Compare commits
No commits in common. "157eb37257aa62b4f7329b92182a5f4c97624339" and "e30c610de4e2eadf40b78741e10c5cf4d3326269" have entirely different histories.
157eb37257
...
e30c610de4
15 changed files with 111 additions and 623 deletions
109
Cargo.lock
generated
109
Cargo.lock
generated
|
@ -74,56 +74,6 @@ dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "anstream"
|
|
||||||
version = "0.6.18"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b"
|
|
||||||
dependencies = [
|
|
||||||
"anstyle",
|
|
||||||
"anstyle-parse",
|
|
||||||
"anstyle-query",
|
|
||||||
"anstyle-wincon",
|
|
||||||
"colorchoice",
|
|
||||||
"is_terminal_polyfill",
|
|
||||||
"utf8parse",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "anstyle"
|
|
||||||
version = "1.0.10"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "anstyle-parse"
|
|
||||||
version = "0.2.6"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9"
|
|
||||||
dependencies = [
|
|
||||||
"utf8parse",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "anstyle-query"
|
|
||||||
version = "1.1.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c"
|
|
||||||
dependencies = [
|
|
||||||
"windows-sys 0.59.0",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "anstyle-wincon"
|
|
||||||
version = "3.0.7"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e"
|
|
||||||
dependencies = [
|
|
||||||
"anstyle",
|
|
||||||
"once_cell",
|
|
||||||
"windows-sys 0.59.0",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "anyhow"
|
name = "anyhow"
|
||||||
version = "1.0.91"
|
version = "1.0.91"
|
||||||
|
@ -566,52 +516,6 @@ dependencies = [
|
||||||
"stacker",
|
"stacker",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "clap"
|
|
||||||
version = "4.5.31"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "027bb0d98429ae334a8698531da7077bdf906419543a35a55c2cb1b66437d767"
|
|
||||||
dependencies = [
|
|
||||||
"clap_builder",
|
|
||||||
"clap_derive",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "clap_builder"
|
|
||||||
version = "4.5.31"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "5589e0cba072e0f3d23791efac0fd8627b49c829c196a492e88168e6a669d863"
|
|
||||||
dependencies = [
|
|
||||||
"anstream",
|
|
||||||
"anstyle",
|
|
||||||
"clap_lex",
|
|
||||||
"strsim",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "clap_derive"
|
|
||||||
version = "4.5.28"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed"
|
|
||||||
dependencies = [
|
|
||||||
"heck",
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "clap_lex"
|
|
||||||
version = "0.7.4"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "colorchoice"
|
|
||||||
version = "1.0.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "config"
|
name = "config"
|
||||||
version = "0.14.1"
|
version = "0.14.1"
|
||||||
|
@ -1664,12 +1568,6 @@ version = "2.10.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708"
|
checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "is_terminal_polyfill"
|
|
||||||
version = "1.70.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "itoa"
|
name = "itoa"
|
||||||
version = "1.0.11"
|
version = "1.0.11"
|
||||||
|
@ -2597,7 +2495,6 @@ dependencies = [
|
||||||
"axum-extra",
|
"axum-extra",
|
||||||
"base64 0.22.1",
|
"base64 0.22.1",
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap",
|
|
||||||
"config",
|
"config",
|
||||||
"console_error_panic_hook",
|
"console_error_panic_hook",
|
||||||
"deadpool-diesel",
|
"deadpool-diesel",
|
||||||
|
@ -3174,12 +3071,6 @@ version = "1.0.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
|
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "utf8parse"
|
|
||||||
version = "0.2.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "uuid"
|
name = "uuid"
|
||||||
version = "1.11.0"
|
version = "1.11.0"
|
||||||
|
|
|
@ -32,4 +32,3 @@ diesel = { version = "2.2.6", features = ["postgres", "chrono", "uuid", "serde_j
|
||||||
tower = "0.5.2"
|
tower = "0.5.2"
|
||||||
regex = "1.11.1"
|
regex = "1.11.1"
|
||||||
lettre = { version = "0.11.12", features = ["tokio1", "serde", "tracing", "tokio1-native-tls"] }
|
lettre = { version = "0.11.12", features = ["tokio1", "serde", "tracing", "tokio1-native-tls"] }
|
||||||
clap = { version = "4.5.31", features = ["derive"] }
|
|
||||||
|
|
113
bacon.toml
113
bacon.toml
|
@ -1,114 +1,7 @@
|
||||||
# This is a configuration file for the bacon tool
|
[jobs.webserver]
|
||||||
#
|
command = ["cargo", "run"]
|
||||||
# Complete help on configuration: https://dystroy.org/bacon/config/
|
|
||||||
#
|
|
||||||
# You may check the current default at
|
|
||||||
# https://github.com/Canop/bacon/blob/main/defaults/default-bacon.toml
|
|
||||||
|
|
||||||
default_job = "check"
|
|
||||||
env.CARGO_TERM_COLOR = "always"
|
|
||||||
|
|
||||||
[jobs.check]
|
|
||||||
command = ["cargo", "check"]
|
|
||||||
need_stdout = false
|
|
||||||
|
|
||||||
[jobs.check-all]
|
|
||||||
command = ["cargo", "check", "--all-targets"]
|
|
||||||
need_stdout = false
|
|
||||||
|
|
||||||
# Run clippy on the default target
|
|
||||||
[jobs.clippy]
|
|
||||||
command = ["cargo", "clippy"]
|
|
||||||
need_stdout = false
|
|
||||||
|
|
||||||
# Run clippy on all targets
|
|
||||||
# To disable some lints, you may change the job this way:
|
|
||||||
# [jobs.clippy-all]
|
|
||||||
# command = [
|
|
||||||
# "cargo", "clippy",
|
|
||||||
# "--all-targets",
|
|
||||||
# "--",
|
|
||||||
# "-A", "clippy::bool_to_int_with_if",
|
|
||||||
# "-A", "clippy::collapsible_if",
|
|
||||||
# "-A", "clippy::derive_partial_eq_without_eq",
|
|
||||||
# ]
|
|
||||||
# need_stdout = false
|
|
||||||
[jobs.clippy-all]
|
|
||||||
command = ["cargo", "clippy", "--all-targets"]
|
|
||||||
need_stdout = false
|
|
||||||
|
|
||||||
# This job lets you run
|
|
||||||
# - all tests: bacon test
|
|
||||||
# - a specific test: bacon test -- config::test_default_files
|
|
||||||
# - the tests of a package: bacon test -- -- -p config
|
|
||||||
[jobs.test]
|
|
||||||
command = ["cargo", "test"]
|
|
||||||
need_stdout = true
|
need_stdout = true
|
||||||
|
|
||||||
[jobs.nextest]
|
|
||||||
command = [
|
|
||||||
"cargo", "nextest", "run",
|
|
||||||
"--hide-progress-bar", "--failure-output", "final"
|
|
||||||
]
|
|
||||||
need_stdout = true
|
|
||||||
analyzer = "nextest"
|
|
||||||
|
|
||||||
[jobs.doc]
|
|
||||||
command = ["cargo", "doc", "--no-deps"]
|
|
||||||
need_stdout = false
|
|
||||||
|
|
||||||
# If the doc compiles, then it opens in your browser and bacon switches
|
|
||||||
# to the previous job
|
|
||||||
[jobs.doc-open]
|
|
||||||
command = ["cargo", "doc", "--no-deps", "--open"]
|
|
||||||
need_stdout = false
|
|
||||||
on_success = "back" # so that we don't open the browser at each change
|
|
||||||
|
|
||||||
# You can run your application and have the result displayed in bacon,
|
|
||||||
# if it makes sense for this crate.
|
|
||||||
[jobs.run-worker]
|
|
||||||
command = [
|
|
||||||
"cargo", "run", "worker",
|
|
||||||
# put launch parameters for your program behind a `--` separator
|
|
||||||
]
|
|
||||||
need_stdout = true
|
|
||||||
allow_warnings = true
|
|
||||||
background = true
|
|
||||||
default_watch = false
|
|
||||||
|
|
||||||
# Run your long-running application (eg server) and have the result displayed in bacon.
|
|
||||||
# For programs that never stop (eg a server), `background` is set to false
|
|
||||||
# to have the cargo run output immediately displayed instead of waiting for
|
|
||||||
# program's end.
|
|
||||||
# 'on_change_strategy' is set to `kill_then_restart` to have your program restart
|
|
||||||
# on every change (an alternative would be to use the 'F5' key manually in bacon).
|
|
||||||
# If you often use this job, it makes sense to override the 'r' key by adding
|
|
||||||
# a binding `r = job:run-long` at the end of this file .
|
|
||||||
[jobs.run-server]
|
|
||||||
command = [
|
|
||||||
"cargo", "run", "serve"
|
|
||||||
# put launch parameters for your program behind a `--` separator
|
|
||||||
]
|
|
||||||
need_stdout = true
|
|
||||||
allow_warnings = true
|
|
||||||
background = false
|
background = false
|
||||||
on_change_strategy = "kill_then_restart"
|
on_change_strategy = "kill_then_restart"
|
||||||
kill = ["kill", "-s", "INT"]
|
kill = ["kill", "-s", "INT"]
|
||||||
watch = ["src", "templates"]
|
watch =["src", "templates"]
|
||||||
|
|
||||||
# This parameterized job runs the example of your choice, as soon
|
|
||||||
# as the code compiles.
|
|
||||||
# Call it as
|
|
||||||
# bacon ex -- my-example
|
|
||||||
[jobs.ex]
|
|
||||||
command = ["cargo", "run", "--example"]
|
|
||||||
need_stdout = true
|
|
||||||
allow_warnings = true
|
|
||||||
|
|
||||||
# You may define here keybindings that would be specific to
|
|
||||||
# a project, for example a shortcut to launch a specific job.
|
|
||||||
# Shortcuts to internal functions (scrolling, toggling, etc.)
|
|
||||||
# should go in your personal global prefs.toml file instead.
|
|
||||||
[keybindings]
|
|
||||||
r = "job:run-server"
|
|
||||||
w = "job:run-worker"
|
|
||||||
|
|
|
@ -7,7 +7,7 @@ CREATE INDEX ON users (uid);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS csrf_tokens (
|
CREATE TABLE IF NOT EXISTS csrf_tokens (
|
||||||
id UUID NOT NULL PRIMARY KEY,
|
id UUID NOT NULL PRIMARY KEY,
|
||||||
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
|
user_id UUID REFERENCES users(id),
|
||||||
created_at TIMESTAMPTZ NOT NULL
|
created_at TIMESTAMPTZ NOT NULL
|
||||||
);
|
);
|
||||||
CREATE INDEX ON csrf_tokens (created_at);
|
CREATE INDEX ON csrf_tokens (created_at);
|
||||||
|
@ -18,8 +18,9 @@ CREATE TABLE teams (
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE team_memberships (
|
CREATE TABLE team_memberships (
|
||||||
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
|
team_id UUID NOT NULL REFERENCES teams(id),
|
||||||
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
user_id UUID NOT NULL REFERENCES users(id),
|
||||||
|
roles TEXT[] NOT NULL DEFAULT '{}',
|
||||||
PRIMARY KEY (team_id, user_id)
|
PRIMARY KEY (team_id, user_id)
|
||||||
);
|
);
|
||||||
CREATE INDEX ON team_memberships (team_id);
|
CREATE INDEX ON team_memberships (team_id);
|
||||||
|
@ -27,7 +28,7 @@ CREATE INDEX ON team_memberships (user_id);
|
||||||
|
|
||||||
CREATE TABLE api_keys (
|
CREATE TABLE api_keys (
|
||||||
id UUID NOT NULL PRIMARY KEY,
|
id UUID NOT NULL PRIMARY KEY,
|
||||||
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
|
team_id UUID NOT NULL REFERENCES teams(id),
|
||||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
last_used_at TIMESTAMPTZ
|
last_used_at TIMESTAMPTZ
|
||||||
);
|
);
|
||||||
|
@ -43,15 +44,15 @@ CREATE INDEX ON projects(team_id);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS channels (
|
CREATE TABLE IF NOT EXISTS channels (
|
||||||
id UUID NOT NULL PRIMARY KEY,
|
id UUID NOT NULL PRIMARY KEY,
|
||||||
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
|
team_id UUID NOT NULL REFERENCES teams(id),
|
||||||
name TEXT NOT NULL,
|
name TEXT NOT NULL,
|
||||||
enable_by_default BOOLEAN NOT NULL DEFAULT FALSE,
|
enable_by_default BOOLEAN NOT NULL DEFAULT FALSE,
|
||||||
backend_config JSONB NOT NULL
|
backend_config JSONB NOT NULL DEFAULT '{}'::JSONB
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS channel_selections (
|
CREATE TABLE IF NOT EXISTS channel_selections (
|
||||||
project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
|
project_id UUID NOT NULL REFERENCES projects(id),
|
||||||
channel_id UUID NOT NULL REFERENCES channels(id) ON DELETE CASCADE,
|
channel_id UUID NOT NULL REFERENCES channels(id),
|
||||||
PRIMARY KEY (project_id, channel_id)
|
PRIMARY KEY (project_id, channel_id)
|
||||||
);
|
);
|
||||||
CREATE INDEX ON channel_selections (project_id);
|
CREATE INDEX ON channel_selections (project_id);
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
CREATE TABLE IF NOT EXISTS messages (
|
CREATE TABLE IF NOT EXISTS messages (
|
||||||
id UUID PRIMARY KEY NOT NULL,
|
id UUID PRIMARY KEY NOT NULL,
|
||||||
channel_id UUID NOT NULL REFERENCES channels (id) ON DELETE RESTRICT,
|
channel_id UUID NOT NULL REFERENCES channels (id),
|
||||||
project_id UUID NOT NULL REFERENCES projects (id) ON DELETE RESTRICT,
|
|
||||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
sent_at TIMESTAMPTZ,
|
sent_at TIMESTAMPTZ,
|
||||||
message TEXT NOT NULL
|
message TEXT NOT NULL
|
||||||
|
|
30
src/auth.rs
30
src/auth.rs
|
@ -13,7 +13,7 @@ use oauth2::{
|
||||||
ClientSecret, CsrfToken, RedirectUrl, RefreshToken, TokenResponse, TokenUrl,
|
ClientSecret, CsrfToken, RedirectUrl, RefreshToken, TokenResponse, TokenUrl,
|
||||||
};
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tracing::{trace_span, Instrument};
|
use tracing::trace_span;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
app_error::AppError,
|
app_error::AppError,
|
||||||
|
@ -197,21 +197,17 @@ impl FromRequestParts<AppState> for AuthInfo {
|
||||||
parts: &mut Parts,
|
parts: &mut Parts,
|
||||||
state: &AppState,
|
state: &AppState,
|
||||||
) -> Result<Self, <Self as FromRequestParts<AppState>>::Rejection> {
|
) -> Result<Self, <Self as FromRequestParts<AppState>>::Rejection> {
|
||||||
async move {
|
let _ = trace_span!("AuthInfo from_request_parts()").enter();
|
||||||
let session = parts
|
let session = parts
|
||||||
.extract_with_state::<AppSession, AppState>(state)
|
.extract_with_state::<AppSession, AppState>(state)
|
||||||
.await?
|
.await?
|
||||||
.0
|
.0
|
||||||
.ok_or(AppError::auth_redirect_from_base_path(
|
.ok_or(AppError::auth_redirect_from_base_path(
|
||||||
state.settings.base_path.clone(),
|
state.settings.base_path.clone(),
|
||||||
))?;
|
))?;
|
||||||
let user = session.get::<AuthInfo>(SESSION_KEY_AUTH_INFO).ok_or(
|
let user = session.get::<AuthInfo>(SESSION_KEY_AUTH_INFO).ok_or(
|
||||||
AppError::auth_redirect_from_base_path(state.settings.base_path.clone()),
|
AppError::auth_redirect_from_base_path(state.settings.base_path.clone()),
|
||||||
)?;
|
)?;
|
||||||
Ok(user)
|
Ok(user)
|
||||||
}
|
|
||||||
// The Span.enter() guard pattern doesn't play nicely async
|
|
||||||
.instrument(trace_span!("AuthInfo from_request_parts()"))
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
149
src/email.rs
149
src/email.rs
|
@ -7,7 +7,7 @@ use crate::app_state::AppState;
|
||||||
|
|
||||||
const POSTMARK_EMAIL_BATCH_URL: &'static str = "https://api.postmarkapp.com/email/batch";
|
const POSTMARK_EMAIL_BATCH_URL: &'static str = "https://api.postmarkapp.com/email/batch";
|
||||||
|
|
||||||
#[derive(Clone, Serialize)]
|
#[derive(Serialize)]
|
||||||
pub struct Message {
|
pub struct Message {
|
||||||
#[serde(rename = "From")]
|
#[serde(rename = "From")]
|
||||||
pub from: lettre::message::Mailbox,
|
pub from: lettre::message::Mailbox,
|
||||||
|
@ -21,11 +21,7 @@ pub struct Message {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait MailSender: Clone + Sync {
|
pub trait MailSender: Clone + Sync {
|
||||||
/**
|
async fn send_batch(&self, emails: Vec<Message>) -> Result<(), anyhow::Error>;
|
||||||
* Attempt to send all messages defined by the input Vec. Send as many as
|
|
||||||
* possible, returning exactly one Result<()> for each message.
|
|
||||||
*/
|
|
||||||
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>>;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
@ -52,7 +48,7 @@ impl Mailer {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MailSender for Mailer {
|
impl MailSender for Mailer {
|
||||||
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>> {
|
async fn send_batch(&self, emails: Vec<Message>) -> Result<(), anyhow::Error> {
|
||||||
match self {
|
match self {
|
||||||
Mailer::Smtp(sender) => sender.send_batch(emails).await,
|
Mailer::Smtp(sender) => sender.send_batch(emails).await,
|
||||||
Mailer::Postmark(sender) => sender.send_batch(emails).await,
|
Mailer::Postmark(sender) => sender.send_batch(emails).await,
|
||||||
|
@ -108,25 +104,11 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MailSender for SmtpSender {
|
impl MailSender for SmtpSender {
|
||||||
async fn send_batch(&self, emails: Vec<Message>) -> Vec<Result<()>> {
|
async fn send_batch(&self, emails: Vec<Message>) -> Result<()> {
|
||||||
let mut results: Vec<Result<()>> = Vec::with_capacity(emails.len());
|
|
||||||
for email in emails {
|
for email in emails {
|
||||||
match TryInto::<lettre::Message>::try_into(email) {
|
self.transport.send(email.try_into()?).await?;
|
||||||
Ok(email) => {
|
|
||||||
results.push(
|
|
||||||
self.transport
|
|
||||||
.send(email)
|
|
||||||
.await
|
|
||||||
.map(|_| ())
|
|
||||||
.map_err(Into::<anyhow::Error>::into),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
results.push(Err(err));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
results
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,114 +132,19 @@ impl PostmarkSender {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MailSender for PostmarkSender {
|
impl MailSender for PostmarkSender {
|
||||||
/**
|
async fn send_batch(&self, emails: Vec<Message>) -> Result<()> {
|
||||||
* Recursively attempts to send messages, breaking them into smaller and
|
if emails.len() > 500 {
|
||||||
* smaller batches as needed.
|
return Err(anyhow::anyhow!(
|
||||||
*/
|
"Postmark sends no more than 500 messages per batch"
|
||||||
async fn send_batch(&self, mut emails: Vec<Message>) -> Vec<Result<()>> {
|
));
|
||||||
/**
|
|
||||||
* Constructs a Vec with Ok(()) repeated n times.
|
|
||||||
*/
|
|
||||||
macro_rules! all_ok {
|
|
||||||
() => {{
|
|
||||||
let mut collection: Vec<Result<_>> = Vec::with_capacity(emails.len());
|
|
||||||
for _ in 0..emails.len() {
|
|
||||||
collection.push(Ok(()));
|
|
||||||
}
|
|
||||||
collection
|
|
||||||
}};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructs a Vec with a single specific error, followed by n-1
|
|
||||||
* generic errors referring back to it.
|
|
||||||
*/
|
|
||||||
macro_rules! cascade_err {
|
|
||||||
($err:expr) => {{
|
|
||||||
let mut collection: Vec<Result<_>> = Vec::with_capacity(emails.len());
|
|
||||||
collection.push(Err($err));
|
|
||||||
for _ in 1..emails.len() {
|
|
||||||
collection.push(Err(anyhow::anyhow!("could not send due to previous error")));
|
|
||||||
}
|
|
||||||
collection
|
|
||||||
}};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Recursively splits the email batch in half and tries to send each
|
|
||||||
* half independently, allowing both to run to completion and then
|
|
||||||
* returning the first error of the two results, if present.
|
|
||||||
*
|
|
||||||
* This is implemented as a macro in order to avoid unstable async
|
|
||||||
* closures.
|
|
||||||
*/
|
|
||||||
macro_rules! split_and_retry {
|
|
||||||
() => {
|
|
||||||
if emails.len() < 2 {
|
|
||||||
tracing::warn!("Postmark send batch cannot be split any further");
|
|
||||||
vec![Err(anyhow::anyhow!(
|
|
||||||
"unable to split Postmark batch into smaller slices"
|
|
||||||
))]
|
|
||||||
} else {
|
|
||||||
tracing::debug!("retrying Postmark send with smaller batches");
|
|
||||||
let mut results =
|
|
||||||
Box::pin(self.send_batch(emails.split_off(emails.len() / 2))).await;
|
|
||||||
results.extend(Box::pin(self.send_batch(emails)).await);
|
|
||||||
results
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
// https://postmarkapp.com/support/article/1056-what-are-the-attachment-and-email-size-limits
|
|
||||||
const POSTMARK_MAX_BATCH_ENTRIES: usize = 500;
|
|
||||||
const POSTMARK_MAX_REQUEST_BYTES: usize = 50 * 1000 * 1000;
|
|
||||||
// TODO: Check email subject and body size against Postmark limits
|
|
||||||
|
|
||||||
if emails.len() == 0 {
|
|
||||||
tracing::debug!("no Postmark messages to send");
|
|
||||||
vec![Ok(())]
|
|
||||||
} else if emails.len() > POSTMARK_MAX_BATCH_ENTRIES {
|
|
||||||
split_and_retry!()
|
|
||||||
} else {
|
|
||||||
let body = match serde_json::to_string(&emails) {
|
|
||||||
Ok(body) => body,
|
|
||||||
Err(err) => return cascade_err!(err.into()),
|
|
||||||
};
|
|
||||||
if body.len() > POSTMARK_MAX_REQUEST_BYTES {
|
|
||||||
if emails.len() > 1 {
|
|
||||||
split_and_retry!()
|
|
||||||
} else {
|
|
||||||
vec![Err(anyhow::anyhow!(
|
|
||||||
"Postmark requests may not exceed {} bytes",
|
|
||||||
POSTMARK_MAX_REQUEST_BYTES
|
|
||||||
))]
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
tracing::debug!("sending Postmark batch of {} messages", emails.len());
|
|
||||||
let resp = match self
|
|
||||||
.client
|
|
||||||
.post(POSTMARK_EMAIL_BATCH_URL)
|
|
||||||
.header("X-Postmark-Server-Token", &self.server_token)
|
|
||||||
.header(reqwest::header::CONTENT_TYPE, "application/json")
|
|
||||||
.body(body)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(resp) => resp,
|
|
||||||
Err(err) => return cascade_err!(err.into()),
|
|
||||||
};
|
|
||||||
if resp.status().is_client_error() && emails.len() > 1 {
|
|
||||||
split_and_retry!()
|
|
||||||
} else {
|
|
||||||
if let Err(err) = resp.error_for_status() {
|
|
||||||
cascade_err!(err.into())
|
|
||||||
} else {
|
|
||||||
tracing::debug!("sent Postmark batch of {} messages", emails.len());
|
|
||||||
all_ok!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
self.client
|
||||||
|
.post(POSTMARK_EMAIL_BATCH_URL)
|
||||||
|
.header("X-Postmark-Server-Token", &self.server_token)
|
||||||
|
.json(&emails)
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
77
src/main.rs
77
src/main.rs
|
@ -18,40 +18,16 @@ mod team_memberships;
|
||||||
mod teams;
|
mod teams;
|
||||||
mod users;
|
mod users;
|
||||||
mod v0_router;
|
mod v0_router;
|
||||||
mod worker;
|
|
||||||
|
|
||||||
use std::process::exit;
|
use std::process::exit;
|
||||||
|
|
||||||
use chrono::{TimeDelta, Utc};
|
|
||||||
use clap::{Parser, Subcommand};
|
|
||||||
use email::SmtpOptions;
|
use email::SmtpOptions;
|
||||||
use tokio::time::sleep;
|
|
||||||
use tracing_subscriber::EnvFilter;
|
use tracing_subscriber::EnvFilter;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
app_state::AppState, email::Mailer, router::new_router, sessions::PgStore, settings::Settings,
|
app_state::AppState, email::Mailer, router::new_router, sessions::PgStore, settings::Settings,
|
||||||
worker::run_worker,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Parser)]
|
|
||||||
#[command(version, about, long_about = None)]
|
|
||||||
struct Cli {
|
|
||||||
#[command(subcommand)]
|
|
||||||
command: Commands,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Subcommand)]
|
|
||||||
enum Commands {
|
|
||||||
/// Run web server
|
|
||||||
Serve,
|
|
||||||
/// Run background worker
|
|
||||||
Worker {
|
|
||||||
/// Loop the every n seconds instead of exiting after execution
|
|
||||||
#[arg(long)]
|
|
||||||
auto_loop_seconds: Option<u32>,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
let settings = Settings::load().unwrap();
|
let settings = Settings::load().unwrap();
|
||||||
|
@ -60,8 +36,6 @@ async fn main() {
|
||||||
.with_env_filter(EnvFilter::from_default_env())
|
.with_env_filter(EnvFilter::from_default_env())
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
let cli = Cli::parse();
|
|
||||||
|
|
||||||
let database_url = settings.database_url.clone();
|
let database_url = settings.database_url.clone();
|
||||||
let manager =
|
let manager =
|
||||||
deadpool_diesel::postgres::Manager::new(database_url, deadpool_diesel::Runtime::Tokio1);
|
deadpool_diesel::postgres::Manager::new(database_url, deadpool_diesel::Runtime::Tokio1);
|
||||||
|
@ -92,7 +66,7 @@ async fn main() {
|
||||||
};
|
};
|
||||||
|
|
||||||
let app_state = AppState {
|
let app_state = AppState {
|
||||||
db_pool: db_pool.clone(),
|
db_pool,
|
||||||
mailer,
|
mailer,
|
||||||
oauth_client,
|
oauth_client,
|
||||||
reqwest_client,
|
reqwest_client,
|
||||||
|
@ -100,43 +74,16 @@ async fn main() {
|
||||||
settings: settings.clone(),
|
settings: settings.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
match &cli.command {
|
let router = new_router(app_state);
|
||||||
Commands::Serve => {
|
|
||||||
let router = new_router(app_state);
|
|
||||||
|
|
||||||
let listener =
|
let listener = tokio::net::TcpListener::bind((settings.host.clone(), settings.port.clone()))
|
||||||
tokio::net::TcpListener::bind((settings.host.clone(), settings.port.clone()))
|
.await
|
||||||
.await
|
.unwrap();
|
||||||
.unwrap();
|
tracing::info!(
|
||||||
tracing::info!(
|
"App running at http://{}:{}{}",
|
||||||
"App running at http://{}:{}{}",
|
settings.host,
|
||||||
settings.host,
|
settings.port,
|
||||||
settings.port,
|
settings.base_path
|
||||||
settings.base_path
|
);
|
||||||
);
|
axum::serve(listener, router).await.unwrap();
|
||||||
axum::serve(listener, router).await.unwrap();
|
|
||||||
}
|
|
||||||
Commands::Worker { auto_loop_seconds } => {
|
|
||||||
if let Some(loop_seconds) = auto_loop_seconds {
|
|
||||||
let loop_delta = TimeDelta::seconds(i64::from(*loop_seconds));
|
|
||||||
loop {
|
|
||||||
let t_next_loop = Utc::now() + loop_delta;
|
|
||||||
|
|
||||||
if let Err(err) = run_worker(app_state.clone()).await {
|
|
||||||
tracing::error!("{}", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
let sleep_delta = t_next_loop - Utc::now();
|
|
||||||
match sleep_delta.to_std() {
|
|
||||||
Ok(duration) => {
|
|
||||||
sleep(duration).await;
|
|
||||||
}
|
|
||||||
Err(_) => { /* sleep_delta was < 0, so don't sleep */ }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
run_worker(app_state).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,10 +13,8 @@ use crate::{channels::Channel, schema::messages};
|
||||||
#[diesel(belongs_to(Channel))]
|
#[diesel(belongs_to(Channel))]
|
||||||
pub struct Message {
|
pub struct Message {
|
||||||
pub id: Uuid,
|
pub id: Uuid,
|
||||||
pub project_id: Uuid,
|
|
||||||
pub channel_id: Uuid,
|
pub channel_id: Uuid,
|
||||||
pub created_at: DateTime<Utc>,
|
pub created_at: DateTime<Utc>,
|
||||||
pub sent_at: Option<DateTime<Utc>>,
|
|
||||||
pub message: String,
|
pub message: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,9 +29,4 @@ impl Message {
|
||||||
pub fn with_channel(channel_id: Uuid) -> _ {
|
pub fn with_channel(channel_id: Uuid) -> _ {
|
||||||
messages::channel_id.eq(channel_id)
|
messages::channel_id.eq(channel_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[auto_type(no_type_alias)]
|
|
||||||
pub fn is_not_sent() -> _ {
|
|
||||||
messages::sent_at.is_null()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -214,6 +214,7 @@ async fn post_new_team(
|
||||||
let team_membership = TeamMembership {
|
let team_membership = TeamMembership {
|
||||||
team_id: team_id.clone(),
|
team_id: team_id.clone(),
|
||||||
user_id: current_user.id,
|
user_id: current_user.id,
|
||||||
|
roles: vec![Some("OWNER".to_string())],
|
||||||
};
|
};
|
||||||
db_conn
|
db_conn
|
||||||
.interact(move |conn| {
|
.interact(move |conn| {
|
||||||
|
@ -593,7 +594,7 @@ async fn update_channel_email_recipient(
|
||||||
subject: "Verify Your Email".to_string(),
|
subject: "Verify Your Email".to_string(),
|
||||||
text_body: format!("Your email verification code is: {}", verification_code),
|
text_body: format!("Your email verification code is: {}", verification_code),
|
||||||
};
|
};
|
||||||
mailer.send_batch(vec![email]).await.remove(0)?;
|
mailer.send_batch(vec![email]).await?;
|
||||||
|
|
||||||
Ok(Redirect::to(&format!(
|
Ok(Redirect::to(&format!(
|
||||||
"{}/teams/{}/channels/{}",
|
"{}/teams/{}/channels/{}",
|
||||||
|
|
|
@ -47,7 +47,6 @@ diesel::table! {
|
||||||
messages (id) {
|
messages (id) {
|
||||||
id -> Uuid,
|
id -> Uuid,
|
||||||
channel_id -> Uuid,
|
channel_id -> Uuid,
|
||||||
project_id -> Uuid,
|
|
||||||
created_at -> Timestamptz,
|
created_at -> Timestamptz,
|
||||||
sent_at -> Nullable<Timestamptz>,
|
sent_at -> Nullable<Timestamptz>,
|
||||||
message -> Text,
|
message -> Text,
|
||||||
|
@ -66,6 +65,7 @@ diesel::table! {
|
||||||
team_memberships (team_id, user_id) {
|
team_memberships (team_id, user_id) {
|
||||||
team_id -> Uuid,
|
team_id -> Uuid,
|
||||||
user_id -> Uuid,
|
user_id -> Uuid,
|
||||||
|
roles -> Array<Nullable<Text>>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,7 +90,6 @@ diesel::joinable!(channel_selections -> projects (project_id));
|
||||||
diesel::joinable!(channels -> teams (team_id));
|
diesel::joinable!(channels -> teams (team_id));
|
||||||
diesel::joinable!(csrf_tokens -> users (user_id));
|
diesel::joinable!(csrf_tokens -> users (user_id));
|
||||||
diesel::joinable!(messages -> channels (channel_id));
|
diesel::joinable!(messages -> channels (channel_id));
|
||||||
diesel::joinable!(messages -> projects (project_id));
|
|
||||||
diesel::joinable!(projects -> teams (team_id));
|
diesel::joinable!(projects -> teams (team_id));
|
||||||
diesel::joinable!(team_memberships -> teams (team_id));
|
diesel::joinable!(team_memberships -> teams (team_id));
|
||||||
diesel::joinable!(team_memberships -> users (user_id));
|
diesel::joinable!(team_memberships -> users (user_id));
|
||||||
|
|
|
@ -8,7 +8,7 @@ use axum::{
|
||||||
use axum_extra::extract::CookieJar;
|
use axum_extra::extract::CookieJar;
|
||||||
use chrono::{DateTime, TimeDelta, Utc};
|
use chrono::{DateTime, TimeDelta, Utc};
|
||||||
use diesel::{pg::Pg, prelude::*, upsert::excluded};
|
use diesel::{pg::Pg, prelude::*, upsert::excluded};
|
||||||
use tracing::{trace_span, Instrument};
|
use tracing::trace_span;
|
||||||
|
|
||||||
use crate::{app_error::AppError, app_state::AppState, schema::browser_sessions};
|
use crate::{app_error::AppError, app_state::AppState, schema::browser_sessions};
|
||||||
|
|
||||||
|
@ -134,7 +134,7 @@ impl FromRequestParts<AppState> for AppSession {
|
||||||
parts: &mut Parts,
|
parts: &mut Parts,
|
||||||
state: &AppState,
|
state: &AppState,
|
||||||
) -> Result<Self, <Self as FromRequestParts<AppState>>::Rejection> {
|
) -> Result<Self, <Self as FromRequestParts<AppState>>::Rejection> {
|
||||||
async move {
|
let _ = trace_span!("AppSession::from_request_parts()").enter();
|
||||||
let jar = parts.extract::<CookieJar>().await.unwrap();
|
let jar = parts.extract::<CookieJar>().await.unwrap();
|
||||||
let session_cookie = match jar.get(&state.settings.auth.cookie_name) {
|
let session_cookie = match jar.get(&state.settings.auth.cookie_name) {
|
||||||
Some(cookie) => cookie,
|
Some(cookie) => cookie,
|
||||||
|
@ -167,7 +167,5 @@ impl FromRequestParts<AppState> for AppSession {
|
||||||
tracing::debug!("no matching session found in database");
|
tracing::debug!("no matching session found in database");
|
||||||
Ok(AppSession(None))
|
Ok(AppSession(None))
|
||||||
}
|
}
|
||||||
// The Span.enter() guard pattern doesn't play nicely async
|
|
||||||
}.instrument(trace_span!("AppSession::from_request_parts()")).await
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ use crate::{
|
||||||
pub struct TeamMembership {
|
pub struct TeamMembership {
|
||||||
pub team_id: Uuid,
|
pub team_id: Uuid,
|
||||||
pub user_id: Uuid,
|
pub user_id: Uuid,
|
||||||
|
pub roles: Vec<Option<String>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TeamMembership {
|
impl TeamMembership {
|
||||||
|
|
104
src/v0_router.rs
104
src/v0_router.rs
|
@ -1,6 +1,6 @@
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::Query,
|
extract::{Query, State},
|
||||||
response::{IntoResponse, Json},
|
response::{IntoResponse, Json},
|
||||||
routing::get,
|
routing::get,
|
||||||
Router,
|
Router,
|
||||||
|
@ -14,9 +14,11 @@ use crate::{
|
||||||
api_keys::ApiKey,
|
api_keys::ApiKey,
|
||||||
app_error::AppError,
|
app_error::AppError,
|
||||||
app_state::{AppState, DbConn},
|
app_state::{AppState, DbConn},
|
||||||
channels::Channel,
|
channels::{Channel, EmailBackendConfig},
|
||||||
|
email::{MailSender as _, Mailer},
|
||||||
projects::Project,
|
projects::Project,
|
||||||
schema::{api_keys, messages, projects},
|
schema::{api_keys, projects},
|
||||||
|
settings::Settings,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub fn new_router(state: AppState) -> Router<AppState> {
|
pub fn new_router(state: AppState) -> Router<AppState> {
|
||||||
|
@ -31,6 +33,11 @@ struct SayQuery {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn say_get(
|
async fn say_get(
|
||||||
|
State(Settings {
|
||||||
|
email: email_settings,
|
||||||
|
..
|
||||||
|
}): State<Settings>,
|
||||||
|
State(mailer): State<Mailer>,
|
||||||
DbConn(db_conn): DbConn,
|
DbConn(db_conn): DbConn,
|
||||||
Query(query): Query<SayQuery>,
|
Query(query): Query<SayQuery>,
|
||||||
) -> Result<impl IntoResponse, AppError> {
|
) -> Result<impl IntoResponse, AppError> {
|
||||||
|
@ -51,68 +58,55 @@ async fn say_get(
|
||||||
.unwrap()?
|
.unwrap()?
|
||||||
};
|
};
|
||||||
|
|
||||||
let project = {
|
let selected_channels = {
|
||||||
let project_name = query.project.to_lowercase();
|
let project_name = query.project.to_lowercase();
|
||||||
db_conn
|
db_conn
|
||||||
.interact::<_, Result<Project, AppError>>(move |conn| {
|
.interact::<_, Result<Vec<Channel>, AppError>>(move |conn| {
|
||||||
conn.transaction(move |conn| {
|
conn.transaction(move |conn| {
|
||||||
Ok(
|
let project = match Project::all()
|
||||||
match Project::all()
|
.filter(Project::with_team(api_key.team_id))
|
||||||
.filter(Project::with_team(api_key.team_id))
|
.filter(Project::with_name(project_name.clone()))
|
||||||
.filter(Project::with_name(project_name.clone()))
|
.first(conn)
|
||||||
.first(conn)
|
.optional()
|
||||||
.optional()
|
.context("failed to load project")?
|
||||||
.context("failed to load project")?
|
{
|
||||||
{
|
Some(project) => project,
|
||||||
Some(project) => project,
|
None => insert_into(projects::table)
|
||||||
None => insert_into(projects::table)
|
.values((
|
||||||
.values((
|
projects::id.eq(Uuid::now_v7()),
|
||||||
projects::id.eq(Uuid::now_v7()),
|
projects::team_id.eq(api_key.team_id),
|
||||||
projects::team_id.eq(api_key.team_id),
|
projects::name.eq(project_name),
|
||||||
projects::name.eq(project_name),
|
))
|
||||||
))
|
.get_result(conn)
|
||||||
.get_result(conn)
|
.context("failed to insert project")?,
|
||||||
.context("failed to insert project")?,
|
};
|
||||||
},
|
Ok(project
|
||||||
)
|
.selected_channels()
|
||||||
|
.load(conn)
|
||||||
|
.context("failed to load selected channels")?)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap()?
|
.unwrap()?
|
||||||
};
|
};
|
||||||
let selected_channels = {
|
|
||||||
let project = project.clone();
|
|
||||||
db_conn
|
|
||||||
.interact::<_, Result<Vec<Channel>, AppError>>(move |conn| {
|
|
||||||
Ok(project
|
|
||||||
.selected_channels()
|
|
||||||
.load(conn)
|
|
||||||
.context("failed to load selected channels")?)
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap()?
|
|
||||||
};
|
|
||||||
|
|
||||||
{
|
for channel in selected_channels {
|
||||||
let selected_channels = selected_channels.clone();
|
if let Ok(config) = TryInto::<EmailBackendConfig>::try_into(channel.backend_config) {
|
||||||
db_conn
|
if config.verified {
|
||||||
.interact::<_, Result<_, AppError>>(move |conn| {
|
let recipient: lettre::message::Mailbox = config.recipient.parse()?;
|
||||||
for channel in selected_channels {
|
let email = crate::email::Message {
|
||||||
insert_into(messages::table)
|
from: email_settings.message_from.clone().into(),
|
||||||
.values((
|
to: recipient.into(),
|
||||||
messages::id.eq(Uuid::now_v7()),
|
subject: "Shout".to_string(),
|
||||||
messages::channel_id.eq(&channel.id),
|
text_body: query.message.clone(),
|
||||||
messages::project_id.eq(&project.id),
|
};
|
||||||
messages::message.eq(&query.message),
|
tracing::info!("Sending email to recipient for channel {}", channel.id);
|
||||||
))
|
mailer.send_batch(vec![email]).await?;
|
||||||
.execute(conn)?;
|
} else {
|
||||||
}
|
tracing::info!("Email recipient for channel {} is not verified", channel.id);
|
||||||
Ok(())
|
}
|
||||||
})
|
}
|
||||||
.await
|
|
||||||
.unwrap()?;
|
|
||||||
}
|
}
|
||||||
tracing::debug!("queued {} messages", selected_channels.len());
|
|
||||||
|
|
||||||
Ok(Json(json!({ "ok": true })))
|
Ok(Json(json!({ "ok": true })))
|
||||||
}
|
}
|
||||||
|
|
111
src/worker.rs
111
src/worker.rs
|
@ -1,111 +0,0 @@
|
||||||
use anyhow::{Context as _, Result};
|
|
||||||
use diesel::prelude::*;
|
|
||||||
use tracing::Instrument as _;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
app_state::AppState,
|
|
||||||
channels::{Channel, EmailBackendConfig},
|
|
||||||
email::MailSender,
|
|
||||||
messages::Message,
|
|
||||||
schema::{channels, messages},
|
|
||||||
};
|
|
||||||
|
|
||||||
pub async fn run_worker(state: AppState) -> Result<()> {
|
|
||||||
async move {
|
|
||||||
process_messages(state).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
.instrument(tracing::debug_span!("run_worker()"))
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Process messages from the queue in the `messages` table. Insertions to the
|
|
||||||
* queue are rate limited per team and per project, so no effort should be
|
|
||||||
* needed here to enforce fairness.
|
|
||||||
*/
|
|
||||||
async fn process_messages(state: AppState) -> Result<()> {
|
|
||||||
async move {
|
|
||||||
const MESSAGE_QUEUE_LIMIT: i64 = 250;
|
|
||||||
let db_conn = state.db_pool.get().await?;
|
|
||||||
let queued_messages = db_conn
|
|
||||||
.interact::<_, Result<Vec<(Message, Channel)>>>(move |conn| {
|
|
||||||
messages::table
|
|
||||||
.inner_join(channels::table)
|
|
||||||
.select((Message::as_select(), Channel::as_select()))
|
|
||||||
.filter(Message::is_not_sent())
|
|
||||||
.order(messages::created_at.asc())
|
|
||||||
.limit(MESSAGE_QUEUE_LIMIT)
|
|
||||||
.load(conn)
|
|
||||||
.context("failed to load queued messages")
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap()?;
|
|
||||||
// Dispatch email messages together to take advantage of Postmark's
|
|
||||||
// batch send API
|
|
||||||
let emails: Vec<(Uuid, crate::email::Message)> = queued_messages
|
|
||||||
.iter()
|
|
||||||
.filter_map(|(message, channel)| {
|
|
||||||
if let Ok(backend_config) =
|
|
||||||
TryInto::<EmailBackendConfig>::try_into(channel.backend_config.clone())
|
|
||||||
{
|
|
||||||
if backend_config.verified {
|
|
||||||
let recipient: lettre::message::Mailbox = if let Ok(recipient) =
|
|
||||||
backend_config.recipient.parse()
|
|
||||||
{
|
|
||||||
recipient
|
|
||||||
} else {
|
|
||||||
tracing::error!("failed to parse recipient for channel {}", channel.id);
|
|
||||||
return None;
|
|
||||||
};
|
|
||||||
let email = crate::email::Message {
|
|
||||||
from: state.settings.email.message_from.clone().into(),
|
|
||||||
to: recipient.into(),
|
|
||||||
subject: "Shout".to_string(),
|
|
||||||
text_body: message.message.clone(),
|
|
||||||
};
|
|
||||||
tracing::debug!("Sending email to recipient for channel {}", channel.id);
|
|
||||||
Some((message.id.clone(), email))
|
|
||||||
} else {
|
|
||||||
tracing::info!(
|
|
||||||
"Email recipient for channel {} is not verified",
|
|
||||||
channel.id
|
|
||||||
);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
if !emails.is_empty() {
|
|
||||||
let message_ids: Vec<Uuid> = emails.iter().map(|(id, _)| id.clone()).collect();
|
|
||||||
let results = state
|
|
||||||
.mailer
|
|
||||||
.send_batch(emails.into_iter().map(|(_, email)| email).collect())
|
|
||||||
.await;
|
|
||||||
assert!(results.len() == message_ids.len());
|
|
||||||
let results_by_id = message_ids.into_iter().zip(results.into_iter());
|
|
||||||
db_conn
|
|
||||||
.interact::<_, Result<_>>(move |conn| {
|
|
||||||
for (id, result) in results_by_id {
|
|
||||||
if let Err(err) = result {
|
|
||||||
tracing::error!("error sending message {}: {:?}", id, err);
|
|
||||||
} else {
|
|
||||||
diesel::update(messages::table.filter(messages::id.eq(id)))
|
|
||||||
.set(messages::sent_at.eq(diesel::dsl::now))
|
|
||||||
.execute(conn)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap()?;
|
|
||||||
}
|
|
||||||
tracing::info!("finished processing messages");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
.instrument(tracing::debug_span!("process_messages()"))
|
|
||||||
.await
|
|
||||||
}
|
|
Loading…
Add table
Reference in a new issue