Compare commits

...

6 commits

15 changed files with 623 additions and 111 deletions

109
Cargo.lock generated
View file

@ -74,6 +74,56 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "anstream"
version = "0.6.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
[[package]]
name = "anstyle-parse"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e"
dependencies = [
"anstyle",
"once_cell",
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.91" version = "1.0.91"
@ -516,6 +566,52 @@ dependencies = [
"stacker", "stacker",
] ]
[[package]]
name = "clap"
version = "4.5.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "027bb0d98429ae334a8698531da7077bdf906419543a35a55c2cb1b66437d767"
dependencies = [
"clap_builder",
"clap_derive",
]
[[package]]
name = "clap_builder"
version = "4.5.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5589e0cba072e0f3d23791efac0fd8627b49c829c196a492e88168e6a669d863"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim",
]
[[package]]
name = "clap_derive"
version = "4.5.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6"
[[package]]
name = "colorchoice"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]] [[package]]
name = "config" name = "config"
version = "0.14.1" version = "0.14.1"
@ -1568,6 +1664,12 @@ version = "2.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708"
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "1.0.11" version = "1.0.11"
@ -2495,6 +2597,7 @@ dependencies = [
"axum-extra", "axum-extra",
"base64 0.22.1", "base64 0.22.1",
"chrono", "chrono",
"clap",
"config", "config",
"console_error_panic_hook", "console_error_panic_hook",
"deadpool-diesel", "deadpool-diesel",
@ -3071,6 +3174,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.11.0" version = "1.11.0"

View file

@ -32,3 +32,4 @@ diesel = { version = "2.2.6", features = ["postgres", "chrono", "uuid", "serde_j
tower = "0.5.2" tower = "0.5.2"
regex = "1.11.1" regex = "1.11.1"
lettre = { version = "0.11.12", features = ["tokio1", "serde", "tracing", "tokio1-native-tls"] } lettre = { version = "0.11.12", features = ["tokio1", "serde", "tracing", "tokio1-native-tls"] }
clap = { version = "4.5.31", features = ["derive"] }

View file

@ -1,7 +1,114 @@
[jobs.webserver] # This is a configuration file for the bacon tool
command = ["cargo", "run"] #
# Complete help on configuration: https://dystroy.org/bacon/config/
#
# You may check the current default at
# https://github.com/Canop/bacon/blob/main/defaults/default-bacon.toml
default_job = "check"
env.CARGO_TERM_COLOR = "always"
[jobs.check]
command = ["cargo", "check"]
need_stdout = false
[jobs.check-all]
command = ["cargo", "check", "--all-targets"]
need_stdout = false
# Run clippy on the default target
[jobs.clippy]
command = ["cargo", "clippy"]
need_stdout = false
# Run clippy on all targets
# To disable some lints, you may change the job this way:
# [jobs.clippy-all]
# command = [
# "cargo", "clippy",
# "--all-targets",
# "--",
# "-A", "clippy::bool_to_int_with_if",
# "-A", "clippy::collapsible_if",
# "-A", "clippy::derive_partial_eq_without_eq",
# ]
# need_stdout = false
[jobs.clippy-all]
command = ["cargo", "clippy", "--all-targets"]
need_stdout = false
# This job lets you run
# - all tests: bacon test
# - a specific test: bacon test -- config::test_default_files
# - the tests of a package: bacon test -- -- -p config
[jobs.test]
command = ["cargo", "test"]
need_stdout = true need_stdout = true
[jobs.nextest]
command = [
"cargo", "nextest", "run",
"--hide-progress-bar", "--failure-output", "final"
]
need_stdout = true
analyzer = "nextest"
[jobs.doc]
command = ["cargo", "doc", "--no-deps"]
need_stdout = false
# If the doc compiles, then it opens in your browser and bacon switches
# to the previous job
[jobs.doc-open]
command = ["cargo", "doc", "--no-deps", "--open"]
need_stdout = false
on_success = "back" # so that we don't open the browser at each change
# You can run your application and have the result displayed in bacon,
# if it makes sense for this crate.
[jobs.run-worker]
command = [
"cargo", "run", "worker",
# put launch parameters for your program behind a `--` separator
]
need_stdout = true
allow_warnings = true
background = true
default_watch = false
# Run your long-running application (eg server) and have the result displayed in bacon.
# For programs that never stop (eg a server), `background` is set to false
# to have the cargo run output immediately displayed instead of waiting for
# program's end.
# 'on_change_strategy' is set to `kill_then_restart` to have your program restart
# on every change (an alternative would be to use the 'F5' key manually in bacon).
# If you often use this job, it makes sense to override the 'r' key by adding
# a binding `r = job:run-long` at the end of this file .
[jobs.run-server]
command = [
"cargo", "run", "serve"
# put launch parameters for your program behind a `--` separator
]
need_stdout = true
allow_warnings = true
background = false background = false
on_change_strategy = "kill_then_restart" on_change_strategy = "kill_then_restart"
kill = ["kill", "-s", "INT"] kill = ["kill", "-s", "INT"]
watch = ["src", "templates"] watch = ["src", "templates"]
# This parameterized job runs the example of your choice, as soon
# as the code compiles.
# Call it as
# bacon ex -- my-example
[jobs.ex]
command = ["cargo", "run", "--example"]
need_stdout = true
allow_warnings = true
# You may define here keybindings that would be specific to
# a project, for example a shortcut to launch a specific job.
# Shortcuts to internal functions (scrolling, toggling, etc.)
# should go in your personal global prefs.toml file instead.
[keybindings]
r = "job:run-server"
w = "job:run-worker"

View file

@ -7,7 +7,7 @@ CREATE INDEX ON users (uid);
CREATE TABLE IF NOT EXISTS csrf_tokens ( CREATE TABLE IF NOT EXISTS csrf_tokens (
id UUID NOT NULL PRIMARY KEY, id UUID NOT NULL PRIMARY KEY,
user_id UUID REFERENCES users(id), user_id UUID REFERENCES users(id) ON DELETE CASCADE,
created_at TIMESTAMPTZ NOT NULL created_at TIMESTAMPTZ NOT NULL
); );
CREATE INDEX ON csrf_tokens (created_at); CREATE INDEX ON csrf_tokens (created_at);
@ -18,9 +18,8 @@ CREATE TABLE teams (
); );
CREATE TABLE team_memberships ( CREATE TABLE team_memberships (
team_id UUID NOT NULL REFERENCES teams(id), team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id), user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
roles TEXT[] NOT NULL DEFAULT '{}',
PRIMARY KEY (team_id, user_id) PRIMARY KEY (team_id, user_id)
); );
CREATE INDEX ON team_memberships (team_id); CREATE INDEX ON team_memberships (team_id);
@ -28,7 +27,7 @@ CREATE INDEX ON team_memberships (user_id);
CREATE TABLE api_keys ( CREATE TABLE api_keys (
id UUID NOT NULL PRIMARY KEY, id UUID NOT NULL PRIMARY KEY,
team_id UUID NOT NULL REFERENCES teams(id), team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_used_at TIMESTAMPTZ last_used_at TIMESTAMPTZ
); );
@ -44,15 +43,15 @@ CREATE INDEX ON projects(team_id);
CREATE TABLE IF NOT EXISTS channels ( CREATE TABLE IF NOT EXISTS channels (
id UUID NOT NULL PRIMARY KEY, id UUID NOT NULL PRIMARY KEY,
team_id UUID NOT NULL REFERENCES teams(id), team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
name TEXT NOT NULL, name TEXT NOT NULL,
enable_by_default BOOLEAN NOT NULL DEFAULT FALSE, enable_by_default BOOLEAN NOT NULL DEFAULT FALSE,
backend_config JSONB NOT NULL DEFAULT '{}'::JSONB backend_config JSONB NOT NULL
); );
CREATE TABLE IF NOT EXISTS channel_selections ( CREATE TABLE IF NOT EXISTS channel_selections (
project_id UUID NOT NULL REFERENCES projects(id), project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
channel_id UUID NOT NULL REFERENCES channels(id), channel_id UUID NOT NULL REFERENCES channels(id) ON DELETE CASCADE,
PRIMARY KEY (project_id, channel_id) PRIMARY KEY (project_id, channel_id)
); );
CREATE INDEX ON channel_selections (project_id); CREATE INDEX ON channel_selections (project_id);

View file

@ -1,6 +1,7 @@
CREATE TABLE IF NOT EXISTS messages ( CREATE TABLE IF NOT EXISTS messages (
id UUID PRIMARY KEY NOT NULL, id UUID PRIMARY KEY NOT NULL,
channel_id UUID NOT NULL REFERENCES channels (id), channel_id UUID NOT NULL REFERENCES channels (id) ON DELETE RESTRICT,
project_id UUID NOT NULL REFERENCES projects (id) ON DELETE RESTRICT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
sent_at TIMESTAMPTZ, sent_at TIMESTAMPTZ,
message TEXT NOT NULL message TEXT NOT NULL

View file

@ -13,7 +13,7 @@ use oauth2::{
ClientSecret, CsrfToken, RedirectUrl, RefreshToken, TokenResponse, TokenUrl, ClientSecret, CsrfToken, RedirectUrl, RefreshToken, TokenResponse, TokenUrl,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tracing::trace_span; use tracing::{trace_span, Instrument};
use crate::{ use crate::{
app_error::AppError, app_error::AppError,
@ -197,7 +197,7 @@ impl FromRequestParts<AppState> for AuthInfo {
parts: &mut Parts, parts: &mut Parts,
state: &AppState, state: &AppState,
) -> Result<Self, <Self as FromRequestParts<AppState>>::Rejection> { ) -> Result<Self, <Self as FromRequestParts<AppState>>::Rejection> {
let _ = trace_span!("AuthInfo from_request_parts()").enter(); async move {
let session = parts let session = parts
.extract_with_state::<AppSession, AppState>(state) .extract_with_state::<AppSession, AppState>(state)
.await? .await?
@ -210,4 +210,8 @@ impl FromRequestParts<AppState> for AuthInfo {
)?; )?;
Ok(user) Ok(user)
} }
// The Span.enter() guard pattern doesn't play nicely async
.instrument(trace_span!("AuthInfo from_request_parts()"))
.await
}
} }

View file

@ -7,7 +7,7 @@ use crate::app_state::AppState;
const POSTMARK_EMAIL_BATCH_URL: &'static str = "https://api.postmarkapp.com/email/batch"; const POSTMARK_EMAIL_BATCH_URL: &'static str = "https://api.postmarkapp.com/email/batch";
#[derive(Serialize)] #[derive(Clone, Serialize)]
pub struct Message { pub struct Message {
#[serde(rename = "From")] #[serde(rename = "From")]
pub from: lettre::message::Mailbox, pub from: lettre::message::Mailbox,
@ -21,7 +21,11 @@ pub struct Message {
} }
pub trait MailSender: Clone + Sync { pub trait MailSender: Clone + Sync {
async fn send_batch(&self, emails: Vec<Message>) -> Result<(), anyhow::Error>; /**
* Attempt to send all messages defined by the input Vec. Send as many as
* possible, returning exactly one Result<()> for each message.
*/
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>>;
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -48,7 +52,7 @@ impl Mailer {
} }
impl MailSender for Mailer { impl MailSender for Mailer {
async fn send_batch(&self, emails: Vec<Message>) -> Result<(), anyhow::Error> { async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>> {
match self { match self {
Mailer::Smtp(sender) => sender.send_batch(emails).await, Mailer::Smtp(sender) => sender.send_batch(emails).await,
Mailer::Postmark(sender) => sender.send_batch(emails).await, Mailer::Postmark(sender) => sender.send_batch(emails).await,
@ -104,11 +108,25 @@ where
} }
impl MailSender for SmtpSender { impl MailSender for SmtpSender {
async fn send_batch(&self, emails: Vec<Message>) -> Result<()> { async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>> {
let mut results: Vec<Result<()>> = Vec::with_capacity(emails.len());
for email in emails { for email in emails {
self.transport.send(email.try_into()?).await?; match TryInto::<lettre::Message>::try_into(email) {
Ok(email) => {
results.push(
self.transport
.send(email)
.await
.map(|_| ())
.map_err(Into::<anyhow::Error>::into),
);
} }
Ok(()) Err(err) => {
results.push(Err(err));
}
}
}
results
} }
} }
@ -132,19 +150,114 @@ impl PostmarkSender {
} }
impl MailSender for PostmarkSender { impl MailSender for PostmarkSender {
async fn send_batch(&self, emails: Vec<Message>) -> Result<()> { /**
if emails.len() > 500 { * Recursively attempts to send messages, breaking them into smaller and
return Err(anyhow::anyhow!( * smaller batches as needed.
"Postmark sends no more than 500 messages per batch" */
)); async fn send_batch(&self, mut emails: Vec<Message>) -> Vec<Result<()>> {
/**
* Constructs a Vec with Ok(()) repeated n times.
*/
macro_rules! all_ok {
() => {{
let mut collection: Vec<Result<_>> = Vec::with_capacity(emails.len());
for _ in 0..emails.len() {
collection.push(Ok(()));
} }
self.client collection
}};
}
/**
* Constructs a Vec with a single specific error, followed by n-1
* generic errors referring back to it.
*/
macro_rules! cascade_err {
($err:expr) => {{
let mut collection: Vec<Result<_>> = Vec::with_capacity(emails.len());
collection.push(Err($err));
for _ in 1..emails.len() {
collection.push(Err(anyhow::anyhow!("could not send due to previous error")));
}
collection
}};
}
/**
* Recursively splits the email batch in half and tries to send each
* half independently, allowing both to run to completion and then
* returning the first error of the two results, if present.
*
* This is implemented as a macro in order to avoid unstable async
* closures.
*/
macro_rules! split_and_retry {
() => {
if emails.len() < 2 {
tracing::warn!("Postmark send batch cannot be split any further");
vec![Err(anyhow::anyhow!(
"unable to split Postmark batch into smaller slices"
))]
} else {
tracing::debug!("retrying Postmark send with smaller batches");
let mut results =
Box::pin(self.send_batch(emails.split_off(emails.len() / 2))).await;
results.extend(Box::pin(self.send_batch(emails)).await);
results
}
};
}
// https://postmarkapp.com/support/article/1056-what-are-the-attachment-and-email-size-limits
const POSTMARK_MAX_BATCH_ENTRIES: usize = 500;
const POSTMARK_MAX_REQUEST_BYTES: usize = 50 * 1000 * 1000;
// TODO: Check email subject and body size against Postmark limits
if emails.len() == 0 {
tracing::debug!("no Postmark messages to send");
vec![Ok(())]
} else if emails.len() > POSTMARK_MAX_BATCH_ENTRIES {
split_and_retry!()
} else {
let body = match serde_json::to_string(&emails) {
Ok(body) => body,
Err(err) => return cascade_err!(err.into()),
};
if body.len() > POSTMARK_MAX_REQUEST_BYTES {
if emails.len() > 1 {
split_and_retry!()
} else {
vec![Err(anyhow::anyhow!(
"Postmark requests may not exceed {} bytes",
POSTMARK_MAX_REQUEST_BYTES
))]
}
} else {
tracing::debug!("sending Postmark batch of {} messages", emails.len());
let resp = match self
.client
.post(POSTMARK_EMAIL_BATCH_URL) .post(POSTMARK_EMAIL_BATCH_URL)
.header("X-Postmark-Server-Token", &self.server_token) .header("X-Postmark-Server-Token", &self.server_token)
.json(&emails) .header(reqwest::header::CONTENT_TYPE, "application/json")
.body(body)
.send() .send()
.await?; .await
Ok(()) {
Ok(resp) => resp,
Err(err) => return cascade_err!(err.into()),
};
if resp.status().is_client_error() && emails.len() > 1 {
split_and_retry!()
} else {
if let Err(err) = resp.error_for_status() {
cascade_err!(err.into())
} else {
tracing::debug!("sent Postmark batch of {} messages", emails.len());
all_ok!()
}
}
}
}
} }
} }

View file

@ -18,16 +18,40 @@ mod team_memberships;
mod teams; mod teams;
mod users; mod users;
mod v0_router; mod v0_router;
mod worker;
use std::process::exit; use std::process::exit;
use chrono::{TimeDelta, Utc};
use clap::{Parser, Subcommand};
use email::SmtpOptions; use email::SmtpOptions;
use tokio::time::sleep;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use crate::{ use crate::{
app_state::AppState, email::Mailer, router::new_router, sessions::PgStore, settings::Settings, app_state::AppState, email::Mailer, router::new_router, sessions::PgStore, settings::Settings,
worker::run_worker,
}; };
#[derive(Parser)]
#[command(version, about, long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
/// Run web server
Serve,
/// Run background worker
Worker {
/// Loop the every n seconds instead of exiting after execution
#[arg(long)]
auto_loop_seconds: Option<u32>,
},
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let settings = Settings::load().unwrap(); let settings = Settings::load().unwrap();
@ -36,6 +60,8 @@ async fn main() {
.with_env_filter(EnvFilter::from_default_env()) .with_env_filter(EnvFilter::from_default_env())
.init(); .init();
let cli = Cli::parse();
let database_url = settings.database_url.clone(); let database_url = settings.database_url.clone();
let manager = let manager =
deadpool_diesel::postgres::Manager::new(database_url, deadpool_diesel::Runtime::Tokio1); deadpool_diesel::postgres::Manager::new(database_url, deadpool_diesel::Runtime::Tokio1);
@ -66,7 +92,7 @@ async fn main() {
}; };
let app_state = AppState { let app_state = AppState {
db_pool, db_pool: db_pool.clone(),
mailer, mailer,
oauth_client, oauth_client,
reqwest_client, reqwest_client,
@ -74,9 +100,12 @@ async fn main() {
settings: settings.clone(), settings: settings.clone(),
}; };
match &cli.command {
Commands::Serve => {
let router = new_router(app_state); let router = new_router(app_state);
let listener = tokio::net::TcpListener::bind((settings.host.clone(), settings.port.clone())) let listener =
tokio::net::TcpListener::bind((settings.host.clone(), settings.port.clone()))
.await .await
.unwrap(); .unwrap();
tracing::info!( tracing::info!(
@ -87,3 +116,27 @@ async fn main() {
); );
axum::serve(listener, router).await.unwrap(); axum::serve(listener, router).await.unwrap();
} }
Commands::Worker { auto_loop_seconds } => {
if let Some(loop_seconds) = auto_loop_seconds {
let loop_delta = TimeDelta::seconds(i64::from(*loop_seconds));
loop {
let t_next_loop = Utc::now() + loop_delta;
if let Err(err) = run_worker(app_state.clone()).await {
tracing::error!("{}", err)
}
let sleep_delta = t_next_loop - Utc::now();
match sleep_delta.to_std() {
Ok(duration) => {
sleep(duration).await;
}
Err(_) => { /* sleep_delta was < 0, so don't sleep */ }
}
}
} else {
run_worker(app_state).await.unwrap();
}
}
}
}

View file

@ -13,8 +13,10 @@ use crate::{channels::Channel, schema::messages};
#[diesel(belongs_to(Channel))] #[diesel(belongs_to(Channel))]
pub struct Message { pub struct Message {
pub id: Uuid, pub id: Uuid,
pub project_id: Uuid,
pub channel_id: Uuid, pub channel_id: Uuid,
pub created_at: DateTime<Utc>, pub created_at: DateTime<Utc>,
pub sent_at: Option<DateTime<Utc>>,
pub message: String, pub message: String,
} }
@ -29,4 +31,9 @@ impl Message {
pub fn with_channel(channel_id: Uuid) -> _ { pub fn with_channel(channel_id: Uuid) -> _ {
messages::channel_id.eq(channel_id) messages::channel_id.eq(channel_id)
} }
#[auto_type(no_type_alias)]
pub fn is_not_sent() -> _ {
messages::sent_at.is_null()
}
} }

View file

@ -214,7 +214,6 @@ async fn post_new_team(
let team_membership = TeamMembership { let team_membership = TeamMembership {
team_id: team_id.clone(), team_id: team_id.clone(),
user_id: current_user.id, user_id: current_user.id,
roles: vec![Some("OWNER".to_string())],
}; };
db_conn db_conn
.interact(move |conn| { .interact(move |conn| {
@ -594,7 +593,7 @@ async fn update_channel_email_recipient(
subject: "Verify Your Email".to_string(), subject: "Verify Your Email".to_string(),
text_body: format!("Your email verification code is: {}", verification_code), text_body: format!("Your email verification code is: {}", verification_code),
}; };
mailer.send_batch(vec![email]).await?; mailer.send_batch(vec![email]).await.remove(0)?;
Ok(Redirect::to(&format!( Ok(Redirect::to(&format!(
"{}/teams/{}/channels/{}", "{}/teams/{}/channels/{}",

View file

@ -47,6 +47,7 @@ diesel::table! {
messages (id) { messages (id) {
id -> Uuid, id -> Uuid,
channel_id -> Uuid, channel_id -> Uuid,
project_id -> Uuid,
created_at -> Timestamptz, created_at -> Timestamptz,
sent_at -> Nullable<Timestamptz>, sent_at -> Nullable<Timestamptz>,
message -> Text, message -> Text,
@ -65,7 +66,6 @@ diesel::table! {
team_memberships (team_id, user_id) { team_memberships (team_id, user_id) {
team_id -> Uuid, team_id -> Uuid,
user_id -> Uuid, user_id -> Uuid,
roles -> Array<Nullable<Text>>,
} }
} }
@ -90,6 +90,7 @@ diesel::joinable!(channel_selections -> projects (project_id));
diesel::joinable!(channels -> teams (team_id)); diesel::joinable!(channels -> teams (team_id));
diesel::joinable!(csrf_tokens -> users (user_id)); diesel::joinable!(csrf_tokens -> users (user_id));
diesel::joinable!(messages -> channels (channel_id)); diesel::joinable!(messages -> channels (channel_id));
diesel::joinable!(messages -> projects (project_id));
diesel::joinable!(projects -> teams (team_id)); diesel::joinable!(projects -> teams (team_id));
diesel::joinable!(team_memberships -> teams (team_id)); diesel::joinable!(team_memberships -> teams (team_id));
diesel::joinable!(team_memberships -> users (user_id)); diesel::joinable!(team_memberships -> users (user_id));

View file

@ -8,7 +8,7 @@ use axum::{
use axum_extra::extract::CookieJar; use axum_extra::extract::CookieJar;
use chrono::{DateTime, TimeDelta, Utc}; use chrono::{DateTime, TimeDelta, Utc};
use diesel::{pg::Pg, prelude::*, upsert::excluded}; use diesel::{pg::Pg, prelude::*, upsert::excluded};
use tracing::trace_span; use tracing::{trace_span, Instrument};
use crate::{app_error::AppError, app_state::AppState, schema::browser_sessions}; use crate::{app_error::AppError, app_state::AppState, schema::browser_sessions};
@ -134,7 +134,7 @@ impl FromRequestParts<AppState> for AppSession {
parts: &mut Parts, parts: &mut Parts,
state: &AppState, state: &AppState,
) -> Result<Self, <Self as FromRequestParts<AppState>>::Rejection> { ) -> Result<Self, <Self as FromRequestParts<AppState>>::Rejection> {
let _ = trace_span!("AppSession::from_request_parts()").enter(); async move {
let jar = parts.extract::<CookieJar>().await.unwrap(); let jar = parts.extract::<CookieJar>().await.unwrap();
let session_cookie = match jar.get(&state.settings.auth.cookie_name) { let session_cookie = match jar.get(&state.settings.auth.cookie_name) {
Some(cookie) => cookie, Some(cookie) => cookie,
@ -167,5 +167,7 @@ impl FromRequestParts<AppState> for AppSession {
tracing::debug!("no matching session found in database"); tracing::debug!("no matching session found in database");
Ok(AppSession(None)) Ok(AppSession(None))
} }
// The Span.enter() guard pattern doesn't play nicely async
}.instrument(trace_span!("AppSession::from_request_parts()")).await
} }
} }

View file

@ -20,7 +20,6 @@ use crate::{
pub struct TeamMembership { pub struct TeamMembership {
pub team_id: Uuid, pub team_id: Uuid,
pub user_id: Uuid, pub user_id: Uuid,
pub roles: Vec<Option<String>>,
} }
impl TeamMembership { impl TeamMembership {

View file

@ -1,6 +1,6 @@
use anyhow::Context; use anyhow::Context;
use axum::{ use axum::{
extract::{Query, State}, extract::Query,
response::{IntoResponse, Json}, response::{IntoResponse, Json},
routing::get, routing::get,
Router, Router,
@ -14,11 +14,9 @@ use crate::{
api_keys::ApiKey, api_keys::ApiKey,
app_error::AppError, app_error::AppError,
app_state::{AppState, DbConn}, app_state::{AppState, DbConn},
channels::{Channel, EmailBackendConfig}, channels::Channel,
email::{MailSender as _, Mailer},
projects::Project, projects::Project,
schema::{api_keys, projects}, schema::{api_keys, messages, projects},
settings::Settings,
}; };
pub fn new_router(state: AppState) -> Router<AppState> { pub fn new_router(state: AppState) -> Router<AppState> {
@ -33,11 +31,6 @@ struct SayQuery {
} }
async fn say_get( async fn say_get(
State(Settings {
email: email_settings,
..
}): State<Settings>,
State(mailer): State<Mailer>,
DbConn(db_conn): DbConn, DbConn(db_conn): DbConn,
Query(query): Query<SayQuery>, Query(query): Query<SayQuery>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<impl IntoResponse, AppError> {
@ -58,12 +51,13 @@ async fn say_get(
.unwrap()? .unwrap()?
}; };
let selected_channels = { let project = {
let project_name = query.project.to_lowercase(); let project_name = query.project.to_lowercase();
db_conn db_conn
.interact::<_, Result<Vec<Channel>, AppError>>(move |conn| { .interact::<_, Result<Project, AppError>>(move |conn| {
conn.transaction(move |conn| { conn.transaction(move |conn| {
let project = match Project::all() Ok(
match Project::all()
.filter(Project::with_team(api_key.team_id)) .filter(Project::with_team(api_key.team_id))
.filter(Project::with_name(project_name.clone())) .filter(Project::with_name(project_name.clone()))
.first(conn) .first(conn)
@ -79,34 +73,46 @@ async fn say_get(
)) ))
.get_result(conn) .get_result(conn)
.context("failed to insert project")?, .context("failed to insert project")?,
}; },
Ok(project )
.selected_channels()
.load(conn)
.context("failed to load selected channels")?)
}) })
}) })
.await .await
.unwrap()? .unwrap()?
}; };
let selected_channels = {
for channel in selected_channels { let project = project.clone();
if let Ok(config) = TryInto::<EmailBackendConfig>::try_into(channel.backend_config) { db_conn
if config.verified { .interact::<_, Result<Vec<Channel>, AppError>>(move |conn| {
let recipient: lettre::message::Mailbox = config.recipient.parse()?; Ok(project
let email = crate::email::Message { .selected_channels()
from: email_settings.message_from.clone().into(), .load(conn)
to: recipient.into(), .context("failed to load selected channels")?)
subject: "Shout".to_string(), })
text_body: query.message.clone(), .await
.unwrap()?
}; };
tracing::info!("Sending email to recipient for channel {}", channel.id);
mailer.send_batch(vec![email]).await?; {
} else { let selected_channels = selected_channels.clone();
tracing::info!("Email recipient for channel {} is not verified", channel.id); db_conn
} .interact::<_, Result<_, AppError>>(move |conn| {
for channel in selected_channels {
insert_into(messages::table)
.values((
messages::id.eq(Uuid::now_v7()),
messages::channel_id.eq(&channel.id),
messages::project_id.eq(&project.id),
messages::message.eq(&query.message),
))
.execute(conn)?;
} }
Ok(())
})
.await
.unwrap()?;
} }
tracing::debug!("queued {} messages", selected_channels.len());
Ok(Json(json!({ "ok": true }))) Ok(Json(json!({ "ok": true })))
} }

111
src/worker.rs Normal file
View file

@ -0,0 +1,111 @@
use anyhow::{Context as _, Result};
use diesel::prelude::*;
use tracing::Instrument as _;
use uuid::Uuid;
use crate::{
app_state::AppState,
channels::{Channel, EmailBackendConfig},
email::MailSender,
messages::Message,
schema::{channels, messages},
};
pub async fn run_worker(state: AppState) -> Result<()> {
async move {
process_messages(state).await?;
Ok(())
}
.instrument(tracing::debug_span!("run_worker()"))
.await
}
/**
* Process messages from the queue in the `messages` table. Insertions to the
* queue are rate limited per team and per project, so no effort should be
* needed here to enforce fairness.
*/
async fn process_messages(state: AppState) -> Result<()> {
async move {
const MESSAGE_QUEUE_LIMIT: i64 = 250;
let db_conn = state.db_pool.get().await?;
let queued_messages = db_conn
.interact::<_, Result<Vec<(Message, Channel)>>>(move |conn| {
messages::table
.inner_join(channels::table)
.select((Message::as_select(), Channel::as_select()))
.filter(Message::is_not_sent())
.order(messages::created_at.asc())
.limit(MESSAGE_QUEUE_LIMIT)
.load(conn)
.context("failed to load queued messages")
})
.await
.unwrap()?;
// Dispatch email messages together to take advantage of Postmark's
// batch send API
let emails: Vec<(Uuid, crate::email::Message)> = queued_messages
.iter()
.filter_map(|(message, channel)| {
if let Ok(backend_config) =
TryInto::<EmailBackendConfig>::try_into(channel.backend_config.clone())
{
if backend_config.verified {
let recipient: lettre::message::Mailbox = if let Ok(recipient) =
backend_config.recipient.parse()
{
recipient
} else {
tracing::error!("failed to parse recipient for channel {}", channel.id);
return None;
};
let email = crate::email::Message {
from: state.settings.email.message_from.clone().into(),
to: recipient.into(),
subject: "Shout".to_string(),
text_body: message.message.clone(),
};
tracing::debug!("Sending email to recipient for channel {}", channel.id);
Some((message.id.clone(), email))
} else {
tracing::info!(
"Email recipient for channel {} is not verified",
channel.id
);
None
}
} else {
None
}
})
.collect();
if !emails.is_empty() {
let message_ids: Vec<Uuid> = emails.iter().map(|(id, _)| id.clone()).collect();
let results = state
.mailer
.send_batch(emails.into_iter().map(|(_, email)| email).collect())
.await;
assert!(results.len() == message_ids.len());
let results_by_id = message_ids.into_iter().zip(results.into_iter());
db_conn
.interact::<_, Result<_>>(move |conn| {
for (id, result) in results_by_id {
if let Err(err) = result {
tracing::error!("error sending message {}: {:?}", id, err);
} else {
diesel::update(messages::table.filter(messages::id.eq(id)))
.set(messages::sent_at.eq(diesel::dsl::now))
.execute(conn)?;
}
}
Ok(())
})
.await
.unwrap()?;
}
tracing::info!("finished processing messages");
Ok(())
}
.instrument(tracing::debug_span!("process_messages()"))
.await
}