Compare commits

...

6 commits

15 changed files with 623 additions and 111 deletions

109
Cargo.lock generated
View file

@ -74,6 +74,56 @@ dependencies = [
"libc",
]
[[package]]
name = "anstream"
version = "0.6.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
[[package]]
name = "anstyle-parse"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e"
dependencies = [
"anstyle",
"once_cell",
"windows-sys 0.59.0",
]
[[package]]
name = "anyhow"
version = "1.0.91"
@ -516,6 +566,52 @@ dependencies = [
"stacker",
]
[[package]]
name = "clap"
version = "4.5.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "027bb0d98429ae334a8698531da7077bdf906419543a35a55c2cb1b66437d767"
dependencies = [
"clap_builder",
"clap_derive",
]
[[package]]
name = "clap_builder"
version = "4.5.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5589e0cba072e0f3d23791efac0fd8627b49c829c196a492e88168e6a669d863"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim",
]
[[package]]
name = "clap_derive"
version = "4.5.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6"
[[package]]
name = "colorchoice"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]]
name = "config"
version = "0.14.1"
@ -1568,6 +1664,12 @@ version = "2.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708"
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "itoa"
version = "1.0.11"
@ -2495,6 +2597,7 @@ dependencies = [
"axum-extra",
"base64 0.22.1",
"chrono",
"clap",
"config",
"console_error_panic_hook",
"deadpool-diesel",
@ -3071,6 +3174,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.11.0"

View file

@ -32,3 +32,4 @@ diesel = { version = "2.2.6", features = ["postgres", "chrono", "uuid", "serde_j
tower = "0.5.2"
regex = "1.11.1"
lettre = { version = "0.11.12", features = ["tokio1", "serde", "tracing", "tokio1-native-tls"] }
clap = { version = "4.5.31", features = ["derive"] }

View file

@ -1,7 +1,114 @@
[jobs.webserver]
command = ["cargo", "run"]
# This is a configuration file for the bacon tool
#
# Complete help on configuration: https://dystroy.org/bacon/config/
#
# You may check the current default at
# https://github.com/Canop/bacon/blob/main/defaults/default-bacon.toml
default_job = "check"
env.CARGO_TERM_COLOR = "always"
[jobs.check]
command = ["cargo", "check"]
need_stdout = false
[jobs.check-all]
command = ["cargo", "check", "--all-targets"]
need_stdout = false
# Run clippy on the default target
[jobs.clippy]
command = ["cargo", "clippy"]
need_stdout = false
# Run clippy on all targets
# To disable some lints, you may change the job this way:
# [jobs.clippy-all]
# command = [
# "cargo", "clippy",
# "--all-targets",
# "--",
# "-A", "clippy::bool_to_int_with_if",
# "-A", "clippy::collapsible_if",
# "-A", "clippy::derive_partial_eq_without_eq",
# ]
# need_stdout = false
[jobs.clippy-all]
command = ["cargo", "clippy", "--all-targets"]
need_stdout = false
# This job lets you run
# - all tests: bacon test
# - a specific test: bacon test -- config::test_default_files
# - the tests of a package: bacon test -- -- -p config
[jobs.test]
command = ["cargo", "test"]
need_stdout = true
[jobs.nextest]
command = [
"cargo", "nextest", "run",
"--hide-progress-bar", "--failure-output", "final"
]
need_stdout = true
analyzer = "nextest"
[jobs.doc]
command = ["cargo", "doc", "--no-deps"]
need_stdout = false
# If the doc compiles, then it opens in your browser and bacon switches
# to the previous job
[jobs.doc-open]
command = ["cargo", "doc", "--no-deps", "--open"]
need_stdout = false
on_success = "back" # so that we don't open the browser at each change
# You can run your application and have the result displayed in bacon,
# if it makes sense for this crate.
[jobs.run-worker]
command = [
"cargo", "run", "worker",
# put launch parameters for your program behind a `--` separator
]
need_stdout = true
allow_warnings = true
background = true
default_watch = false
# Run your long-running application (eg server) and have the result displayed in bacon.
# For programs that never stop (eg a server), `background` is set to false
# to have the cargo run output immediately displayed instead of waiting for
# program's end.
# 'on_change_strategy' is set to `kill_then_restart` to have your program restart
# on every change (an alternative would be to use the 'F5' key manually in bacon).
# If you often use this job, it makes sense to override the 'r' key by adding
# a binding `r = job:run-long` at the end of this file .
[jobs.run-server]
command = [
"cargo", "run", "serve"
# put launch parameters for your program behind a `--` separator
]
need_stdout = true
allow_warnings = true
background = false
on_change_strategy = "kill_then_restart"
kill = ["kill", "-s", "INT"]
watch =["src", "templates"]
watch = ["src", "templates"]
# This parameterized job runs the example of your choice, as soon
# as the code compiles.
# Call it as
# bacon ex -- my-example
[jobs.ex]
command = ["cargo", "run", "--example"]
need_stdout = true
allow_warnings = true
# You may define here keybindings that would be specific to
# a project, for example a shortcut to launch a specific job.
# Shortcuts to internal functions (scrolling, toggling, etc.)
# should go in your personal global prefs.toml file instead.
[keybindings]
r = "job:run-server"
w = "job:run-worker"

View file

@ -7,7 +7,7 @@ CREATE INDEX ON users (uid);
CREATE TABLE IF NOT EXISTS csrf_tokens (
id UUID NOT NULL PRIMARY KEY,
user_id UUID REFERENCES users(id),
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
created_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX ON csrf_tokens (created_at);
@ -18,9 +18,8 @@ CREATE TABLE teams (
);
CREATE TABLE team_memberships (
team_id UUID NOT NULL REFERENCES teams(id),
user_id UUID NOT NULL REFERENCES users(id),
roles TEXT[] NOT NULL DEFAULT '{}',
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
PRIMARY KEY (team_id, user_id)
);
CREATE INDEX ON team_memberships (team_id);
@ -28,7 +27,7 @@ CREATE INDEX ON team_memberships (user_id);
CREATE TABLE api_keys (
id UUID NOT NULL PRIMARY KEY,
team_id UUID NOT NULL REFERENCES teams(id),
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_used_at TIMESTAMPTZ
);
@ -44,15 +43,15 @@ CREATE INDEX ON projects(team_id);
CREATE TABLE IF NOT EXISTS channels (
id UUID NOT NULL PRIMARY KEY,
team_id UUID NOT NULL REFERENCES teams(id),
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
name TEXT NOT NULL,
enable_by_default BOOLEAN NOT NULL DEFAULT FALSE,
backend_config JSONB NOT NULL DEFAULT '{}'::JSONB
backend_config JSONB NOT NULL
);
CREATE TABLE IF NOT EXISTS channel_selections (
project_id UUID NOT NULL REFERENCES projects(id),
channel_id UUID NOT NULL REFERENCES channels(id),
project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
channel_id UUID NOT NULL REFERENCES channels(id) ON DELETE CASCADE,
PRIMARY KEY (project_id, channel_id)
);
CREATE INDEX ON channel_selections (project_id);

View file

@ -1,6 +1,7 @@
CREATE TABLE IF NOT EXISTS messages (
id UUID PRIMARY KEY NOT NULL,
channel_id UUID NOT NULL REFERENCES channels (id),
channel_id UUID NOT NULL REFERENCES channels (id) ON DELETE RESTRICT,
project_id UUID NOT NULL REFERENCES projects (id) ON DELETE RESTRICT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
sent_at TIMESTAMPTZ,
message TEXT NOT NULL

View file

@ -13,7 +13,7 @@ use oauth2::{
ClientSecret, CsrfToken, RedirectUrl, RefreshToken, TokenResponse, TokenUrl,
};
use serde::{Deserialize, Serialize};
use tracing::trace_span;
use tracing::{trace_span, Instrument};
use crate::{
app_error::AppError,
@ -197,17 +197,21 @@ impl FromRequestParts<AppState> for AuthInfo {
parts: &mut Parts,
state: &AppState,
) -> Result<Self, <Self as FromRequestParts<AppState>>::Rejection> {
let _ = trace_span!("AuthInfo from_request_parts()").enter();
let session = parts
.extract_with_state::<AppSession, AppState>(state)
.await?
.0
.ok_or(AppError::auth_redirect_from_base_path(
state.settings.base_path.clone(),
))?;
let user = session.get::<AuthInfo>(SESSION_KEY_AUTH_INFO).ok_or(
AppError::auth_redirect_from_base_path(state.settings.base_path.clone()),
)?;
Ok(user)
async move {
let session = parts
.extract_with_state::<AppSession, AppState>(state)
.await?
.0
.ok_or(AppError::auth_redirect_from_base_path(
state.settings.base_path.clone(),
))?;
let user = session.get::<AuthInfo>(SESSION_KEY_AUTH_INFO).ok_or(
AppError::auth_redirect_from_base_path(state.settings.base_path.clone()),
)?;
Ok(user)
}
// The Span.enter() guard pattern doesn't play nicely async
.instrument(trace_span!("AuthInfo from_request_parts()"))
.await
}
}

View file

@ -7,7 +7,7 @@ use crate::app_state::AppState;
const POSTMARK_EMAIL_BATCH_URL: &'static str = "https://api.postmarkapp.com/email/batch";
#[derive(Serialize)]
#[derive(Clone, Serialize)]
pub struct Message {
#[serde(rename = "From")]
pub from: lettre::message::Mailbox,
@ -21,7 +21,11 @@ pub struct Message {
}
pub trait MailSender: Clone + Sync {
async fn send_batch(&self, emails: Vec<Message>) -> Result<(), anyhow::Error>;
/**
* Attempt to send all messages defined by the input Vec. Send as many as
* possible, returning exactly one Result<()> for each message.
*/
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>>;
}
#[derive(Clone, Debug)]
@ -48,7 +52,7 @@ impl Mailer {
}
impl MailSender for Mailer {
async fn send_batch(&self, emails: Vec<Message>) -> Result<(), anyhow::Error> {
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>> {
match self {
Mailer::Smtp(sender) => sender.send_batch(emails).await,
Mailer::Postmark(sender) => sender.send_batch(emails).await,
@ -104,11 +108,25 @@ where
}
impl MailSender for SmtpSender {
async fn send_batch(&self, emails: Vec<Message>) -> Result<()> {
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>> {
let mut results: Vec<Result<()>> = Vec::with_capacity(emails.len());
for email in emails {
self.transport.send(email.try_into()?).await?;
match TryInto::<lettre::Message>::try_into(email) {
Ok(email) => {
results.push(
self.transport
.send(email)
.await
.map(|_| ())
.map_err(Into::<anyhow::Error>::into),
);
}
Err(err) => {
results.push(Err(err));
}
}
}
Ok(())
results
}
}
@ -132,19 +150,114 @@ impl PostmarkSender {
}
impl MailSender for PostmarkSender {
async fn send_batch(&self, emails: Vec<Message>) -> Result<()> {
if emails.len() > 500 {
return Err(anyhow::anyhow!(
"Postmark sends no more than 500 messages per batch"
));
/**
* Recursively attempts to send messages, breaking them into smaller and
* smaller batches as needed.
*/
async fn send_batch(&self, mut emails: Vec<Message>) -> Vec<Result<()>> {
/**
* Constructs a Vec with Ok(()) repeated n times.
*/
macro_rules! all_ok {
() => {{
let mut collection: Vec<Result<_>> = Vec::with_capacity(emails.len());
for _ in 0..emails.len() {
collection.push(Ok(()));
}
collection
}};
}
/**
* Constructs a Vec with a single specific error, followed by n-1
* generic errors referring back to it.
*/
macro_rules! cascade_err {
($err:expr) => {{
let mut collection: Vec<Result<_>> = Vec::with_capacity(emails.len());
collection.push(Err($err));
for _ in 1..emails.len() {
collection.push(Err(anyhow::anyhow!("could not send due to previous error")));
}
collection
}};
}
/**
* Recursively splits the email batch in half and tries to send each
* half independently, allowing both to run to completion and then
* returning the first error of the two results, if present.
*
* This is implemented as a macro in order to avoid unstable async
* closures.
*/
macro_rules! split_and_retry {
() => {
if emails.len() < 2 {
tracing::warn!("Postmark send batch cannot be split any further");
vec![Err(anyhow::anyhow!(
"unable to split Postmark batch into smaller slices"
))]
} else {
tracing::debug!("retrying Postmark send with smaller batches");
let mut results =
Box::pin(self.send_batch(emails.split_off(emails.len() / 2))).await;
results.extend(Box::pin(self.send_batch(emails)).await);
results
}
};
}
// https://postmarkapp.com/support/article/1056-what-are-the-attachment-and-email-size-limits
const POSTMARK_MAX_BATCH_ENTRIES: usize = 500;
const POSTMARK_MAX_REQUEST_BYTES: usize = 50 * 1000 * 1000;
// TODO: Check email subject and body size against Postmark limits
if emails.len() == 0 {
tracing::debug!("no Postmark messages to send");
vec![Ok(())]
} else if emails.len() > POSTMARK_MAX_BATCH_ENTRIES {
split_and_retry!()
} else {
let body = match serde_json::to_string(&emails) {
Ok(body) => body,
Err(err) => return cascade_err!(err.into()),
};
if body.len() > POSTMARK_MAX_REQUEST_BYTES {
if emails.len() > 1 {
split_and_retry!()
} else {
vec![Err(anyhow::anyhow!(
"Postmark requests may not exceed {} bytes",
POSTMARK_MAX_REQUEST_BYTES
))]
}
} else {
tracing::debug!("sending Postmark batch of {} messages", emails.len());
let resp = match self
.client
.post(POSTMARK_EMAIL_BATCH_URL)
.header("X-Postmark-Server-Token", &self.server_token)
.header(reqwest::header::CONTENT_TYPE, "application/json")
.body(body)
.send()
.await
{
Ok(resp) => resp,
Err(err) => return cascade_err!(err.into()),
};
if resp.status().is_client_error() && emails.len() > 1 {
split_and_retry!()
} else {
if let Err(err) = resp.error_for_status() {
cascade_err!(err.into())
} else {
tracing::debug!("sent Postmark batch of {} messages", emails.len());
all_ok!()
}
}
}
}
self.client
.post(POSTMARK_EMAIL_BATCH_URL)
.header("X-Postmark-Server-Token", &self.server_token)
.json(&emails)
.send()
.await?;
Ok(())
}
}

View file

@ -18,16 +18,40 @@ mod team_memberships;
mod teams;
mod users;
mod v0_router;
mod worker;
use std::process::exit;
use chrono::{TimeDelta, Utc};
use clap::{Parser, Subcommand};
use email::SmtpOptions;
use tokio::time::sleep;
use tracing_subscriber::EnvFilter;
use crate::{
app_state::AppState, email::Mailer, router::new_router, sessions::PgStore, settings::Settings,
worker::run_worker,
};
#[derive(Parser)]
#[command(version, about, long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
/// Run web server
Serve,
/// Run background worker
Worker {
/// Loop the every n seconds instead of exiting after execution
#[arg(long)]
auto_loop_seconds: Option<u32>,
},
}
#[tokio::main]
async fn main() {
let settings = Settings::load().unwrap();
@ -36,6 +60,8 @@ async fn main() {
.with_env_filter(EnvFilter::from_default_env())
.init();
let cli = Cli::parse();
let database_url = settings.database_url.clone();
let manager =
deadpool_diesel::postgres::Manager::new(database_url, deadpool_diesel::Runtime::Tokio1);
@ -66,7 +92,7 @@ async fn main() {
};
let app_state = AppState {
db_pool,
db_pool: db_pool.clone(),
mailer,
oauth_client,
reqwest_client,
@ -74,16 +100,43 @@ async fn main() {
settings: settings.clone(),
};
let router = new_router(app_state);
match &cli.command {
Commands::Serve => {
let router = new_router(app_state);
let listener = tokio::net::TcpListener::bind((settings.host.clone(), settings.port.clone()))
.await
.unwrap();
tracing::info!(
"App running at http://{}:{}{}",
settings.host,
settings.port,
settings.base_path
);
axum::serve(listener, router).await.unwrap();
let listener =
tokio::net::TcpListener::bind((settings.host.clone(), settings.port.clone()))
.await
.unwrap();
tracing::info!(
"App running at http://{}:{}{}",
settings.host,
settings.port,
settings.base_path
);
axum::serve(listener, router).await.unwrap();
}
Commands::Worker { auto_loop_seconds } => {
if let Some(loop_seconds) = auto_loop_seconds {
let loop_delta = TimeDelta::seconds(i64::from(*loop_seconds));
loop {
let t_next_loop = Utc::now() + loop_delta;
if let Err(err) = run_worker(app_state.clone()).await {
tracing::error!("{}", err)
}
let sleep_delta = t_next_loop - Utc::now();
match sleep_delta.to_std() {
Ok(duration) => {
sleep(duration).await;
}
Err(_) => { /* sleep_delta was < 0, so don't sleep */ }
}
}
} else {
run_worker(app_state).await.unwrap();
}
}
}
}

View file

@ -13,8 +13,10 @@ use crate::{channels::Channel, schema::messages};
#[diesel(belongs_to(Channel))]
pub struct Message {
pub id: Uuid,
pub project_id: Uuid,
pub channel_id: Uuid,
pub created_at: DateTime<Utc>,
pub sent_at: Option<DateTime<Utc>>,
pub message: String,
}
@ -29,4 +31,9 @@ impl Message {
pub fn with_channel(channel_id: Uuid) -> _ {
messages::channel_id.eq(channel_id)
}
#[auto_type(no_type_alias)]
pub fn is_not_sent() -> _ {
messages::sent_at.is_null()
}
}

View file

@ -214,7 +214,6 @@ async fn post_new_team(
let team_membership = TeamMembership {
team_id: team_id.clone(),
user_id: current_user.id,
roles: vec![Some("OWNER".to_string())],
};
db_conn
.interact(move |conn| {
@ -594,7 +593,7 @@ async fn update_channel_email_recipient(
subject: "Verify Your Email".to_string(),
text_body: format!("Your email verification code is: {}", verification_code),
};
mailer.send_batch(vec![email]).await?;
mailer.send_batch(vec![email]).await.remove(0)?;
Ok(Redirect::to(&format!(
"{}/teams/{}/channels/{}",

View file

@ -47,6 +47,7 @@ diesel::table! {
messages (id) {
id -> Uuid,
channel_id -> Uuid,
project_id -> Uuid,
created_at -> Timestamptz,
sent_at -> Nullable<Timestamptz>,
message -> Text,
@ -65,7 +66,6 @@ diesel::table! {
team_memberships (team_id, user_id) {
team_id -> Uuid,
user_id -> Uuid,
roles -> Array<Nullable<Text>>,
}
}
@ -90,6 +90,7 @@ diesel::joinable!(channel_selections -> projects (project_id));
diesel::joinable!(channels -> teams (team_id));
diesel::joinable!(csrf_tokens -> users (user_id));
diesel::joinable!(messages -> channels (channel_id));
diesel::joinable!(messages -> projects (project_id));
diesel::joinable!(projects -> teams (team_id));
diesel::joinable!(team_memberships -> teams (team_id));
diesel::joinable!(team_memberships -> users (user_id));

View file

@ -8,7 +8,7 @@ use axum::{
use axum_extra::extract::CookieJar;
use chrono::{DateTime, TimeDelta, Utc};
use diesel::{pg::Pg, prelude::*, upsert::excluded};
use tracing::trace_span;
use tracing::{trace_span, Instrument};
use crate::{app_error::AppError, app_state::AppState, schema::browser_sessions};
@ -134,7 +134,7 @@ impl FromRequestParts<AppState> for AppSession {
parts: &mut Parts,
state: &AppState,
) -> Result<Self, <Self as FromRequestParts<AppState>>::Rejection> {
let _ = trace_span!("AppSession::from_request_parts()").enter();
async move {
let jar = parts.extract::<CookieJar>().await.unwrap();
let session_cookie = match jar.get(&state.settings.auth.cookie_name) {
Some(cookie) => cookie,
@ -167,5 +167,7 @@ impl FromRequestParts<AppState> for AppSession {
tracing::debug!("no matching session found in database");
Ok(AppSession(None))
}
// The Span.enter() guard pattern doesn't play nicely async
}.instrument(trace_span!("AppSession::from_request_parts()")).await
}
}

View file

@ -20,7 +20,6 @@ use crate::{
pub struct TeamMembership {
pub team_id: Uuid,
pub user_id: Uuid,
pub roles: Vec<Option<String>>,
}
impl TeamMembership {

View file

@ -1,6 +1,6 @@
use anyhow::Context;
use axum::{
extract::{Query, State},
extract::Query,
response::{IntoResponse, Json},
routing::get,
Router,
@ -14,11 +14,9 @@ use crate::{
api_keys::ApiKey,
app_error::AppError,
app_state::{AppState, DbConn},
channels::{Channel, EmailBackendConfig},
email::{MailSender as _, Mailer},
channels::Channel,
projects::Project,
schema::{api_keys, projects},
settings::Settings,
schema::{api_keys, messages, projects},
};
pub fn new_router(state: AppState) -> Router<AppState> {
@ -33,11 +31,6 @@ struct SayQuery {
}
async fn say_get(
State(Settings {
email: email_settings,
..
}): State<Settings>,
State(mailer): State<Mailer>,
DbConn(db_conn): DbConn,
Query(query): Query<SayQuery>,
) -> Result<impl IntoResponse, AppError> {
@ -58,55 +51,68 @@ async fn say_get(
.unwrap()?
};
let selected_channels = {
let project = {
let project_name = query.project.to_lowercase();
db_conn
.interact::<_, Result<Vec<Channel>, AppError>>(move |conn| {
.interact::<_, Result<Project, AppError>>(move |conn| {
conn.transaction(move |conn| {
let project = match Project::all()
.filter(Project::with_team(api_key.team_id))
.filter(Project::with_name(project_name.clone()))
.first(conn)
.optional()
.context("failed to load project")?
{
Some(project) => project,
None => insert_into(projects::table)
.values((
projects::id.eq(Uuid::now_v7()),
projects::team_id.eq(api_key.team_id),
projects::name.eq(project_name),
))
.get_result(conn)
.context("failed to insert project")?,
};
Ok(project
.selected_channels()
.load(conn)
.context("failed to load selected channels")?)
Ok(
match Project::all()
.filter(Project::with_team(api_key.team_id))
.filter(Project::with_name(project_name.clone()))
.first(conn)
.optional()
.context("failed to load project")?
{
Some(project) => project,
None => insert_into(projects::table)
.values((
projects::id.eq(Uuid::now_v7()),
projects::team_id.eq(api_key.team_id),
projects::name.eq(project_name),
))
.get_result(conn)
.context("failed to insert project")?,
},
)
})
})
.await
.unwrap()?
};
let selected_channels = {
let project = project.clone();
db_conn
.interact::<_, Result<Vec<Channel>, AppError>>(move |conn| {
Ok(project
.selected_channels()
.load(conn)
.context("failed to load selected channels")?)
})
.await
.unwrap()?
};
for channel in selected_channels {
if let Ok(config) = TryInto::<EmailBackendConfig>::try_into(channel.backend_config) {
if config.verified {
let recipient: lettre::message::Mailbox = config.recipient.parse()?;
let email = crate::email::Message {
from: email_settings.message_from.clone().into(),
to: recipient.into(),
subject: "Shout".to_string(),
text_body: query.message.clone(),
};
tracing::info!("Sending email to recipient for channel {}", channel.id);
mailer.send_batch(vec![email]).await?;
} else {
tracing::info!("Email recipient for channel {} is not verified", channel.id);
}
}
{
let selected_channels = selected_channels.clone();
db_conn
.interact::<_, Result<_, AppError>>(move |conn| {
for channel in selected_channels {
insert_into(messages::table)
.values((
messages::id.eq(Uuid::now_v7()),
messages::channel_id.eq(&channel.id),
messages::project_id.eq(&project.id),
messages::message.eq(&query.message),
))
.execute(conn)?;
}
Ok(())
})
.await
.unwrap()?;
}
tracing::debug!("queued {} messages", selected_channels.len());
Ok(Json(json!({ "ok": true })))
}

111
src/worker.rs Normal file
View file

@ -0,0 +1,111 @@
use anyhow::{Context as _, Result};
use diesel::prelude::*;
use tracing::Instrument as _;
use uuid::Uuid;
use crate::{
app_state::AppState,
channels::{Channel, EmailBackendConfig},
email::MailSender,
messages::Message,
schema::{channels, messages},
};
pub async fn run_worker(state: AppState) -> Result<()> {
async move {
process_messages(state).await?;
Ok(())
}
.instrument(tracing::debug_span!("run_worker()"))
.await
}
/**
* Process messages from the queue in the `messages` table. Insertions to the
* queue are rate limited per team and per project, so no effort should be
* needed here to enforce fairness.
*/
async fn process_messages(state: AppState) -> Result<()> {
async move {
const MESSAGE_QUEUE_LIMIT: i64 = 250;
let db_conn = state.db_pool.get().await?;
let queued_messages = db_conn
.interact::<_, Result<Vec<(Message, Channel)>>>(move |conn| {
messages::table
.inner_join(channels::table)
.select((Message::as_select(), Channel::as_select()))
.filter(Message::is_not_sent())
.order(messages::created_at.asc())
.limit(MESSAGE_QUEUE_LIMIT)
.load(conn)
.context("failed to load queued messages")
})
.await
.unwrap()?;
// Dispatch email messages together to take advantage of Postmark's
// batch send API
let emails: Vec<(Uuid, crate::email::Message)> = queued_messages
.iter()
.filter_map(|(message, channel)| {
if let Ok(backend_config) =
TryInto::<EmailBackendConfig>::try_into(channel.backend_config.clone())
{
if backend_config.verified {
let recipient: lettre::message::Mailbox = if let Ok(recipient) =
backend_config.recipient.parse()
{
recipient
} else {
tracing::error!("failed to parse recipient for channel {}", channel.id);
return None;
};
let email = crate::email::Message {
from: state.settings.email.message_from.clone().into(),
to: recipient.into(),
subject: "Shout".to_string(),
text_body: message.message.clone(),
};
tracing::debug!("Sending email to recipient for channel {}", channel.id);
Some((message.id.clone(), email))
} else {
tracing::info!(
"Email recipient for channel {} is not verified",
channel.id
);
None
}
} else {
None
}
})
.collect();
if !emails.is_empty() {
let message_ids: Vec<Uuid> = emails.iter().map(|(id, _)| id.clone()).collect();
let results = state
.mailer
.send_batch(emails.into_iter().map(|(_, email)| email).collect())
.await;
assert!(results.len() == message_ids.len());
let results_by_id = message_ids.into_iter().zip(results.into_iter());
db_conn
.interact::<_, Result<_>>(move |conn| {
for (id, result) in results_by_id {
if let Err(err) = result {
tracing::error!("error sending message {}: {:?}", id, err);
} else {
diesel::update(messages::table.filter(messages::id.eq(id)))
.set(messages::sent_at.eq(diesel::dsl::now))
.execute(conn)?;
}
}
Ok(())
})
.await
.unwrap()?;
}
tracing::info!("finished processing messages");
Ok(())
}
.instrument(tracing::debug_span!("process_messages()"))
.await
}