From ac056c0aa39006910672b53a7af48d9372942b2a Mon Sep 17 00:00:00 2001 From: Brent Schroeter Date: Sat, 8 Mar 2025 22:18:24 -0800 Subject: [PATCH] add background worker to send messages --- Cargo.lock | 109 +++++++++++++++++++ Cargo.toml | 1 + migrations/2025-01-31-045014_messages/up.sql | 1 + src/email.rs | 84 +++++++++++--- src/main.rs | 77 +++++++++++-- src/messages.rs | 7 ++ src/schema.rs | 2 + src/v0_router.rs | 104 +++++++++--------- src/worker.rs | 97 +++++++++++++++++ 9 files changed, 408 insertions(+), 74 deletions(-) create mode 100644 src/worker.rs diff --git a/Cargo.lock b/Cargo.lock index 1d3133c..58b88ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -74,6 +74,56 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" + +[[package]] +name = "anstyle-parse" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" +dependencies = [ + "windows-sys 0.59.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e" +dependencies = [ + "anstyle", + "once_cell", + "windows-sys 0.59.0", +] + [[package]] name = "anyhow" version = "1.0.91" @@ -516,6 +566,52 @@ dependencies = [ "stacker", ] +[[package]] +name = "clap" +version = "4.5.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "027bb0d98429ae334a8698531da7077bdf906419543a35a55c2cb1b66437d767" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5589e0cba072e0f3d23791efac0fd8627b49c829c196a492e88168e6a669d863" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" + +[[package]] +name = "colorchoice" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" + [[package]] name = "config" version = "0.14.1" @@ -1568,6 +1664,12 @@ version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itoa" version = "1.0.11" @@ -2495,6 +2597,7 @@ dependencies = [ "axum-extra", "base64 0.22.1", "chrono", + "clap", "config", "console_error_panic_hook", "deadpool-diesel", @@ -3071,6 +3174,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.11.0" diff --git a/Cargo.toml b/Cargo.toml index 30a8e40..b57a7cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,3 +32,4 @@ diesel = { version = "2.2.6", features = ["postgres", "chrono", "uuid", "serde_j tower = "0.5.2" regex = "1.11.1" lettre = { version = "0.11.12", features = ["tokio1", "serde", "tracing", "tokio1-native-tls"] } +clap = { version = "4.5.31", features = ["derive"] } diff --git a/migrations/2025-01-31-045014_messages/up.sql b/migrations/2025-01-31-045014_messages/up.sql index f03c60f..03ce902 100644 --- a/migrations/2025-01-31-045014_messages/up.sql +++ b/migrations/2025-01-31-045014_messages/up.sql @@ -1,6 +1,7 @@ CREATE TABLE IF NOT EXISTS messages ( id UUID PRIMARY KEY NOT NULL, channel_id UUID NOT NULL REFERENCES channels (id) ON DELETE RESTRICT, + project_id UUID NOT NULL REFERENCES projects (id) ON DELETE RESTRICT, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), sent_at TIMESTAMPTZ, message TEXT NOT NULL diff --git a/src/email.rs b/src/email.rs index 19faea8..aa330e6 100644 --- a/src/email.rs +++ b/src/email.rs @@ -7,7 +7,7 @@ use crate::app_state::AppState; const POSTMARK_EMAIL_BATCH_URL: &'static str = "https://api.postmarkapp.com/email/batch"; -#[derive(Serialize)] +#[derive(Clone, Serialize)] pub struct Message { #[serde(rename = "From")] pub from: lettre::message::Mailbox, @@ -21,6 +21,8 @@ pub struct Message { } pub trait MailSender: Clone + Sync { + // TODO: Document a consistent contract for how partial successes should + // behave async fn send_batch(&self, emails: Vec) -> Result<(), anyhow::Error>; } @@ -132,19 +134,75 @@ impl PostmarkSender { } impl MailSender for PostmarkSender { - async fn send_batch(&self, emails: Vec) -> Result<()> { - if emails.len() > 500 { - return Err(anyhow::anyhow!( - "Postmark sends no more than 500 messages per batch" - )); + async fn send_batch(&self, mut emails: Vec) -> Result<()> { + // https://postmarkapp.com/support/article/1056-what-are-the-attachment-and-email-size-limits + const POSTMARK_MAX_BATCH_ENTRIES: usize = 500; + const POSTMARK_MAX_REQUEST_BYTES: usize = 50 * 1000 * 1000; + // TODO: Check email subject and body size against Postmark limits + let count = emails.len(); + + // TODO: This may be more readable as a closure + /** + * Recursively splits the email batch in half and tries to send each + * half independently, allowing both to run to completion and then + * returning the first error of the two results, if present. Panics if + * the batch is too small to split, so the caller is responsible for + * handling that case before invoking the macro. + */ + macro_rules! split_and_retry { + () => { + tracing::debug!("retrying Postmark send with smaller batches"); + assert!(count > 1); + let res1 = Box::pin(self.send_batch(emails.split_off(count / 2))).await; + let res2 = Box::pin(self.send_batch(emails)).await; + match (res1, res2) { + (Err(err), _) => { + return Err(err); + } + (_, Err(err)) => { + return Err(err); + } + _ => {} + } + }; + } + + if count == 0 { + tracing::debug!("no Postmark messages to send"); + Ok(()) + } else if count > POSTMARK_MAX_BATCH_ENTRIES { + split_and_retry!(); + Ok(()) + } else { + let body = serde_json::to_string(&emails)?; + if body.len() > POSTMARK_MAX_REQUEST_BYTES { + if count > 1 { + split_and_retry!(); + Ok(()) + } else { + Err(anyhow::anyhow!( + "Postmark requests may not exceed {} bytes", + POSTMARK_MAX_REQUEST_BYTES + )) + } + } else { + tracing::debug!("sending Postmark batch of {} messages", count); + let resp = self + .client + .post(POSTMARK_EMAIL_BATCH_URL) + .header("X-Postmark-Server-Token", &self.server_token) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(body) + .send() + .await?; + if resp.status().is_client_error() && count > 1 { + split_and_retry!(); + } + resp.error_for_status()?; + tracing::debug!("sent Postmark batch of {} messages", count); + Ok(()) + } } - self.client - .post(POSTMARK_EMAIL_BATCH_URL) - .header("X-Postmark-Server-Token", &self.server_token) - .json(&emails) - .send() - .await?; - Ok(()) } } diff --git a/src/main.rs b/src/main.rs index 07eb4a1..4fcc0c7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,16 +18,40 @@ mod team_memberships; mod teams; mod users; mod v0_router; +mod worker; use std::process::exit; +use chrono::{TimeDelta, Utc}; +use clap::{Parser, Subcommand}; use email::SmtpOptions; +use tokio::time::sleep; use tracing_subscriber::EnvFilter; use crate::{ app_state::AppState, email::Mailer, router::new_router, sessions::PgStore, settings::Settings, + worker::run_worker, }; +#[derive(Parser)] +#[command(version, about, long_about = None)] +struct Cli { + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// Run web server + Serve, + /// Run background worker + Worker { + /// Loop the every n seconds instead of exiting after execution + #[arg(long)] + auto_loop_seconds: Option, + }, +} + #[tokio::main] async fn main() { let settings = Settings::load().unwrap(); @@ -36,6 +60,8 @@ async fn main() { .with_env_filter(EnvFilter::from_default_env()) .init(); + let cli = Cli::parse(); + let database_url = settings.database_url.clone(); let manager = deadpool_diesel::postgres::Manager::new(database_url, deadpool_diesel::Runtime::Tokio1); @@ -66,7 +92,7 @@ async fn main() { }; let app_state = AppState { - db_pool, + db_pool: db_pool.clone(), mailer, oauth_client, reqwest_client, @@ -74,16 +100,43 @@ async fn main() { settings: settings.clone(), }; - let router = new_router(app_state); + match &cli.command { + Commands::Serve => { + let router = new_router(app_state); - let listener = tokio::net::TcpListener::bind((settings.host.clone(), settings.port.clone())) - .await - .unwrap(); - tracing::info!( - "App running at http://{}:{}{}", - settings.host, - settings.port, - settings.base_path - ); - axum::serve(listener, router).await.unwrap(); + let listener = + tokio::net::TcpListener::bind((settings.host.clone(), settings.port.clone())) + .await + .unwrap(); + tracing::info!( + "App running at http://{}:{}{}", + settings.host, + settings.port, + settings.base_path + ); + axum::serve(listener, router).await.unwrap(); + } + Commands::Worker { auto_loop_seconds } => { + if let Some(loop_seconds) = auto_loop_seconds { + let loop_delta = TimeDelta::seconds(i64::from(*loop_seconds)); + loop { + let t_next_loop = Utc::now() + loop_delta; + + if let Err(err) = run_worker(app_state.clone()).await { + tracing::error!("{}", err) + } + + let sleep_delta = t_next_loop - Utc::now(); + match sleep_delta.to_std() { + Ok(duration) => { + sleep(duration).await; + } + Err(_) => { /* sleep_delta was < 0, so don't sleep */ } + } + } + } else { + run_worker(app_state).await.unwrap(); + } + } + } } diff --git a/src/messages.rs b/src/messages.rs index 0beac30..a84ec93 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -13,8 +13,10 @@ use crate::{channels::Channel, schema::messages}; #[diesel(belongs_to(Channel))] pub struct Message { pub id: Uuid, + pub project_id: Uuid, pub channel_id: Uuid, pub created_at: DateTime, + pub sent_at: Option>, pub message: String, } @@ -29,4 +31,9 @@ impl Message { pub fn with_channel(channel_id: Uuid) -> _ { messages::channel_id.eq(channel_id) } + + #[auto_type(no_type_alias)] + pub fn is_not_sent() -> _ { + messages::sent_at.is_null() + } } diff --git a/src/schema.rs b/src/schema.rs index 1ea0c74..3219607 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -47,6 +47,7 @@ diesel::table! { messages (id) { id -> Uuid, channel_id -> Uuid, + project_id -> Uuid, created_at -> Timestamptz, sent_at -> Nullable, message -> Text, @@ -89,6 +90,7 @@ diesel::joinable!(channel_selections -> projects (project_id)); diesel::joinable!(channels -> teams (team_id)); diesel::joinable!(csrf_tokens -> users (user_id)); diesel::joinable!(messages -> channels (channel_id)); +diesel::joinable!(messages -> projects (project_id)); diesel::joinable!(projects -> teams (team_id)); diesel::joinable!(team_memberships -> teams (team_id)); diesel::joinable!(team_memberships -> users (user_id)); diff --git a/src/v0_router.rs b/src/v0_router.rs index 5f1e41a..ada057b 100644 --- a/src/v0_router.rs +++ b/src/v0_router.rs @@ -1,6 +1,6 @@ use anyhow::Context; use axum::{ - extract::{Query, State}, + extract::Query, response::{IntoResponse, Json}, routing::get, Router, @@ -14,11 +14,9 @@ use crate::{ api_keys::ApiKey, app_error::AppError, app_state::{AppState, DbConn}, - channels::{Channel, EmailBackendConfig}, - email::{MailSender as _, Mailer}, + channels::Channel, projects::Project, - schema::{api_keys, projects}, - settings::Settings, + schema::{api_keys, messages, projects}, }; pub fn new_router(state: AppState) -> Router { @@ -33,11 +31,6 @@ struct SayQuery { } async fn say_get( - State(Settings { - email: email_settings, - .. - }): State, - State(mailer): State, DbConn(db_conn): DbConn, Query(query): Query, ) -> Result { @@ -58,55 +51,68 @@ async fn say_get( .unwrap()? }; - let selected_channels = { + let project = { let project_name = query.project.to_lowercase(); db_conn - .interact::<_, Result, AppError>>(move |conn| { + .interact::<_, Result>(move |conn| { conn.transaction(move |conn| { - let project = match Project::all() - .filter(Project::with_team(api_key.team_id)) - .filter(Project::with_name(project_name.clone())) - .first(conn) - .optional() - .context("failed to load project")? - { - Some(project) => project, - None => insert_into(projects::table) - .values(( - projects::id.eq(Uuid::now_v7()), - projects::team_id.eq(api_key.team_id), - projects::name.eq(project_name), - )) - .get_result(conn) - .context("failed to insert project")?, - }; - Ok(project - .selected_channels() - .load(conn) - .context("failed to load selected channels")?) + Ok( + match Project::all() + .filter(Project::with_team(api_key.team_id)) + .filter(Project::with_name(project_name.clone())) + .first(conn) + .optional() + .context("failed to load project")? + { + Some(project) => project, + None => insert_into(projects::table) + .values(( + projects::id.eq(Uuid::now_v7()), + projects::team_id.eq(api_key.team_id), + projects::name.eq(project_name), + )) + .get_result(conn) + .context("failed to insert project")?, + }, + ) }) }) .await .unwrap()? }; + let selected_channels = { + let project = project.clone(); + db_conn + .interact::<_, Result, AppError>>(move |conn| { + Ok(project + .selected_channels() + .load(conn) + .context("failed to load selected channels")?) + }) + .await + .unwrap()? + }; - for channel in selected_channels { - if let Ok(config) = TryInto::::try_into(channel.backend_config) { - if config.verified { - let recipient: lettre::message::Mailbox = config.recipient.parse()?; - let email = crate::email::Message { - from: email_settings.message_from.clone().into(), - to: recipient.into(), - subject: "Shout".to_string(), - text_body: query.message.clone(), - }; - tracing::info!("Sending email to recipient for channel {}", channel.id); - mailer.send_batch(vec![email]).await?; - } else { - tracing::info!("Email recipient for channel {} is not verified", channel.id); - } - } + { + let selected_channels = selected_channels.clone(); + db_conn + .interact::<_, Result<_, AppError>>(move |conn| { + for channel in selected_channels { + insert_into(messages::table) + .values(( + messages::id.eq(Uuid::now_v7()), + messages::channel_id.eq(&channel.id), + messages::project_id.eq(&project.id), + messages::message.eq(&query.message), + )) + .execute(conn)?; + } + Ok(()) + }) + .await + .unwrap()?; } + tracing::debug!("queued {} messages", selected_channels.len()); Ok(Json(json!({ "ok": true }))) } diff --git a/src/worker.rs b/src/worker.rs new file mode 100644 index 0000000..c86932b --- /dev/null +++ b/src/worker.rs @@ -0,0 +1,97 @@ +use anyhow::{Context as _, Result}; +use diesel::prelude::*; +use tracing::Instrument as _; + +use crate::{ + app_state::AppState, + channels::{Channel, EmailBackendConfig}, + email::MailSender, + messages::Message, + schema::{channels, messages}, +}; + +pub async fn run_worker(state: AppState) -> Result<()> { + async move { + process_messages(state).await?; + Ok(()) + } + .instrument(tracing::debug_span!("run_worker()")) + .await +} + +async fn process_messages(state: AppState) -> Result<()> { + async move { + const MESSAGE_QUEUE_LIMIT: i64 = 250; + let db_conn = state.db_pool.get().await?; + let queued_messages = db_conn + .interact::<_, Result>>(move |conn| { + messages::table + .inner_join(channels::table) + .select((Message::as_select(), Channel::as_select())) + .filter(Message::is_not_sent()) + .order(messages::created_at.asc()) + .limit(MESSAGE_QUEUE_LIMIT) + .load(conn) + .context("failed to load queued messages") + }) + .await + .unwrap()?; + // Dispatch email messages together to take advantage of Postmark's + // batch send API + let emails: Vec = queued_messages + .iter() + .filter_map(|(message, channel)| { + if let Ok(backend_config) = + TryInto::::try_into(channel.backend_config.clone()) + { + if backend_config.verified { + let recipient: lettre::message::Mailbox = if let Ok(recipient) = + backend_config.recipient.parse() + { + recipient + } else { + tracing::error!("failed to parse recipient for channel {}", channel.id); + return None; + }; + let email = crate::email::Message { + from: state.settings.email.message_from.clone().into(), + to: recipient.into(), + subject: "Shout".to_string(), + text_body: message.message.clone(), + }; + tracing::debug!("Sending email to recipient for channel {}", channel.id); + Some(email) + } else { + tracing::info!( + "Email recipient for channel {} is not verified", + channel.id + ); + None + } + } else { + None + } + }) + .collect(); + if !emails.is_empty() { + state.mailer.send_batch(emails).await?; + } + { + db_conn + .interact::<_, Result<_>>(move |conn| { + for (message, _) in queued_messages { + diesel::update(messages::table.filter(messages::id.eq(message.id))) + .set(messages::sent_at.eq(diesel::dsl::now)) + .execute(conn)?; + } + Ok(()) + }) + .await + .unwrap()?; + } + tracing::info!("finished processing messages"); + Ok(()) + } + .instrument(tracing::debug_span!("process_messages()")) + .await +}