Compare commits
No commits in common. "157eb37257aa62b4f7329b92182a5f4c97624339" and "e30c610de4e2eadf40b78741e10c5cf4d3326269" have entirely different histories.
157eb37257
...
e30c610de4
15 changed files with 111 additions and 623 deletions
109
Cargo.lock
generated
109
Cargo.lock
generated
|
@ -74,56 +74,6 @@ dependencies = [
|
|||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstream"
|
||||
version = "0.6.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"anstyle-parse",
|
||||
"anstyle-query",
|
||||
"anstyle-wincon",
|
||||
"colorchoice",
|
||||
"is_terminal_polyfill",
|
||||
"utf8parse",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle"
|
||||
version = "1.0.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-parse"
|
||||
version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9"
|
||||
dependencies = [
|
||||
"utf8parse",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-query"
|
||||
version = "1.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c"
|
||||
dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-wincon"
|
||||
version = "3.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"once_cell",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.91"
|
||||
|
@ -566,52 +516,6 @@ dependencies = [
|
|||
"stacker",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "4.5.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "027bb0d98429ae334a8698531da7077bdf906419543a35a55c2cb1b66437d767"
|
||||
dependencies = [
|
||||
"clap_builder",
|
||||
"clap_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_builder"
|
||||
version = "4.5.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5589e0cba072e0f3d23791efac0fd8627b49c829c196a492e88168e6a669d863"
|
||||
dependencies = [
|
||||
"anstream",
|
||||
"anstyle",
|
||||
"clap_lex",
|
||||
"strsim",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_derive"
|
||||
version = "4.5.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_lex"
|
||||
version = "0.7.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6"
|
||||
|
||||
[[package]]
|
||||
name = "colorchoice"
|
||||
version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
|
||||
|
||||
[[package]]
|
||||
name = "config"
|
||||
version = "0.14.1"
|
||||
|
@ -1664,12 +1568,6 @@ version = "2.10.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708"
|
||||
|
||||
[[package]]
|
||||
name = "is_terminal_polyfill"
|
||||
version = "1.70.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.11"
|
||||
|
@ -2597,7 +2495,6 @@ dependencies = [
|
|||
"axum-extra",
|
||||
"base64 0.22.1",
|
||||
"chrono",
|
||||
"clap",
|
||||
"config",
|
||||
"console_error_panic_hook",
|
||||
"deadpool-diesel",
|
||||
|
@ -3174,12 +3071,6 @@ version = "1.0.4"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
|
||||
|
||||
[[package]]
|
||||
name = "utf8parse"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "1.11.0"
|
||||
|
|
|
@ -32,4 +32,3 @@ diesel = { version = "2.2.6", features = ["postgres", "chrono", "uuid", "serde_j
|
|||
tower = "0.5.2"
|
||||
regex = "1.11.1"
|
||||
lettre = { version = "0.11.12", features = ["tokio1", "serde", "tracing", "tokio1-native-tls"] }
|
||||
clap = { version = "4.5.31", features = ["derive"] }
|
||||
|
|
113
bacon.toml
113
bacon.toml
|
@ -1,114 +1,7 @@
|
|||
# This is a configuration file for the bacon tool
|
||||
#
|
||||
# Complete help on configuration: https://dystroy.org/bacon/config/
|
||||
#
|
||||
# You may check the current default at
|
||||
# https://github.com/Canop/bacon/blob/main/defaults/default-bacon.toml
|
||||
|
||||
default_job = "check"
|
||||
env.CARGO_TERM_COLOR = "always"
|
||||
|
||||
[jobs.check]
|
||||
command = ["cargo", "check"]
|
||||
need_stdout = false
|
||||
|
||||
[jobs.check-all]
|
||||
command = ["cargo", "check", "--all-targets"]
|
||||
need_stdout = false
|
||||
|
||||
# Run clippy on the default target
|
||||
[jobs.clippy]
|
||||
command = ["cargo", "clippy"]
|
||||
need_stdout = false
|
||||
|
||||
# Run clippy on all targets
|
||||
# To disable some lints, you may change the job this way:
|
||||
# [jobs.clippy-all]
|
||||
# command = [
|
||||
# "cargo", "clippy",
|
||||
# "--all-targets",
|
||||
# "--",
|
||||
# "-A", "clippy::bool_to_int_with_if",
|
||||
# "-A", "clippy::collapsible_if",
|
||||
# "-A", "clippy::derive_partial_eq_without_eq",
|
||||
# ]
|
||||
# need_stdout = false
|
||||
[jobs.clippy-all]
|
||||
command = ["cargo", "clippy", "--all-targets"]
|
||||
need_stdout = false
|
||||
|
||||
# This job lets you run
|
||||
# - all tests: bacon test
|
||||
# - a specific test: bacon test -- config::test_default_files
|
||||
# - the tests of a package: bacon test -- -- -p config
|
||||
[jobs.test]
|
||||
command = ["cargo", "test"]
|
||||
[jobs.webserver]
|
||||
command = ["cargo", "run"]
|
||||
need_stdout = true
|
||||
|
||||
[jobs.nextest]
|
||||
command = [
|
||||
"cargo", "nextest", "run",
|
||||
"--hide-progress-bar", "--failure-output", "final"
|
||||
]
|
||||
need_stdout = true
|
||||
analyzer = "nextest"
|
||||
|
||||
[jobs.doc]
|
||||
command = ["cargo", "doc", "--no-deps"]
|
||||
need_stdout = false
|
||||
|
||||
# If the doc compiles, then it opens in your browser and bacon switches
|
||||
# to the previous job
|
||||
[jobs.doc-open]
|
||||
command = ["cargo", "doc", "--no-deps", "--open"]
|
||||
need_stdout = false
|
||||
on_success = "back" # so that we don't open the browser at each change
|
||||
|
||||
# You can run your application and have the result displayed in bacon,
|
||||
# if it makes sense for this crate.
|
||||
[jobs.run-worker]
|
||||
command = [
|
||||
"cargo", "run", "worker",
|
||||
# put launch parameters for your program behind a `--` separator
|
||||
]
|
||||
need_stdout = true
|
||||
allow_warnings = true
|
||||
background = true
|
||||
default_watch = false
|
||||
|
||||
# Run your long-running application (eg server) and have the result displayed in bacon.
|
||||
# For programs that never stop (eg a server), `background` is set to false
|
||||
# to have the cargo run output immediately displayed instead of waiting for
|
||||
# program's end.
|
||||
# 'on_change_strategy' is set to `kill_then_restart` to have your program restart
|
||||
# on every change (an alternative would be to use the 'F5' key manually in bacon).
|
||||
# If you often use this job, it makes sense to override the 'r' key by adding
|
||||
# a binding `r = job:run-long` at the end of this file .
|
||||
[jobs.run-server]
|
||||
command = [
|
||||
"cargo", "run", "serve"
|
||||
# put launch parameters for your program behind a `--` separator
|
||||
]
|
||||
need_stdout = true
|
||||
allow_warnings = true
|
||||
background = false
|
||||
on_change_strategy = "kill_then_restart"
|
||||
kill = ["kill", "-s", "INT"]
|
||||
watch = ["src", "templates"]
|
||||
|
||||
# This parameterized job runs the example of your choice, as soon
|
||||
# as the code compiles.
|
||||
# Call it as
|
||||
# bacon ex -- my-example
|
||||
[jobs.ex]
|
||||
command = ["cargo", "run", "--example"]
|
||||
need_stdout = true
|
||||
allow_warnings = true
|
||||
|
||||
# You may define here keybindings that would be specific to
|
||||
# a project, for example a shortcut to launch a specific job.
|
||||
# Shortcuts to internal functions (scrolling, toggling, etc.)
|
||||
# should go in your personal global prefs.toml file instead.
|
||||
[keybindings]
|
||||
r = "job:run-server"
|
||||
w = "job:run-worker"
|
||||
watch =["src", "templates"]
|
||||
|
|
|
@ -7,7 +7,7 @@ CREATE INDEX ON users (uid);
|
|||
|
||||
CREATE TABLE IF NOT EXISTS csrf_tokens (
|
||||
id UUID NOT NULL PRIMARY KEY,
|
||||
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
|
||||
user_id UUID REFERENCES users(id),
|
||||
created_at TIMESTAMPTZ NOT NULL
|
||||
);
|
||||
CREATE INDEX ON csrf_tokens (created_at);
|
||||
|
@ -18,8 +18,9 @@ CREATE TABLE teams (
|
|||
);
|
||||
|
||||
CREATE TABLE team_memberships (
|
||||
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
|
||||
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||
team_id UUID NOT NULL REFERENCES teams(id),
|
||||
user_id UUID NOT NULL REFERENCES users(id),
|
||||
roles TEXT[] NOT NULL DEFAULT '{}',
|
||||
PRIMARY KEY (team_id, user_id)
|
||||
);
|
||||
CREATE INDEX ON team_memberships (team_id);
|
||||
|
@ -27,7 +28,7 @@ CREATE INDEX ON team_memberships (user_id);
|
|||
|
||||
CREATE TABLE api_keys (
|
||||
id UUID NOT NULL PRIMARY KEY,
|
||||
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
|
||||
team_id UUID NOT NULL REFERENCES teams(id),
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
last_used_at TIMESTAMPTZ
|
||||
);
|
||||
|
@ -43,15 +44,15 @@ CREATE INDEX ON projects(team_id);
|
|||
|
||||
CREATE TABLE IF NOT EXISTS channels (
|
||||
id UUID NOT NULL PRIMARY KEY,
|
||||
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
|
||||
team_id UUID NOT NULL REFERENCES teams(id),
|
||||
name TEXT NOT NULL,
|
||||
enable_by_default BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
backend_config JSONB NOT NULL
|
||||
backend_config JSONB NOT NULL DEFAULT '{}'::JSONB
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS channel_selections (
|
||||
project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
|
||||
channel_id UUID NOT NULL REFERENCES channels(id) ON DELETE CASCADE,
|
||||
project_id UUID NOT NULL REFERENCES projects(id),
|
||||
channel_id UUID NOT NULL REFERENCES channels(id),
|
||||
PRIMARY KEY (project_id, channel_id)
|
||||
);
|
||||
CREATE INDEX ON channel_selections (project_id);
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
CREATE TABLE IF NOT EXISTS messages (
|
||||
id UUID PRIMARY KEY NOT NULL,
|
||||
channel_id UUID NOT NULL REFERENCES channels (id) ON DELETE RESTRICT,
|
||||
project_id UUID NOT NULL REFERENCES projects (id) ON DELETE RESTRICT,
|
||||
channel_id UUID NOT NULL REFERENCES channels (id),
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
sent_at TIMESTAMPTZ,
|
||||
message TEXT NOT NULL
|
||||
|
|
|
@ -13,7 +13,7 @@ use oauth2::{
|
|||
ClientSecret, CsrfToken, RedirectUrl, RefreshToken, TokenResponse, TokenUrl,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::{trace_span, Instrument};
|
||||
use tracing::trace_span;
|
||||
|
||||
use crate::{
|
||||
app_error::AppError,
|
||||
|
@ -197,7 +197,7 @@ impl FromRequestParts<AppState> for AuthInfo {
|
|||
parts: &mut Parts,
|
||||
state: &AppState,
|
||||
) -> Result<Self, <Self as FromRequestParts<AppState>>::Rejection> {
|
||||
async move {
|
||||
let _ = trace_span!("AuthInfo from_request_parts()").enter();
|
||||
let session = parts
|
||||
.extract_with_state::<AppSession, AppState>(state)
|
||||
.await?
|
||||
|
@ -210,8 +210,4 @@ impl FromRequestParts<AppState> for AuthInfo {
|
|||
)?;
|
||||
Ok(user)
|
||||
}
|
||||
// The Span.enter() guard pattern doesn't play nicely async
|
||||
.instrument(trace_span!("AuthInfo from_request_parts()"))
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
|
143
src/email.rs
143
src/email.rs
|
@ -7,7 +7,7 @@ use crate::app_state::AppState;
|
|||
|
||||
const POSTMARK_EMAIL_BATCH_URL: &'static str = "https://api.postmarkapp.com/email/batch";
|
||||
|
||||
#[derive(Clone, Serialize)]
|
||||
#[derive(Serialize)]
|
||||
pub struct Message {
|
||||
#[serde(rename = "From")]
|
||||
pub from: lettre::message::Mailbox,
|
||||
|
@ -21,11 +21,7 @@ pub struct Message {
|
|||
}
|
||||
|
||||
pub trait MailSender: Clone + Sync {
|
||||
/**
|
||||
* Attempt to send all messages defined by the input Vec. Send as many as
|
||||
* possible, returning exactly one Result<()> for each message.
|
||||
*/
|
||||
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>>;
|
||||
async fn send_batch(&self, emails: Vec<Message>) -> Result<(), anyhow::Error>;
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
@ -52,7 +48,7 @@ impl Mailer {
|
|||
}
|
||||
|
||||
impl MailSender for Mailer {
|
||||
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>> {
|
||||
async fn send_batch(&self, emails: Vec<Message>) -> Result<(), anyhow::Error> {
|
||||
match self {
|
||||
Mailer::Smtp(sender) => sender.send_batch(emails).await,
|
||||
Mailer::Postmark(sender) => sender.send_batch(emails).await,
|
||||
|
@ -108,25 +104,11 @@ where
|
|||
}
|
||||
|
||||
impl MailSender for SmtpSender {
|
||||
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>> {
|
||||
let mut results: Vec<Result<()>> = Vec::with_capacity(emails.len());
|
||||
async fn send_batch(&self, emails: Vec<Message>) -> Result<()> {
|
||||
for email in emails {
|
||||
match TryInto::<lettre::Message>::try_into(email) {
|
||||
Ok(email) => {
|
||||
results.push(
|
||||
self.transport
|
||||
.send(email)
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(Into::<anyhow::Error>::into),
|
||||
);
|
||||
self.transport.send(email.try_into()?).await?;
|
||||
}
|
||||
Err(err) => {
|
||||
results.push(Err(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
results
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -150,114 +132,19 @@ impl PostmarkSender {
|
|||
}
|
||||
|
||||
impl MailSender for PostmarkSender {
|
||||
/**
|
||||
* Recursively attempts to send messages, breaking them into smaller and
|
||||
* smaller batches as needed.
|
||||
*/
|
||||
async fn send_batch(&self, mut emails: Vec<Message>) -> Vec<Result<()>> {
|
||||
/**
|
||||
* Constructs a Vec with Ok(()) repeated n times.
|
||||
*/
|
||||
macro_rules! all_ok {
|
||||
() => {{
|
||||
let mut collection: Vec<Result<_>> = Vec::with_capacity(emails.len());
|
||||
for _ in 0..emails.len() {
|
||||
collection.push(Ok(()));
|
||||
async fn send_batch(&self, emails: Vec<Message>) -> Result<()> {
|
||||
if emails.len() > 500 {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Postmark sends no more than 500 messages per batch"
|
||||
));
|
||||
}
|
||||
collection
|
||||
}};
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a Vec with a single specific error, followed by n-1
|
||||
* generic errors referring back to it.
|
||||
*/
|
||||
macro_rules! cascade_err {
|
||||
($err:expr) => {{
|
||||
let mut collection: Vec<Result<_>> = Vec::with_capacity(emails.len());
|
||||
collection.push(Err($err));
|
||||
for _ in 1..emails.len() {
|
||||
collection.push(Err(anyhow::anyhow!("could not send due to previous error")));
|
||||
}
|
||||
collection
|
||||
}};
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively splits the email batch in half and tries to send each
|
||||
* half independently, allowing both to run to completion and then
|
||||
* returning the first error of the two results, if present.
|
||||
*
|
||||
* This is implemented as a macro in order to avoid unstable async
|
||||
* closures.
|
||||
*/
|
||||
macro_rules! split_and_retry {
|
||||
() => {
|
||||
if emails.len() < 2 {
|
||||
tracing::warn!("Postmark send batch cannot be split any further");
|
||||
vec![Err(anyhow::anyhow!(
|
||||
"unable to split Postmark batch into smaller slices"
|
||||
))]
|
||||
} else {
|
||||
tracing::debug!("retrying Postmark send with smaller batches");
|
||||
let mut results =
|
||||
Box::pin(self.send_batch(emails.split_off(emails.len() / 2))).await;
|
||||
results.extend(Box::pin(self.send_batch(emails)).await);
|
||||
results
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// https://postmarkapp.com/support/article/1056-what-are-the-attachment-and-email-size-limits
|
||||
const POSTMARK_MAX_BATCH_ENTRIES: usize = 500;
|
||||
const POSTMARK_MAX_REQUEST_BYTES: usize = 50 * 1000 * 1000;
|
||||
// TODO: Check email subject and body size against Postmark limits
|
||||
|
||||
if emails.len() == 0 {
|
||||
tracing::debug!("no Postmark messages to send");
|
||||
vec![Ok(())]
|
||||
} else if emails.len() > POSTMARK_MAX_BATCH_ENTRIES {
|
||||
split_and_retry!()
|
||||
} else {
|
||||
let body = match serde_json::to_string(&emails) {
|
||||
Ok(body) => body,
|
||||
Err(err) => return cascade_err!(err.into()),
|
||||
};
|
||||
if body.len() > POSTMARK_MAX_REQUEST_BYTES {
|
||||
if emails.len() > 1 {
|
||||
split_and_retry!()
|
||||
} else {
|
||||
vec![Err(anyhow::anyhow!(
|
||||
"Postmark requests may not exceed {} bytes",
|
||||
POSTMARK_MAX_REQUEST_BYTES
|
||||
))]
|
||||
}
|
||||
} else {
|
||||
tracing::debug!("sending Postmark batch of {} messages", emails.len());
|
||||
let resp = match self
|
||||
.client
|
||||
self.client
|
||||
.post(POSTMARK_EMAIL_BATCH_URL)
|
||||
.header("X-Postmark-Server-Token", &self.server_token)
|
||||
.header(reqwest::header::CONTENT_TYPE, "application/json")
|
||||
.body(body)
|
||||
.json(&emails)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(resp) => resp,
|
||||
Err(err) => return cascade_err!(err.into()),
|
||||
};
|
||||
if resp.status().is_client_error() && emails.len() > 1 {
|
||||
split_and_retry!()
|
||||
} else {
|
||||
if let Err(err) = resp.error_for_status() {
|
||||
cascade_err!(err.into())
|
||||
} else {
|
||||
tracing::debug!("sent Postmark batch of {} messages", emails.len());
|
||||
all_ok!()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
57
src/main.rs
57
src/main.rs
|
@ -18,40 +18,16 @@ mod team_memberships;
|
|||
mod teams;
|
||||
mod users;
|
||||
mod v0_router;
|
||||
mod worker;
|
||||
|
||||
use std::process::exit;
|
||||
|
||||
use chrono::{TimeDelta, Utc};
|
||||
use clap::{Parser, Subcommand};
|
||||
use email::SmtpOptions;
|
||||
use tokio::time::sleep;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
use crate::{
|
||||
app_state::AppState, email::Mailer, router::new_router, sessions::PgStore, settings::Settings,
|
||||
worker::run_worker,
|
||||
};
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(version, about, long_about = None)]
|
||||
struct Cli {
|
||||
#[command(subcommand)]
|
||||
command: Commands,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum Commands {
|
||||
/// Run web server
|
||||
Serve,
|
||||
/// Run background worker
|
||||
Worker {
|
||||
/// Loop the every n seconds instead of exiting after execution
|
||||
#[arg(long)]
|
||||
auto_loop_seconds: Option<u32>,
|
||||
},
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let settings = Settings::load().unwrap();
|
||||
|
@ -60,8 +36,6 @@ async fn main() {
|
|||
.with_env_filter(EnvFilter::from_default_env())
|
||||
.init();
|
||||
|
||||
let cli = Cli::parse();
|
||||
|
||||
let database_url = settings.database_url.clone();
|
||||
let manager =
|
||||
deadpool_diesel::postgres::Manager::new(database_url, deadpool_diesel::Runtime::Tokio1);
|
||||
|
@ -92,7 +66,7 @@ async fn main() {
|
|||
};
|
||||
|
||||
let app_state = AppState {
|
||||
db_pool: db_pool.clone(),
|
||||
db_pool,
|
||||
mailer,
|
||||
oauth_client,
|
||||
reqwest_client,
|
||||
|
@ -100,12 +74,9 @@ async fn main() {
|
|||
settings: settings.clone(),
|
||||
};
|
||||
|
||||
match &cli.command {
|
||||
Commands::Serve => {
|
||||
let router = new_router(app_state);
|
||||
|
||||
let listener =
|
||||
tokio::net::TcpListener::bind((settings.host.clone(), settings.port.clone()))
|
||||
let listener = tokio::net::TcpListener::bind((settings.host.clone(), settings.port.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
tracing::info!(
|
||||
|
@ -115,28 +86,4 @@ async fn main() {
|
|||
settings.base_path
|
||||
);
|
||||
axum::serve(listener, router).await.unwrap();
|
||||
}
|
||||
Commands::Worker { auto_loop_seconds } => {
|
||||
if let Some(loop_seconds) = auto_loop_seconds {
|
||||
let loop_delta = TimeDelta::seconds(i64::from(*loop_seconds));
|
||||
loop {
|
||||
let t_next_loop = Utc::now() + loop_delta;
|
||||
|
||||
if let Err(err) = run_worker(app_state.clone()).await {
|
||||
tracing::error!("{}", err)
|
||||
}
|
||||
|
||||
let sleep_delta = t_next_loop - Utc::now();
|
||||
match sleep_delta.to_std() {
|
||||
Ok(duration) => {
|
||||
sleep(duration).await;
|
||||
}
|
||||
Err(_) => { /* sleep_delta was < 0, so don't sleep */ }
|
||||
}
|
||||
}
|
||||
} else {
|
||||
run_worker(app_state).await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,10 +13,8 @@ use crate::{channels::Channel, schema::messages};
|
|||
#[diesel(belongs_to(Channel))]
|
||||
pub struct Message {
|
||||
pub id: Uuid,
|
||||
pub project_id: Uuid,
|
||||
pub channel_id: Uuid,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub sent_at: Option<DateTime<Utc>>,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
|
@ -31,9 +29,4 @@ impl Message {
|
|||
pub fn with_channel(channel_id: Uuid) -> _ {
|
||||
messages::channel_id.eq(channel_id)
|
||||
}
|
||||
|
||||
#[auto_type(no_type_alias)]
|
||||
pub fn is_not_sent() -> _ {
|
||||
messages::sent_at.is_null()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -214,6 +214,7 @@ async fn post_new_team(
|
|||
let team_membership = TeamMembership {
|
||||
team_id: team_id.clone(),
|
||||
user_id: current_user.id,
|
||||
roles: vec![Some("OWNER".to_string())],
|
||||
};
|
||||
db_conn
|
||||
.interact(move |conn| {
|
||||
|
@ -593,7 +594,7 @@ async fn update_channel_email_recipient(
|
|||
subject: "Verify Your Email".to_string(),
|
||||
text_body: format!("Your email verification code is: {}", verification_code),
|
||||
};
|
||||
mailer.send_batch(vec![email]).await.remove(0)?;
|
||||
mailer.send_batch(vec![email]).await?;
|
||||
|
||||
Ok(Redirect::to(&format!(
|
||||
"{}/teams/{}/channels/{}",
|
||||
|
|
|
@ -47,7 +47,6 @@ diesel::table! {
|
|||
messages (id) {
|
||||
id -> Uuid,
|
||||
channel_id -> Uuid,
|
||||
project_id -> Uuid,
|
||||
created_at -> Timestamptz,
|
||||
sent_at -> Nullable<Timestamptz>,
|
||||
message -> Text,
|
||||
|
@ -66,6 +65,7 @@ diesel::table! {
|
|||
team_memberships (team_id, user_id) {
|
||||
team_id -> Uuid,
|
||||
user_id -> Uuid,
|
||||
roles -> Array<Nullable<Text>>,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -90,7 +90,6 @@ diesel::joinable!(channel_selections -> projects (project_id));
|
|||
diesel::joinable!(channels -> teams (team_id));
|
||||
diesel::joinable!(csrf_tokens -> users (user_id));
|
||||
diesel::joinable!(messages -> channels (channel_id));
|
||||
diesel::joinable!(messages -> projects (project_id));
|
||||
diesel::joinable!(projects -> teams (team_id));
|
||||
diesel::joinable!(team_memberships -> teams (team_id));
|
||||
diesel::joinable!(team_memberships -> users (user_id));
|
||||
|
|
|
@ -8,7 +8,7 @@ use axum::{
|
|||
use axum_extra::extract::CookieJar;
|
||||
use chrono::{DateTime, TimeDelta, Utc};
|
||||
use diesel::{pg::Pg, prelude::*, upsert::excluded};
|
||||
use tracing::{trace_span, Instrument};
|
||||
use tracing::trace_span;
|
||||
|
||||
use crate::{app_error::AppError, app_state::AppState, schema::browser_sessions};
|
||||
|
||||
|
@ -134,7 +134,7 @@ impl FromRequestParts<AppState> for AppSession {
|
|||
parts: &mut Parts,
|
||||
state: &AppState,
|
||||
) -> Result<Self, <Self as FromRequestParts<AppState>>::Rejection> {
|
||||
async move {
|
||||
let _ = trace_span!("AppSession::from_request_parts()").enter();
|
||||
let jar = parts.extract::<CookieJar>().await.unwrap();
|
||||
let session_cookie = match jar.get(&state.settings.auth.cookie_name) {
|
||||
Some(cookie) => cookie,
|
||||
|
@ -167,7 +167,5 @@ impl FromRequestParts<AppState> for AppSession {
|
|||
tracing::debug!("no matching session found in database");
|
||||
Ok(AppSession(None))
|
||||
}
|
||||
// The Span.enter() guard pattern doesn't play nicely async
|
||||
}.instrument(trace_span!("AppSession::from_request_parts()")).await
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ use crate::{
|
|||
pub struct TeamMembership {
|
||||
pub team_id: Uuid,
|
||||
pub user_id: Uuid,
|
||||
pub roles: Vec<Option<String>>,
|
||||
}
|
||||
|
||||
impl TeamMembership {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use anyhow::Context;
|
||||
use axum::{
|
||||
extract::Query,
|
||||
extract::{Query, State},
|
||||
response::{IntoResponse, Json},
|
||||
routing::get,
|
||||
Router,
|
||||
|
@ -14,9 +14,11 @@ use crate::{
|
|||
api_keys::ApiKey,
|
||||
app_error::AppError,
|
||||
app_state::{AppState, DbConn},
|
||||
channels::Channel,
|
||||
channels::{Channel, EmailBackendConfig},
|
||||
email::{MailSender as _, Mailer},
|
||||
projects::Project,
|
||||
schema::{api_keys, messages, projects},
|
||||
schema::{api_keys, projects},
|
||||
settings::Settings,
|
||||
};
|
||||
|
||||
pub fn new_router(state: AppState) -> Router<AppState> {
|
||||
|
@ -31,6 +33,11 @@ struct SayQuery {
|
|||
}
|
||||
|
||||
async fn say_get(
|
||||
State(Settings {
|
||||
email: email_settings,
|
||||
..
|
||||
}): State<Settings>,
|
||||
State(mailer): State<Mailer>,
|
||||
DbConn(db_conn): DbConn,
|
||||
Query(query): Query<SayQuery>,
|
||||
) -> Result<impl IntoResponse, AppError> {
|
||||
|
@ -51,13 +58,12 @@ async fn say_get(
|
|||
.unwrap()?
|
||||
};
|
||||
|
||||
let project = {
|
||||
let selected_channels = {
|
||||
let project_name = query.project.to_lowercase();
|
||||
db_conn
|
||||
.interact::<_, Result<Project, AppError>>(move |conn| {
|
||||
.interact::<_, Result<Vec<Channel>, AppError>>(move |conn| {
|
||||
conn.transaction(move |conn| {
|
||||
Ok(
|
||||
match Project::all()
|
||||
let project = match Project::all()
|
||||
.filter(Project::with_team(api_key.team_id))
|
||||
.filter(Project::with_name(project_name.clone()))
|
||||
.first(conn)
|
||||
|
@ -73,46 +79,34 @@ async fn say_get(
|
|||
))
|
||||
.get_result(conn)
|
||||
.context("failed to insert project")?,
|
||||
},
|
||||
)
|
||||
})
|
||||
})
|
||||
.await
|
||||
.unwrap()?
|
||||
};
|
||||
let selected_channels = {
|
||||
let project = project.clone();
|
||||
db_conn
|
||||
.interact::<_, Result<Vec<Channel>, AppError>>(move |conn| {
|
||||
Ok(project
|
||||
.selected_channels()
|
||||
.load(conn)
|
||||
.context("failed to load selected channels")?)
|
||||
})
|
||||
})
|
||||
.await
|
||||
.unwrap()?
|
||||
};
|
||||
|
||||
{
|
||||
let selected_channels = selected_channels.clone();
|
||||
db_conn
|
||||
.interact::<_, Result<_, AppError>>(move |conn| {
|
||||
for channel in selected_channels {
|
||||
insert_into(messages::table)
|
||||
.values((
|
||||
messages::id.eq(Uuid::now_v7()),
|
||||
messages::channel_id.eq(&channel.id),
|
||||
messages::project_id.eq(&project.id),
|
||||
messages::message.eq(&query.message),
|
||||
))
|
||||
.execute(conn)?;
|
||||
if let Ok(config) = TryInto::<EmailBackendConfig>::try_into(channel.backend_config) {
|
||||
if config.verified {
|
||||
let recipient: lettre::message::Mailbox = config.recipient.parse()?;
|
||||
let email = crate::email::Message {
|
||||
from: email_settings.message_from.clone().into(),
|
||||
to: recipient.into(),
|
||||
subject: "Shout".to_string(),
|
||||
text_body: query.message.clone(),
|
||||
};
|
||||
tracing::info!("Sending email to recipient for channel {}", channel.id);
|
||||
mailer.send_batch(vec![email]).await?;
|
||||
} else {
|
||||
tracing::info!("Email recipient for channel {} is not verified", channel.id);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
}
|
||||
tracing::debug!("queued {} messages", selected_channels.len());
|
||||
|
||||
Ok(Json(json!({ "ok": true })))
|
||||
}
|
||||
|
|
111
src/worker.rs
111
src/worker.rs
|
@ -1,111 +0,0 @@
|
|||
use anyhow::{Context as _, Result};
|
||||
use diesel::prelude::*;
|
||||
use tracing::Instrument as _;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
app_state::AppState,
|
||||
channels::{Channel, EmailBackendConfig},
|
||||
email::MailSender,
|
||||
messages::Message,
|
||||
schema::{channels, messages},
|
||||
};
|
||||
|
||||
pub async fn run_worker(state: AppState) -> Result<()> {
|
||||
async move {
|
||||
process_messages(state).await?;
|
||||
Ok(())
|
||||
}
|
||||
.instrument(tracing::debug_span!("run_worker()"))
|
||||
.await
|
||||
}
|
||||
|
||||
/**
|
||||
* Process messages from the queue in the `messages` table. Insertions to the
|
||||
* queue are rate limited per team and per project, so no effort should be
|
||||
* needed here to enforce fairness.
|
||||
*/
|
||||
async fn process_messages(state: AppState) -> Result<()> {
|
||||
async move {
|
||||
const MESSAGE_QUEUE_LIMIT: i64 = 250;
|
||||
let db_conn = state.db_pool.get().await?;
|
||||
let queued_messages = db_conn
|
||||
.interact::<_, Result<Vec<(Message, Channel)>>>(move |conn| {
|
||||
messages::table
|
||||
.inner_join(channels::table)
|
||||
.select((Message::as_select(), Channel::as_select()))
|
||||
.filter(Message::is_not_sent())
|
||||
.order(messages::created_at.asc())
|
||||
.limit(MESSAGE_QUEUE_LIMIT)
|
||||
.load(conn)
|
||||
.context("failed to load queued messages")
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
// Dispatch email messages together to take advantage of Postmark's
|
||||
// batch send API
|
||||
let emails: Vec<(Uuid, crate::email::Message)> = queued_messages
|
||||
.iter()
|
||||
.filter_map(|(message, channel)| {
|
||||
if let Ok(backend_config) =
|
||||
TryInto::<EmailBackendConfig>::try_into(channel.backend_config.clone())
|
||||
{
|
||||
if backend_config.verified {
|
||||
let recipient: lettre::message::Mailbox = if let Ok(recipient) =
|
||||
backend_config.recipient.parse()
|
||||
{
|
||||
recipient
|
||||
} else {
|
||||
tracing::error!("failed to parse recipient for channel {}", channel.id);
|
||||
return None;
|
||||
};
|
||||
let email = crate::email::Message {
|
||||
from: state.settings.email.message_from.clone().into(),
|
||||
to: recipient.into(),
|
||||
subject: "Shout".to_string(),
|
||||
text_body: message.message.clone(),
|
||||
};
|
||||
tracing::debug!("Sending email to recipient for channel {}", channel.id);
|
||||
Some((message.id.clone(), email))
|
||||
} else {
|
||||
tracing::info!(
|
||||
"Email recipient for channel {} is not verified",
|
||||
channel.id
|
||||
);
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
if !emails.is_empty() {
|
||||
let message_ids: Vec<Uuid> = emails.iter().map(|(id, _)| id.clone()).collect();
|
||||
let results = state
|
||||
.mailer
|
||||
.send_batch(emails.into_iter().map(|(_, email)| email).collect())
|
||||
.await;
|
||||
assert!(results.len() == message_ids.len());
|
||||
let results_by_id = message_ids.into_iter().zip(results.into_iter());
|
||||
db_conn
|
||||
.interact::<_, Result<_>>(move |conn| {
|
||||
for (id, result) in results_by_id {
|
||||
if let Err(err) = result {
|
||||
tracing::error!("error sending message {}: {:?}", id, err);
|
||||
} else {
|
||||
diesel::update(messages::table.filter(messages::id.eq(id)))
|
||||
.set(messages::sent_at.eq(diesel::dsl::now))
|
||||
.execute(conn)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
}
|
||||
tracing::info!("finished processing messages");
|
||||
Ok(())
|
||||
}
|
||||
.instrument(tracing::debug_span!("process_messages()"))
|
||||
.await
|
||||
}
|
Loading…
Add table
Reference in a new issue