Files
jspg/src/database/executors/pgrx.rs
2026-03-16 21:21:11 -04:00

113 lines
3.4 KiB
Rust

use crate::database::executors::DatabaseExecutor;
use pgrx::prelude::*;
use serde_json::Value;
/// The production executor that wraps `pgrx::spi::Spi`.
pub struct SpiExecutor;
impl SpiExecutor {
pub fn new() -> Self {
Self {}
}
}
impl DatabaseExecutor for SpiExecutor {
fn query(&self, sql: &str, args: Option<&[Value]>) -> Result<Value, String> {
let mut json_args = Vec::new();
let mut args_with_oid: Vec<pgrx::datum::DatumWithOid> = Vec::new();
if let Some(params) = args {
for val in params {
json_args.push(pgrx::JsonB(val.clone()));
}
for j_val in json_args.into_iter() {
args_with_oid.push(pgrx::datum::DatumWithOid::from(j_val));
}
}
pgrx::PgTryBuilder::new(|| {
Spi::connect(|client| {
pgrx::notice!("JSPG_SQL: {}", sql);
match client.select(sql, Some(args_with_oid.len() as i64), &args_with_oid) {
Ok(tup_table) => {
let mut results = Vec::new();
for row in tup_table {
if let Ok(Some(jsonb)) = row.get::<pgrx::JsonB>(1) {
results.push(jsonb.0);
}
}
Ok(Value::Array(results))
}
Err(e) => Err(format!("SPI Query Fetch Failure: {}", e)),
}
})
})
.catch_others(|cause| {
pgrx::warning!("JSPG Caught Native Postgres Error: {:?}", cause);
Err(format!("{:?}", cause))
})
.execute()
}
fn execute(&self, sql: &str, args: Option<&[Value]>) -> Result<(), String> {
let mut json_args = Vec::new();
let mut args_with_oid: Vec<pgrx::datum::DatumWithOid> = Vec::new();
if let Some(params) = args {
for val in params {
json_args.push(pgrx::JsonB(val.clone()));
}
for j_val in json_args.into_iter() {
args_with_oid.push(pgrx::datum::DatumWithOid::from(j_val));
}
}
pgrx::PgTryBuilder::new(|| {
Spi::connect_mut(|client| {
pgrx::notice!("JSPG_SQL: {}", sql);
match client.update(sql, Some(args_with_oid.len() as i64), &args_with_oid) {
Ok(_) => Ok(()),
Err(e) => Err(format!("SPI Execution Failure: {}", e)),
}
})
})
.catch_others(|cause| {
pgrx::warning!("JSPG Caught Native Postgres Error: {:?}", cause);
Err(format!("{:?}", cause))
})
.execute()
}
fn auth_user_id(&self) -> Result<String, String> {
Spi::connect(|client| {
let mut tup_table = client
.select(
"SELECT COALESCE(current_setting('auth.user_id', true), 'ffffffff-ffff-ffff-ffff-ffffffffffff')",
None,
&[],
)
.map_err(|e| format!("SPI Select Error: {}", e))?;
let row = tup_table
.next()
.ok_or("No user id setting returned from context".to_string())?;
let user_id: Option<String> = row.get(1).map_err(|e| e.to_string())?;
user_id.ok_or("Missing user_id".to_string())
})
}
fn timestamp(&self) -> Result<String, String> {
Spi::connect(|client| {
let mut tup_table = client
.select("SELECT clock_timestamp()::text", None, &[])
.map_err(|e| format!("SPI Select Error: {}", e))?;
let row = tup_table
.next()
.ok_or("No clock timestamp returned".to_string())?;
let timestamp: Option<String> = row.get(1).map_err(|e| e.to_string())?;
timestamp.ok_or("Missing timestamp".to_string())
})
}
}