add background worker to send messages

This commit is contained in:
Brent Schroeter 2025-03-08 22:18:24 -08:00
parent 4a62d66400
commit ac056c0aa3
9 changed files with 408 additions and 74 deletions

109
Cargo.lock generated
View file

@ -74,6 +74,56 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "anstream"
version = "0.6.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
[[package]]
name = "anstyle-parse"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e"
dependencies = [
"anstyle",
"once_cell",
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.91" version = "1.0.91"
@ -516,6 +566,52 @@ dependencies = [
"stacker", "stacker",
] ]
[[package]]
name = "clap"
version = "4.5.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "027bb0d98429ae334a8698531da7077bdf906419543a35a55c2cb1b66437d767"
dependencies = [
"clap_builder",
"clap_derive",
]
[[package]]
name = "clap_builder"
version = "4.5.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5589e0cba072e0f3d23791efac0fd8627b49c829c196a492e88168e6a669d863"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim",
]
[[package]]
name = "clap_derive"
version = "4.5.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6"
[[package]]
name = "colorchoice"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]] [[package]]
name = "config" name = "config"
version = "0.14.1" version = "0.14.1"
@ -1568,6 +1664,12 @@ version = "2.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708"
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "1.0.11" version = "1.0.11"
@ -2495,6 +2597,7 @@ dependencies = [
"axum-extra", "axum-extra",
"base64 0.22.1", "base64 0.22.1",
"chrono", "chrono",
"clap",
"config", "config",
"console_error_panic_hook", "console_error_panic_hook",
"deadpool-diesel", "deadpool-diesel",
@ -3071,6 +3174,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.11.0" version = "1.11.0"

View file

@ -32,3 +32,4 @@ diesel = { version = "2.2.6", features = ["postgres", "chrono", "uuid", "serde_j
tower = "0.5.2" tower = "0.5.2"
regex = "1.11.1" regex = "1.11.1"
lettre = { version = "0.11.12", features = ["tokio1", "serde", "tracing", "tokio1-native-tls"] } lettre = { version = "0.11.12", features = ["tokio1", "serde", "tracing", "tokio1-native-tls"] }
clap = { version = "4.5.31", features = ["derive"] }

View file

@ -1,6 +1,7 @@
CREATE TABLE IF NOT EXISTS messages ( CREATE TABLE IF NOT EXISTS messages (
id UUID PRIMARY KEY NOT NULL, id UUID PRIMARY KEY NOT NULL,
channel_id UUID NOT NULL REFERENCES channels (id) ON DELETE RESTRICT, channel_id UUID NOT NULL REFERENCES channels (id) ON DELETE RESTRICT,
project_id UUID NOT NULL REFERENCES projects (id) ON DELETE RESTRICT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
sent_at TIMESTAMPTZ, sent_at TIMESTAMPTZ,
message TEXT NOT NULL message TEXT NOT NULL

View file

@ -7,7 +7,7 @@ use crate::app_state::AppState;
const POSTMARK_EMAIL_BATCH_URL: &'static str = "https://api.postmarkapp.com/email/batch"; const POSTMARK_EMAIL_BATCH_URL: &'static str = "https://api.postmarkapp.com/email/batch";
#[derive(Serialize)] #[derive(Clone, Serialize)]
pub struct Message { pub struct Message {
#[serde(rename = "From")] #[serde(rename = "From")]
pub from: lettre::message::Mailbox, pub from: lettre::message::Mailbox,
@ -21,6 +21,8 @@ pub struct Message {
} }
pub trait MailSender: Clone + Sync { pub trait MailSender: Clone + Sync {
// TODO: Document a consistent contract for how partial successes should
// behave
async fn send_batch(&self, emails: Vec<Message>) -> Result<(), anyhow::Error>; async fn send_batch(&self, emails: Vec<Message>) -> Result<(), anyhow::Error>;
} }
@ -132,19 +134,75 @@ impl PostmarkSender {
} }
impl MailSender for PostmarkSender { impl MailSender for PostmarkSender {
async fn send_batch(&self, emails: Vec<Message>) -> Result<()> { async fn send_batch(&self, mut emails: Vec<Message>) -> Result<()> {
if emails.len() > 500 { // https://postmarkapp.com/support/article/1056-what-are-the-attachment-and-email-size-limits
return Err(anyhow::anyhow!( const POSTMARK_MAX_BATCH_ENTRIES: usize = 500;
"Postmark sends no more than 500 messages per batch" const POSTMARK_MAX_REQUEST_BYTES: usize = 50 * 1000 * 1000;
)); // TODO: Check email subject and body size against Postmark limits
let count = emails.len();
// TODO: This may be more readable as a closure
/**
* Recursively splits the email batch in half and tries to send each
* half independently, allowing both to run to completion and then
* returning the first error of the two results, if present. Panics if
* the batch is too small to split, so the caller is responsible for
* handling that case before invoking the macro.
*/
macro_rules! split_and_retry {
() => {
tracing::debug!("retrying Postmark send with smaller batches");
assert!(count > 1);
let res1 = Box::pin(self.send_batch(emails.split_off(count / 2))).await;
let res2 = Box::pin(self.send_batch(emails)).await;
match (res1, res2) {
(Err(err), _) => {
return Err(err);
}
(_, Err(err)) => {
return Err(err);
}
_ => {}
}
};
}
if count == 0 {
tracing::debug!("no Postmark messages to send");
Ok(())
} else if count > POSTMARK_MAX_BATCH_ENTRIES {
split_and_retry!();
Ok(())
} else {
let body = serde_json::to_string(&emails)?;
if body.len() > POSTMARK_MAX_REQUEST_BYTES {
if count > 1 {
split_and_retry!();
Ok(())
} else {
Err(anyhow::anyhow!(
"Postmark requests may not exceed {} bytes",
POSTMARK_MAX_REQUEST_BYTES
))
}
} else {
tracing::debug!("sending Postmark batch of {} messages", count);
let resp = self
.client
.post(POSTMARK_EMAIL_BATCH_URL)
.header("X-Postmark-Server-Token", &self.server_token)
.header(reqwest::header::CONTENT_TYPE, "application/json")
.body(body)
.send()
.await?;
if resp.status().is_client_error() && count > 1 {
split_and_retry!();
}
resp.error_for_status()?;
tracing::debug!("sent Postmark batch of {} messages", count);
Ok(())
}
} }
self.client
.post(POSTMARK_EMAIL_BATCH_URL)
.header("X-Postmark-Server-Token", &self.server_token)
.json(&emails)
.send()
.await?;
Ok(())
} }
} }

View file

@ -18,16 +18,40 @@ mod team_memberships;
mod teams; mod teams;
mod users; mod users;
mod v0_router; mod v0_router;
mod worker;
use std::process::exit; use std::process::exit;
use chrono::{TimeDelta, Utc};
use clap::{Parser, Subcommand};
use email::SmtpOptions; use email::SmtpOptions;
use tokio::time::sleep;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use crate::{ use crate::{
app_state::AppState, email::Mailer, router::new_router, sessions::PgStore, settings::Settings, app_state::AppState, email::Mailer, router::new_router, sessions::PgStore, settings::Settings,
worker::run_worker,
}; };
#[derive(Parser)]
#[command(version, about, long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
/// Run web server
Serve,
/// Run background worker
Worker {
/// Loop the every n seconds instead of exiting after execution
#[arg(long)]
auto_loop_seconds: Option<u32>,
},
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let settings = Settings::load().unwrap(); let settings = Settings::load().unwrap();
@ -36,6 +60,8 @@ async fn main() {
.with_env_filter(EnvFilter::from_default_env()) .with_env_filter(EnvFilter::from_default_env())
.init(); .init();
let cli = Cli::parse();
let database_url = settings.database_url.clone(); let database_url = settings.database_url.clone();
let manager = let manager =
deadpool_diesel::postgres::Manager::new(database_url, deadpool_diesel::Runtime::Tokio1); deadpool_diesel::postgres::Manager::new(database_url, deadpool_diesel::Runtime::Tokio1);
@ -66,7 +92,7 @@ async fn main() {
}; };
let app_state = AppState { let app_state = AppState {
db_pool, db_pool: db_pool.clone(),
mailer, mailer,
oauth_client, oauth_client,
reqwest_client, reqwest_client,
@ -74,16 +100,43 @@ async fn main() {
settings: settings.clone(), settings: settings.clone(),
}; };
let router = new_router(app_state); match &cli.command {
Commands::Serve => {
let router = new_router(app_state);
let listener = tokio::net::TcpListener::bind((settings.host.clone(), settings.port.clone())) let listener =
.await tokio::net::TcpListener::bind((settings.host.clone(), settings.port.clone()))
.unwrap(); .await
tracing::info!( .unwrap();
"App running at http://{}:{}{}", tracing::info!(
settings.host, "App running at http://{}:{}{}",
settings.port, settings.host,
settings.base_path settings.port,
); settings.base_path
axum::serve(listener, router).await.unwrap(); );
axum::serve(listener, router).await.unwrap();
}
Commands::Worker { auto_loop_seconds } => {
if let Some(loop_seconds) = auto_loop_seconds {
let loop_delta = TimeDelta::seconds(i64::from(*loop_seconds));
loop {
let t_next_loop = Utc::now() + loop_delta;
if let Err(err) = run_worker(app_state.clone()).await {
tracing::error!("{}", err)
}
let sleep_delta = t_next_loop - Utc::now();
match sleep_delta.to_std() {
Ok(duration) => {
sleep(duration).await;
}
Err(_) => { /* sleep_delta was < 0, so don't sleep */ }
}
}
} else {
run_worker(app_state).await.unwrap();
}
}
}
} }

View file

@ -13,8 +13,10 @@ use crate::{channels::Channel, schema::messages};
#[diesel(belongs_to(Channel))] #[diesel(belongs_to(Channel))]
pub struct Message { pub struct Message {
pub id: Uuid, pub id: Uuid,
pub project_id: Uuid,
pub channel_id: Uuid, pub channel_id: Uuid,
pub created_at: DateTime<Utc>, pub created_at: DateTime<Utc>,
pub sent_at: Option<DateTime<Utc>>,
pub message: String, pub message: String,
} }
@ -29,4 +31,9 @@ impl Message {
pub fn with_channel(channel_id: Uuid) -> _ { pub fn with_channel(channel_id: Uuid) -> _ {
messages::channel_id.eq(channel_id) messages::channel_id.eq(channel_id)
} }
#[auto_type(no_type_alias)]
pub fn is_not_sent() -> _ {
messages::sent_at.is_null()
}
} }

View file

@ -47,6 +47,7 @@ diesel::table! {
messages (id) { messages (id) {
id -> Uuid, id -> Uuid,
channel_id -> Uuid, channel_id -> Uuid,
project_id -> Uuid,
created_at -> Timestamptz, created_at -> Timestamptz,
sent_at -> Nullable<Timestamptz>, sent_at -> Nullable<Timestamptz>,
message -> Text, message -> Text,
@ -89,6 +90,7 @@ diesel::joinable!(channel_selections -> projects (project_id));
diesel::joinable!(channels -> teams (team_id)); diesel::joinable!(channels -> teams (team_id));
diesel::joinable!(csrf_tokens -> users (user_id)); diesel::joinable!(csrf_tokens -> users (user_id));
diesel::joinable!(messages -> channels (channel_id)); diesel::joinable!(messages -> channels (channel_id));
diesel::joinable!(messages -> projects (project_id));
diesel::joinable!(projects -> teams (team_id)); diesel::joinable!(projects -> teams (team_id));
diesel::joinable!(team_memberships -> teams (team_id)); diesel::joinable!(team_memberships -> teams (team_id));
diesel::joinable!(team_memberships -> users (user_id)); diesel::joinable!(team_memberships -> users (user_id));

View file

@ -1,6 +1,6 @@
use anyhow::Context; use anyhow::Context;
use axum::{ use axum::{
extract::{Query, State}, extract::Query,
response::{IntoResponse, Json}, response::{IntoResponse, Json},
routing::get, routing::get,
Router, Router,
@ -14,11 +14,9 @@ use crate::{
api_keys::ApiKey, api_keys::ApiKey,
app_error::AppError, app_error::AppError,
app_state::{AppState, DbConn}, app_state::{AppState, DbConn},
channels::{Channel, EmailBackendConfig}, channels::Channel,
email::{MailSender as _, Mailer},
projects::Project, projects::Project,
schema::{api_keys, projects}, schema::{api_keys, messages, projects},
settings::Settings,
}; };
pub fn new_router(state: AppState) -> Router<AppState> { pub fn new_router(state: AppState) -> Router<AppState> {
@ -33,11 +31,6 @@ struct SayQuery {
} }
async fn say_get( async fn say_get(
State(Settings {
email: email_settings,
..
}): State<Settings>,
State(mailer): State<Mailer>,
DbConn(db_conn): DbConn, DbConn(db_conn): DbConn,
Query(query): Query<SayQuery>, Query(query): Query<SayQuery>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<impl IntoResponse, AppError> {
@ -58,55 +51,68 @@ async fn say_get(
.unwrap()? .unwrap()?
}; };
let selected_channels = { let project = {
let project_name = query.project.to_lowercase(); let project_name = query.project.to_lowercase();
db_conn db_conn
.interact::<_, Result<Vec<Channel>, AppError>>(move |conn| { .interact::<_, Result<Project, AppError>>(move |conn| {
conn.transaction(move |conn| { conn.transaction(move |conn| {
let project = match Project::all() Ok(
.filter(Project::with_team(api_key.team_id)) match Project::all()
.filter(Project::with_name(project_name.clone())) .filter(Project::with_team(api_key.team_id))
.first(conn) .filter(Project::with_name(project_name.clone()))
.optional() .first(conn)
.context("failed to load project")? .optional()
{ .context("failed to load project")?
Some(project) => project, {
None => insert_into(projects::table) Some(project) => project,
.values(( None => insert_into(projects::table)
projects::id.eq(Uuid::now_v7()), .values((
projects::team_id.eq(api_key.team_id), projects::id.eq(Uuid::now_v7()),
projects::name.eq(project_name), projects::team_id.eq(api_key.team_id),
)) projects::name.eq(project_name),
.get_result(conn) ))
.context("failed to insert project")?, .get_result(conn)
}; .context("failed to insert project")?,
Ok(project },
.selected_channels() )
.load(conn)
.context("failed to load selected channels")?)
}) })
}) })
.await .await
.unwrap()? .unwrap()?
}; };
let selected_channels = {
let project = project.clone();
db_conn
.interact::<_, Result<Vec<Channel>, AppError>>(move |conn| {
Ok(project
.selected_channels()
.load(conn)
.context("failed to load selected channels")?)
})
.await
.unwrap()?
};
for channel in selected_channels { {
if let Ok(config) = TryInto::<EmailBackendConfig>::try_into(channel.backend_config) { let selected_channels = selected_channels.clone();
if config.verified { db_conn
let recipient: lettre::message::Mailbox = config.recipient.parse()?; .interact::<_, Result<_, AppError>>(move |conn| {
let email = crate::email::Message { for channel in selected_channels {
from: email_settings.message_from.clone().into(), insert_into(messages::table)
to: recipient.into(), .values((
subject: "Shout".to_string(), messages::id.eq(Uuid::now_v7()),
text_body: query.message.clone(), messages::channel_id.eq(&channel.id),
}; messages::project_id.eq(&project.id),
tracing::info!("Sending email to recipient for channel {}", channel.id); messages::message.eq(&query.message),
mailer.send_batch(vec![email]).await?; ))
} else { .execute(conn)?;
tracing::info!("Email recipient for channel {} is not verified", channel.id); }
} Ok(())
} })
.await
.unwrap()?;
} }
tracing::debug!("queued {} messages", selected_channels.len());
Ok(Json(json!({ "ok": true }))) Ok(Json(json!({ "ok": true })))
} }

97
src/worker.rs Normal file
View file

@ -0,0 +1,97 @@
use anyhow::{Context as _, Result};
use diesel::prelude::*;
use tracing::Instrument as _;
use crate::{
app_state::AppState,
channels::{Channel, EmailBackendConfig},
email::MailSender,
messages::Message,
schema::{channels, messages},
};
pub async fn run_worker(state: AppState) -> Result<()> {
async move {
process_messages(state).await?;
Ok(())
}
.instrument(tracing::debug_span!("run_worker()"))
.await
}
async fn process_messages(state: AppState) -> Result<()> {
async move {
const MESSAGE_QUEUE_LIMIT: i64 = 250;
let db_conn = state.db_pool.get().await?;
let queued_messages = db_conn
.interact::<_, Result<Vec<(Message, Channel)>>>(move |conn| {
messages::table
.inner_join(channels::table)
.select((Message::as_select(), Channel::as_select()))
.filter(Message::is_not_sent())
.order(messages::created_at.asc())
.limit(MESSAGE_QUEUE_LIMIT)
.load(conn)
.context("failed to load queued messages")
})
.await
.unwrap()?;
// Dispatch email messages together to take advantage of Postmark's
// batch send API
let emails: Vec<crate::email::Message> = queued_messages
.iter()
.filter_map(|(message, channel)| {
if let Ok(backend_config) =
TryInto::<EmailBackendConfig>::try_into(channel.backend_config.clone())
{
if backend_config.verified {
let recipient: lettre::message::Mailbox = if let Ok(recipient) =
backend_config.recipient.parse()
{
recipient
} else {
tracing::error!("failed to parse recipient for channel {}", channel.id);
return None;
};
let email = crate::email::Message {
from: state.settings.email.message_from.clone().into(),
to: recipient.into(),
subject: "Shout".to_string(),
text_body: message.message.clone(),
};
tracing::debug!("Sending email to recipient for channel {}", channel.id);
Some(email)
} else {
tracing::info!(
"Email recipient for channel {} is not verified",
channel.id
);
None
}
} else {
None
}
})
.collect();
if !emails.is_empty() {
state.mailer.send_batch(emails).await?;
}
{
db_conn
.interact::<_, Result<_>>(move |conn| {
for (message, _) in queued_messages {
diesel::update(messages::table.filter(messages::id.eq(message.id)))
.set(messages::sent_at.eq(diesel::dsl::now))
.execute(conn)?;
}
Ok(())
})
.await
.unwrap()?;
}
tracing::info!("finished processing messages");
Ok(())
}
.instrument(tracing::debug_span!("process_messages()"))
.await
}