implement watchdog timers

This commit is contained in:
Brent Schroeter 2025-04-23 12:57:10 -07:00
parent 6da087e17f
commit 819caae914
25 changed files with 964 additions and 434 deletions

32
Cargo.lock generated
View file

@ -764,6 +764,37 @@ dependencies = [
"powerfmt", "powerfmt",
] ]
[[package]]
name = "derive_builder"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "507dfb09ea8b7fa618fcf76e953f4f5e192547945816d5358edffe39f6f94947"
dependencies = [
"derive_builder_macro",
]
[[package]]
name = "derive_builder_core"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8"
dependencies = [
"darling",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "derive_builder_macro"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c"
dependencies = [
"derive_builder_core",
"syn",
]
[[package]] [[package]]
name = "diesel" name = "diesel"
version = "2.2.8" version = "2.2.8"
@ -2644,6 +2675,7 @@ dependencies = [
"clap", "clap",
"config", "config",
"deadpool-diesel", "deadpool-diesel",
"derive_builder",
"diesel", "diesel",
"diesel_migrations", "diesel_migrations",
"dotenvy", "dotenvy",

View file

@ -14,6 +14,7 @@ chrono = { version = "0.4.39", features = ["serde"] }
clap = { version = "4.5.31", features = ["derive"] } clap = { version = "4.5.31", features = ["derive"] }
config = "0.14.1" config = "0.14.1"
deadpool-diesel = { version = "0.6.1", features = ["postgres", "serde"] } deadpool-diesel = { version = "0.6.1", features = ["postgres", "serde"] }
derive_builder = "0.20.2"
diesel = { version = "2.2.6", features = ["postgres", "chrono", "uuid", "serde_json"] } diesel = { version = "2.2.6", features = ["postgres", "chrono", "uuid", "serde_json"] }
diesel_migrations = { version = "2.2.0", features = ["postgres"] } diesel_migrations = { version = "2.2.0", features = ["postgres"] }
dotenvy = "0.15.7" dotenvy = "0.15.7"

View file

@ -0,0 +1 @@
DROP TABLE IF EXISTS watchdogs;

View file

@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS watchdogs (
id UUID PRIMARY KEY NOT NULL,
project_id UUID UNIQUE NOT NULL REFERENCES projects (id) ON DELETE RESTRICT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_set_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expiration TIMESTAMPTZ NOT NULL,
notified BOOLEAN NOT NULL DEFAULT FALSE
);
CREATE INDEX ON watchdogs (project_id);
CREATE INDEX ON watchdogs (expiration);

View file

@ -1,21 +1,21 @@
use anyhow::Result; use anyhow::{Context as _, Result};
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _}; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use deadpool_diesel::postgres::Connection;
use diesel::{ use diesel::{
dsl::{auto_type, AsSelect}, dsl::{auto_type, update, AsSelect},
pg::Pg, pg::Pg,
prelude::*, prelude::*,
}; };
use uuid::Uuid; use uuid::Uuid;
use crate::{app_error::AppError, schema::api_keys, teams::Team}; use crate::{app_error::AppError, schema::api_keys};
pub use crate::schema::api_keys::{dsl, table};
/// A team-scoped application key for authenticating API calls to /say, etc. /// A team-scoped application key for authenticating API calls to /say, etc.
/// Does not authorize any administrative functions besides creating projects. /// Does not authorize any administrative functions besides creating projects.
#[derive(Associations, Clone, Debug, Identifiable, Insertable, Queryable, Selectable)] #[derive(Clone, Debug, Identifiable, Insertable, Queryable, Selectable)]
#[diesel(table_name = api_keys)] #[diesel(table_name = api_keys)]
#[diesel(belongs_to(Team))]
pub struct ApiKey { pub struct ApiKey {
pub id: Uuid, pub id: Uuid,
pub team_id: Uuid, pub team_id: Uuid,
@ -23,38 +23,28 @@ pub struct ApiKey {
} }
impl ApiKey { impl ApiKey {
pub async fn generate_for_team(db_conn: &Connection, team_id: Uuid) -> Result<Self, AppError> { pub fn new_from_team_id(team_id: Uuid) -> Self {
let api_key = Self { Self {
team_id, team_id,
id: Uuid::new_v4(), id: Uuid::new_v4(),
last_used_at: None, last_used_at: None,
}; }
let api_key_copy = api_key.clone();
db_conn
.interact(move |conn| {
diesel::insert_into(api_keys::table)
.values(api_key_copy)
.execute(conn)
})
.await
.unwrap()?;
Ok(api_key)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn all() -> _ { pub fn all() -> _ {
let select: AsSelect<ApiKey, Pg> = ApiKey::as_select(); let select: AsSelect<Self, Pg> = Self::as_select();
api_keys::table.select(select) table.select(select)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn with_id<'a>(id: &'a Uuid) -> _ { pub fn with_id<'a>(id: &'a Uuid) -> _ {
api_keys::id.eq(id) dsl::id.eq(id)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn with_team<'a>(team_id: &'a Uuid) -> _ { pub fn with_team<'a>(team_id: &'a Uuid) -> _ {
api_keys::team_id.eq(team_id) dsl::team_id.eq(team_id)
} }
} }
@ -78,3 +68,15 @@ pub fn try_parse_as_uuid(value: &str) -> Result<Uuid> {
Uuid::try_parse(value).or(Err(anyhow::anyhow!("failed to parse"))) Uuid::try_parse(value).or(Err(anyhow::anyhow!("failed to parse")))
} }
} }
pub fn use_api_key(key_id: &str, db_conn: &mut PgConnection) -> Result<ApiKey, AppError> {
let normalized_id =
try_parse_as_uuid(key_id).or(Err(AppError::Forbidden("Key not accepted.".to_string())))?;
update(table.filter(ApiKey::with_id(&normalized_id)))
.set(dsl::last_used_at.eq(diesel::dsl::now))
.returning(ApiKey::as_returning())
.get_result(db_conn)
.optional()
.context("failed to load api key")?
.ok_or(AppError::Forbidden("Key not accepted.".to_owned()))
}

View file

@ -7,7 +7,9 @@ use uuid::Uuid;
use crate::schema::channel_selections; use crate::schema::channel_selections;
#[derive(Associations, Clone, Debug, Identifiable, Queryable, Selectable)] pub use crate::schema::channel_selections::{dsl, table};
#[derive(Associations, Clone, Debug, Identifiable, Insertable, Queryable, Selectable)]
#[diesel(belongs_to(crate::channels::Channel))] #[diesel(belongs_to(crate::channels::Channel))]
#[diesel(belongs_to(crate::projects::Project))] #[diesel(belongs_to(crate::projects::Project))]
#[diesel(primary_key(channel_id, project_id))] #[diesel(primary_key(channel_id, project_id))]

View file

@ -1,5 +1,6 @@
use std::fmt::Debug; use std::fmt::Debug;
use derive_builder::Builder;
use diesel::{ use diesel::{
backend::Backend, backend::Backend,
deserialize::{self, FromSql, FromSqlRow}, deserialize::{self, FromSql, FromSqlRow},
@ -14,7 +15,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use uuid::Uuid; use uuid::Uuid;
use crate::{schema::channels, teams::Team}; use crate::schema::channels;
pub const CHANNEL_BACKEND_EMAIL: &str = "email"; pub const CHANNEL_BACKEND_EMAIL: &str = "email";
pub const CHANNEL_BACKEND_SLACK: &str = "slack"; pub const CHANNEL_BACKEND_SLACK: &str = "slack";
@ -25,8 +26,7 @@ pub use crate::schema::channels::{dsl, table};
/// defined in the backend_config field. A single channel may be attached to /// defined in the backend_config field. A single channel may be attached to
/// (in other words, "enabled" or "selected" for) any number of projects within /// (in other words, "enabled" or "selected" for) any number of projects within
/// the same team. /// the same team.
#[derive(Associations, Clone, Debug, Identifiable, Queryable, Selectable)] #[derive(Clone, Debug, Identifiable, Queryable, Selectable)]
#[diesel(belongs_to(Team))]
#[diesel(check_for_backend(Pg))] #[diesel(check_for_backend(Pg))]
pub struct Channel { pub struct Channel {
pub id: Uuid, pub id: Uuid,
@ -57,6 +57,23 @@ impl Channel {
pub fn where_enabled_by_default() -> _ { pub fn where_enabled_by_default() -> _ {
channels::enable_by_default.eq(true) channels::enable_by_default.eq(true)
} }
pub fn insertable_builder() -> InsertableChannelBuilder {
InsertableChannelBuilder::default()
}
}
#[derive(Builder, Clone, Debug, Insertable)]
#[diesel(table_name = channels)]
#[builder(pattern = "owned", setter(prefix = "with"))]
pub struct InsertableChannel {
#[builder(setter(skip), default = "uuid::Uuid::now_v7()")]
id: Uuid,
team_id: Uuid,
name: String,
#[builder(setter(strip_option), default)]
enable_by_default: Option<bool>,
backend_config: BackendConfig,
} }
// Note: In a previous implementation, channel configuration was handled by // Note: In a previous implementation, channel configuration was handled by

View file

@ -1,8 +1,8 @@
use anyhow::Context as _; use anyhow::{Context as _, Result};
use askama::Template; use askama::Template;
use axum::{ use axum::{
extract::{Path, State}, extract::{OriginalUri, Path, State},
response::{Html, IntoResponse, Redirect}, response::{Html, IntoResponse as _, Redirect, Response},
routing::{get, post}, routing::{get, post},
Router, Router,
}; };
@ -16,17 +16,16 @@ use crate::{
app_error::AppError, app_error::AppError,
app_state::{AppState, DbConn, ReqwestClient}, app_state::{AppState, DbConn, ReqwestClient},
channels::{ channels::{
BackendConfig, Channel, EmailBackendConfig, SlackBackendConfig, CHANNEL_BACKEND_EMAIL, self, BackendConfig, Channel, EmailBackendConfig, SlackBackendConfig,
CHANNEL_BACKEND_SLACK, CHANNEL_BACKEND_EMAIL, CHANNEL_BACKEND_SLACK,
}, },
csrf::generate_csrf_token, csrf::generate_csrf_token,
email::{is_permissible_email, MailSender as _, Mailer}, email::{is_permissible_email, MailSender as _, Mailer},
guards, guards,
nav::{BreadcrumbTrail, Navbar, NavbarBuilder, NAVBAR_ITEM_CHANNELS}, nav::{BreadcrumbTrail, Navbar, NavbarBuilder, NAVBAR_ITEM_CHANNELS},
schema::channels,
settings::{Settings, SlackSettings}, settings::{Settings, SlackSettings},
slack_auth, slack_auth,
slack_utils::{self, ConversationType, SlackClient}, slack_utils::{self, ConversationType, SlackClient, SlackError},
users::CurrentUser, users::CurrentUser,
}; };
@ -82,7 +81,7 @@ async fn channels_page(
DbConn(db_conn): DbConn, DbConn(db_conn): DbConn,
Path(team_id): Path<Uuid>, Path(team_id): Path<Uuid>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?; let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?;
let channels = { let channels = {
@ -139,41 +138,42 @@ async fn post_new_channel(
Path(team_id): Path<Uuid>, Path(team_id): Path<Uuid>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
Form(form_body): Form<NewChannelPostFormBody>, Form(form_body): Form<NewChannelPostFormBody>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?; let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?;
guards::require_valid_csrf_token(&form_body.csrf_token, &current_user, &db_conn).await?; guards::require_valid_csrf_token(&form_body.csrf_token, &current_user, &db_conn).await?;
let channel_id = Uuid::now_v7();
let channel = match form_body.channel_type.as_str() { let channel = match form_body.channel_type.as_str() {
CHANNEL_BACKEND_EMAIL => db_conn CHANNEL_BACKEND_EMAIL => db_conn
.interact::<_, Result<Channel, AppError>>(move |conn| { .interact(move |conn| -> Result<Channel> {
Ok(diesel::insert_into(channels::table) diesel::insert_into(channels::table)
.values(( .values(
channels::id.eq(channel_id), Channel::insertable_builder()
channels::team_id.eq(team_id), .with_team_id(team_id)
channels::name.eq("Untitled Email Channel"), .with_name("Untitled Email Channel".to_owned())
channels::backend_config .with_backend_config(EmailBackendConfig::default().into())
.eq(Into::<BackendConfig>::into(EmailBackendConfig::default())), .build()
)) .context("failed to build insertable channel")?,
)
.returning(Channel::as_returning()) .returning(Channel::as_returning())
.get_result(conn) .get_result(conn)
.context("Failed to insert new EmailChannel.")?) .context("Failed to insert new EmailChannel.")
}) })
.await .await
.unwrap()?, .unwrap()?,
CHANNEL_BACKEND_SLACK => db_conn CHANNEL_BACKEND_SLACK => db_conn
.interact::<_, Result<Channel, AppError>>(move |conn| { .interact(move |conn| -> Result<Channel> {
Ok(diesel::insert_into(channels::table) diesel::insert_into(channels::table)
.values(( .values(
channels::id.eq(channel_id), Channel::insertable_builder()
channels::team_id.eq(team_id), .with_team_id(team_id)
channels::name.eq("Untitled Slack Channel"), .with_name("Untitled Slack Channel".to_owned())
channels::backend_config .with_backend_config(SlackBackendConfig::default().into())
.eq(Into::<BackendConfig>::into(SlackBackendConfig::default())), .build()
)) .context("failed to build insertable channel")?,
)
.returning(Channel::as_returning()) .returning(Channel::as_returning())
.get_result(conn) .get_result(conn)
.context("Failed to insert new SlackChannel.")?) .context("Failed to insert new SlackChannel.")
}) })
.await .await
.unwrap()?, .unwrap()?,
@ -189,7 +189,8 @@ async fn post_new_channel(
base_path, base_path,
team.id.simple(), team.id.simple(),
channel.id.simple() channel.id.simple()
))) ))
.into_response())
} }
async fn channel_page( async fn channel_page(
@ -206,7 +207,8 @@ async fn channel_page(
DbConn(db_conn): DbConn, DbConn(db_conn): DbConn,
Path((team_id, channel_id)): Path<(Uuid, Uuid)>, Path((team_id, channel_id)): Path<(Uuid, Uuid)>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
) -> Result<impl IntoResponse, AppError> { OriginalUri(original_uri): OriginalUri,
) -> Result<Response, AppError> {
let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?; let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?;
let channel = { let channel = {
@ -260,21 +262,53 @@ async fn channel_page(
.build(), .build(),
} }
.render()?, .render()?,
)) )
.into_response())
} }
BackendConfig::Slack(slack_data) => { BackendConfig::Slack(slack_data) => {
let slack_client = slack_data.oauth_tokens.map(|tokens| { let slack_client = slack_data.oauth_tokens.clone().map(|tokens| {
SlackClient::new(&tokens.access_token) SlackClient::new(&tokens.access_token)
.with_reqwest_client(reqwest_client) .with_reqwest_client(reqwest_client)
.with_api_root(&slack_api_root) .with_api_root(&slack_api_root)
}); });
let slack_channels = if let Some(client) = slack_client { let slack_channels = if let Some(client) = slack_client {
client match client
.list_conversations() .list_conversations()
.with_types([ConversationType::PublicChannel]) .with_types([ConversationType::PublicChannel])
.with_exclude_archived(true) .with_exclude_archived(true)
.load_all() .load_all()
.await? .await
{
Err(SlackError::Api(slack_utils::ApiError {
error: slack_utils::ErrorCode::AccountInactive,
})) => {
// Access needs to be reauthorized.
tracing::info!("encountered account_inactive error for slack backend of channel {}; resetting oauth tokens", channel.id);
let new_slack_data = SlackBackendConfig {
oauth_tokens: None,
..slack_data
};
db_conn
.interact(move |conn| -> Result<()> {
diesel::update(
channels::table.filter(Channel::with_id(&channel.id)),
)
.set(
channels::dsl::backend_config
.eq(BackendConfig::from(new_slack_data)),
)
.execute(conn)
.context("failed to clear oauth tokens on slack backend config")
.and(Ok(()))
})
.await
.unwrap()?;
// Have the HTTP client refresh now that the old OAuth
// tokens have been cleared.
return Ok(Redirect::to(&original_uri.to_string()).into_response());
}
other => other,
}?
} else { } else {
Vec::new() Vec::new()
}; };
@ -306,7 +340,8 @@ async fn channel_page(
slack_channels, slack_channels,
} }
.render()?, .render()?,
)) )
.into_response())
} }
} }
} }
@ -324,7 +359,7 @@ async fn update_channel(
Path((team_id, channel_id)): Path<(Uuid, Uuid)>, Path((team_id, channel_id)): Path<(Uuid, Uuid)>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
Form(form_body): Form<UpdateChannelFormBody>, Form(form_body): Form<UpdateChannelFormBody>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
guards::require_valid_csrf_token(&form_body.csrf_token, &current_user, &db_conn).await?; guards::require_valid_csrf_token(&form_body.csrf_token, &current_user, &db_conn).await?;
guards::require_team_membership(&current_user, &team_id, &db_conn).await?; guards::require_team_membership(&current_user, &team_id, &db_conn).await?;
@ -337,8 +372,8 @@ async fn update_channel(
.filter(Channel::with_team(&team_id)), .filter(Channel::with_team(&team_id)),
) )
.set(( .set((
channels::name.eq(form_body.name), channels::dsl::name.eq(form_body.name),
channels::enable_by_default channels::dsl::enable_by_default
.eq(form_body.enable_by_default.unwrap_or("false".to_string()) == "true"), .eq(form_body.enable_by_default.unwrap_or("false".to_string()) == "true"),
)) ))
.execute(conn) .execute(conn)
@ -379,7 +414,7 @@ async fn update_channel_email_recipient(
Path((team_id, channel_id)): Path<(Uuid, Uuid)>, Path((team_id, channel_id)): Path<(Uuid, Uuid)>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
Form(form_body): Form<UpdateChannelEmailRecipientFormBody>, Form(form_body): Form<UpdateChannelEmailRecipientFormBody>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
guards::require_valid_csrf_token(&form_body.csrf_token, &current_user, &db_conn).await?; guards::require_valid_csrf_token(&form_body.csrf_token, &current_user, &db_conn).await?;
guards::require_team_membership(&current_user, &team_id, &db_conn).await?; guards::require_team_membership(&current_user, &team_id, &db_conn).await?;
@ -402,14 +437,14 @@ async fn update_channel_email_recipient(
// TODO: transaction retries // TODO: transaction retries
conn.transaction::<_, AppError, _>(move |conn| { conn.transaction::<_, AppError, _>(move |conn| {
let channel = get_channel_by_params(conn, &team_id, &channel_id)?; let channel = get_channel_by_params(conn, &team_id, &channel_id)?;
let new_config = BackendConfig::Email(EmailBackendConfig { let new_config = EmailBackendConfig {
recipient, recipient,
verification_code, verification_code,
verification_code_guesses: 0, verification_code_guesses: 0,
..channel.backend_config.try_into()? ..channel.backend_config.try_into()?
}); };
let num_rows = diesel::update(channels::table.filter(Channel::with_id(&channel.id))) let num_rows = diesel::update(channels::table.filter(Channel::with_id(&channel.id)))
.set(channels::backend_config.eq(new_config)) .set(channels::dsl::backend_config.eq(BackendConfig::from(new_config)))
.execute(conn)?; .execute(conn)?;
if num_rows != 1 { if num_rows != 1 {
return Err(anyhow::anyhow!( return Err(anyhow::anyhow!(
@ -447,7 +482,8 @@ async fn update_channel_email_recipient(
base_path, base_path,
team_id.simple(), team_id.simple(),
channel_id.simple() channel_id.simple()
))) ))
.into_response())
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -462,7 +498,7 @@ async fn update_channel_slack_conversation(
Path((team_id, channel_id)): Path<(Uuid, Uuid)>, Path((team_id, channel_id)): Path<(Uuid, Uuid)>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
Form(form): Form<UpdateChannelSlackConversationFormBody>, Form(form): Form<UpdateChannelSlackConversationFormBody>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?; guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?;
guards::require_team_membership(&current_user, &team_id, &db_conn).await?; guards::require_team_membership(&current_user, &team_id, &db_conn).await?;
@ -491,7 +527,7 @@ async fn update_channel_slack_conversation(
// TODO: Ensure this holds true with private channels and groups. // TODO: Ensure this holds true with private channels and groups.
slack_data.conversation_id = Some(form.conversation_id); slack_data.conversation_id = Some(form.conversation_id);
let num_rows = diesel::update(channels::table.filter(Channel::with_id(&channel.id))) let num_rows = diesel::update(channels::table.filter(Channel::with_id(&channel.id)))
.set(channels::backend_config.eq(BackendConfig::from(slack_data))) .set(channels::dsl::backend_config.eq(BackendConfig::from(slack_data)))
.execute(conn)?; .execute(conn)?;
tracing::debug!("updated {} rows", num_rows); tracing::debug!("updated {} rows", num_rows);
// If the channel is deleted while this db interaction is running, 0 // If the channel is deleted while this db interaction is running, 0
@ -509,7 +545,8 @@ async fn update_channel_slack_conversation(
base_path, base_path,
team_id.simple(), team_id.simple(),
channel_id.simple() channel_id.simple()
))) ))
.into_response())
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -524,7 +561,7 @@ async fn verify_email(
Path((team_id, channel_id)): Path<(Uuid, Uuid)>, Path((team_id, channel_id)): Path<(Uuid, Uuid)>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
Form(form_body): Form<VerifyEmailFormBody>, Form(form_body): Form<VerifyEmailFormBody>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
guards::require_valid_csrf_token(&form_body.csrf_token, &current_user, &db_conn).await?; guards::require_valid_csrf_token(&form_body.csrf_token, &current_user, &db_conn).await?;
guards::require_team_membership(&current_user, &team_id, &db_conn).await?; guards::require_team_membership(&current_user, &team_id, &db_conn).await?;
@ -565,7 +602,7 @@ async fn verify_email(
} }
}; };
diesel::update(channels::table.filter(Channel::with_id(&channel_id))) diesel::update(channels::table.filter(Channel::with_id(&channel_id)))
.set(channels::backend_config.eq(Into::<BackendConfig>::into(new_config))) .set(channels::dsl::backend_config.eq(BackendConfig::from(new_config)))
.execute(conn)?; .execute(conn)?;
Ok(()) Ok(())
}) })
@ -579,5 +616,6 @@ async fn verify_email(
base_path, base_path,
team_id.simple(), team_id.simple(),
channel_id.simple() channel_id.simple()
))) ))
.into_response())
} }

