diff --git a/src/email.rs b/src/email.rs index aa330e6..870ec61 100644 --- a/src/email.rs +++ b/src/email.rs @@ -21,9 +21,11 @@ pub struct Message { } pub trait MailSender: Clone + Sync { - // TODO: Document a consistent contract for how partial successes should - // behave - async fn send_batch(&self, emails: Vec) -> Result<(), anyhow::Error>; + /** + * Attempt to send all messages defined by the input Vec. Send as many as + * possible, returning exactly one Result<()> for each message. + */ + async fn send_batch(&self, emails: Vec) -> Vec>; } #[derive(Clone, Debug)] @@ -50,7 +52,7 @@ impl Mailer { } impl MailSender for Mailer { - async fn send_batch(&self, emails: Vec) -> Result<(), anyhow::Error> { + async fn send_batch(&self, emails: Vec) -> Vec> { match self { Mailer::Smtp(sender) => sender.send_batch(emails).await, Mailer::Postmark(sender) => sender.send_batch(emails).await, @@ -106,11 +108,25 @@ where } impl MailSender for SmtpSender { - async fn send_batch(&self, emails: Vec) -> Result<()> { + async fn send_batch(&self, emails: Vec) -> Vec> { + let mut results: Vec> = Vec::with_capacity(emails.len()); for email in emails { - self.transport.send(email.try_into()?).await?; + match TryInto::::try_into(email) { + Ok(email) => { + results.push( + self.transport + .send(email) + .await + .map(|_| ()) + .map_err(Into::::into), + ); + } + Err(err) => { + results.push(Err(err)); + } + } } - Ok(()) + results } } @@ -134,73 +150,112 @@ impl PostmarkSender { } impl MailSender for PostmarkSender { - async fn send_batch(&self, mut emails: Vec) -> Result<()> { - // https://postmarkapp.com/support/article/1056-what-are-the-attachment-and-email-size-limits - const POSTMARK_MAX_BATCH_ENTRIES: usize = 500; - const POSTMARK_MAX_REQUEST_BYTES: usize = 50 * 1000 * 1000; - // TODO: Check email subject and body size against Postmark limits - let count = emails.len(); + /** + * Recursively attempts to send messages, breaking them into smaller and + * smaller batches as needed. + */ + async fn send_batch(&self, mut emails: Vec) -> Vec> { + /** + * Constructs a Vec with Ok(()) repeated n times. + */ + macro_rules! all_ok { + () => {{ + let mut collection: Vec> = Vec::with_capacity(emails.len()); + for _ in 0..emails.len() { + collection.push(Ok(())); + } + collection + }}; + } + + /** + * Constructs a Vec with a single specific error, followed by n-1 + * generic errors referring back to it. + */ + macro_rules! cascade_err { + ($err:expr) => {{ + let mut collection: Vec> = Vec::with_capacity(emails.len()); + collection.push(Err($err)); + for _ in 1..emails.len() { + collection.push(Err(anyhow::anyhow!("could not send due to previous error"))); + } + collection + }}; + } - // TODO: This may be more readable as a closure /** * Recursively splits the email batch in half and tries to send each * half independently, allowing both to run to completion and then - * returning the first error of the two results, if present. Panics if - * the batch is too small to split, so the caller is responsible for - * handling that case before invoking the macro. + * returning the first error of the two results, if present. + * + * This is implemented as a macro in order to avoid unstable async + * closures. */ macro_rules! split_and_retry { () => { - tracing::debug!("retrying Postmark send with smaller batches"); - assert!(count > 1); - let res1 = Box::pin(self.send_batch(emails.split_off(count / 2))).await; - let res2 = Box::pin(self.send_batch(emails)).await; - match (res1, res2) { - (Err(err), _) => { - return Err(err); - } - (_, Err(err)) => { - return Err(err); - } - _ => {} + if emails.len() < 2 { + tracing::warn!("Postmark send batch cannot be split any further"); + vec![Err(anyhow::anyhow!( + "unable to split Postmark batch into smaller slices" + ))] + } else { + tracing::debug!("retrying Postmark send with smaller batches"); + let mut results = + Box::pin(self.send_batch(emails.split_off(emails.len() / 2))).await; + results.extend(Box::pin(self.send_batch(emails)).await); + results } }; } - if count == 0 { + // https://postmarkapp.com/support/article/1056-what-are-the-attachment-and-email-size-limits + const POSTMARK_MAX_BATCH_ENTRIES: usize = 500; + const POSTMARK_MAX_REQUEST_BYTES: usize = 50 * 1000 * 1000; + // TODO: Check email subject and body size against Postmark limits + + if emails.len() == 0 { tracing::debug!("no Postmark messages to send"); - Ok(()) - } else if count > POSTMARK_MAX_BATCH_ENTRIES { - split_and_retry!(); - Ok(()) + vec![Ok(())] + } else if emails.len() > POSTMARK_MAX_BATCH_ENTRIES { + split_and_retry!() } else { - let body = serde_json::to_string(&emails)?; + let body = match serde_json::to_string(&emails) { + Ok(body) => body, + Err(err) => return cascade_err!(err.into()), + }; if body.len() > POSTMARK_MAX_REQUEST_BYTES { - if count > 1 { - split_and_retry!(); - Ok(()) + if emails.len() > 1 { + split_and_retry!() } else { - Err(anyhow::anyhow!( + vec![Err(anyhow::anyhow!( "Postmark requests may not exceed {} bytes", POSTMARK_MAX_REQUEST_BYTES - )) + ))] } } else { - tracing::debug!("sending Postmark batch of {} messages", count); - let resp = self + tracing::debug!("sending Postmark batch of {} messages", emails.len()); + let resp = match self .client .post(POSTMARK_EMAIL_BATCH_URL) .header("X-Postmark-Server-Token", &self.server_token) .header(reqwest::header::CONTENT_TYPE, "application/json") .body(body) .send() - .await?; - if resp.status().is_client_error() && count > 1 { - split_and_retry!(); + .await + { + Ok(resp) => resp, + Err(err) => return cascade_err!(err.into()), + }; + if resp.status().is_client_error() && emails.len() > 1 { + split_and_retry!() + } else { + if let Err(err) = resp.error_for_status() { + cascade_err!(err.into()) + } else { + tracing::debug!("sent Postmark batch of {} messages", emails.len()); + all_ok!() + } } - resp.error_for_status()?; - tracing::debug!("sent Postmark batch of {} messages", count); - Ok(()) } } } diff --git a/src/router.rs b/src/router.rs index 0c7d8b2..e2668b9 100644 --- a/src/router.rs +++ b/src/router.rs @@ -593,7 +593,7 @@ async fn update_channel_email_recipient( subject: "Verify Your Email".to_string(), text_body: format!("Your email verification code is: {}", verification_code), }; - mailer.send_batch(vec![email]).await?; + mailer.send_batch(vec![email]).await.remove(0)?; Ok(Redirect::to(&format!( "{}/teams/{}/channels/{}", diff --git a/src/worker.rs b/src/worker.rs index c86932b..4c15b1a 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,6 +1,7 @@ use anyhow::{Context as _, Result}; use diesel::prelude::*; use tracing::Instrument as _; +use uuid::Uuid; use crate::{ app_state::AppState, @@ -19,6 +20,11 @@ pub async fn run_worker(state: AppState) -> Result<()> { .await } +/** + * Process messages from the queue in the `messages` table. Insertions to the + * queue are rate limited per team and per project, so no effort should be + * needed here to enforce fairness. + */ async fn process_messages(state: AppState) -> Result<()> { async move { const MESSAGE_QUEUE_LIMIT: i64 = 250; @@ -38,7 +44,7 @@ async fn process_messages(state: AppState) -> Result<()> { .unwrap()?; // Dispatch email messages together to take advantage of Postmark's // batch send API - let emails: Vec = queued_messages + let emails: Vec<(Uuid, crate::email::Message)> = queued_messages .iter() .filter_map(|(message, channel)| { if let Ok(backend_config) = @@ -60,7 +66,7 @@ async fn process_messages(state: AppState) -> Result<()> { text_body: message.message.clone(), }; tracing::debug!("Sending email to recipient for channel {}", channel.id); - Some(email) + Some((message.id.clone(), email)) } else { tracing::info!( "Email recipient for channel {} is not verified", @@ -74,15 +80,23 @@ async fn process_messages(state: AppState) -> Result<()> { }) .collect(); if !emails.is_empty() { - state.mailer.send_batch(emails).await?; - } - { + let message_ids: Vec = emails.iter().map(|(id, _)| id.clone()).collect(); + let results = state + .mailer + .send_batch(emails.into_iter().map(|(_, email)| email).collect()) + .await; + assert!(results.len() == message_ids.len()); + let results_by_id = message_ids.into_iter().zip(results.into_iter()); db_conn .interact::<_, Result<_>>(move |conn| { - for (message, _) in queued_messages { - diesel::update(messages::table.filter(messages::id.eq(message.id))) - .set(messages::sent_at.eq(diesel::dsl::now)) - .execute(conn)?; + for (id, result) in results_by_id { + if let Err(err) = result { + tracing::error!("error sending message {}: {:?}", id, err); + } else { + diesel::update(messages::table.filter(messages::id.eq(id))) + .set(messages::sent_at.eq(diesel::dsl::now)) + .execute(conn)?; + } } Ok(()) })