1
0
Fork 0
forked from 2sys/shoutdotdev

make Mailer::send_batch() return a result for each message

This commit is contained in:
Brent Schroeter 2025-03-08 23:24:02 -08:00
parent ac056c0aa3
commit e7d4eaaf81
3 changed files with 127 additions and 58 deletions

View file

@ -21,9 +21,11 @@ pub struct Message {
}
pub trait MailSender: Clone + Sync {
// TODO: Document a consistent contract for how partial successes should
// behave
async fn send_batch(&self, emails: Vec<Message>) -> Result<(), anyhow::Error>;
/**
* Attempt to send all messages defined by the input Vec. Send as many as
* possible, returning exactly one Result<()> for each message.
*/
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>>;
}
#[derive(Clone, Debug)]
@ -50,7 +52,7 @@ impl Mailer {
}
impl MailSender for Mailer {
async fn send_batch(&self, emails: Vec<Message>) -> Result<(), anyhow::Error> {
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>> {
match self {
Mailer::Smtp(sender) => sender.send_batch(emails).await,
Mailer::Postmark(sender) => sender.send_batch(emails).await,
@ -106,11 +108,25 @@ where
}
impl MailSender for SmtpSender {
async fn send_batch(&self, emails: Vec<Message>) -> Result<()> {
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>> {
let mut results: Vec<Result<()>> = Vec::with_capacity(emails.len());
for email in emails {
self.transport.send(email.try_into()?).await?;
match TryInto::<lettre::Message>::try_into(email) {
Ok(email) => {
results.push(
self.transport
.send(email)
.await
.map(|_| ())
.map_err(Into::<anyhow::Error>::into),
);
}
Err(err) => {
results.push(Err(err));
}
}
}
Ok(())
results
}
}
@ -134,73 +150,112 @@ impl PostmarkSender {
}
impl MailSender for PostmarkSender {
async fn send_batch(&self, mut emails: Vec<Message>) -> Result<()> {
// https://postmarkapp.com/support/article/1056-what-are-the-attachment-and-email-size-limits
const POSTMARK_MAX_BATCH_ENTRIES: usize = 500;
const POSTMARK_MAX_REQUEST_BYTES: usize = 50 * 1000 * 1000;
// TODO: Check email subject and body size against Postmark limits
let count = emails.len();
/**
* Recursively attempts to send messages, breaking them into smaller and
* smaller batches as needed.
*/
async fn send_batch(&self, mut emails: Vec<Message>) -> Vec<Result<()>> {
/**
* Constructs a Vec with Ok(()) repeated n times.
*/
macro_rules! all_ok {
() => {{
let mut collection: Vec<Result<_>> = Vec::with_capacity(emails.len());
for _ in 0..emails.len() {
collection.push(Ok(()));
}
collection
}};
}
/**
* Constructs a Vec with a single specific error, followed by n-1
* generic errors referring back to it.
*/
macro_rules! cascade_err {
($err:expr) => {{
let mut collection: Vec<Result<_>> = Vec::with_capacity(emails.len());
collection.push(Err($err));
for _ in 1..emails.len() {
collection.push(Err(anyhow::anyhow!("could not send due to previous error")));
}
collection
}};
}
// TODO: This may be more readable as a closure
/**
* Recursively splits the email batch in half and tries to send each
* half independently, allowing both to run to completion and then
* returning the first error of the two results, if present. Panics if
* the batch is too small to split, so the caller is responsible for
* handling that case before invoking the macro.
* returning the first error of the two results, if present.
*
* This is implemented as a macro in order to avoid unstable async
* closures.
*/
macro_rules! split_and_retry {
() => {
tracing::debug!("retrying Postmark send with smaller batches");
assert!(count > 1);
let res1 = Box::pin(self.send_batch(emails.split_off(count / 2))).await;
let res2 = Box::pin(self.send_batch(emails)).await;
match (res1, res2) {
(Err(err), _) => {
return Err(err);
}
(_, Err(err)) => {
return Err(err);
}
_ => {}
if emails.len() < 2 {
tracing::warn!("Postmark send batch cannot be split any further");
vec![Err(anyhow::anyhow!(
"unable to split Postmark batch into smaller slices"
))]
} else {
tracing::debug!("retrying Postmark send with smaller batches");
let mut results =
Box::pin(self.send_batch(emails.split_off(emails.len() / 2))).await;
results.extend(Box::pin(self.send_batch(emails)).await);
results
}
};
}
if count == 0 {
// https://postmarkapp.com/support/article/1056-what-are-the-attachment-and-email-size-limits
const POSTMARK_MAX_BATCH_ENTRIES: usize = 500;
const POSTMARK_MAX_REQUEST_BYTES: usize = 50 * 1000 * 1000;
// TODO: Check email subject and body size against Postmark limits
if emails.len() == 0 {
tracing::debug!("no Postmark messages to send");
Ok(())
} else if count > POSTMARK_MAX_BATCH_ENTRIES {
split_and_retry!();
Ok(())
vec![Ok(())]
} else if emails.len() > POSTMARK_MAX_BATCH_ENTRIES {
split_and_retry!()
} else {
let body = serde_json::to_string(&emails)?;
let body = match serde_json::to_string(&emails) {
Ok(body) => body,
Err(err) => return cascade_err!(err.into()),
};
if body.len() > POSTMARK_MAX_REQUEST_BYTES {
if count > 1 {
split_and_retry!();
Ok(())
if emails.len() > 1 {
split_and_retry!()
} else {
Err(anyhow::anyhow!(
vec![Err(anyhow::anyhow!(
"Postmark requests may not exceed {} bytes",
POSTMARK_MAX_REQUEST_BYTES
))
))]
}
} else {
tracing::debug!("sending Postmark batch of {} messages", count);
let resp = self
tracing::debug!("sending Postmark batch of {} messages", emails.len());
let resp = match self
.client
.post(POSTMARK_EMAIL_BATCH_URL)
.header("X-Postmark-Server-Token", &self.server_token)
.header(reqwest::header::CONTENT_TYPE, "application/json")
.body(body)
.send()
.await?;
if resp.status().is_client_error() && count > 1 {
split_and_retry!();
.await
{
Ok(resp) => resp,
Err(err) => return cascade_err!(err.into()),
};
if resp.status().is_client_error() && emails.len() > 1 {
split_and_retry!()
} else {
if let Err(err) = resp.error_for_status() {
cascade_err!(err.into())
} else {
tracing::debug!("sent Postmark batch of {} messages", emails.len());
all_ok!()
}
}
resp.error_for_status()?;
tracing::debug!("sent Postmark batch of {} messages", count);
Ok(())
}
}
}

View file

@ -593,7 +593,7 @@ async fn update_channel_email_recipient(
subject: "Verify Your Email".to_string(),
text_body: format!("Your email verification code is: {}", verification_code),
};
mailer.send_batch(vec![email]).await?;
mailer.send_batch(vec![email]).await.remove(0)?;
Ok(Redirect::to(&format!(
"{}/teams/{}/channels/{}",

View file

@ -1,6 +1,7 @@
use anyhow::{Context as _, Result};
use diesel::prelude::*;
use tracing::Instrument as _;
use uuid::Uuid;
use crate::{
app_state::AppState,
@ -19,6 +20,11 @@ pub async fn run_worker(state: AppState) -> Result<()> {
.await
}
/**
* Process messages from the queue in the `messages` table. Insertions to the
* queue are rate limited per team and per project, so no effort should be
* needed here to enforce fairness.
*/
async fn process_messages(state: AppState) -> Result<()> {
async move {
const MESSAGE_QUEUE_LIMIT: i64 = 250;
@ -38,7 +44,7 @@ async fn process_messages(state: AppState) -> Result<()> {
.unwrap()?;
// Dispatch email messages together to take advantage of Postmark's
// batch send API
let emails: Vec<crate::email::Message> = queued_messages
let emails: Vec<(Uuid, crate::email::Message)> = queued_messages
.iter()
.filter_map(|(message, channel)| {
if let Ok(backend_config) =
@ -60,7 +66,7 @@ async fn process_messages(state: AppState) -> Result<()> {
text_body: message.message.clone(),
};
tracing::debug!("Sending email to recipient for channel {}", channel.id);
Some(email)
Some((message.id.clone(), email))
} else {
tracing::info!(
"Email recipient for channel {} is not verified",
@ -74,15 +80,23 @@ async fn process_messages(state: AppState) -> Result<()> {
})
.collect();
if !emails.is_empty() {
state.mailer.send_batch(emails).await?;
}
{
let message_ids: Vec<Uuid> = emails.iter().map(|(id, _)| id.clone()).collect();
let results = state
.mailer
.send_batch(emails.into_iter().map(|(_, email)| email).collect())
.await;
assert!(results.len() == message_ids.len());
let results_by_id = message_ids.into_iter().zip(results.into_iter());
db_conn
.interact::<_, Result<_>>(move |conn| {
for (message, _) in queued_messages {
diesel::update(messages::table.filter(messages::id.eq(message.id)))
.set(messages::sent_at.eq(diesel::dsl::now))
.execute(conn)?;
for (id, result) in results_by_id {
if let Err(err) = result {
tracing::error!("error sending message {}: {:?}", id, err);
} else {
diesel::update(messages::table.filter(messages::id.eq(id)))
.set(messages::sent_at.eq(diesel::dsl::now))
.execute(conn)?;
}
}
Ok(())
})