View file

@ -7,12 +7,14 @@ use diesel::{
}; };
use uuid::Uuid; use uuid::Uuid;
use crate::{app_error::AppError, schema::csrf_tokens::dsl::*}; use crate::{app_error::AppError, schema::csrf_tokens};
pub use crate::schema::csrf_tokens::{dsl, table};
const TOKEN_PREFIX: &str = "csrf-"; const TOKEN_PREFIX: &str = "csrf-";
#[derive(Clone, Debug, Identifiable, Queryable, Selectable)] #[derive(Clone, Debug, Identifiable, Queryable, Selectable)]
#[diesel(table_name = crate::schema::csrf_tokens)] #[diesel(table_name = csrf_tokens)]
#[diesel(check_for_backend(Pg))] #[diesel(check_for_backend(Pg))]
pub struct CsrfToken { pub struct CsrfToken {
pub id: Uuid, pub id: Uuid,
@ -21,24 +23,24 @@ pub struct CsrfToken {
} }
impl CsrfToken { impl CsrfToken {
fn all() -> Select<csrf_tokens, AsSelect<CsrfToken, Pg>> { fn all() -> Select<table, AsSelect<CsrfToken, Pg>> {
csrf_tokens.select(Self::as_select()) table.select(Self::as_select())
} }
pub fn is_not_expired() -> Gt<created_at, DateTime<Utc>> { pub fn is_not_expired() -> Gt<dsl::created_at, DateTime<Utc>> {
let ttl = TimeDelta::hours(24); let ttl = TimeDelta::hours(24);
let min_created_at: DateTime<Utc> = Utc::now() - ttl; let min_created_at: DateTime<Utc> = Utc::now() - ttl;
created_at.gt(min_created_at) dsl::created_at.gt(min_created_at)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn with_user_id<'a>(token_user_id: &'a Option<Uuid>) -> _ { pub fn with_user_id<'a>(token_user_id: &'a Option<Uuid>) -> _ {
user_id.is_not_distinct_from(token_user_id) dsl::user_id.is_not_distinct_from(token_user_id)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn with_token_id<'a>(token_id: &'a Uuid) -> _ { pub fn with_token_id<'a>(token_id: &'a Uuid) -> _ {
id.eq(token_id) dsl::id.eq(token_id)
} }
} }
@ -50,11 +52,11 @@ pub async fn generate_csrf_token(
let token_id = Uuid::new_v4(); let token_id = Uuid::new_v4();
db_conn db_conn
.interact(move |conn| { .interact(move |conn| {
diesel::insert_into(csrf_tokens) diesel::insert_into(table)
.values(( .values((
id.eq(token_id), dsl::id.eq(token_id),
user_id.eq(with_user_id), dsl::user_id.eq(with_user_id),
created_at.eq(diesel::dsl::now), dsl::created_at.eq(diesel::dsl::now),
)) ))
.execute(conn) .execute(conn)
}) })

View file

@ -1,9 +1,10 @@
// Fault tolerant rate limiting backed by Postgres. // Fault tolerant rate limiting backed by Postgres.
use anyhow::Result; use anyhow::{Context as _, Result};
use chrono::{DateTime, TimeDelta, Utc}; use chrono::{DateTime, TimeDelta, Utc};
use derive_builder::Builder;
use diesel::{ use diesel::{
dsl::{auto_type, insert_into, AsSelect}, dsl::{auto_type, AsSelect},
pg::Pg, pg::Pg,
prelude::*, prelude::*,
sql_types::Timestamptz, sql_types::Timestamptz,
@ -12,12 +13,14 @@ use uuid::Uuid;
use crate::schema::{governor_entries, governors}; use crate::schema::{governor_entries, governors};
pub use crate::schema::governors::{dsl, table};
// Expose built-in Postgres GREATEST() function to Diesel // Expose built-in Postgres GREATEST() function to Diesel
define_sql_function! { define_sql_function! {
fn greatest(a: diesel::sql_types::Integer, b: diesel::sql_types::Integer) -> Integer fn greatest(a: diesel::sql_types::Integer, b: diesel::sql_types::Integer) -> Integer
} }
#[derive(Clone, Debug, Identifiable, Insertable, Queryable, Selectable)] #[derive(Clone, Debug, Identifiable, Queryable, Selectable)]
#[diesel(table_name = governors)] #[diesel(table_name = governors)]
pub struct Governor { pub struct Governor {
pub id: Uuid, pub id: Uuid,
@ -29,75 +32,86 @@ pub struct Governor {
} }
impl Governor { impl Governor {
pub fn insert_new<'a>( pub fn insertable_builder() -> InsertableGovernorBuilder {
db_conn: &mut diesel::PgConnection, InsertableGovernorBuilder::default()
team_id: &'a Uuid, }
project_id: Option<&'a Uuid>,
window_size: &'a TimeDelta, pub fn lazy_getter() -> LazyGetterBuilder {
max_count: i32, LazyGetterBuilder::default()
) -> Result<Self> {
let id: Uuid = Uuid::now_v7();
Ok(insert_into(governors::table)
.values((
governors::team_id.eq(team_id),
governors::id.eq(id),
governors::project_id.eq(project_id),
governors::window_size.eq(window_size),
governors::max_count.eq(max_count),
))
.get_result(db_conn)?)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn all() -> _ { pub fn all() -> _ {
let select: AsSelect<Governor, Pg> = Governor::as_select(); let select: AsSelect<Governor, Pg> = Governor::as_select();
governors::table.select(select) table.select(select)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn with_id<'a>(governor_id: &'a Uuid) -> _ { pub fn with_id<'a>(governor_id: &'a Uuid) -> _ {
governors::id.eq(governor_id) dsl::id.eq(governor_id)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn with_team<'a>(team_id: &'a Uuid) -> _ { pub fn with_team<'a>(team_id: &'a Uuid) -> _ {
governors::team_id.eq(team_id) dsl::team_id.eq(team_id)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn with_project<'a>(project_id: &'a Option<Uuid>) -> _ { pub fn with_project<'a>(project_id: &'a Option<Uuid>) -> _ {
governors::project_id.is_not_distinct_from(project_id) dsl::project_id.is_not_distinct_from(project_id)
} }
// TODO: return a custom result enum instead of a Result<Option>, for
// better readability
/// Attempt to increment the rolling count. If the governor is not full, /// Attempt to increment the rolling count. If the governor is not full,
/// returns a GovernorEntry which can be used to cancel the operation and /// returns a list of GovernorEntry objects, each of which can be used to
/// restore the rolling count. If governor is full, returns None. /// cancel the operation and decrement the rolling count. If governor cannot
pub fn create_entry(&self, conn: &mut diesel::PgConnection) -> Result<Option<GovernorEntry>> { /// fit all new entries, returns a GovernorError::GovernorFull error.
let entry = diesel::insert_into(governor_entries::table) ///
/// To keep database queries to a reasonable size, num must be at least 1
/// and no more than 100.
pub fn create_entries(
&self,
num: usize,
conn: &mut PgConnection,
) -> Result<Vec<GovernorEntry>, GovernorError> {
if !(1..=100).contains(&num) {
return Err(anyhow::anyhow!(
"number of governor entries to create must be between 1 and 100"
)
.into());
}
let entries: Vec<GovernorEntry> = diesel::insert_into(governor_entries::table)
.values(( .values((
governor_entries::id.eq(Uuid::now_v7()), governor_entries::id.eq(Uuid::now_v7()),
governor_entries::governor_id.eq(self.id), governor_entries::governor_id.eq(self.id),
)) ))
.get_result(conn)?; .get_results(conn)
let n_rows = diesel::update( .context("failed to insert governor entry")?;
governors::table let n_rows =
.filter(governors::id.eq(self.id)) diesel::update(
.filter(governors::rolling_count.lt(self.max_count)), table
) .filter(dsl::id.eq(self.id))
.set(governors::rolling_count.eq(governors::rolling_count + 1)) .filter(dsl::rolling_count.le(self.max_count
.execute(conn)?; - i32::try_from(num).expect("bounds should already be validated"))),
)
.set(dsl::rolling_count.eq(dsl::rolling_count
+ i32::try_from(num).expect("bounds should already be validated")))
.execute(conn)
.context("failed to increment governor count")?;
assert!(n_rows < 2); assert!(n_rows < 2);
if n_rows == 1 { if n_rows == 1 {
Ok(Some(entry)) Ok(entries)
} else { } else {
// Clean up unused entry, or else it will artificially decrement // Clean up unused entries, or else they will artificially decrement
// rolling count when it expires // rolling count upon expiration
diesel::delete(governor_entries::table.filter(GovernorEntry::with_id(entry.id))) diesel::delete(
.execute(conn)?; governor_entries::table.filter(
Ok(None) governor_entries::dsl::id
.eq_any(entries.iter().map(|entry| entry.id).collect::<Vec<Uuid>>()),
),
)
.execute(conn)
.context("failed to delete governor entries")?;
Err(GovernorError::GovernorFull)
} }
} }
@ -105,7 +119,7 @@ impl Governor {
/// periodically decrementing it as entries fall out of the current window of /// periodically decrementing it as entries fall out of the current window of
/// time. This function performs the latter part of the cycle, sweeping out /// time. This function performs the latter part of the cycle, sweeping out
/// expired entries and adjusting the counter accordingly. /// expired entries and adjusting the counter accordingly.
pub fn reclaim(&self, conn: &mut diesel::PgConnection) -> Result<()> { pub fn reclaim(&self, conn: &mut PgConnection) -> Result<()> {
let n_expired_entries: i32 = diesel::delete( let n_expired_entries: i32 = diesel::delete(
GovernorEntry::belonging_to(self).filter( GovernorEntry::belonging_to(self).filter(
governor_entries::timestamp governor_entries::timestamp
@ -116,25 +130,22 @@ impl Governor {
.try_into() .try_into()
.expect("a governor should never have been allowed enough entries to overflow an i32"); .expect("a governor should never have been allowed enough entries to overflow an i32");
// Clamp rolling_count >= 0 // Clamp rolling_count >= 0
diesel::update(governors::table.filter(Self::with_id(&self.id))) diesel::update(table.filter(Self::with_id(&self.id)))
.set( .set(dsl::rolling_count.eq(greatest(dsl::rolling_count - n_expired_entries, 0)))
governors::rolling_count
.eq(greatest(governors::rolling_count - n_expired_entries, 0)),
)
.execute(conn)?; .execute(conn)?;
Ok(()) Ok(())
} }
/// Run reclaim() on all governors with expired entries. /// Run reclaim() on all governors with expired entries.
pub fn reclaim_all(conn: &mut diesel::PgConnection) -> Result<()> { pub fn reclaim_all(conn: &mut PgConnection) -> Result<()> {
let applicable_governors = governors::table let applicable_governors = table
.inner_join(governor_entries::table) .inner_join(governor_entries::table)
.filter( .filter(
governor_entries::timestamp governor_entries::timestamp
.lt(diesel::dsl::now.into_sql::<Timestamptz>() - governors::window_size), .lt(diesel::dsl::now.into_sql::<Timestamptz>() - dsl::window_size),
) )
.select(Self::as_select()) .select(Self::as_select())
.group_by(governors::id) .group_by(dsl::id)
.load(conn)?; .load(conn)?;
tracing::info!( tracing::info!(
"reclaiming counts for {} governors", "reclaiming counts for {} governors",
@ -148,18 +159,99 @@ impl Governor {
/// Reset all governors to a count of 0, to fix any accumulated error /// Reset all governors to a count of 0, to fix any accumulated error
/// between rolling counts and number of entries. /// between rolling counts and number of entries.
pub fn reset_all(conn: &mut diesel::PgConnection) -> Result<()> { pub fn reset_all(conn: &mut PgConnection) -> Result<()> {
// Delete entries and then reset counts, not vice-versa; otherwise // Delete entries and then reset counts, not vice-versa; otherwise
// concurrent inserts could result in rolling counts getting stuck // concurrent inserts could result in rolling counts getting stuck
// higher than they should be // higher than they should be
diesel::delete(governor_entries::table).execute(conn)?; diesel::delete(governor_entries::table).execute(conn)?;
diesel::update(governors::table) diesel::update(table)
.set(governors::rolling_count.eq(0)) .set(dsl::rolling_count.eq(0))
.execute(conn)?; .execute(conn)?;
Ok(()) Ok(())
} }
} }
#[derive(Builder, Clone, Debug, Insertable)]
#[diesel(table_name = governors)]
#[builder(pattern = "owned", setter(prefix = "with"))]
pub struct InsertableGovernor {
#[builder(setter(skip), default = "uuid::Uuid::now_v7()")]
id: Uuid,
team_id: Uuid,
#[builder(setter(strip_option), default)]
project_id: Option<Uuid>,
window_size: TimeDelta,
max_count: i32,
}
#[derive(Builder, Clone, Debug)]
#[builder(pattern = "owned", setter(prefix = "with"))]
pub struct LazyGetter {
team_id: Uuid,
#[builder(setter(strip_option), default)]
project_id: Option<Uuid>,
default_window_size: TimeDelta,
default_max_count: i32,
}
impl LazyGetter {
/// Loads a governor from the database, lazily creating it if needed. Note
/// that in contrast to an upsert, if the governor already exists then it
/// will not be updated with the default values.
pub fn load(self, db_conn: &mut PgConnection) -> Result<Governor> {
Ok(
if let Some(governor) = Governor::all()
.filter(Governor::with_team(&self.team_id))
.filter(Governor::with_project(&None))
.first(db_conn)
.optional()
.context("failed to load governor")?
{
governor
} else {
// Lazily insert governor
if let Some(governor) = diesel::insert_into(governors::table)
.values(
Governor::insertable_builder()
.with_team_id(self.team_id)
.with_window_size(self.default_window_size)
.with_max_count(self.default_max_count)
.build()
.context("failed to build insertable governor")?,
)
.on_conflict_do_nothing()
.get_result(db_conn)
.optional()
.context("failed to insert governor")?
{
governor
} else {
// A conflict occurred, meaning that a concurrent process
// likely inserted a matching governor and we should retry
// loading that one.
Governor::all()
.filter(Governor::with_team(&self.team_id))
.filter(Governor::with_project(&None))
.first(db_conn)
.context("failed to load governor")?
}
},
)
}
}
#[derive(Debug)]
pub enum GovernorError {
Unknown(anyhow::Error),
GovernorFull,
}
impl From<anyhow::Error> for GovernorError {
fn from(value: anyhow::Error) -> Self {
Self::Unknown(value)
}
}
#[derive(Associations, Clone, Debug, Identifiable, Insertable, Queryable, Selectable)] #[derive(Associations, Clone, Debug, Identifiable, Insertable, Queryable, Selectable)]
#[diesel(table_name = governor_entries)] #[diesel(table_name = governor_entries)]
#[diesel(belongs_to(Governor))] #[diesel(belongs_to(Governor))]

View file

@ -40,6 +40,7 @@ mod teams;
mod teams_router; mod teams_router;
mod users; mod users;
mod v0_router; mod v0_router;
mod watchdogs;
mod worker; mod worker;
/// Run CLI /// Run CLI

View file

@ -1,4 +1,5 @@
use chrono::{DateTime, Utc}; use anyhow::Context as _;
use chrono::{DateTime, TimeDelta, Utc};
use diesel::{ use diesel::{
dsl::{auto_type, AsSelect}, dsl::{auto_type, AsSelect},
pg::Pg, pg::Pg,
@ -6,10 +7,19 @@ use diesel::{
}; };
use uuid::Uuid; use uuid::Uuid;
use crate::{channels::Channel, schema::messages}; use crate::{
channels::Channel,
governors::{Governor, GovernorError},
projects::{self, Project},
schema::messages,
teams::{self, Team},
};
pub use crate::schema::messages::{dsl, table}; pub use crate::schema::messages::{dsl, table};
const TEAM_GOVERNOR_DEFAULT_WINDOW_SIZE_SEC: i64 = 300;
const TEAM_GOVERNOR_DEFAULT_MAX_COUNT: i32 = 50;
/// A "/say" message queued for sending /// A "/say" message queued for sending
#[derive(Associations, Clone, Debug, Identifiable, Queryable, Selectable)] #[derive(Associations, Clone, Debug, Identifiable, Queryable, Selectable)]
#[diesel(table_name = messages)] #[diesel(table_name = messages)]
@ -46,3 +56,66 @@ impl Message {
dsl::sent_at.is_null().and(dsl::failed_at.is_null()) dsl::sent_at.is_null().and(dsl::failed_at.is_null())
} }
} }
#[derive(Clone, Debug, Insertable)]
#[diesel(table_name = messages)]
struct InsertableMessage<'a> {
id: Uuid,
project_id: Uuid,
channel_id: Uuid,
message: &'a str,
}
#[derive(Clone, Debug)]
pub struct Broadcast {
pub project_id: Uuid,
pub message: String,
}
impl Broadcast {
pub fn queue_messages(&self, db_conn: &mut PgConnection) -> Result<Vec<Uuid>, GovernorError> {
let _guard = tracing::debug_span!(
"Broadcast::queue_messages()",
project = self.project_id.hyphenated().to_string()
)
.entered();
let (project, team): (Project, Team) = projects::table
.inner_join(teams::table)
.filter(Project::with_id(&self.project_id))
.first(db_conn)
.context("failed to load project and team")?;
let selected_channels = project
.selected_channels()
.load(db_conn)
.context("failed to load selected channels")?;
Governor::lazy_getter()
.with_team_id(team.id)
.with_default_window_size(TimeDelta::seconds(TEAM_GOVERNOR_DEFAULT_WINDOW_SIZE_SEC))
.with_default_max_count(TEAM_GOVERNOR_DEFAULT_MAX_COUNT)
.build()
.context("failed to build governor lazy getter")?
.load(db_conn)
.context("failed to lazily get governor")?
.create_entries(selected_channels.len(), db_conn)?;
let mut message_ids: Vec<Uuid> = Vec::with_capacity(selected_channels.len());
for channel in selected_channels {
message_ids.push(
diesel::insert_into(messages::table)
.values(InsertableMessage {
id: Uuid::now_v7(),
project_id: project.id,
channel_id: channel.id,
message: &self.message,
})
.returning(dsl::id)
.get_result(db_conn)
.context("failed to queue message")
.inspect_err(|_| {
tracing::error!("error queuing message for channel {}", channel.id);
})?,
);
}
tracing::debug!("queued {} messages", message_ids.len());
Ok(message_ids)
}
}

View file

@ -1,4 +1,5 @@
use anyhow::Result; use anyhow::{Context as _, Result};
use derive_builder::Builder;
use diesel::{ use diesel::{
dsl::{auto_type, insert_into, AsSelect, Eq}, dsl::{auto_type, insert_into, AsSelect, Eq},
pg::Pg, pg::Pg,
@ -7,18 +8,19 @@ use diesel::{
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
channels::Channel, channel_selections::{self, ChannelSelection},
schema::{channel_selections, channels, projects}, channels::{self, Channel},
teams::Team, schema::projects,
}; };
pub use crate::schema::projects::{dsl, table};
pub const DEFAULT_PROJECT_NAME: &str = "default"; pub const DEFAULT_PROJECT_NAME: &str = "default";
/// A project maps approximately to an application service, and allows messages /// A project maps approximately to an application service, and allows messages
/// to be directed to an adjustable set of output channels. /// to be directed to an adjustable set of output channels.
#[derive(Associations, Clone, Debug, Identifiable, Insertable, Queryable, Selectable)] #[derive(Clone, Debug, Identifiable, Insertable, Queryable, Selectable)]
#[diesel(table_name = projects)] #[diesel(table_name = projects)]
#[diesel(belongs_to(Team))]
pub struct Project { pub struct Project {
pub id: Uuid, pub id: Uuid,
pub team_id: Uuid, pub team_id: Uuid,
@ -26,63 +28,91 @@ pub struct Project {
} }
impl Project { impl Project {
pub fn insert_new<'a>(
db_conn: &mut diesel::PgConnection,
team_id: &'a Uuid,
name: &'a str,
) -> Result<Self> {
let default_channels = Channel::all()
.filter(Channel::with_team(team_id))
.filter(Channel::where_enabled_by_default())
.load(db_conn)?;
let id: Uuid = Uuid::now_v7();
let project: Self = insert_into(projects::table)
.values((
projects::id.eq(id),
projects::team_id.eq(team_id),
projects::name.eq(name),
))
.get_result(db_conn)?;
for channel in default_channels {
insert_into(channel_selections::table)
.values((
channel_selections::project_id.eq(&project.id),
channel_selections::channel_id.eq(&channel.id),
))
.execute(db_conn)?;
}
Ok(project)
}
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn all() -> _ { pub fn all() -> _ {
let select: AsSelect<Project, Pg> = Project::as_select(); let select: AsSelect<Project, Pg> = Project::as_select();
projects::table.select(select) table.select(select)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn with_id<'a>(project_id: &'a Uuid) -> _ { pub fn with_id<'a>(project_id: &'a Uuid) -> _ {
projects::id.eq(project_id) dsl::id.eq(project_id)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn with_team<'a>(team_id: &'a Uuid) -> _ { pub fn with_team<'a>(team_id: &'a Uuid) -> _ {
projects::team_id.eq(team_id) dsl::team_id.eq(team_id)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn with_name<'a>(name: &'a str) -> _ { pub fn with_name<'a>(name: &'a str) -> _ {
projects::name.eq(name) dsl::name.eq(name)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn selected_channels(&self) -> _ { pub fn selected_channels(&self) -> _ {
let select: AsSelect<Channel, Pg> = Channel::as_select(); let select: AsSelect<Channel, Pg> = Channel::as_select();
let project_filter: Eq<channel_selections::project_id, Uuid> = let project_filter: Eq<channel_selections::dsl::project_id, &Uuid> =
channel_selections::project_id.eq(self.id); ChannelSelection::with_project(&self.id);
channels::table channels::table
.inner_join(channel_selections::table) .inner_join(channel_selections::table)
.filter(project_filter) .filter(project_filter)
.select(select) .select(select)
} }
/// Lazily fetch a project from the database. That is, query for it, and
/// insert if it does not exist yet.
pub fn lazy_getter() -> LazyGetterBuilder {
LazyGetterBuilder::default()
}
}
#[derive(Builder)]
#[builder(pattern = "owned", setter(prefix = "with"))]
pub struct LazyGetter {
team_id: Uuid,
name: String,
}
impl LazyGetter {
pub fn execute(self, db_conn: &mut PgConnection) -> Result<Project> {
db_conn.transaction(move |conn| {
Ok(
if let Some(project) = Project::all()
.filter(Project::with_team(&self.team_id))
.filter(Project::with_name(&self.name))
.first(conn)
.optional()
.context("failed to load project")?
{
project
} else {
let default_channels = Channel::all()
.filter(Channel::with_team(&self.team_id))
.filter(Channel::where_enabled_by_default())
.load(conn)
.context("failed to load default channels")?;
let id: Uuid = Uuid::now_v7();
let project: Project = insert_into(table)
.values(Project {
id,
team_id: self.team_id,
name: self.name,
})
.get_result(conn)
.context("failed to insert project")?;
for channel in default_channels {
insert_into(channel_selections::table)
.values(ChannelSelection {
project_id: project.id,
channel_id: channel.id,
})
.execute(conn)
.context("failed to insert channel selection")?;
}
project
},
)
})
}
} }

View file

@ -17,15 +17,15 @@ use crate::{
api_keys::ApiKey, api_keys::ApiKey,
app_error::AppError, app_error::AppError,
app_state::{AppState, DbConn}, app_state::{AppState, DbConn},
channel_selections::ChannelSelection, channel_selections::{self, ChannelSelection},
channels::Channel, channels::Channel,
csrf::generate_csrf_token, csrf::generate_csrf_token,
guards, guards,
nav::{BreadcrumbTrail, Navbar, NavbarBuilder, NAVBAR_ITEM_PROJECTS}, nav::{BreadcrumbTrail, Navbar, NavbarBuilder, NAVBAR_ITEM_PROJECTS},
projects::Project, projects::{self, Project},
schema::channel_selections,
settings::Settings, settings::Settings,
users::CurrentUser, users::CurrentUser,
watchdogs::{self, Watchdog},
}; };
pub fn new_router() -> Router<AppState> { pub fn new_router() -> Router<AppState> {
@ -39,7 +39,11 @@ pub fn new_router() -> Router<AppState> {
} }
async fn projects_page( async fn projects_page(
State(Settings { base_path, .. }): State<Settings>, State(Settings {
base_path,
frontend_host,
..
}): State<Settings>,
State(navbar_template): State<NavbarBuilder>, State(navbar_template): State<NavbarBuilder>,
DbConn(db_conn): DbConn, DbConn(db_conn): DbConn,
Path(team_id): Path<Uuid>, Path(team_id): Path<Uuid>,
@ -82,6 +86,7 @@ async fn projects_page(
base_path: String, base_path: String,
breadcrumbs: BreadcrumbTrail, breadcrumbs: BreadcrumbTrail,
csrf_token: String, csrf_token: String,
frontend_host: String,
keys: Vec<ApiKey>, keys: Vec<ApiKey>,
navbar: Navbar, navbar: Navbar,
projects: Vec<Project>, projects: Vec<Project>,
@ -96,12 +101,13 @@ async fn projects_page(
.push_slug("Projects", "projects"), .push_slug("Projects", "projects"),
base_path, base_path,
csrf_token, csrf_token,
frontend_host,
keys: api_keys,
navbar: navbar_template navbar: navbar_template
.with_param("team_id", &team.id.simple().to_string()) .with_param("team_id", &team.id.simple().to_string())
.with_active_item(NAVBAR_ITEM_PROJECTS) .with_active_item(NAVBAR_ITEM_PROJECTS)
.build(), .build(),
projects, projects,
keys: api_keys,
} }
.render()?, .render()?,
) )
@ -109,7 +115,11 @@ async fn projects_page(
} }
async fn project_page( async fn project_page(
State(Settings { base_path, .. }): State<Settings>, State(Settings {
base_path,
frontend_host,
..
}): State<Settings>,
State(navbar_template): State<NavbarBuilder>, State(navbar_template): State<NavbarBuilder>,
DbConn(db_conn): DbConn, DbConn(db_conn): DbConn,
Path((team_id, project_id)): Path<(Uuid, Uuid)>, Path((team_id, project_id)): Path<(Uuid, Uuid)>,
@ -117,45 +127,38 @@ async fn project_page(
) -> Result<impl IntoResponse, AppError> { ) -> Result<impl IntoResponse, AppError> {
let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?; let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?;
let project = db_conn let (project, maybe_watchdog, enabled_channel_ids, team_channels) = db_conn
.interact(move |conn| { .interact(move |conn| -> Result<_, AppError> {
match Project::all() // Queried together to save a round trip to the database.
let (project, maybe_watchdog) = projects::table
.left_outer_join(watchdogs::table)
.select(<(Project, Option<Watchdog>)>::as_select())
.filter(Project::with_id(&project_id)) .filter(Project::with_id(&project_id))
.filter(Project::with_team(&team_id)) .filter(Project::with_team(&team_id))
.first(conn) .first(conn)
{ .optional()
diesel::QueryResult::Err(diesel::NotFound) => Err(AppError::NotFound( .context("failed to load project")?
.ok_or(AppError::NotFound(
"Project with that team and ID not found.".to_string(), "Project with that team and ID not found.".to_string(),
)), ))?;
other => other let enabled_channel_ids: HashSet<Uuid> = project
.context("failed to load project") .selected_channels()
.map_err(|err| err.into()), .load(conn)
} .context("failed to load selected channels")?
.iter()
.map(|channel| channel.id)
.collect();
let team_channels = Channel::all()
.filter(Channel::with_team(&team_id))
.load(conn)
.context("failed to load team channels")?;
Ok((project, maybe_watchdog, enabled_channel_ids, team_channels))
}) })
.await .await
.unwrap()?; .unwrap()?;
let selected_channels_query = project.selected_channels();
let enabled_channel_ids: HashSet<Uuid> = db_conn
.interact(move |conn| selected_channels_query.load(conn))
.await
.unwrap()
.context("failed to load selected channels")?
.iter()
.map(|channel| channel.id)
.collect();
let team_channels = db_conn
.interact(move |conn| {
Channel::all()
.filter(Channel::with_team(&team_id))
.load(conn)
})
.await
.unwrap()
.context("failed to load team channels")?;
let csrf_token = generate_csrf_token(&db_conn, Some(current_user.id)).await?; let csrf_token = generate_csrf_token(&db_conn, Some(current_user.id)).await?;
#[derive(Template)] #[derive(Template)]
#[template(path = "project.html")] #[template(path = "project.html")]
struct ResponseTemplate { struct ResponseTemplate {
@ -163,9 +166,11 @@ async fn project_page(
breadcrumbs: BreadcrumbTrail, breadcrumbs: BreadcrumbTrail,
csrf_token: String, csrf_token: String,
enabled_channel_ids: HashSet<Uuid>, enabled_channel_ids: HashSet<Uuid>,
frontend_host: String,
navbar: Navbar, navbar: Navbar,
project: Project, project: Project,
team_channels: Vec<Channel>, team_channels: Vec<Channel>,
watchdog: Option<Watchdog>,
} }
Ok(Html( Ok(Html(
ResponseTemplate { ResponseTemplate {
@ -178,12 +183,14 @@ async fn project_page(
base_path, base_path,
csrf_token, csrf_token,
enabled_channel_ids, enabled_channel_ids,
frontend_host,
project, project,
navbar: navbar_template navbar: navbar_template
.with_param("team_id", &team.id.simple().to_string()) .with_param("team_id", &team.id.simple().to_string())
.with_active_item(NAVBAR_ITEM_PROJECTS) .with_active_item(NAVBAR_ITEM_PROJECTS)
.build(), .build(),
team_channels, team_channels,
watchdog: maybe_watchdog,
} }
.render()?, .render()?,
)) ))
@ -223,15 +230,17 @@ async fn update_enabled_channels(
diesel::delete( diesel::delete(
channel_selections::table channel_selections::table
.filter(ChannelSelection::with_project(&project.id)) .filter(ChannelSelection::with_project(&project.id))
.filter(channel_selections::channel_id.ne_all(&form_body.enabled_channels)), .filter(
channel_selections::dsl::channel_id.ne_all(&form_body.enabled_channels),
),
) )
.execute(conn) .execute(conn)
.context("failed to remove unset channel selections")?; .context("failed to remove unset channel selections")?;
for channel_id in form_body.enabled_channels { for channel_id in form_body.enabled_channels {
diesel::insert_into(channel_selections::table) diesel::insert_into(channel_selections::table)
.values(( .values((
channel_selections::project_id.eq(&project.id), channel_selections::dsl::project_id.eq(&project.id),
channel_selections::channel_id.eq(channel_id), channel_selections::dsl::channel_id.eq(channel_id),
)) ))
.on_conflict_do_nothing() .on_conflict_do_nothing()
.execute(conn) .execute(conn)

View file

@ -114,6 +114,17 @@ diesel::table! {
} }
} }
diesel::table! {
watchdogs (id) {
id -> Uuid,
project_id -> Uuid,
created_at -> Timestamptz,
last_set_at -> Timestamptz,
expiration -> Timestamptz,
notified -> Bool,
}
}
diesel::joinable!(api_keys -> teams (team_id)); diesel::joinable!(api_keys -> teams (team_id));
diesel::joinable!(channel_selections -> channels (channel_id)); diesel::joinable!(channel_selections -> channels (channel_id));
diesel::joinable!(channel_selections -> projects (project_id)); diesel::joinable!(channel_selections -> projects (project_id));
@ -129,6 +140,7 @@ diesel::joinable!(team_invitations -> teams (team_id));
diesel::joinable!(team_invitations -> users (created_by)); diesel::joinable!(team_invitations -> users (created_by));
diesel::joinable!(team_memberships -> teams (team_id)); diesel::joinable!(team_memberships -> teams (team_id));
diesel::joinable!(team_memberships -> users (user_id)); diesel::joinable!(team_memberships -> users (user_id));
diesel::joinable!(watchdogs -> projects (project_id));
diesel::allow_tables_to_appear_in_same_query!( diesel::allow_tables_to_appear_in_same_query!(
api_keys, api_keys,
@ -144,4 +156,5 @@ diesel::allow_tables_to_appear_in_same_query!(
team_memberships, team_memberships,
teams, teams,
users, users,
watchdogs,
); );

View file

@ -10,13 +10,15 @@ use uuid::Uuid;
use crate::{ use crate::{
email::{is_permissible_email, MailSender as _, Mailer, Message}, email::{is_permissible_email, MailSender as _, Mailer, Message},
schema::{team_invitations, team_memberships, teams, users}, schema::team_invitations,
settings::Settings, settings::Settings,
team_memberships::TeamMembership, team_memberships::{self, TeamMembership},
teams::Team, teams::{self, Team},
users::User, users::{self, User},
}; };
pub use crate::schema::team_invitations::{dsl, table};
#[derive(Clone, Debug, Identifiable, Insertable, Queryable, Selectable)] #[derive(Clone, Debug, Identifiable, Insertable, Queryable, Selectable)]
#[diesel(table_name = team_invitations)] #[diesel(table_name = team_invitations)]
#[diesel(check_for_backend(Pg))] #[diesel(check_for_backend(Pg))]
@ -77,7 +79,7 @@ impl TeamInvitation {
.transaction::<(), anyhow::Error, _>(move |conn| { .transaction::<(), anyhow::Error, _>(move |conn| {
let n_inserted = diesel::insert_into(team_memberships::table) let n_inserted = diesel::insert_into(team_memberships::table)
.values(TeamMembership { user_id, team_id }) .values(TeamMembership { user_id, team_id })
.on_conflict((team_memberships::team_id, team_memberships::user_id)) .on_conflict((team_memberships::dsl::team_id, team_memberships::dsl::user_id))
.do_nothing() .do_nothing()
.execute(conn) .execute(conn)
.context("failed to create team membership")?; .context("failed to create team membership")?;
@ -154,7 +156,7 @@ impl PopulatedTeamInvitation {
let as_select: AsSelect<Self, Pg> = Self::as_select(); let as_select: AsSelect<Self, Pg> = Self::as_select();
team_invitations::table team_invitations::table
.inner_join(teams::table) .inner_join(teams::table)
.inner_join(users::table.on(users::id.eq(team_invitations::created_by))) .inner_join(users::table.on(users::dsl::id.eq(team_invitations::created_by)))
.select(as_select) .select(as_select)
} }

View file

@ -6,11 +6,13 @@ use diesel::{
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
schema::{team_memberships, teams, users}, schema::team_memberships,
teams::Team, teams::{self, Team},
users::User, users::{self, User},
}; };
pub use crate::schema::team_memberships::{dsl, table};
#[derive(Clone, Debug, Identifiable, Insertable, Queryable, Selectable)] #[derive(Clone, Debug, Identifiable, Insertable, Queryable, Selectable)]
#[diesel(table_name = team_memberships)] #[diesel(table_name = team_memberships)]
#[diesel(primary_key(team_id, user_id))] #[diesel(primary_key(team_id, user_id))]

View file

@ -6,13 +6,15 @@ use diesel::{
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
api_keys::ApiKey, api_keys::{self, ApiKey},
schema::{api_keys, team_invitations, team_memberships, teams, users}, schema::teams,
team_invitations::{PopulatedTeamInvitation, TeamInvitation}, team_invitations::{self, PopulatedTeamInvitation, TeamInvitation},
team_memberships::TeamMembership, team_memberships::{self, TeamMembership},
users::User, users::{self, User},
}; };
pub use crate::schema::teams::{dsl, table};
/// Teams are the fundamental organizing unit for billing and help to /// Teams are the fundamental organizing unit for billing and help to
/// distribute ownership of projects and other resources across multiple /// distribute ownership of projects and other resources across multiple
/// users rather than forcing a single user account to own them. /// users rather than forcing a single user account to own them.
@ -28,18 +30,18 @@ impl Team {
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn all() -> _ { pub fn all() -> _ {
let select: AsSelect<Team, Pg> = Team::as_select(); let select: AsSelect<Team, Pg> = Team::as_select();
teams::table.select(select) table.select(select)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn with_id(id: &Uuid) -> _ { pub fn with_id(id: &Uuid) -> _ {
teams::id.eq(id) dsl::id.eq(id)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn api_keys(&self) -> _ { pub fn api_keys(&self) -> _ {
let all: diesel::dsl::Select<api_keys::table, AsSelect<ApiKey, Pg>> = ApiKey::all(); let all: diesel::dsl::Select<api_keys::table, AsSelect<ApiKey, Pg>> = ApiKey::all();
let filter: Eq<api_keys::team_id, &Uuid> = ApiKey::with_team(&self.id); let filter: Eq<api_keys::dsl::team_id, &Uuid> = ApiKey::with_team(&self.id);
all.filter(filter) all.filter(filter)
} }
@ -51,19 +53,21 @@ impl Team {
diesel::dsl::InnerJoin<team_invitations::table, teams::table>, diesel::dsl::InnerJoin<team_invitations::table, teams::table>,
diesel::dsl::On< diesel::dsl::On<
users::table, users::table,
diesel::dsl::Eq<users::id, team_invitations::created_by>, diesel::dsl::Eq<users::dsl::id, team_invitations::dsl::created_by>,
>, >,
>, >,
AsSelect<PopulatedTeamInvitation, Pg>, AsSelect<PopulatedTeamInvitation, Pg>,
> = PopulatedTeamInvitation::all(); > = PopulatedTeamInvitation::all();
let filter: Eq<team_invitations::team_id, &Uuid> = TeamInvitation::with_team_id(&self.id); let filter: Eq<team_invitations::dsl::team_id, &Uuid> =
TeamInvitation::with_team_id(&self.id);
all.filter(filter) all.filter(filter)
} }
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn members(&self) -> _ { pub fn members(&self) -> _ {
let select: AsSelect<User, Pg> = User::as_select(); let select: AsSelect<User, Pg> = User::as_select();
let filter: Eq<team_memberships::team_id, &Uuid> = TeamMembership::with_team_id(&self.id); let filter: Eq<team_memberships::dsl::team_id, &Uuid> =
TeamMembership::with_team_id(&self.id);
team_memberships::table team_memberships::table
.inner_join(users::table) .inner_join(users::table)
.filter(filter) .filter(filter)

View file

@ -2,7 +2,7 @@ use anyhow::{Context as _, Result};
use askama::Template; use askama::Template;
use axum::{ use axum::{
extract::{Path, State}, extract::{Path, State},
response::{Html, IntoResponse, Redirect}, response::{Html, IntoResponse as _, Redirect, Response},
routing::{get, post}, routing::{get, post},
Router, Router,
}; };
@ -12,7 +12,7 @@ use serde::Deserialize;
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
api_keys::ApiKey, api_keys::{self, ApiKey},
app_error::AppError, app_error::AppError,
app_state::{AppState, DbConn}, app_state::{AppState, DbConn},
csrf::generate_csrf_token, csrf::generate_csrf_token,
@ -20,12 +20,11 @@ use crate::{
guards, guards,
nav::{BreadcrumbTrail, Navbar, NavbarBuilder, NAVBAR_ITEM_TEAMS, NAVBAR_ITEM_TEAM_MEMBERS}, nav::{BreadcrumbTrail, Navbar, NavbarBuilder, NAVBAR_ITEM_TEAMS, NAVBAR_ITEM_TEAM_MEMBERS},
projects::{Project, DEFAULT_PROJECT_NAME}, projects::{Project, DEFAULT_PROJECT_NAME},
schema::{api_keys, team_invitations, team_memberships, teams, users},
settings::Settings, settings::Settings,
team_invitations::{InvitationBuilder, PopulatedTeamInvitation, TeamInvitation}, team_invitations::{self, InvitationBuilder, PopulatedTeamInvitation, TeamInvitation},
team_memberships::TeamMembership, team_memberships::{self, TeamMembership},
teams::Team, teams::{self, Team},
users::{CurrentUser, User}, users::{self, CurrentUser, User},
}; };
pub fn new_router() -> Router<AppState> { pub fn new_router() -> Router<AppState> {
@ -64,7 +63,7 @@ async fn teams_page(
State(navbar_template): State<NavbarBuilder>, State(navbar_template): State<NavbarBuilder>,
DbConn(conn): DbConn, DbConn(conn): DbConn,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
let teams: Vec<Team> = { let teams: Vec<Team> = {
let current_user = current_user.clone(); let current_user = current_user.clone();
conn.interact(move |conn| current_user.team_memberships().load(conn)) conn.interact(move |conn| current_user.team_memberships().load(conn))
@ -91,7 +90,8 @@ async fn teams_page(
teams, teams,
} }
.render()?, .render()?,
)) )
.into_response())
} }
async fn new_team_page( async fn new_team_page(
@ -99,7 +99,7 @@ async fn new_team_page(
State(navbar_template): State<NavbarBuilder>, State(navbar_template): State<NavbarBuilder>,
DbConn(db_conn): DbConn, DbConn(db_conn): DbConn,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
let csrf_token = generate_csrf_token(&db_conn, Some(current_user.id)).await?; let csrf_token = generate_csrf_token(&db_conn, Some(current_user.id)).await?;
#[derive(Template)] #[derive(Template)]
@ -120,7 +120,8 @@ async fn new_team_page(
navbar: navbar_template.with_active_item(NAVBAR_ITEM_TEAMS).build(), navbar: navbar_template.with_active_item(NAVBAR_ITEM_TEAMS).build(),
} }
.render()?, .render()?,
)) )
.into_response())
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -134,7 +135,7 @@ async fn post_new_team(
State(Settings { base_path, .. }): State<Settings>, State(Settings { base_path, .. }): State<Settings>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
Form(form): Form<PostNewTeamForm>, Form(form): Form<PostNewTeamForm>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?; guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?;
let team_id = Uuid::now_v7(); let team_id = Uuid::now_v7();
@ -147,30 +148,41 @@ async fn post_new_team(
user_id: current_user.id, user_id: current_user.id,
}; };
db_conn db_conn
.interact::<_, Result<(), AppError>>(move |conn| { .interact(move |conn| -> Result<(), AppError> {
conn.transaction::<(), AppError, _>(move |conn| { conn.transaction(move |conn| -> Result<(), AppError> {
diesel::insert_into(teams::table) diesel::insert_into(teams::table)
.values(&team) .values(&team)
.execute(conn)?; .execute(conn)
.context("failed to insert team")?;
diesel::insert_into(team_memberships::table) diesel::insert_into(team_memberships::table)
.values(&team_membership) .values(&team_membership)
.execute(conn)?; .execute(conn)
Project::insert_new(conn, &team.id, DEFAULT_PROJECT_NAME)?; .context("failed to insert team membership")?;
Ok(()) Ok(())
}) })?;
Project::lazy_getter()
.with_team_id(team_id)
.with_name(DEFAULT_PROJECT_NAME.to_owned())
.build()
.context("failed to build project lazy getter")?
.execute(conn)
.context("failed to insert project")?;
diesel::insert_into(api_keys::table)
.values(ApiKey::new_from_team_id(team_id))
.execute(conn)
.context("failed to insert api key")?;
Ok(())
}) })
.await .await
.unwrap() .unwrap()?;
.unwrap();
ApiKey::generate_for_team(&db_conn, team_id).await?;
Ok(Redirect::to(&format!("{}/en/teams/{}/projects", base_path, team_id)).into_response()) Ok(Redirect::to(&format!("{}/en/teams/{}/projects", base_path, team_id)).into_response())
} }
async fn team_page( async fn team_page(
State(Settings { base_path, .. }): State<Settings>, State(Settings { base_path, .. }): State<Settings>,
Path(team_id): Path<Uuid>, Path(team_id): Path<Uuid>,
) -> impl IntoResponse { ) -> Response {
Redirect::to(&format!("{}/en/teams/{}/projects", base_path, team_id)) Redirect::to(&format!("{}/en/teams/{}/projects", base_path, team_id)).into_response()
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -184,11 +196,21 @@ async fn post_new_api_key(
Path(team_id): Path<Uuid>, Path(team_id): Path<Uuid>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
Form(form): Form<PostNewApiKeyForm>, Form(form): Form<PostNewApiKeyForm>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?; guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?;
let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?; let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?;
ApiKey::generate_for_team(&db_conn, team.id).await?; db_conn
.interact(move |conn| -> Result<()> {
diesel::insert_into(api_keys::table)
.values(ApiKey::new_from_team_id(team_id))
.execute(conn)
.context("failed to insert api key")
.map(|_| ())
})
.await
.unwrap()?;
Ok(Redirect::to(&format!( Ok(Redirect::to(&format!(
"{}/en/teams/{}/projects", "{}/en/teams/{}/projects",
base_path, base_path,
@ -209,7 +231,7 @@ async fn remove_api_key(
Path(team_id): Path<Uuid>, Path(team_id): Path<Uuid>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
Form(form): Form<RemoveApiKeyForm>, Form(form): Form<RemoveApiKeyForm>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?; guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?;
let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?; let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?;
@ -253,15 +275,21 @@ async fn team_members_page(
DbConn(db_conn): DbConn, DbConn(db_conn): DbConn,
Path(team_id): Path<Uuid>, Path(team_id): Path<Uuid>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?; let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?;
let (team_members, invitations) = { let (team_members, invitations) = {
let team = team.clone(); let team = team.clone();
db_conn db_conn
.interact::<_, Result<(Vec<User>, Vec<PopulatedTeamInvitation>)>>(move |conn| { .interact::<_, Result<(Vec<User>, Vec<PopulatedTeamInvitation>)>>(move |conn| {
let team_members = team.members().order_by(users::email.asc()).load(conn)?; let team_members = team
let invitations = team.invitations().order_by(users::email.asc()).load(conn)?; .members()
.order_by(users::dsl::email.asc())
.load(conn)?;
let invitations = team
.invitations()
.order_by(users::dsl::email.asc())
.load(conn)?;
Ok((team_members, invitations)) Ok((team_members, invitations))
}) })
.await .await
@ -317,7 +345,7 @@ async fn invite_team_member(
Path(team_id): Path<Uuid>, Path(team_id): Path<Uuid>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
Form(form): Form<InviteTeamMemberForm>, Form(form): Form<InviteTeamMemberForm>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?; let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?;
guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?; guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?;
@ -363,7 +391,7 @@ async fn accept_invitation_page(
Path(team_id): Path<Uuid>, Path(team_id): Path<Uuid>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
Form(form): Form<AcceptInvitationPageForm>, Form(form): Form<AcceptInvitationPageForm>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
let csrf_token = generate_csrf_token(&db_conn, Some(current_user.id)).await?; let csrf_token = generate_csrf_token(&db_conn, Some(current_user.id)).await?;
let maybe_invitation = db_conn let maybe_invitation = db_conn
@ -430,7 +458,7 @@ async fn post_accept_invitation(
Path(team_id): Path<Uuid>, Path(team_id): Path<Uuid>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
Form(form): Form<PostAcceptInvitationForm>, Form(form): Form<PostAcceptInvitationForm>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?; guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?;
let maybe_invitation = { let maybe_invitation = {
@ -475,7 +503,7 @@ async fn remove_team_member(
Path(team_id): Path<Uuid>, Path(team_id): Path<Uuid>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
Form(form): Form<RemoveTeamMemberForm>, Form(form): Form<RemoveTeamMemberForm>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?; guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?;
let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?; let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?;
@ -525,7 +553,7 @@ async fn remove_team_invitation(
Path(team_id): Path<Uuid>, Path(team_id): Path<Uuid>,
CurrentUser(current_user): CurrentUser, CurrentUser(current_user): CurrentUser,
Form(form): Form<RemoveTeamInvitationForm>, Form(form): Form<RemoveTeamInvitationForm>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?; guards::require_valid_csrf_token(&form.csrf_token, &current_user, &db_conn).await?;
let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?; let team = guards::require_team_membership(&current_user, &team_id, &db_conn).await?;

View file

@ -24,12 +24,14 @@ use crate::{
app_error::AppError, app_error::AppError,
app_state::AppState, app_state::AppState,
auth::{AuthInfo, SESSION_KEY_AUTH_INFO, SESSION_KEY_AUTH_REDIRECT}, auth::{AuthInfo, SESSION_KEY_AUTH_INFO, SESSION_KEY_AUTH_REDIRECT},
schema::{team_memberships, teams, users}, schema::users,
sessions::AppSession, sessions::AppSession,
team_memberships::TeamMembership, team_memberships::{self, TeamMembership},
teams::Team, teams::{self, Team},
}; };
pub use crate::schema::users::{dsl, table};
#[derive(Clone, Debug, Identifiable, Insertable, Queryable, Selectable)] #[derive(Clone, Debug, Identifiable, Insertable, Queryable, Selectable)]
#[diesel(table_name = users)] #[diesel(table_name = users)]
#[diesel(check_for_backend(Pg))] #[diesel(check_for_backend(Pg))]
@ -51,7 +53,7 @@ impl User {
#[auto_type(no_type_alias)] #[auto_type(no_type_alias)]
pub fn team_memberships(&self) -> _ { pub fn team_memberships(&self) -> _ {
let user_id_filter: Eq<team_memberships::user_id, &Uuid> = let user_id_filter: Eq<team_memberships::dsl::user_id, &Uuid> =
TeamMembership::with_user_id(&self.id); TeamMembership::with_user_id(&self.id);
let select: AsSelect<(TeamMembership, Team), Pg> = <(TeamMembership, Team)>::as_select(); let select: AsSelect<(TeamMembership, Team), Pg> = <(TeamMembership, Team)>::as_select();
team_memberships::table team_memberships::table

View file

@ -1,38 +1,37 @@
use std::sync::LazyLock; use std::sync::LazyLock;
use anyhow::Context; use anyhow::{Context as _, Result};
use axum::{ use axum::{
extract::Query, extract::Query,
response::{IntoResponse, Json}, response::{IntoResponse as _, Json, Response},
routing::get, routing::get,
Router, Router,
}; };
use chrono::TimeDelta; use chrono::{Duration, Utc};
use diesel::{dsl::insert_into, prelude::*, update}; use diesel::prelude::*;
use regex::Regex; use regex::Regex;
use serde::Deserialize; use serde::Deserialize;
use serde_json::json; use serde_json::json;
use uuid::Uuid;
use validator::Validate; use validator::Validate;
use crate::{ use crate::{
api_keys::{try_parse_as_uuid, ApiKey}, api_keys::use_api_key,
app_error::AppError, app_error::AppError,
app_state::{AppState, DbConn}, app_state::{AppState, DbConn},
channels::Channel, governors::GovernorError,
governors::Governor, messages::Broadcast,
projects::{Project, DEFAULT_PROJECT_NAME}, projects::{Project, DEFAULT_PROJECT_NAME},
schema::{api_keys, messages}, watchdogs::{self, Watchdog},
}; };
const TEAM_GOVERNOR_DEFAULT_WINDOW_SIZE_SEC: i64 = 300;
const TEAM_GOVERNOR_DEFAULT_MAX_COUNT: i32 = 50;
static RE_PROJECT_NAME: LazyLock<Regex> = static RE_PROJECT_NAME: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^[a-z0-9_-]{1,100}$").unwrap()); LazyLock::new(|| Regex::new(r"^[a-z0-9_-]{1,100}$").unwrap());
pub fn new_router() -> Router<AppState> { pub fn new_router() -> Router<AppState> {
Router::new().route("/say", get(say_get)) Router::new()
.route("/say", get(say_get))
.route("/watchdog", get(watchdog_get))
.route("/pet", get(watchdog_get))
} }
#[derive(Deserialize, Validate)] #[derive(Deserialize, Validate)]
@ -62,23 +61,74 @@ fn default_project() -> String {
async fn say_get( async fn say_get(
DbConn(db_conn): DbConn, DbConn(db_conn): DbConn,
Query(mut query): Query<SayQuery>, Query(mut query): Query<SayQuery>,
) -> Result<impl IntoResponse, AppError> { ) -> Result<Response, AppError> {
query.project = query.project.to_lowercase().replace(" ", "_"); query.project = query.project.to_lowercase().replace(" ", "_");
query.validate().map_err(AppError::from_validation_errors)?; query.validate().map_err(AppError::from_validation_errors)?;
let api_key = { let api_key = {
let query_key = try_parse_as_uuid(&query.key) let key_id = query.key;
.or(Err(AppError::Forbidden("key not accepted".to_string())))?;
db_conn db_conn
.interact::<_, Result<ApiKey, AppError>>(move |conn| { .interact(move |conn| use_api_key(&key_id, conn))
update(api_keys::table.filter(ApiKey::with_id(&query_key))) .await
.set(api_keys::last_used_at.eq(diesel::dsl::now)) .unwrap()?
.returning(ApiKey::as_returning()) };
.get_result(conn)
.optional() db_conn
.context("failed to get API key")? .interact(move |conn| -> Result<_, AppError> {
.ok_or(AppError::Forbidden("key not accepted.".to_string())) let project = Project::lazy_getter()
.with_team_id(api_key.team_id)
.with_name(query.project)
.build()
.context("failed to build project lazy getter")?
.execute(conn)
.context("failed to lazily get project")?;
Broadcast {
project_id: project.id,
message: query.message,
}
.queue_messages(conn)
.map_err(|err| match err {
GovernorError::GovernorFull => {
AppError::TooManyRequests("team rate limit exceeded".to_owned())
}
GovernorError::Unknown(inner) => inner.into(),
}) })
})
.await
.unwrap()?;
Ok(Json(json!({ "ok": true })).into_response())
}
#[derive(Deserialize, Validate)]
struct WatchdogQuery {
#[serde(alias = "k")]
key: String,
#[serde(alias = "p")]
#[serde(default = "default_project")]
#[validate(regex(
path = *RE_PROJECT_NAME,
message = "may be no more than 100 characters and contain only alphanumerics, -, and _",
))]
project: String,
#[serde(alias = "timeout")]
#[serde(alias = "t")]
#[serde(alias = "minutes")]
#[validate(range(min = 1, max = 527040))]
timeout_minutes: i64,
}
async fn watchdog_get(
DbConn(db_conn): DbConn,
Query(mut query): Query<WatchdogQuery>,
) -> Result<Response, AppError> {
query.project = query.project.to_lowercase().replace(" ", "_");
query.validate().map_err(AppError::from_validation_errors)?;
let api_key = {
let key_id = query.key;
db_conn
.interact(move |conn| use_api_key(&key_id, conn))
.await .await
.unwrap()? .unwrap()?
}; };
@ -86,103 +136,53 @@ async fn say_get(
let project = { let project = {
let project_name = query.project.clone(); let project_name = query.project.clone();
db_conn db_conn
.interact::<_, Result<Project, AppError>>(move |conn| { .interact(move |conn| {
conn.transaction(move |conn| { Project::lazy_getter()
Ok( .with_team_id(api_key.team_id)
match Project::all() .with_name(project_name)
.filter(Project::with_team(&api_key.team_id)) .build()
.filter(Project::with_name(&project_name)) .context("failed to build project lazy getter")?
.first(conn) .execute(conn)
.optional() .context("failed to lazily get project")
.context("failed to load project")?
{
Some(project) => project,
None => Project::insert_new(conn, &api_key.team_id, &project_name)
.context("failed to insert project")?,
},
)
})
}) })
.await .await
.unwrap()? .unwrap()?
}; };
let team_governor = { let expiration = Utc::now() + Duration::minutes(query.timeout_minutes);
let team_id = project.team_id; tracing::debug!(
db_conn "updated watchdog for project {} with expiration {}",
.interact::<_, Result<Governor, AppError>>(move |conn| { project.id,
// TODO: extract this logic to a method in crate::governors, expiration
// and create governor proactively on team creation );
match Governor::all() let watchdog = db_conn
.filter(Governor::with_team(&team_id)) .interact(move |conn| -> Result<Watchdog, _> {
.filter(Governor::with_project(&None)) diesel::insert_into(watchdogs::table)
.first(conn) .values(
{ Watchdog::insertable_builder()
diesel::QueryResult::Ok(governor) => Ok(governor), .with_project_id(project.id)
diesel::QueryResult::Err(diesel::result::Error::NotFound) => { .with_expiration(expiration)
// Lazily initialize governor .build()
Governor::insert_new( .context("failed to build insertable watchdog")?,
conn, )
&team_id, .on_conflict(watchdogs::dsl::project_id)
None, .do_update()
&TimeDelta::seconds(TEAM_GOVERNOR_DEFAULT_WINDOW_SIZE_SEC), .set((
TEAM_GOVERNOR_DEFAULT_MAX_COUNT, watchdogs::dsl::expiration
) .eq(diesel::upsert::excluded(watchdogs::dsl::expiration)),
.map_err(Into::into) watchdogs::dsl::notified.eq(diesel::upsert::excluded(watchdogs::dsl::notified)),
} watchdogs::dsl::last_set_at.eq(diesel::dsl::now),
diesel::QueryResult::Err(err) => Err(err.into()), ))
} .get_result(conn)
}) .context("failed to upsert watchdog")
.await
.unwrap()?
};
if db_conn
.interact::<_, Result<Option<_>, anyhow::Error>>(move |conn| {
team_governor.create_entry(conn)
}) })
.await .await
.unwrap()? .unwrap()?;
.is_none() tracing::debug!(
{ "updated watchdog {} with expiration {}",
return Err(AppError::TooManyRequests( watchdog.id,
"team rate limit exceeded".to_string(), watchdog.expiration
)); );
}
let selected_channels = { Ok(Json(json!({ "ok": true })).into_response())
let project = project.clone();
db_conn
.interact::<_, Result<Vec<Channel>, AppError>>(move |conn| {
Ok(project
.selected_channels()
.load(conn)
.context("failed to load selected channels")?)
})
.await
.unwrap()?
};
{
let selected_channels = selected_channels.clone();
db_conn
.interact::<_, Result<_, AppError>>(move |conn| {
for channel in selected_channels {
insert_into(messages::table)
.values((
messages::id.eq(Uuid::now_v7()),
messages::channel_id.eq(&channel.id),
messages::project_id.eq(&project.id),
messages::message.eq(&query.message),
))
.execute(conn)?;
}
Ok(())
})
.await
.unwrap()?;
}
tracing::debug!("queued {} messages", selected_channels.len());
Ok(Json(json!({ "ok": true })))
} }

67
src/watchdogs.rs Normal file
View file

@ -0,0 +1,67 @@
use chrono::{DateTime, Utc};
use derive_builder::Builder;
use diesel::{
dsl::{auto_type, AsSelect, Lt},
pg::Pg,
prelude::*,
};
use uuid::Uuid;
use crate::schema::watchdogs;
pub use crate::schema::watchdogs::{dsl, table};
#[derive(Clone, Debug, Identifiable, Queryable, Selectable)]
#[diesel(table_name = watchdogs)]
pub struct Watchdog {
pub id: Uuid,
pub project_id: Uuid,
pub created_at: DateTime<Utc>,
pub last_set_at: DateTime<Utc>,
pub expiration: DateTime<Utc>,
pub notified: bool,
}
impl Watchdog {
#[auto_type(no_type_alias)]
pub fn all() -> _ {
let select: AsSelect<Watchdog, Pg> = Watchdog::as_select();
table.select(select)
}
#[auto_type(no_type_alias)]
pub fn with_id<'a>(id: &'a Uuid) -> _ {
dsl::id.eq(id)
}
#[auto_type(no_type_alias)]
pub fn with_project<'a>(project_id: &'a Uuid) -> _ {
dsl::project_id.eq(project_id)
}
pub fn with_expiration_before<T: Into<DateTime<Utc>>>(
time: T,
) -> Lt<dsl::expiration, DateTime<Utc>> {
dsl::expiration.lt(Into::<DateTime<Utc>>::into(time))
}
#[auto_type(no_type_alias)]
pub fn with_notified(notified: bool) -> _ {
dsl::notified.eq(notified)
}
pub fn insertable_builder() -> InsertableWatchdogBuilder {
InsertableWatchdogBuilder::default()
}
}
#[derive(Builder, Clone, Debug, Insertable)]
#[diesel(table_name = watchdogs)]
#[builder(pattern = "owned", setter(prefix = "with"))]
pub struct InsertableWatchdog {
#[builder(setter(skip), default = "uuid::Uuid::now_v7()")]
id: Uuid,
project_id: Uuid,
#[builder(setter(strip_option), default)]
expiration: Option<DateTime<Utc>>,
}

View file

@ -1,4 +1,5 @@
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use chrono::Utc;
use diesel::prelude::*; use diesel::prelude::*;
use tracing::Instrument as _; use tracing::Instrument as _;
use uuid::Uuid; use uuid::Uuid;
@ -7,26 +8,89 @@ use crate::{
app_state::AppState, app_state::AppState,
channels::{self, BackendConfig, Channel, EmailBackendConfig}, channels::{self, BackendConfig, Channel, EmailBackendConfig},
email::MailSender, email::MailSender,
governors::Governor, governors::{Governor, GovernorError},
messages::{self, Message}, messages::{self, Broadcast, Message},
projects::{self, Project},
slack_utils::SlackClient, slack_utils::SlackClient,
watchdogs::{self, Watchdog},
}; };
pub async fn run_worker(state: AppState) -> Result<()> { pub async fn run_worker(state: AppState) -> Result<()> {
async move { async move {
process_watchdogs(state.clone()).await?;
process_messages(state.clone()).await?; process_messages(state.clone()).await?;
reclaim_governor_entries(state).await?; reclaim_governor_entries(state).await?;
// TODO: Add an optional meta Shout.dev watchdog hook
Ok(()) Ok(())
} }
.instrument(tracing::debug_span!("run_worker()")) .instrument(tracing::debug_span!("run_worker()"))
.await .await
} }
/** async fn process_watchdogs(state: AppState) -> Result<()> {
* Process messages from the queue in the `messages` table. Insertions to the // rustfmt has an issue that makes long string literals prevent formatting of
* queue are rate limited per team and per project, so no effort should be // the surrounding code. Breaking this out into a macro is used here as a
* needed here to enforce fairness. // workaround.
*/ macro_rules! watchdog_alert_message {
($project_name:expr) => {
format!(
"Hi, This is a friendly alert from Shout.dev that the watchdog timer for project {} has run out without being renewed. You may want to check that the associated service is running as expected.",
$project_name,
)
}
}
const WATCHDOG_QUEUE_LIMIT: i64 = 500;
let now = Utc::now();
async move {
let db_conn = state.db_pool.get().await?;
db_conn
.interact(move |conn| -> Result<_> {
let expired = watchdogs::table
.inner_join(projects::table)
.select(<(Watchdog, Project)>::as_select())
.filter(Watchdog::with_expiration_before(now))
.filter(Watchdog::with_notified(false))
.order_by(watchdogs::dsl::expiration.asc())
.limit(WATCHDOG_QUEUE_LIMIT)
.load(conn)
.context("failed to load expired watchdogs")?;
for (watchdog, project) in expired {
match (Broadcast {
project_id: project.id,
message: watchdog_alert_message!(project.name),
})
.queue_messages(conn)
{
Err(GovernorError::GovernorFull) => {
tracing::warn!(
"governor full and could not queue watchdog alert for project {}",
project.id
);
Ok(())
}
Err(GovernorError::Unknown(inner)) => Err(inner),
Ok(_) => Ok(()),
}?;
diesel::update(watchdogs::table.filter(Watchdog::with_id(&watchdog.id)))
.set(watchdogs::dsl::notified.eq(true))
.execute(conn)?;
}
Ok(())
})
.await
.unwrap()?;
Ok(())
}
.instrument(tracing::debug_span!(
"process_watchdogs()",
now = now.to_rfc3339()
))
.await
}
/// Process messages from the queue in the `messages` table. Insertions to the
/// queue are rate limited per team and per project, so no effort should be
/// needed here to enforce fairness.
async fn process_messages(state: AppState) -> Result<()> { async fn process_messages(state: AppState) -> Result<()> {
async move { async move {
const MESSAGE_QUEUE_LIMIT: i64 = 250; const MESSAGE_QUEUE_LIMIT: i64 = 250;

View file

@ -8,6 +8,44 @@
<section class="mb-4"> <section class="mb-4">
<h1>Project: <code>{{ project.name }}</code></h1> <h1>Project: <code>{{ project.name }}</code></h1>
</section> </section>
<section class="mb-4">
<h2>Watchdog Timer</h2>
{% if let Some(watchdog) = watchdog %}
<p>
Watchdog timer was last refreshed at
<code>{{ watchdog.last_set_at.to_rfc3339_opts(chrono::SecondsFormat::Secs, true) }}</code>
and is set to time out at
<code>{{ watchdog.expiration.to_rfc3339_opts(chrono::SecondsFormat::Secs, true) }}</code>.
</p>
<div class="alert alert-primary" role="alert">
{% else %}
<p>
Watchdog timer is not active.
</p>
<div class="alert alert-primary" role="alert">
<p>
Taking inspiration from the
<a
target="_blank"
rel="noopener noreferrer"
href="https://en.wikipedia.org/wiki/Watchdog_timer"
>"computer operating properly timers"</a>
of embedded programming, a project's watchdog timer will trigger an
alert to all enabled channels when it is not refreshed within a
certain duration. For example, if you have a background job that is
expected to run every 15 minutes, setting a watchdog timer with a
timeout of 20 minutes at the end of each run will cause Shout.dev to
notify you if the job begins to fail or hang.
</p>
{% endif %}
<p class="mb-0">
To set or refresh the watchdog for this project:
<code>
{{ frontend_host }}{{base_path}}/watchdog?project={{ project.name|urlencode }}&timeout_minutes=20&key=******
</code>
</p>
</div>
</section>
<section class="mb-4"> <section class="mb-4">
<h2>Enabled Channels</h2> <h2>Enabled Channels</h2>
<form <form
@ -24,30 +62,30 @@
</thead> </thead>
<tbody> <tbody>
{% for channel in team_channels %} {% for channel in team_channels %}
<tr> <tr>
<td> <td>
<label for="enable-channel-switch-{{ channel.id.simple() }}"> <label for="enable-channel-switch-{{ channel.id.simple() }}">
<a <a
target="_blank" target="_blank"
href="{{ breadcrumbs.join(format!("../../channels/{}", channel.id.simple()).as_str()) }}" href="{{ breadcrumbs.join(format!("../../channels/{}", channel.id.simple()).as_str()) }}"
>
{{ channel.name }}
</a>
</label>
</td>
<td>
<input
class="form-check-input"
{% if enabled_channel_ids.contains(channel.id) %}
checked=""
{% endif %}
type="checkbox"
name="enabled_channels"
value="{{ channel.id.simple() }}"
id="enable-channel-switch-{{ channel.id.simple() }}"
> >
</td> {{ channel.name }}
</tr> </a>
</label>
</td>
<td>
<input
class="form-check-input"
{% if enabled_channel_ids.contains(channel.id) %}
checked=""
{% endif %}
type="checkbox"
name="enabled_channels"
value="{{ channel.id.simple() }}"
id="enable-channel-switch-{{ channel.id.simple() }}"
>
</td>
</tr>
{% endfor %} {% endfor %}
</tbody> </tbody>
</table> </table>

View file

@ -17,12 +17,12 @@
</p> </p>
<p> <p>
<code> <code>
https://shout.dev{{ base_path }}/say?project=my-first-project&amp;key=***&amp;message=Hello,%20World {{ frontend_host }}{{ base_path }}/say?project=my-first-project&amp;key=***&amp;message=Hello,%20World
</code> </code>
</p> </p>
<p> <p>
<code> <code>
https://shout.dev{{ base_path }}/watchdog?project=my-first-project&amp;key=***&amp;seconds=300 {{ frontend_host }}{{ base_path }}/watchdog?project=my-first-project&amp;key=***&amp;timeout_minutes=15
</code> </code>
</p> </p>
</div> </div>