Compare commits
6 commits
e30c610de4
...
157eb37257
Author | SHA1 | Date | |
---|---|---|---|
157eb37257 | |||
e7d4eaaf81 | |||
ac056c0aa3 | |||
4a62d66400 | |||
37e91c36a8 | |||
c9e64e5f0b |
15 changed files with 623 additions and 111 deletions
109
Cargo.lock
generated
109
Cargo.lock
generated
|
@ -74,6 +74,56 @@ dependencies = [
|
|||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstream"
|
||||
version = "0.6.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"anstyle-parse",
|
||||
"anstyle-query",
|
||||
"anstyle-wincon",
|
||||
"colorchoice",
|
||||
"is_terminal_polyfill",
|
||||
"utf8parse",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle"
|
||||
version = "1.0.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-parse"
|
||||
version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9"
|
||||
dependencies = [
|
||||
"utf8parse",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-query"
|
||||
version = "1.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c"
|
||||
dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-wincon"
|
||||
version = "3.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"once_cell",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.91"
|
||||
|
@ -516,6 +566,52 @@ dependencies = [
|
|||
"stacker",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "4.5.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "027bb0d98429ae334a8698531da7077bdf906419543a35a55c2cb1b66437d767"
|
||||
dependencies = [
|
||||
"clap_builder",
|
||||
"clap_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_builder"
|
||||
version = "4.5.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5589e0cba072e0f3d23791efac0fd8627b49c829c196a492e88168e6a669d863"
|
||||
dependencies = [
|
||||
"anstream",
|
||||
"anstyle",
|
||||
"clap_lex",
|
||||
"strsim",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_derive"
|
||||
version = "4.5.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_lex"
|
||||
version = "0.7.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6"
|
||||
|
||||
[[package]]
|
||||
name = "colorchoice"
|
||||
version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
|
||||
|
||||
[[package]]
|
||||
name = "config"
|
||||
version = "0.14.1"
|
||||
|
@ -1568,6 +1664,12 @@ version = "2.10.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708"
|
||||
|
||||
[[package]]
|
||||
name = "is_terminal_polyfill"
|
||||
version = "1.70.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.11"
|
||||
|
@ -2495,6 +2597,7 @@ dependencies = [
|
|||
"axum-extra",
|
||||
"base64 0.22.1",
|
||||
"chrono",
|
||||
"clap",
|
||||
"config",
|
||||
"console_error_panic_hook",
|
||||
"deadpool-diesel",
|
||||
|
@ -3071,6 +3174,12 @@ version = "1.0.4"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
|
||||
|
||||
[[package]]
|
||||
name = "utf8parse"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "1.11.0"
|
||||
|
|
|
@ -32,3 +32,4 @@ diesel = { version = "2.2.6", features = ["postgres", "chrono", "uuid", "serde_j
|
|||
tower = "0.5.2"
|
||||
regex = "1.11.1"
|
||||
lettre = { version = "0.11.12", features = ["tokio1", "serde", "tracing", "tokio1-native-tls"] }
|
||||
clap = { version = "4.5.31", features = ["derive"] }
|
||||
|
|
113
bacon.toml
113
bacon.toml
|
@ -1,7 +1,114 @@
|
|||
[jobs.webserver]
|
||||
command = ["cargo", "run"]
|
||||
# This is a configuration file for the bacon tool
|
||||
#
|
||||
# Complete help on configuration: https://dystroy.org/bacon/config/
|
||||
#
|
||||
# You may check the current default at
|
||||
# https://github.com/Canop/bacon/blob/main/defaults/default-bacon.toml
|
||||
|
||||
default_job = "check"
|
||||
env.CARGO_TERM_COLOR = "always"
|
||||
|
||||
[jobs.check]
|
||||
command = ["cargo", "check"]
|
||||
need_stdout = false
|
||||
|
||||
[jobs.check-all]
|
||||
command = ["cargo", "check", "--all-targets"]
|
||||
need_stdout = false
|
||||
|
||||
# Run clippy on the default target
|
||||
[jobs.clippy]
|
||||
command = ["cargo", "clippy"]
|
||||
need_stdout = false
|
||||
|
||||
# Run clippy on all targets
|
||||
# To disable some lints, you may change the job this way:
|
||||
# [jobs.clippy-all]
|
||||
# command = [
|
||||
# "cargo", "clippy",
|
||||
# "--all-targets",
|
||||
# "--",
|
||||
# "-A", "clippy::bool_to_int_with_if",
|
||||
# "-A", "clippy::collapsible_if",
|
||||
# "-A", "clippy::derive_partial_eq_without_eq",
|
||||
# ]
|
||||
# need_stdout = false
|
||||
[jobs.clippy-all]
|
||||
command = ["cargo", "clippy", "--all-targets"]
|
||||
need_stdout = false
|
||||
|
||||
# This job lets you run
|
||||
# - all tests: bacon test
|
||||
# - a specific test: bacon test -- config::test_default_files
|
||||
# - the tests of a package: bacon test -- -- -p config
|
||||
[jobs.test]
|
||||
command = ["cargo", "test"]
|
||||
need_stdout = true
|
||||
|
||||
[jobs.nextest]
|
||||
command = [
|
||||
"cargo", "nextest", "run",
|
||||
"--hide-progress-bar", "--failure-output", "final"
|
||||
]
|
||||
need_stdout = true
|
||||
analyzer = "nextest"
|
||||
|
||||
[jobs.doc]
|
||||
command = ["cargo", "doc", "--no-deps"]
|
||||
need_stdout = false
|
||||
|
||||
# If the doc compiles, then it opens in your browser and bacon switches
|
||||
# to the previous job
|
||||
[jobs.doc-open]
|
||||
command = ["cargo", "doc", "--no-deps", "--open"]
|
||||
need_stdout = false
|
||||
on_success = "back" # so that we don't open the browser at each change
|
||||
|
||||
# You can run your application and have the result displayed in bacon,
|
||||
# if it makes sense for this crate.
|
||||
[jobs.run-worker]
|
||||
command = [
|
||||
"cargo", "run", "worker",
|
||||
# put launch parameters for your program behind a `--` separator
|
||||
]
|
||||
need_stdout = true
|
||||
allow_warnings = true
|
||||
background = true
|
||||
default_watch = false
|
||||
|
||||
# Run your long-running application (eg server) and have the result displayed in bacon.
|
||||
# For programs that never stop (eg a server), `background` is set to false
|
||||
# to have the cargo run output immediately displayed instead of waiting for
|
||||
# program's end.
|
||||
# 'on_change_strategy' is set to `kill_then_restart` to have your program restart
|
||||
# on every change (an alternative would be to use the 'F5' key manually in bacon).
|
||||
# If you often use this job, it makes sense to override the 'r' key by adding
|
||||
# a binding `r = job:run-long` at the end of this file .
|
||||
[jobs.run-server]
|
||||
command = [
|
||||
"cargo", "run", "serve"
|
||||
# put launch parameters for your program behind a `--` separator
|
||||
]
|
||||
need_stdout = true
|
||||
allow_warnings = true
|
||||
background = false
|
||||
on_change_strategy = "kill_then_restart"
|
||||
kill = ["kill", "-s", "INT"]
|
||||
watch =["src", "templates"]
|
||||
watch = ["src", "templates"]
|
||||
|
||||
# This parameterized job runs the example of your choice, as soon
|
||||
# as the code compiles.
|
||||
# Call it as
|
||||
# bacon ex -- my-example
|
||||
[jobs.ex]
|
||||
command = ["cargo", "run", "--example"]
|
||||
need_stdout = true
|
||||
allow_warnings = true
|
||||
|
||||
# You may define here keybindings that would be specific to
|
||||
# a project, for example a shortcut to launch a specific job.
|
||||
# Shortcuts to internal functions (scrolling, toggling, etc.)
|
||||
# should go in your personal global prefs.toml file instead.
|
||||
[keybindings]
|
||||
r = "job:run-server"
|
||||
w = "job:run-worker"
|
||||
|
|
|
@ -7,7 +7,7 @@ CREATE INDEX ON users (uid);
|
|||
|
||||
CREATE TABLE IF NOT EXISTS csrf_tokens (
|
||||
id UUID NOT NULL PRIMARY KEY,
|
||||
user_id UUID REFERENCES users(id),
|
||||
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
|
||||
created_at TIMESTAMPTZ NOT NULL
|
||||
);
|
||||
CREATE INDEX ON csrf_tokens (created_at);
|
||||
|
@ -18,9 +18,8 @@ CREATE TABLE teams (
|
|||
);
|
||||
|
||||
CREATE TABLE team_memberships (
|
||||
team_id UUID NOT NULL REFERENCES teams(id),
|
||||
user_id UUID NOT NULL REFERENCES users(id),
|
||||
roles TEXT[] NOT NULL DEFAULT '{}',
|
||||
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
|
||||
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||
PRIMARY KEY (team_id, user_id)
|
||||
);
|
||||
CREATE INDEX ON team_memberships (team_id);
|
||||
|
@ -28,7 +27,7 @@ CREATE INDEX ON team_memberships (user_id);
|
|||
|
||||
CREATE TABLE api_keys (
|
||||
id UUID NOT NULL PRIMARY KEY,
|
||||
team_id UUID NOT NULL REFERENCES teams(id),
|
||||
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
last_used_at TIMESTAMPTZ
|
||||
);
|
||||
|
@ -44,15 +43,15 @@ CREATE INDEX ON projects(team_id);
|
|||
|
||||
CREATE TABLE IF NOT EXISTS channels (
|
||||
id UUID NOT NULL PRIMARY KEY,
|
||||
team_id UUID NOT NULL REFERENCES teams(id),
|
||||
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
|
||||
name TEXT NOT NULL,
|
||||
enable_by_default BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
backend_config JSONB NOT NULL DEFAULT '{}'::JSONB
|
||||
backend_config JSONB NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS channel_selections (
|
||||
project_id UUID NOT NULL REFERENCES projects(id),
|
||||
channel_id UUID NOT NULL REFERENCES channels(id),
|
||||
project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
|
||||
channel_id UUID NOT NULL REFERENCES channels(id) ON DELETE CASCADE,
|
||||
PRIMARY KEY (project_id, channel_id)
|
||||
);
|
||||
CREATE INDEX ON channel_selections (project_id);
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
CREATE TABLE IF NOT EXISTS messages (
|
||||
id UUID PRIMARY KEY NOT NULL,
|
||||
channel_id UUID NOT NULL REFERENCES channels (id),
|
||||
channel_id UUID NOT NULL REFERENCES channels (id) ON DELETE RESTRICT,
|
||||
project_id UUID NOT NULL REFERENCES projects (id) ON DELETE RESTRICT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
sent_at TIMESTAMPTZ,
|
||||
message TEXT NOT NULL
|
||||
|
|
30
src/auth.rs
30
src/auth.rs
|
@ -13,7 +13,7 @@ use oauth2::{
|
|||
ClientSecret, CsrfToken, RedirectUrl, RefreshToken, TokenResponse, TokenUrl,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::trace_span;
|
||||
use tracing::{trace_span, Instrument};
|
||||
|
||||
use crate::{
|
||||
app_error::AppError,
|
||||
|
@ -197,17 +197,21 @@ impl FromRequestParts<AppState> for AuthInfo {
|
|||
parts: &mut Parts,
|
||||
state: &AppState,
|
||||
) -> Result<Self, <Self as FromRequestParts<AppState>>::Rejection> {
|
||||
let _ = trace_span!("AuthInfo from_request_parts()").enter();
|
||||
let session = parts
|
||||
.extract_with_state::<AppSession, AppState>(state)
|
||||
.await?
|
||||
.0
|
||||
.ok_or(AppError::auth_redirect_from_base_path(
|
||||
state.settings.base_path.clone(),
|
||||
))?;
|
||||
let user = session.get::<AuthInfo>(SESSION_KEY_AUTH_INFO).ok_or(
|
||||
AppError::auth_redirect_from_base_path(state.settings.base_path.clone()),
|
||||
)?;
|
||||
Ok(user)
|
||||
async move {
|
||||
let session = parts
|
||||
.extract_with_state::<AppSession, AppState>(state)
|
||||
.await?
|
||||
.0
|
||||
.ok_or(AppError::auth_redirect_from_base_path(
|
||||
state.settings.base_path.clone(),
|
||||
))?;
|
||||
let user = session.get::<AuthInfo>(SESSION_KEY_AUTH_INFO).ok_or(
|
||||
AppError::auth_redirect_from_base_path(state.settings.base_path.clone()),
|
||||
)?;
|
||||
Ok(user)
|
||||
}
|
||||
// The Span.enter() guard pattern doesn't play nicely async
|
||||
.instrument(trace_span!("AuthInfo from_request_parts()"))
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
|
149
src/email.rs
149
src/email.rs
|
@ -7,7 +7,7 @@ use crate::app_state::AppState;
|
|||
|
||||
const POSTMARK_EMAIL_BATCH_URL: &'static str = "https://api.postmarkapp.com/email/batch";
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[derive(Clone, Serialize)]
|
||||
pub struct Message {
|
||||
#[serde(rename = "From")]
|
||||
pub from: lettre::message::Mailbox,
|
||||
|
@ -21,7 +21,11 @@ pub struct Message {
|
|||
}
|
||||
|
||||
pub trait MailSender: Clone + Sync {
|
||||
async fn send_batch(&self, emails: Vec<Message>) -> Result<(), anyhow::Error>;
|
||||
/**
|
||||
* Attempt to send all messages defined by the input Vec. Send as many as
|
||||
* possible, returning exactly one Result<()> for each message.
|
||||
*/
|
||||
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>>;
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
@ -48,7 +52,7 @@ impl Mailer {
|
|||
}
|
||||
|
||||
impl MailSender for Mailer {
|
||||
async fn send_batch(&self, emails: Vec<Message>) -> Result<(), anyhow::Error> {
|
||||
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>> {
|
||||
match self {
|
||||
Mailer::Smtp(sender) => sender.send_batch(emails).await,
|
||||
Mailer::Postmark(sender) => sender.send_batch(emails).await,
|
||||
|
@ -104,11 +108,25 @@ where
|
|||
}
|
||||
|
||||
impl MailSender for SmtpSender {
|
||||
async fn send_batch(&self, emails: Vec<Message>) -> Result<()> {
|
||||
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>> {
|
||||
let mut results: Vec<Result<()>> = Vec::with_capacity(emails.len());
|
||||
for email in emails {
|
||||
self.transport.send(email.try_into()?).await?;
|
||||
match TryInto::<lettre::Message>::try_into(email) {
|
||||
Ok(email) => {
|
||||
results.push(
|
||||
self.transport
|
||||
.send(email)
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(Into::<anyhow::Error>::into),
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
results.push(Err(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
results
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -132,19 +150,114 @@ impl PostmarkSender {
|
|||
}
|
||||
|
||||
impl MailSender for PostmarkSender {
|
||||
async fn send_batch(&self, emails: Vec<Message>) -> Result<()> {
|
||||
if emails.len() > 500 {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Postmark sends no more than 500 messages per batch"
|
||||
));
|
||||
/**
|
||||
* Recursively attempts to send messages, breaking them into smaller and
|
||||
* smaller batches as needed.
|
||||
*/
|
||||
async fn send_batch(&self, mut emails: Vec<Message>) -> Vec<Result<()>> {
|
||||
/**
|
||||
* Constructs a Vec with Ok(()) repeated n times.
|
||||
*/
|
||||
macro_rules! all_ok {
|
||||
() => {{
|
||||
let mut collection: Vec<Result<_>> = Vec::with_capacity(emails.len());
|
||||
for _ in 0..emails.len() {
|
||||
collection.push(Ok(()));
|
||||
}
|
||||
collection
|
||||
}};
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a Vec with a single specific error, followed by n-1
|
||||
* generic errors referring back to it.
|
||||
*/
|
||||
macro_rules! cascade_err {
|
||||
($err:expr) => {{
|
||||
let mut collection: Vec<Result<_>> = Vec::with_capacity(emails.len());
|
||||
collection.push(Err($err));
|
||||
for _ in 1..emails.len() {
|
||||
collection.push(Err(anyhow::anyhow!("could not send due to previous error")));
|
||||
}
|
||||
collection
|
||||
}};
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively splits the email batch in half and tries to send each
|
||||
* half independently, allowing both to run to completion and then
|
||||
* returning the first error of the two results, if present.
|
||||
*
|
||||
* This is implemented as a macro in order to avoid unstable async
|
||||
* closures.
|
||||
*/
|
||||
macro_rules! split_and_retry {
|
||||
() => {
|
||||
if emails.len() < 2 {
|
||||
tracing::warn!("Postmark send batch cannot be split any further");
|
||||
vec![Err(anyhow::anyhow!(
|
||||
"unable to split Postmark batch into smaller slices"
|
||||
))]
|
||||
} else {
|
||||
tracing::debug!("retrying Postmark send with smaller batches");
|
||||
let mut results =
|
||||
Box::pin(self.send_batch(emails.split_off(emails.len() / 2))).await;
|
||||
results.extend(Box::pin(self.send_batch(emails)).await);
|
||||
results
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// https://postmarkapp.com/support/article/1056-what-are-the-attachment-and-email-size-limits
|
||||
const POSTMARK_MAX_BATCH_ENTRIES: usize = 500;
|
||||
const POSTMARK_MAX_REQUEST_BYTES: usize = 50 * 1000 * 1000;
|
||||
// TODO: Check email subject and body size against Postmark limits
|
||||
|
||||
if emails.len() == 0 {
|
||||
tracing::debug!("no Postmark messages to send");
|
||||
vec![Ok(())]
|
||||
} else if emails.len() > POSTMARK_MAX_BATCH_ENTRIES {
|
||||
split_and_retry!()
|
||||
} else {
|
||||
let body = match serde_json::to_string(&emails) {
|
||||
Ok(body) => body,
|
||||
Err(err) => return cascade_err!(err.into()),
|
||||
};
|
||||
if body.len() > POSTMARK_MAX_REQUEST_BYTES {
|
||||
if emails.len() > 1 {
|
||||
split_and_retry!()
|
||||
} else {
|
||||
vec![Err(anyhow::anyhow!(
|
||||
"Postmark requests may not exceed {} bytes",
|
||||
POSTMARK_MAX_REQUEST_BYTES
|
||||
))]
|
||||
}
|
||||
} else {
|
||||
tracing::debug!("sending Postmark batch of {} messages", emails.len());
|
||||
let resp = match self
|
||||
.client
|
||||
.post(POSTMARK_EMAIL_BATCH_URL)
|
||||
.header("X-Postmark-Server-Token", &self.server_token)
|
||||
.header(reqwest::header::CONTENT_TYPE, "application/json")
|
||||
.body(body)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(resp) => resp,
|
||||
Err(err) => return cascade_err!(err.into()),
|
||||
};
|
||||
if resp.status().is_client_error() && emails.len() > 1 {
|
||||
split_and_retry!()
|
||||
} else {
|
||||
if let Err(err) = resp.error_for_status() {
|
||||
cascade_err!(err.into())
|
||||
} else {
|
||||
tracing::debug!("sent Postmark batch of {} messages", emails.len());
|
||||
all_ok!()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
self.client
|
||||
.post(POSTMARK_EMAIL_BATCH_URL)
|
||||
.header("X-Postmark-Server-Token", &self.server_token)
|
||||
.json(&emails)
|
||||
.send()
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
77
src/main.rs
77
src/main.rs
|
@ -18,16 +18,40 @@ mod team_memberships;
|
|||
mod teams;
|
||||
mod users;
|
||||
mod v0_router;
|
||||
mod worker;
|
||||
|
||||
use std::process::exit;
|
||||
|
||||
use chrono::{TimeDelta, Utc};
|
||||
use clap::{Parser, Subcommand};
|
||||
use email::SmtpOptions;
|
||||
use tokio::time::sleep;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
use crate::{
|
||||
app_state::AppState, email::Mailer, router::new_router, sessions::PgStore, settings::Settings,
|
||||
worker::run_worker,
|
||||
};
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(version, about, long_about = None)]
|
||||
struct Cli {
|
||||
#[command(subcommand)]
|
||||
command: Commands,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum Commands {
|
||||
/// Run web server
|
||||
Serve,
|
||||
/// Run background worker
|
||||
Worker {
|
||||
/// Loop the every n seconds instead of exiting after execution
|
||||
#[arg(long)]
|
||||
auto_loop_seconds: Option<u32>,
|
||||
},
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let settings = Settings::load().unwrap();
|
||||
|
@ -36,6 +60,8 @@ async fn main() {
|
|||
.with_env_filter(EnvFilter::from_default_env())
|
||||
.init();
|
||||
|
||||
let cli = Cli::parse();
|
||||
|
||||
let database_url = settings.database_url.clone();
|
||||
let manager =
|
||||
deadpool_diesel::postgres::Manager::new(database_url, deadpool_diesel::Runtime::Tokio1);
|
||||
|
@ -66,7 +92,7 @@ async fn main() {
|
|||
};
|
||||
|
||||
let app_state = AppState {
|
||||
db_pool,
|
||||
db_pool: db_pool.clone(),
|
||||
mailer,
|
||||
oauth_client,
|
||||
reqwest_client,
|
||||
|
@ -74,16 +100,43 @@ async fn main() {
|
|||
settings: settings.clone(),
|
||||
};
|
||||
|
||||
let router = new_router(app_state);
|
||||
match &cli.command {
|
||||
Commands::Serve => {
|
||||
let router = new_router(app_state);
|
||||
|
||||
let listener = tokio::net::TcpListener::bind((settings.host.clone(), settings.port.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
tracing::info!(
|
||||
"App running at http://{}:{}{}",
|
||||
settings.host,
|
||||
settings.port,
|
||||
settings.base_path
|
||||
);
|
||||
axum::serve(listener, router).await.unwrap();
|
||||
let listener =
|
||||
tokio::net::TcpListener::bind((settings.host.clone(), settings.port.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
tracing::info!(
|
||||
"App running at http://{}:{}{}",
|
||||
settings.host,
|
||||
settings.port,
|
||||
settings.base_path
|
||||
);
|
||||
axum::serve(listener, router).await.unwrap();
|
||||
}
|
||||
Commands::Worker { auto_loop_seconds } => {
|
||||
if let Some(loop_seconds) = auto_loop_seconds {
|
||||
let loop_delta = TimeDelta::seconds(i64::from(*loop_seconds));
|
||||
loop {
|
||||
let t_next_loop = Utc::now() + loop_delta;
|
||||
|
||||
if let Err(err) = run_worker(app_state.clone()).await {
|
||||
tracing::error!("{}", err)
|
||||
}
|
||||
|
||||
let sleep_delta = t_next_loop - Utc::now();
|
||||
match sleep_delta.to_std() {
|
||||
Ok(duration) => {
|
||||
sleep(duration).await;
|
||||
}
|
||||
Err(_) => { /* sleep_delta was < 0, so don't sleep */ }
|
||||
}
|
||||
}
|
||||
} else {
|
||||
run_worker(app_state).await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,8 +13,10 @@ use crate::{channels::Channel, schema::messages};
|
|||
#[diesel(belongs_to(Channel))]
|
||||
pub struct Message {
|
||||
pub id: Uuid,
|
||||
pub project_id: Uuid,
|
||||
pub channel_id: Uuid,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub sent_at: Option<DateTime<Utc>>,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
|
@ -29,4 +31,9 @@ impl Message {
|
|||
pub fn with_channel(channel_id: Uuid) -> _ {
|
||||
messages::channel_id.eq(channel_id)
|
||||
}
|
||||
|
||||
#[auto_type(no_type_alias)]
|
||||
pub fn is_not_sent() -> _ {
|
||||
messages::sent_at.is_null()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -214,7 +214,6 @@ async fn post_new_team(
|
|||
let team_membership = TeamMembership {
|
||||
team_id: team_id.clone(),
|
||||
user_id: current_user.id,
|
||||
roles: vec![Some("OWNER".to_string())],
|
||||
};
|
||||
db_conn
|
||||
.interact(move |conn| {
|
||||
|
@ -594,7 +593,7 @@ async fn update_channel_email_recipient(
|
|||
subject: "Verify Your Email".to_string(),
|
||||
text_body: format!("Your email verification code is: {}", verification_code),
|
||||
};
|
||||
mailer.send_batch(vec![email]).await?;
|
||||
mailer.send_batch(vec![email]).await.remove(0)?;
|
||||
|
||||
Ok(Redirect::to(&format!(
|
||||
"{}/teams/{}/channels/{}",
|
||||
|
|
|
@ -47,6 +47,7 @@ diesel::table! {
|
|||
messages (id) {
|
||||
id -> Uuid,
|
||||
channel_id -> Uuid,
|
||||
project_id -> Uuid,
|
||||
created_at -> Timestamptz,
|
||||
sent_at -> Nullable<Timestamptz>,
|
||||
message -> Text,
|
||||
|
@ -65,7 +66,6 @@ diesel::table! {
|
|||
team_memberships (team_id, user_id) {
|
||||
team_id -> Uuid,
|
||||
user_id -> Uuid,
|
||||
roles -> Array<Nullable<Text>>,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -90,6 +90,7 @@ diesel::joinable!(channel_selections -> projects (project_id));
|
|||
diesel::joinable!(channels -> teams (team_id));
|
||||
diesel::joinable!(csrf_tokens -> users (user_id));
|
||||
diesel::joinable!(messages -> channels (channel_id));
|
||||
diesel::joinable!(messages -> projects (project_id));
|
||||
diesel::joinable!(projects -> teams (team_id));
|
||||
diesel::joinable!(team_memberships -> teams (team_id));
|
||||
diesel::joinable!(team_memberships -> users (user_id));
|
||||
|
|
|
@ -8,7 +8,7 @@ use axum::{
|
|||
use axum_extra::extract::CookieJar;
|
||||
use chrono::{DateTime, TimeDelta, Utc};
|
||||
use diesel::{pg::Pg, prelude::*, upsert::excluded};
|
||||
use tracing::trace_span;
|
||||
use tracing::{trace_span, Instrument};
|
||||
|
||||
use crate::{app_error::AppError, app_state::AppState, schema::browser_sessions};
|
||||
|
||||
|
@ -134,7 +134,7 @@ impl FromRequestParts<AppState> for AppSession {
|
|||
parts: &mut Parts,
|
||||
state: &AppState,
|
||||
) -> Result<Self, <Self as FromRequestParts<AppState>>::Rejection> {
|
||||
let _ = trace_span!("AppSession::from_request_parts()").enter();
|
||||
async move {
|
||||
let jar = parts.extract::<CookieJar>().await.unwrap();
|
||||
let session_cookie = match jar.get(&state.settings.auth.cookie_name) {
|
||||
Some(cookie) => cookie,
|
||||
|
@ -167,5 +167,7 @@ impl FromRequestParts<AppState> for AppSession {
|
|||
tracing::debug!("no matching session found in database");
|
||||
Ok(AppSession(None))
|
||||
}
|
||||
// The Span.enter() guard pattern doesn't play nicely async
|
||||
}.instrument(trace_span!("AppSession::from_request_parts()")).await
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ use crate::{
|
|||
pub struct TeamMembership {
|
||||
pub team_id: Uuid,
|
||||
pub user_id: Uuid,
|
||||
pub roles: Vec<Option<String>>,
|
||||
}
|
||||
|
||||
impl TeamMembership {
|
||||
|
|
104
src/v0_router.rs
104
src/v0_router.rs
|
@ -1,6 +1,6 @@
|
|||
use anyhow::Context;
|
||||
use axum::{
|
||||
extract::{Query, State},
|
||||
extract::Query,
|
||||
response::{IntoResponse, Json},
|
||||
routing::get,
|
||||
Router,
|
||||
|
@ -14,11 +14,9 @@ use crate::{
|
|||
api_keys::ApiKey,
|
||||
app_error::AppError,
|
||||
app_state::{AppState, DbConn},
|
||||
channels::{Channel, EmailBackendConfig},
|
||||
email::{MailSender as _, Mailer},
|
||||
channels::Channel,
|
||||
projects::Project,
|
||||
schema::{api_keys, projects},
|
||||
settings::Settings,
|
||||
schema::{api_keys, messages, projects},
|
||||
};
|
||||
|
||||
pub fn new_router(state: AppState) -> Router<AppState> {
|
||||
|
@ -33,11 +31,6 @@ struct SayQuery {
|
|||
}
|
||||
|
||||
async fn say_get(
|
||||
State(Settings {
|
||||
email: email_settings,
|
||||
..
|
||||
}): State<Settings>,
|
||||
State(mailer): State<Mailer>,
|
||||
DbConn(db_conn): DbConn,
|
||||
Query(query): Query<SayQuery>,
|
||||
) -> Result<impl IntoResponse, AppError> {
|
||||
|
@ -58,55 +51,68 @@ async fn say_get(
|
|||
.unwrap()?
|
||||
};
|
||||
|
||||
let selected_channels = {
|
||||
let project = {
|
||||
let project_name = query.project.to_lowercase();
|
||||
db_conn
|
||||
.interact::<_, Result<Vec<Channel>, AppError>>(move |conn| {
|
||||
.interact::<_, Result<Project, AppError>>(move |conn| {
|
||||
conn.transaction(move |conn| {
|
||||
let project = match Project::all()
|
||||
.filter(Project::with_team(api_key.team_id))
|
||||
.filter(Project::with_name(project_name.clone()))
|
||||
.first(conn)
|
||||
.optional()
|
||||
.context("failed to load project")?
|
||||
{
|
||||
Some(project) => project,
|
||||
None => insert_into(projects::table)
|
||||
.values((
|
||||
projects::id.eq(Uuid::now_v7()),
|
||||
projects::team_id.eq(api_key.team_id),
|
||||
projects::name.eq(project_name),
|
||||
))
|
||||
.get_result(conn)
|
||||
.context("failed to insert project")?,
|
||||
};
|
||||
Ok(project
|
||||
.selected_channels()
|
||||
.load(conn)
|
||||
.context("failed to load selected channels")?)
|
||||
Ok(
|
||||
match Project::all()
|
||||
.filter(Project::with_team(api_key.team_id))
|
||||
.filter(Project::with_name(project_name.clone()))
|
||||
.first(conn)
|
||||
.optional()
|
||||
.context("failed to load project")?
|
||||
{
|
||||
Some(project) => project,
|
||||
None => insert_into(projects::table)
|
||||
.values((
|
||||
projects::id.eq(Uuid::now_v7()),
|
||||
projects::team_id.eq(api_key.team_id),
|
||||
projects::name.eq(project_name),
|
||||
))
|
||||
.get_result(conn)
|
||||
.context("failed to insert project")?,
|
||||
},
|
||||
)
|
||||
})
|
||||
})
|
||||
.await
|
||||
.unwrap()?
|
||||
};
|
||||
let selected_channels = {
|
||||
let project = project.clone();
|
||||
db_conn
|
||||
.interact::<_, Result<Vec<Channel>, AppError>>(move |conn| {
|
||||
Ok(project
|
||||
.selected_channels()
|
||||
.load(conn)
|
||||
.context("failed to load selected channels")?)
|
||||
})
|
||||
.await
|
||||
.unwrap()?
|
||||
};
|
||||
|
||||
for channel in selected_channels {
|
||||
if let Ok(config) = TryInto::<EmailBackendConfig>::try_into(channel.backend_config) {
|
||||
if config.verified {
|
||||
let recipient: lettre::message::Mailbox = config.recipient.parse()?;
|
||||
let email = crate::email::Message {
|
||||
from: email_settings.message_from.clone().into(),
|
||||
to: recipient.into(),
|
||||
subject: "Shout".to_string(),
|
||||
text_body: query.message.clone(),
|
||||
};
|
||||
tracing::info!("Sending email to recipient for channel {}", channel.id);
|
||||
mailer.send_batch(vec![email]).await?;
|
||||
} else {
|
||||
tracing::info!("Email recipient for channel {} is not verified", channel.id);
|
||||
}
|
||||
}
|
||||
{
|
||||
let selected_channels = selected_channels.clone();
|
||||
db_conn
|
||||
.interact::<_, Result<_, AppError>>(move |conn| {
|
||||
for channel in selected_channels {
|
||||
insert_into(messages::table)
|
||||
.values((
|
||||
messages::id.eq(Uuid::now_v7()),
|
||||
messages::channel_id.eq(&channel.id),
|
||||
messages::project_id.eq(&project.id),
|
||||
messages::message.eq(&query.message),
|
||||
))
|
||||
.execute(conn)?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
}
|
||||
tracing::debug!("queued {} messages", selected_channels.len());
|
||||
|
||||
Ok(Json(json!({ "ok": true })))
|
||||
}
|
||||
|
|
111
src/worker.rs
Normal file
111
src/worker.rs
Normal file
|
@ -0,0 +1,111 @@
|
|||
use anyhow::{Context as _, Result};
|
||||
use diesel::prelude::*;
|
||||
use tracing::Instrument as _;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
app_state::AppState,
|
||||
channels::{Channel, EmailBackendConfig},
|
||||
email::MailSender,
|
||||
messages::Message,
|
||||
schema::{channels, messages},
|
||||
};
|
||||
|
||||
pub async fn run_worker(state: AppState) -> Result<()> {
|
||||
async move {
|
||||
process_messages(state).await?;
|
||||
Ok(())
|
||||
}
|
||||
.instrument(tracing::debug_span!("run_worker()"))
|
||||
.await
|
||||
}
|
||||
|
||||
/**
|
||||
* Process messages from the queue in the `messages` table. Insertions to the
|
||||
* queue are rate limited per team and per project, so no effort should be
|
||||
* needed here to enforce fairness.
|
||||
*/
|
||||
async fn process_messages(state: AppState) -> Result<()> {
|
||||
async move {
|
||||
const MESSAGE_QUEUE_LIMIT: i64 = 250;
|
||||
let db_conn = state.db_pool.get().await?;
|
||||
let queued_messages = db_conn
|
||||
.interact::<_, Result<Vec<(Message, Channel)>>>(move |conn| {
|
||||
messages::table
|
||||
.inner_join(channels::table)
|
||||
.select((Message::as_select(), Channel::as_select()))
|
||||
.filter(Message::is_not_sent())
|
||||
.order(messages::created_at.asc())
|
||||
.limit(MESSAGE_QUEUE_LIMIT)
|
||||
.load(conn)
|
||||
.context("failed to load queued messages")
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
// Dispatch email messages together to take advantage of Postmark's
|
||||
// batch send API
|
||||
let emails: Vec<(Uuid, crate::email::Message)> = queued_messages
|
||||
.iter()
|
||||
.filter_map(|(message, channel)| {
|
||||
if let Ok(backend_config) =
|
||||
TryInto::<EmailBackendConfig>::try_into(channel.backend_config.clone())
|
||||
{
|
||||
if backend_config.verified {
|
||||
let recipient: lettre::message::Mailbox = if let Ok(recipient) =
|
||||
backend_config.recipient.parse()
|
||||
{
|
||||
recipient
|
||||
} else {
|
||||
tracing::error!("failed to parse recipient for channel {}", channel.id);
|
||||
return None;
|
||||
};
|
||||
let email = crate::email::Message {
|
||||
from: state.settings.email.message_from.clone().into(),
|
||||
to: recipient.into(),
|
||||
subject: "Shout".to_string(),
|
||||
text_body: message.message.clone(),
|
||||
};
|
||||
tracing::debug!("Sending email to recipient for channel {}", channel.id);
|
||||
Some((message.id.clone(), email))
|
||||
} else {
|
||||
tracing::info!(
|
||||
"Email recipient for channel {} is not verified",
|
||||
channel.id
|
||||
);
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
if !emails.is_empty() {
|
||||
let message_ids: Vec<Uuid> = emails.iter().map(|(id, _)| id.clone()).collect();
|
||||
let results = state
|
||||
.mailer
|
||||
.send_batch(emails.into_iter().map(|(_, email)| email).collect())
|
||||
.await;
|
||||
assert!(results.len() == message_ids.len());
|
||||
let results_by_id = message_ids.into_iter().zip(results.into_iter());
|
||||
db_conn
|
||||
.interact::<_, Result<_>>(move |conn| {
|
||||
for (id, result) in results_by_id {
|
||||
if let Err(err) = result {
|
||||
tracing::error!("error sending message {}: {:?}", id, err);
|
||||
} else {
|
||||
diesel::update(messages::table.filter(messages::id.eq(id)))
|
||||
.set(messages::sent_at.eq(diesel::dsl::now))
|
||||
.execute(conn)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
}
|
||||
tracing::info!("finished processing messages");
|
||||
Ok(())
|
||||
}
|
||||
.instrument(tracing::debug_span!("process_messages()"))
|
||||
.await
|
||||
}
|
Loading…
Add table
Reference in a new issue