Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 271828ebe9 | |||
| 8c430d42e3 |
@ -85,6 +85,14 @@ impl DatabaseExecutor for MockExecutor {
|
|||||||
Ok("2026-03-10T00:00:00Z".to_string())
|
Ok("2026-03-10T00:00:00Z".to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn auth_origin(&self) -> Result<Option<Value>, String> {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn punc_trigger(&self) -> Result<Option<String>, String> {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn get_queries(&self) -> Vec<String> {
|
fn get_queries(&self) -> Vec<String> {
|
||||||
MOCK_STATE.with(|state| state.borrow().captured_queries.clone())
|
MOCK_STATE.with(|state| state.borrow().captured_queries.clone())
|
||||||
|
|||||||
@ -20,6 +20,12 @@ pub trait DatabaseExecutor: Send + Sync {
|
|||||||
/// Returns the current transaction timestamp
|
/// Returns the current transaction timestamp
|
||||||
fn timestamp(&self) -> Result<String, String>;
|
fn timestamp(&self) -> Result<String, String>;
|
||||||
|
|
||||||
|
/// Returns the current auth.origin session context if configured
|
||||||
|
fn auth_origin(&self) -> Result<Option<Value>, String>;
|
||||||
|
|
||||||
|
/// Returns the current punc.name session context if configured
|
||||||
|
fn punc_trigger(&self) -> Result<Option<String>, String>;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn get_queries(&self) -> Vec<String>;
|
fn get_queries(&self) -> Vec<String>;
|
||||||
|
|
||||||
|
|||||||
@ -150,4 +150,46 @@ impl DatabaseExecutor for SpiExecutor {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn auth_origin(&self) -> Result<Option<Value>, String> {
|
||||||
|
self.transact(|| {
|
||||||
|
Spi::connect(|client| {
|
||||||
|
let mut tup_table = client
|
||||||
|
.select(
|
||||||
|
"SELECT NULLIF(current_setting('auth.origin', true), '')::jsonb",
|
||||||
|
None,
|
||||||
|
&[],
|
||||||
|
)
|
||||||
|
.map_err(|e| format!("SPI Select Error: {}", e))?;
|
||||||
|
|
||||||
|
if let Some(row) = tup_table.next() {
|
||||||
|
if let Ok(Some(jsonb)) = row.get::<pgrx::JsonB>(1) {
|
||||||
|
return Ok(Some(jsonb.0));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn punc_trigger(&self) -> Result<Option<String>, String> {
|
||||||
|
self.transact(|| {
|
||||||
|
Spi::connect(|client| {
|
||||||
|
let mut tup_table = client
|
||||||
|
.select(
|
||||||
|
"SELECT NULLIF(current_setting('punc.name', true), '')",
|
||||||
|
None,
|
||||||
|
&[],
|
||||||
|
)
|
||||||
|
.map_err(|e| format!("SPI Select Error: {}", e))?;
|
||||||
|
|
||||||
|
if let Some(row) = tup_table.next() {
|
||||||
|
if let Ok(val_opt) = row.get::<String>(1) {
|
||||||
|
return Ok(val_opt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -206,6 +206,16 @@ impl Database {
|
|||||||
self.executor.timestamp()
|
self.executor.timestamp()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the current auth.origin session context if configured
|
||||||
|
pub fn auth_origin(&self) -> Result<Option<Value>, String> {
|
||||||
|
self.executor.auth_origin()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the current punc.name session context if configured
|
||||||
|
pub fn punc_trigger(&self) -> Result<Option<String>, String> {
|
||||||
|
self.executor.punc_trigger()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn compile(&mut self, errors: &mut Vec<crate::drop::Error>) {
|
pub fn compile(&mut self, errors: &mut Vec<crate::drop::Error>) {
|
||||||
// Phase 1: Registration
|
// Phase 1: Registration
|
||||||
self.collect_schemas(errors);
|
self.collect_schemas(errors);
|
||||||
|
|||||||
@ -946,7 +946,25 @@ impl Merger {
|
|||||||
Value::Object(old_vals)
|
Value::Object(old_vals)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let origin = match self.db.auth_origin() {
|
||||||
|
Ok(Some(orig)) => orig,
|
||||||
|
_ => serde_json::json!({
|
||||||
|
"kind": "user",
|
||||||
|
"user_id": user_id
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
|
let trigger = match self.db.punc_trigger() {
|
||||||
|
Ok(Some(trig)) => trig,
|
||||||
|
_ => "merge_entity".to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let entity_type_name = type_name.as_str().unwrap_or(&type_obj.name);
|
||||||
|
|
||||||
let mut notification = serde_json::Map::new();
|
let mut notification = serde_json::Map::new();
|
||||||
|
notification.insert("type".to_string(), Value::String(entity_type_name.to_string()));
|
||||||
|
notification.insert("trigger".to_string(), Value::String(trigger));
|
||||||
|
notification.insert("origin".to_string(), origin.clone());
|
||||||
notification.insert("complete".to_string(), Value::Object(complete));
|
notification.insert("complete".to_string(), Value::Object(complete));
|
||||||
notification.insert("new".to_string(), new_val_obj.clone());
|
notification.insert("new".to_string(), new_val_obj.clone());
|
||||||
|
|
||||||
@ -961,14 +979,16 @@ impl Merger {
|
|||||||
let mut notify_sql = None;
|
let mut notify_sql = None;
|
||||||
if type_obj.historical && change_kind != "replace" {
|
if type_obj.historical && change_kind != "replace" {
|
||||||
let change_sql = format!(
|
let change_sql = format!(
|
||||||
"INSERT INTO agreego.change (\"old\", \"new\", entity_id, id, kind, modified_at, modified_by) VALUES ({}, {}, {}, {}, {}, {}, {})",
|
"INSERT INTO agreego.change (\"old\", \"new\", entity_id, id, kind, modified_at, modified_by, origin, entity_type) VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {})",
|
||||||
Self::quote_literal(&old_val_obj),
|
Self::quote_literal(&old_val_obj),
|
||||||
Self::quote_literal(&new_val_obj),
|
Self::quote_literal(&new_val_obj),
|
||||||
Self::quote_literal(id_str),
|
Self::quote_literal(id_str),
|
||||||
Self::quote_literal(&Value::String(uuid::Uuid::new_v4().to_string())),
|
Self::quote_literal(&Value::String(uuid::Uuid::new_v4().to_string())),
|
||||||
Self::quote_literal(&Value::String(change_kind.to_string())),
|
Self::quote_literal(&Value::String(change_kind.to_string())),
|
||||||
Self::quote_literal(&Value::String(timestamp.to_string())),
|
Self::quote_literal(&Value::String(timestamp.to_string())),
|
||||||
Self::quote_literal(&Value::String(user_id.to_string()))
|
Self::quote_literal(&Value::String(user_id.to_string())),
|
||||||
|
Self::quote_literal(&origin),
|
||||||
|
Self::quote_literal(&Value::String(entity_type_name.to_string()))
|
||||||
);
|
);
|
||||||
|
|
||||||
self.db.execute(&change_sql, None)?;
|
self.db.execute(&change_sql, None)?;
|
||||||
|
|||||||
Reference in New Issue
Block a user