use anyhow::{Context, Result}; use axum::extract::FromRef; use futures::Future; use lettre::{AsyncSmtpTransport, AsyncTransport, Tokio1Executor}; use serde::{Serialize, Serializer}; use crate::app_state::AppState; const POSTMARK_EMAIL_BATCH_URL: &str = "https://api.postmarkapp.com/email/batch"; #[derive(Clone, Serialize)] pub struct Message { #[serde(rename = "From")] pub from: lettre::message::Mailbox, #[serde(rename = "To")] #[serde(serialize_with = "serialize_mailboxes")] pub to: lettre::message::Mailboxes, #[serde(rename = "Subject")] pub subject: String, #[serde(rename = "TextBody")] pub text_body: String, } pub trait MailSender: Clone + Sync { /// Attempt to send all messages defined by the input Vec. Send as many as /// possible, returning exactly one Result<()> for each message. fn send_batch(&self, emails: Vec) -> impl Future>>; } #[derive(Clone, Debug)] pub enum Mailer { Smtp(SmtpSender), Postmark(PostmarkSender), } impl Mailer { pub fn new_smtp(opts: SmtpOptions) -> Result { Ok(Self::Smtp(SmtpSender::new(opts)?)) } pub fn new_postmark(server_token: String) -> Result { Ok(Self::Postmark(PostmarkSender::new(server_token)?)) } pub fn with_reqwest_client(self, client: reqwest::Client) -> Self { match self { Self::Postmark(sender) => Self::Postmark(sender.with_reqwest_client(client)), _ => self, } } } impl MailSender for Mailer { async fn send_batch(&self, emails: Vec) -> Vec> { match self { Mailer::Smtp(sender) => sender.send_batch(emails).await, Mailer::Postmark(sender) => sender.send_batch(emails).await, } } } #[derive(Clone, Debug)] pub struct SmtpSender { transport: AsyncSmtpTransport, } #[derive(Debug)] pub struct SmtpOptions { pub server: String, pub username: String, pub password: String, } impl SmtpSender { fn new(opts: SmtpOptions) -> Result { let mailer_creds = lettre::transport::smtp::authentication::Credentials::new(opts.username, opts.password); let transport = lettre::AsyncSmtpTransport::::starttls_relay(&opts.server) .context("unable to initialize starttls_relay")? .credentials(mailer_creds) .build(); Ok(Self { transport }) } } impl TryInto for Message { type Error = anyhow::Error; fn try_into(self) -> Result { let mut builder = lettre::Message::builder() .from(self.from.clone()) .subject(self.subject) .header(lettre::message::header::ContentType::TEXT_PLAIN); for to_addr in self.to { builder = builder.to(to_addr); } let message = builder.body(self.text_body)?; Ok(message) } } fn serialize_mailboxes(t: &lettre::message::Mailboxes, s: S) -> Result where S: Serializer, { s.serialize_str(&t.to_string()) } impl MailSender for SmtpSender { async fn send_batch(&self, emails: Vec) -> Vec> { let mut results: Vec> = Vec::with_capacity(emails.len()); for email in emails { match TryInto::::try_into(email) { Ok(email) => { results.push( self.transport .send(email) .await .map(|_| ()) .map_err(Into::::into), ); } Err(err) => { results.push(Err(err)); } } } results } } #[derive(Clone, Debug)] pub struct PostmarkSender { client: reqwest::Client, server_token: String, } impl PostmarkSender { fn new(server_token: String) -> Result { Ok(Self { client: reqwest::ClientBuilder::new().https_only(true).build()?, server_token, }) } pub fn with_reqwest_client(self, client: reqwest::Client) -> Self { Self { client, ..self } } } impl MailSender for PostmarkSender { /// Recursively attempts to send messages, breaking them into smaller and /// smaller batches as needed. async fn send_batch(&self, mut emails: Vec) -> Vec> { /// Constructs a Vec with Ok(()) repeated n times. macro_rules! all_ok { () => {{ let mut collection: Vec> = Vec::with_capacity(emails.len()); for _ in 0..emails.len() { collection.push(Ok(())); } collection }}; } /// Constructs a Vec with a single specific error, followed by n-1 /// generic errors referring back to it. macro_rules! cascade_err { ($err:expr) => {{ let mut collection: Vec> = Vec::with_capacity(emails.len()); collection.push(Err($err)); for _ in 1..emails.len() { collection.push(Err(anyhow::anyhow!("could not send due to previous error"))); } collection }}; } /// Recursively splits the email batch in half and tries to send each /// half independently, allowing both to run to completion and then /// returning the first error of the two results, if present. macro_rules! split_and_retry { // This is implemented as a macro in order to avoid unstable async // closures. () => { if emails.len() < 2 { tracing::warn!("Postmark send batch cannot be split any further"); vec![Err(anyhow::anyhow!( "unable to split Postmark batch into smaller slices" ))] } else { tracing::debug!("retrying Postmark send with smaller batches"); let mut results = Box::pin(self.send_batch(emails.split_off(emails.len() / 2))).await; results.extend(Box::pin(self.send_batch(emails)).await); results } }; } // https://postmarkapp.com/support/article/1056-what-are-the-attachment-and-email-size-limits const POSTMARK_MAX_BATCH_ENTRIES: usize = 500; const POSTMARK_MAX_REQUEST_BYTES: usize = 50 * 1000 * 1000; // TODO: Check email subject and body size against Postmark limits if emails.is_empty() { tracing::debug!("no Postmark messages to send"); vec![Ok(())] } else if emails.len() > POSTMARK_MAX_BATCH_ENTRIES { split_and_retry!() } else { let body = match serde_json::to_string(&emails) { Ok(body) => body, Err(err) => return cascade_err!(err.into()), }; if body.len() > POSTMARK_MAX_REQUEST_BYTES { if emails.len() > 1 { split_and_retry!() } else { vec![Err(anyhow::anyhow!( "Postmark requests may not exceed {} bytes", POSTMARK_MAX_REQUEST_BYTES ))] } } else { tracing::debug!("sending Postmark batch of {} messages", emails.len()); let resp = match self .client .post(POSTMARK_EMAIL_BATCH_URL) .header("X-Postmark-Server-Token", &self.server_token) .header(reqwest::header::CONTENT_TYPE, "application/json") .body(body) .send() .await { Ok(resp) => resp, Err(err) => return cascade_err!(err.into()), }; if resp.status().is_client_error() && emails.len() > 1 { split_and_retry!() } else if let Err(err) = resp.error_for_status() { cascade_err!(err.into()) } else { tracing::debug!("sent Postmark batch of {} messages", emails.len()); all_ok!() } } } } } impl FromRef for Mailer { fn from_ref(state: &AppState) -> Mailer { state.mailer.clone() } }