From ffd6c27da34af25a7e63c41efeffd3fc11fc531a Mon Sep 17 00:00:00 2001 From: Alex Groleau Date: Wed, 25 Mar 2026 19:31:51 -0400 Subject: [PATCH] more pg try catching and error handling --- src/database/executors/pgrx.rs | 117 +++++++++++++++++++++++---------- src/database/schema.rs | 24 ++++--- src/merger/mod.rs | 38 +++++++---- src/queryer/compiler.rs | 13 +--- src/validator/rules/object.rs | 20 ++---- 5 files changed, 135 insertions(+), 77 deletions(-) diff --git a/src/database/executors/pgrx.rs b/src/database/executors/pgrx.rs index bde39bb..207c1a1 100644 --- a/src/database/executors/pgrx.rs +++ b/src/database/executors/pgrx.rs @@ -9,6 +9,61 @@ impl SpiExecutor { pub fn new() -> Self { Self {} } + + fn transact(&self, f: F) -> Result + where + F: FnOnce() -> Result, + { + unsafe { + let oldcontext = pgrx::pg_sys::CurrentMemoryContext; + let oldowner = pgrx::pg_sys::CurrentResourceOwner; + pgrx::pg_sys::BeginInternalSubTransaction(std::ptr::null()); + pgrx::pg_sys::MemoryContextSwitchTo(oldcontext); + + let runner = std::panic::AssertUnwindSafe(move || { + let res = f(); + + pgrx::pg_sys::ReleaseCurrentSubTransaction(); + pgrx::pg_sys::MemoryContextSwitchTo(oldcontext); + pgrx::pg_sys::CurrentResourceOwner = oldowner; + + res + }); + + pgrx::PgTryBuilder::new(runner) + .catch_rust_panic(|cause| { + pgrx::pg_sys::RollbackAndReleaseCurrentSubTransaction(); + pgrx::pg_sys::MemoryContextSwitchTo(oldcontext); + pgrx::pg_sys::CurrentResourceOwner = oldowner; + + // Rust panics are fatal bugs, not validation errors. Rethrow so they bubble up. + cause.rethrow() + }) + .catch_others(|cause| { + pgrx::pg_sys::RollbackAndReleaseCurrentSubTransaction(); + pgrx::pg_sys::MemoryContextSwitchTo(oldcontext); + pgrx::pg_sys::CurrentResourceOwner = oldowner; + + let error_msg = match &cause { + pgrx::pg_sys::panic::CaughtError::PostgresError(e) + | pgrx::pg_sys::panic::CaughtError::ErrorReport(e) => { + let json_err = serde_json::json!({ + "error": e.message(), + "code": format!("{:?}", e.sql_error_code()), + "detail": e.detail(), + "hint": e.hint() + }); + json_err.to_string() + } + _ => format!("{:?}", cause), + }; + + pgrx::warning!("JSPG Caught Native Postgres Error: {}", error_msg); + Err(error_msg) + }) + .execute() + } + } } impl DatabaseExecutor for SpiExecutor { @@ -24,7 +79,7 @@ impl DatabaseExecutor for SpiExecutor { } } - pgrx::PgTryBuilder::new(|| { + self.transact(|| { Spi::connect(|client| { pgrx::notice!("JSPG_SQL: {}", sql); match client.select(sql, Some(args_with_oid.len() as i64), &args_with_oid) { @@ -41,11 +96,6 @@ impl DatabaseExecutor for SpiExecutor { } }) }) - .catch_others(|cause| { - pgrx::warning!("JSPG Caught Native Postgres Error: {:?}", cause); - Err(format!("{:?}", cause)) - }) - .execute() } fn execute(&self, sql: &str, args: Option<&[Value]>) -> Result<(), String> { @@ -60,7 +110,7 @@ impl DatabaseExecutor for SpiExecutor { } } - pgrx::PgTryBuilder::new(|| { + self.transact(|| { Spi::connect_mut(|client| { pgrx::notice!("JSPG_SQL: {}", sql); match client.update(sql, Some(args_with_oid.len() as i64), &args_with_oid) { @@ -69,44 +119,43 @@ impl DatabaseExecutor for SpiExecutor { } }) }) - .catch_others(|cause| { - pgrx::warning!("JSPG Caught Native Postgres Error: {:?}", cause); - Err(format!("{:?}", cause)) - }) - .execute() } fn auth_user_id(&self) -> Result { - Spi::connect(|client| { - let mut tup_table = client - .select( - "SELECT COALESCE(current_setting('auth.user_id', true), 'ffffffff-ffff-ffff-ffff-ffffffffffff')", - None, - &[], - ) - .map_err(|e| format!("SPI Select Error: {}", e))?; + self.transact(|| { + Spi::connect(|client| { + let mut tup_table = client + .select( + "SELECT COALESCE(current_setting('auth.user_id', true), 'ffffffff-ffff-ffff-ffff-ffffffffffff')", + None, + &[], + ) + .map_err(|e| format!("SPI Select Error: {}", e))?; - let row = tup_table - .next() - .ok_or("No user id setting returned from context".to_string())?; - let user_id: Option = row.get(1).map_err(|e| e.to_string())?; + let row = tup_table + .next() + .ok_or("No user id setting returned from context".to_string())?; + let user_id: Option = row.get(1).map_err(|e| e.to_string())?; - user_id.ok_or("Missing user_id".to_string()) + user_id.ok_or("Missing user_id".to_string()) + }) }) } fn timestamp(&self) -> Result { - Spi::connect(|client| { - let mut tup_table = client - .select("SELECT clock_timestamp()::text", None, &[]) - .map_err(|e| format!("SPI Select Error: {}", e))?; + self.transact(|| { + Spi::connect(|client| { + let mut tup_table = client + .select("SELECT clock_timestamp()::text", None, &[]) + .map_err(|e| format!("SPI Select Error: {}", e))?; - let row = tup_table - .next() - .ok_or("No clock timestamp returned".to_string())?; - let timestamp: Option = row.get(1).map_err(|e| e.to_string())?; + let row = tup_table + .next() + .ok_or("No clock timestamp returned".to_string())?; + let timestamp: Option = row.get(1).map_err(|e| e.to_string())?; - timestamp.ok_or("Missing timestamp".to_string()) + timestamp.ok_or("Missing timestamp".to_string()) + }) }) } } diff --git a/src/database/schema.rs b/src/database/schema.rs index 7d5ad1f..c5191e9 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -507,10 +507,8 @@ impl Schema { let mut parent_type_name = None; if let Some(family) = &self.obj.family { parent_type_name = Some(family.split('.').next_back().unwrap_or(family).to_string()); - } else if let Some(id) = &self.obj.id { - parent_type_name = Some(id.split('.').next_back().unwrap_or("").to_string()); - } else if let Some(ref_id) = &self.obj.r#ref { - parent_type_name = Some(ref_id.split('.').next_back().unwrap_or("").to_string()); + } else if let Some(identifier) = self.obj.identifier() { + parent_type_name = Some(identifier); } if let Some(p_type) = parent_type_name { @@ -531,12 +529,12 @@ impl Schema { if let Some(family) = &target_schema.obj.family { child_type_name = Some(family.split('.').next_back().unwrap_or(family).to_string()); - } else if let Some(ref_id) = target_schema.obj.r#ref.as_ref() { - child_type_name = Some(ref_id.split('.').next_back().unwrap_or("").to_string()); + } else if let Some(ref_id) = target_schema.obj.identifier() { + child_type_name = Some(ref_id); } else if let Some(arr) = &target_schema.obj.one_of { if let Some(first) = arr.first() { - if let Some(ref_id) = first.obj.id.as_ref().or(first.obj.r#ref.as_ref()) { - child_type_name = Some(ref_id.split('.').next_back().unwrap_or("").to_string()); + if let Some(ref_id) = first.obj.identifier() { + child_type_name = Some(ref_id); } } } @@ -697,6 +695,16 @@ impl<'de> Deserialize<'de> for Schema { } } +impl SchemaObject { + pub fn identifier(&self) -> Option { + if let Some(lookup_key) = self.id.as_ref().or(self.r#ref.as_ref()) { + Some(lookup_key.split('.').next_back().unwrap_or("").to_string()) + } else { + None + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] pub enum SchemaTypeOrArray { diff --git a/src/merger/mod.rs b/src/merger/mod.rs index 38d13b2..da96766 100644 --- a/src/merger/mod.rs +++ b/src/merger/mod.rs @@ -45,12 +45,33 @@ impl Merger { let val_resolved = match result { Ok(val) => val, Err(msg) => { + let mut final_code = "MERGE_FAILED".to_string(); + let mut final_message = msg.clone(); + let mut final_cause = None; + + if let Ok(Value::Object(map)) = serde_json::from_str::(&msg) { + if let (Some(Value::String(e_msg)), Some(Value::String(e_code))) = (map.get("error"), map.get("code")) { + final_message = e_msg.clone(); + final_code = e_code.clone(); + let mut cause_parts = Vec::new(); + if let Some(Value::String(d)) = map.get("detail") { + if !d.is_empty() { cause_parts.push(d.clone()); } + } + if let Some(Value::String(h)) = map.get("hint") { + if !h.is_empty() { cause_parts.push(h.clone()); } + } + if !cause_parts.is_empty() { + final_cause = Some(cause_parts.join("\n")); + } + } + } + return crate::drop::Drop::with_errors(vec![crate::drop::Error { - code: "MERGE_FAILED".to_string(), - message: msg, + code: final_code, + message: final_message, details: crate::drop::ErrorDetails { path: "".to_string(), - cause: None, + cause: final_cause, context: Some(data), schema: None, }, @@ -691,8 +712,7 @@ impl Merger { ); self .db - .execute(&sql, None) - .map_err(|e| format!("SPI Error in INSERT: {:?}", e))?; + .execute(&sql, None)?; } else if change_kind == "update" || change_kind == "delete" { entity_pairs.remove("id"); entity_pairs.remove("type"); @@ -726,8 +746,7 @@ impl Merger { ); self .db - .execute(&sql, None) - .map_err(|e| format!("SPI Error in UPDATE: {:?}", e))?; + .execute(&sql, None)?; } } @@ -830,10 +849,7 @@ impl Merger { Self::quote_literal(&Value::String(user_id.to_string())) ); - self - .db - .execute(&change_sql, None) - .map_err(|e| format!("Executor Error in change: {:?}", e))?; + self.db.execute(&change_sql, None)?; } if type_obj.notify { diff --git a/src/queryer/compiler.rs b/src/queryer/compiler.rs index 642f5a6..7899988 100644 --- a/src/queryer/compiler.rs +++ b/src/queryer/compiler.rs @@ -98,14 +98,7 @@ impl<'a> Compiler<'a> { if let Some(family_target) = node.schema.obj.family.as_ref() { resolved_type = self.db.types.get(family_target); - } else if let Some(lookup_key) = node - .schema - .obj - .id - .as_ref() - .or(node.schema.obj.r#ref.as_ref()) - { - let base_type_name = lookup_key.split('.').next_back().unwrap_or("").to_string(); + } else if let Some(base_type_name) = node.schema.obj.identifier() { resolved_type = self.db.types.get(&base_type_name); } @@ -523,8 +516,8 @@ impl<'a> Compiler<'a> { let mut bound_type_name = None; if let Some(family_target) = prop_schema.obj.family.as_ref() { bound_type_name = Some(family_target.split('.').next_back().unwrap_or(family_target).to_string()); - } else if let Some(lookup_key) = prop_schema.obj.id.as_ref().or(prop_schema.obj.r#ref.as_ref()) { - bound_type_name = Some(lookup_key.split('.').next_back().unwrap_or(lookup_key).to_string()); + } else if let Some(lookup_key) = prop_schema.obj.identifier() { + bound_type_name = Some(lookup_key); } if let Some(type_name) = bound_type_name { diff --git a/src/validator/rules/object.rs b/src/validator/rules/object.rs index 53175b8..60ddb3c 100644 --- a/src/validator/rules/object.rs +++ b/src/validator/rules/object.rs @@ -14,16 +14,13 @@ impl<'a> ValidationContext<'a> { let current = self.instance; if let Some(obj) = current.as_object() { // Entity implicit type validation - // Use the specific schema id or ref as a fallback - if let Some(identifier) = self.schema.id.as_ref().or(self.schema.r#ref.as_ref()) { + if let Some(schema_identifier) = self.schema.identifier() { // Kick in if the data object has a type field if let Some(type_val) = obj.get("type") && let Some(type_str) = type_val.as_str() { - // Get the string or the final segment as the base - let base = identifier.split('.').next_back().unwrap_or("").to_string(); - // Check if the base is a global type name - if let Some(type_def) = self.db.types.get(&base) { + // Check if the identifier is a global type name + if let Some(type_def) = self.db.types.get(&schema_identifier) { // Ensure the instance type is a variation of the global type if type_def.variations.contains(type_str) { // Ensure it passes strict mode @@ -40,7 +37,7 @@ impl<'a> ValidationContext<'a> { } } else { // Ad-Hoc schemas natively use strict schema discriminator strings instead of variation inheritance - if type_str == identifier { + if type_str == schema_identifier.as_str() { result.evaluated_keys.insert("type".to_string()); } } @@ -128,14 +125,9 @@ impl<'a> ValidationContext<'a> { // Entity Bound Implicit Type Interception if key == "type" - && let Some(schema_bound) = sub_schema.id.as_ref().or(sub_schema.r#ref.as_ref()) + && let Some(schema_bound) = sub_schema.identifier() { - let physical_type_name = schema_bound - .split('.') - .next_back() - .unwrap_or("") - .to_string(); - if let Some(type_def) = self.db.types.get(&physical_type_name) + if let Some(type_def) = self.db.types.get(&schema_bound) && let Some(instance_type) = child_instance.as_str() && type_def.variations.contains(instance_type) {