From 9d543eedd803504b68492bca61d0a07a4d1bdfd9 Mon Sep 17 00:00:00 2001 From: Brent Schroeter Date: Fri, 14 Mar 2025 15:28:20 -0700 Subject: [PATCH] extract cli and migrations into modules --- src/cli.rs | 83 ++++++++++++++++++++++++++++++ src/main.rs | 127 +++++++++++----------------------------------- src/migrations.rs | 3 ++ 3 files changed, 115 insertions(+), 98 deletions(-) create mode 100644 src/cli.rs create mode 100644 src/migrations.rs diff --git a/src/cli.rs b/src/cli.rs new file mode 100644 index 0000000..2bdd50a --- /dev/null +++ b/src/cli.rs @@ -0,0 +1,83 @@ +use anyhow::Result; +use axum::middleware::map_request; +use chrono::{TimeDelta, Utc}; +use clap::{Parser, Subcommand}; +use tokio::time::sleep; +use tower::ServiceBuilder; +use tower_http::{ + compression::CompressionLayer, normalize_path::NormalizePathLayer, trace::TraceLayer, +}; + +use crate::{ + app_state::AppState, middleware::lowercase_uri_path, router::new_router, worker::run_worker, +}; + +#[derive(Parser)] +#[command(version, about, long_about = None)] +pub struct Cli { + #[command(subcommand)] + pub command: Commands, +} + +#[derive(Parser)] +pub struct WorkerArgs { + /// Loop the every n seconds instead of exiting after execution + #[arg(long)] + auto_loop_seconds: Option, +} + +#[derive(Subcommand)] +pub enum Commands { + /// Run web server + Serve, + /// Run background worker + Worker(WorkerArgs), + // TODO: add a low-frequency worker task exclusively for self-healing + // mechanisms like Governor::reset_all() +} + +pub async fn serve_command(state: AppState) -> Result<()> { + let router = new_router(state.clone()).layer( + ServiceBuilder::new() + .layer(map_request(lowercase_uri_path)) + .layer(TraceLayer::new_for_http()) + .layer(CompressionLayer::new()) + .layer(NormalizePathLayer::trim_trailing_slash()), + ); + + let listener = + tokio::net::TcpListener::bind((state.settings.host.clone(), state.settings.port)) + .await + .unwrap(); + tracing::info!( + "App running at http://{}:{}{}", + state.settings.host, + state.settings.port, + state.settings.base_path + ); + + axum::serve(listener, router).await.map_err(Into::into) +} + +pub async fn worker_command(args: &WorkerArgs, state: AppState) -> Result<()> { + if let Some(loop_seconds) = args.auto_loop_seconds { + let loop_delta = TimeDelta::seconds(i64::from(loop_seconds)); + loop { + let t_next_loop = Utc::now() + loop_delta; + + if let Err(err) = run_worker(state.clone()).await { + tracing::error!("{}", err) + } + + let sleep_delta = t_next_loop - Utc::now(); + match sleep_delta.to_std() { + Ok(duration) => { + sleep(duration).await; + } + Err(_) => { /* sleep_delta was < 0, so don't sleep */ } + } + } + } else { + run_worker(state).await + } +} diff --git a/src/main.rs b/src/main.rs index a030c8f..e003e2f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,70 +1,43 @@ -use axum::middleware::map_request; -use chrono::{TimeDelta, Utc}; -use clap::{Parser, Subcommand}; -use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; +use clap::Parser as _; +use diesel_migrations::MigrationHarness; use dotenvy::dotenv; -use tokio::time::sleep; -use tower::ServiceBuilder; -use tower_http::{ - compression::CompressionLayer, normalize_path::NormalizePathLayer, trace::TraceLayer, -}; use tracing_subscriber::EnvFilter; use crate::{ app_state::{App, AppState}, - middleware::lowercase_uri_path, - router::new_router, + cli::{serve_command, worker_command, Cli, Commands}, + migrations::MIGRATIONS, settings::Settings, - worker::run_worker, }; -pub mod api_keys; -pub mod app_error; -pub mod app_state; -pub mod auth; -pub mod channel_selections; -pub mod channels; +mod api_keys; +mod app_error; +mod app_state; +mod auth; +mod channel_selections; +mod channels; mod channels_router; -pub mod csrf; -pub mod email; -pub mod governors; -pub mod guards; -pub mod messages; -pub mod middleware; +mod cli; +mod csrf; +mod email; +mod governors; +mod guards; +mod messages; +mod middleware; +mod migrations; mod nav_state; -pub mod projects; +mod projects; mod projects_router; -pub mod router; -pub mod schema; -pub mod sessions; -pub mod settings; -pub mod team_memberships; -pub mod teams; +mod router; +mod schema; +mod sessions; +mod settings; +mod team_memberships; +mod teams; mod teams_router; -pub mod users; +mod users; mod v0_router; -pub mod worker; - -#[derive(Parser)] -#[command(version, about, long_about = None)] -struct Cli { - #[command(subcommand)] - command: Commands, -} - -#[derive(Subcommand)] -enum Commands { - /// Run web server - Serve, - /// Run background worker - Worker { - /// Loop the every n seconds instead of exiting after execution - #[arg(long)] - auto_loop_seconds: Option, - }, - // TODO: add a low-frequency worker task exclusively for self-healing - // mechanisms like Governor::reset_all() -} +mod worker; /// Run CLI #[tokio::main] @@ -80,7 +53,6 @@ async fn main() { let state: AppState = App::from_settings(settings.clone()).await.unwrap().into(); if settings.run_database_migrations == Some(1) { - const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/"); // Run migrations on server startup let conn = state.db_pool.get().await.unwrap(); conn.interact(|conn| conn.run_pending_migrations(MIGRATIONS).map(|_| ())) @@ -91,48 +63,7 @@ async fn main() { let cli = Cli::parse(); match &cli.command { - Commands::Serve => { - let router = new_router(state.clone()).layer( - ServiceBuilder::new() - .layer(map_request(lowercase_uri_path)) - .layer(TraceLayer::new_for_http()) - .layer(CompressionLayer::new()) - .layer(NormalizePathLayer::trim_trailing_slash()), - ); - - let listener = tokio::net::TcpListener::bind((settings.host.clone(), settings.port)) - .await - .unwrap(); - tracing::info!( - "App running at http://{}:{}{}", - settings.host, - settings.port, - settings.base_path - ); - - axum::serve(listener, router).await.unwrap(); - } - Commands::Worker { auto_loop_seconds } => { - if let Some(loop_seconds) = auto_loop_seconds { - let loop_delta = TimeDelta::seconds(i64::from(*loop_seconds)); - loop { - let t_next_loop = Utc::now() + loop_delta; - - if let Err(err) = run_worker(state.clone()).await { - tracing::error!("{}", err) - } - - let sleep_delta = t_next_loop - Utc::now(); - match sleep_delta.to_std() { - Ok(duration) => { - sleep(duration).await; - } - Err(_) => { /* sleep_delta was < 0, so don't sleep */ } - } - } - } else { - run_worker(state).await.unwrap(); - } - } + Commands::Serve => serve_command(state).await.unwrap(), + Commands::Worker(args) => worker_command(args, state).await.unwrap(), } } diff --git a/src/migrations.rs b/src/migrations.rs new file mode 100644 index 0000000..9c47191 --- /dev/null +++ b/src/migrations.rs @@ -0,0 +1,3 @@ +use diesel_migrations::{embed_migrations, EmbeddedMigrations}; + +pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/");