phonograph/phono-server/src/workspace_pooler.rs
2025-12-18 20:05:46 +00:00

142 lines
4.8 KiB
Rust

use std::{collections::HashMap, sync::Arc, time::Duration};
use anyhow::Result;
use axum::extract::FromRef;
use derive_builder::Builder;
use phono_backends::{client::WorkspaceClient, rolnames::ROLE_PREFIX_USER};
use phono_models::{client::AppDbClient, workspace::Workspace};
use sqlx::{Executor, PgPool, postgres::PgPoolOptions, raw_sql};
use tokio::sync::{OnceCell, RwLock};
use uuid::Uuid;
use crate::app::App;
const MAX_CONNECTIONS: u32 = 4;
const IDLE_SECONDS: u64 = 3600;
/// A collection of multiple SQLx Pools.
#[derive(Builder, Clone, Debug)]
pub struct WorkspacePooler {
#[builder(default, setter(skip))]
pools: Arc<RwLock<HashMap<Uuid, OnceCell<PgPool>>>>,
app_db_pool: PgPool,
}
impl WorkspacePooler {
pub fn builder() -> WorkspacePoolerBuilder {
WorkspacePoolerBuilder::default()
}
async fn get_pool_for(&mut self, workspace_id: Uuid) -> Result<PgPool> {
let init_cell = || async {
let mut app_db = AppDbClient::from_pool_conn(self.app_db_pool.acquire().await?);
let workspace = Workspace::with_id(workspace_id)
.fetch_one(&mut app_db)
.await?;
Ok(PgPoolOptions::new()
.min_connections(0)
.max_connections(MAX_CONNECTIONS)
.idle_timeout(Some(Duration::from_secs(IDLE_SECONDS)))
.after_release(|conn, _| {
Box::pin(async move {
// Essentially "DISCARD ALL" without "DEALLOCATE ALL"
conn.execute(raw_sql(
r#"
close all;
set session authorization default;
reset all;
unlisten *;
select pg_advisory_unlock_all();
discard plans;
discard temp;
discard sequences;
"#,
))
.await?;
Ok(true)
})
})
.connect(
workspace
.fetch_cluster(&mut app_db)
.await?
.conn_str_for_db(&workspace.db_name, None)?
.expose_secret()
.as_str(),
)
.await?)
};
// Attempt to get an existing pool without write-locking the map
let pools = self.pools.read().await;
if let Some(cell) = pools.get(&workspace_id) {
return Ok(cell
.get_or_try_init::<anyhow::Error, _, _>(init_cell)
.await?
.clone());
}
drop(pools); // Release read lock
let mut pools = self.pools.write().await;
let entry = pools.entry(workspace_id).or_insert(OnceCell::new());
Ok(entry
.get_or_try_init::<anyhow::Error, _, _>(init_cell)
.await?
.clone())
}
/// Note that while using `SET ROLE` simulates impersonation for most data
/// access and RLS purposes, it is both incomplete and easily reversible:
/// some commands and system tables will still behave according to the
/// privileges of the session user, and clients relying on this abstraction
/// should **NEVER** execute untrusted SQL.
pub async fn acquire_for(
&mut self,
workspace_id: Uuid,
set_role: RoleAssignment,
) -> Result<WorkspaceClient> {
let pool = self.get_pool_for(workspace_id).await?;
let mut client = WorkspaceClient::from_pool_conn(pool.acquire().await?);
match set_role {
RoleAssignment::User(uid) => {
// TODO: Return error if user does not have "CONNECT" privileges
// on backing database. Note that this change will entail a
// fairly broad refactor of [`phono-server`] code for
// initializing user roles and for performing workspace auth
// checks.
let rolname = format!("{ROLE_PREFIX_USER}{uid}", uid = uid.simple());
client.ensure_role(&rolname).await?;
client.set_role(&rolname).await?;
}
RoleAssignment::Root => { /* no-op */ }
}
Ok(client)
}
pub async fn close_for(&mut self, workspace_id: Uuid) -> Result<()> {
let pools = self.pools.read().await;
if let Some(cell) = pools.get(&workspace_id)
&& let Some(pool) = cell.get()
{
let pool = pool.clone();
drop(pools); // Release read lock
let mut pools = self.pools.write().await;
pools.remove(&workspace_id);
drop(pools); // Release write lock
pool.close().await;
}
Ok(())
}
// TODO: Add a cleanup method to remove entries with no connections
}
impl FromRef<App> for WorkspacePooler {
fn from_ref(state: &App) -> Self {
state.workspace_pooler.clone()
}
}
pub enum RoleAssignment {
Root,
User(Uuid),
}