diff --git a/src/database/executors/mock.rs b/src/database/executors/mock.rs index 4a037b6..e734864 100644 --- a/src/database/executors/mock.rs +++ b/src/database/executors/mock.rs @@ -85,6 +85,14 @@ impl DatabaseExecutor for MockExecutor { Ok("2026-03-10T00:00:00Z".to_string()) } + fn auth_origin(&self) -> Result, String> { + Ok(None) + } + + fn punc_trigger(&self) -> Result, String> { + Ok(None) + } + #[cfg(test)] fn get_queries(&self) -> Vec { MOCK_STATE.with(|state| state.borrow().captured_queries.clone()) diff --git a/src/database/executors/mod.rs b/src/database/executors/mod.rs index 3923d13..ad97c5e 100644 --- a/src/database/executors/mod.rs +++ b/src/database/executors/mod.rs @@ -20,6 +20,12 @@ pub trait DatabaseExecutor: Send + Sync { /// Returns the current transaction timestamp fn timestamp(&self) -> Result; + /// Returns the current auth.origin session context if configured + fn auth_origin(&self) -> Result, String>; + + /// Returns the current punc.name session context if configured + fn punc_trigger(&self) -> Result, String>; + #[cfg(test)] fn get_queries(&self) -> Vec; diff --git a/src/database/executors/pgrx.rs b/src/database/executors/pgrx.rs index e348bd5..f58a90e 100644 --- a/src/database/executors/pgrx.rs +++ b/src/database/executors/pgrx.rs @@ -150,4 +150,46 @@ impl DatabaseExecutor for SpiExecutor { }) }) } + + fn auth_origin(&self) -> Result, String> { + self.transact(|| { + Spi::connect(|client| { + let mut tup_table = client + .select( + "SELECT NULLIF(current_setting('auth.origin', true), '')::jsonb", + None, + &[], + ) + .map_err(|e| format!("SPI Select Error: {}", e))?; + + if let Some(row) = tup_table.next() { + if let Ok(Some(jsonb)) = row.get::(1) { + return Ok(Some(jsonb.0)); + } + } + Ok(None) + }) + }) + } + + fn punc_trigger(&self) -> Result, String> { + self.transact(|| { + Spi::connect(|client| { + let mut tup_table = client + .select( + "SELECT NULLIF(current_setting('punc.name', true), '')", + None, + &[], + ) + .map_err(|e| format!("SPI Select Error: {}", e))?; + + if let Some(row) = tup_table.next() { + if let Ok(val_opt) = row.get::(1) { + return Ok(val_opt); + } + } + Ok(None) + }) + }) + } } diff --git a/src/database/mod.rs b/src/database/mod.rs index e36ff1e..dc8fad2 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -206,6 +206,16 @@ impl Database { self.executor.timestamp() } + /// Returns the current auth.origin session context if configured + pub fn auth_origin(&self) -> Result, String> { + self.executor.auth_origin() + } + + /// Returns the current punc.name session context if configured + pub fn punc_trigger(&self) -> Result, String> { + self.executor.punc_trigger() + } + pub fn compile(&mut self, errors: &mut Vec) { // Phase 1: Registration self.collect_schemas(errors); diff --git a/src/merger/mod.rs b/src/merger/mod.rs index 851bc05..7941d0e 100644 --- a/src/merger/mod.rs +++ b/src/merger/mod.rs @@ -946,7 +946,25 @@ impl Merger { Value::Object(old_vals) }; + let origin = match self.db.auth_origin() { + Ok(Some(orig)) => orig, + _ => serde_json::json!({ + "kind": "user", + "user_id": user_id + }), + }; + + let trigger = match self.db.punc_trigger() { + Ok(Some(trig)) => trig, + _ => "merge_entity".to_string(), + }; + + let entity_type_name = type_name.as_str().unwrap_or(&type_obj.name); + let mut notification = serde_json::Map::new(); + notification.insert("type".to_string(), Value::String(entity_type_name.to_string())); + notification.insert("trigger".to_string(), Value::String(trigger)); + notification.insert("origin".to_string(), origin.clone()); notification.insert("complete".to_string(), Value::Object(complete)); notification.insert("new".to_string(), new_val_obj.clone()); @@ -961,14 +979,16 @@ impl Merger { let mut notify_sql = None; if type_obj.historical && change_kind != "replace" { let change_sql = format!( - "INSERT INTO agreego.change (\"old\", \"new\", entity_id, id, kind, modified_at, modified_by) VALUES ({}, {}, {}, {}, {}, {}, {})", + "INSERT INTO agreego.change (\"old\", \"new\", entity_id, id, kind, modified_at, modified_by, origin, entity_type) VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {})", Self::quote_literal(&old_val_obj), Self::quote_literal(&new_val_obj), Self::quote_literal(id_str), Self::quote_literal(&Value::String(uuid::Uuid::new_v4().to_string())), Self::quote_literal(&Value::String(change_kind.to_string())), Self::quote_literal(&Value::String(timestamp.to_string())), - Self::quote_literal(&Value::String(user_id.to_string())) + Self::quote_literal(&Value::String(user_id.to_string())), + Self::quote_literal(&origin), + Self::quote_literal(&Value::String(entity_type_name.to_string())) ); self.db.execute(&change_sql, None)?;