phonograph/interim-server/src/base_pooler.rs

138 lines
4.4 KiB
Rust
Raw Normal View History

2025-05-26 22:08:21 -07:00
use std::{collections::HashMap, sync::Arc, time::Duration};
2025-08-04 13:59:42 -07:00
use anyhow::Result;
2025-05-26 22:08:21 -07:00
use axum::extract::FromRef;
2025-08-04 13:59:42 -07:00
use interim_models::{base::Base, client::AppDbClient};
use interim_pgtypes::client::BaseClient;
use sqlx::{Executor, PgPool, postgres::PgPoolOptions, raw_sql};
2025-05-26 22:08:21 -07:00
use tokio::sync::{OnceCell, RwLock};
use uuid::Uuid;
2025-08-04 13:59:42 -07:00
use crate::app_state::AppState;
2025-05-26 22:08:21 -07:00
const MAX_CONNECTIONS: u32 = 4;
const IDLE_SECONDS: u64 = 3600;
2025-08-04 13:59:42 -07:00
// TODO: The Arc<RwLock> this uses will probably need to be cleaned up for
2025-05-26 22:08:21 -07:00
// performance eventually.
/// A collection of multiple SQLx Pools.
#[derive(Clone)]
pub struct BasePooler {
pools: Arc<RwLock<HashMap<Uuid, OnceCell<PgPool>>>>,
app_db: PgPool,
}
impl BasePooler {
pub fn new_with_app_db(app_db: PgPool) -> Self {
Self {
app_db,
pools: Arc::new(RwLock::new(HashMap::new())),
}
}
async fn get_pool_for(&mut self, base_id: Uuid) -> Result<PgPool> {
let init_cell = || async {
2025-08-04 13:59:42 -07:00
let mut app_db = AppDbClient::from_pool_conn(self.app_db.acquire().await?);
let base = Base::with_id(base_id).fetch_one(&mut app_db).await?;
2025-05-26 22:08:21 -07:00
Ok(PgPoolOptions::new()
.min_connections(0)
.max_connections(MAX_CONNECTIONS)
.idle_timeout(Some(Duration::from_secs(IDLE_SECONDS)))
.after_release(|conn, _| {
Box::pin(async move {
// Essentially "DISCARD ALL" without "DEALLOCATE ALL"
conn.execute(raw_sql(
"
close all;
set session authorization default;
reset all;
unlisten *;
select pg_advisory_unlock_all();
discard plans;
discard temp;
discard sequences;
",
))
.await?;
Ok(true)
})
})
.connect(&base.url)
.await?)
};
// Attempt to get an existing pool without write-locking the map
let pools = self.pools.read().await;
if let Some(cell) = pools.get(&base_id) {
return Ok(cell
.get_or_try_init::<anyhow::Error, _, _>(init_cell)
.await?
.clone());
}
drop(pools); // Release read lock
let mut pools = self.pools.write().await;
let entry = pools.entry(base_id).or_insert(OnceCell::new());
Ok(entry
.get_or_try_init::<anyhow::Error, _, _>(init_cell)
.await?
.clone())
}
2025-08-04 13:59:42 -07:00
/// Note that while using `set role` simulates impersonation for most data
/// access and RLS purposes, it is both incomplete and easily reversible:
/// some commands and system tables will still behave according to the
/// privileges of the session user, and clients relying on this abstraction
/// should **NEVER** execute untrusted SQL.
pub async fn acquire_for(
&mut self,
base_id: Uuid,
set_role: RoleAssignment,
) -> Result<BaseClient> {
let mut app_db = AppDbClient::from_pool_conn(self.app_db.acquire().await?);
2025-05-26 22:08:21 -07:00
let pool = self.get_pool_for(base_id).await?;
2025-08-04 13:59:42 -07:00
let mut client = BaseClient::from_pool_conn(pool.acquire().await?);
match set_role {
RoleAssignment::User(id) => {
let base = Base::with_id(base_id).fetch_one(&mut app_db).await?;
let prefix = base.user_role_prefix;
let user_id = id.simple();
client.init_role(&format!("{prefix}{user_id}")).await?;
}
RoleAssignment::Root => {}
}
Ok(client)
2025-05-26 22:08:21 -07:00
}
pub async fn close_for(&mut self, base_id: Uuid) -> Result<()> {
let pools = self.pools.read().await;
if let Some(cell) = pools.get(&base_id) {
if let Some(pool) = cell.get() {
let pool = pool.clone();
drop(pools); // Release read lock
let mut pools = self.pools.write().await;
pools.remove(&base_id);
drop(pools); // Release write lock
pool.close().await;
}
}
Ok(())
}
// TODO: Add a cleanup method to remove entries with no connections
}
impl<S> FromRef<S> for BasePooler
where
S: Into<AppState> + Clone,
{
fn from_ref(state: &S) -> Self {
Into::<AppState>::into(state.clone()).base_pooler.clone()
}
}
2025-08-04 13:59:42 -07:00
pub enum RoleAssignment {
Root,
User(Uuid),
}