//! The `merger` module handles executing Postgres SPI directives dynamically based on JSON payloads //! using the structurally isolated schema rules provided by the `Database` registry. pub mod cache; use crate::database::Database; use serde_json::Value; use std::sync::Arc; pub struct Merger { pub db: Arc, pub cache: cache::StatementCache, } impl Merger { pub fn new(db: Arc) -> Self { Self { db, cache: cache::StatementCache::new(10_000), } } /// Primary recursive entrypoint that separates Array lists from Object branches pub fn merge(&self, data: Value) -> Result { let result = match data { Value::Array(items) => self.merge_array(items)?, Value::Object(map) => self.merge_object(map)?, // Nulls, Strings, Bools, Numbers at root are invalid merge payloads _ => return Err("Invalid merge payload: root must be an Object or Array".to_string()), }; Ok(match result { Value::Object(mut map) => { let mut out = serde_json::Map::new(); if let Some(id) = map.remove("id") { out.insert("id".to_string(), id); } Value::Object(out) } Value::Array(arr) => { let mut out_arr = Vec::new(); for item in arr { if let Value::Object(mut map) = item { let mut out = serde_json::Map::new(); if let Some(id) = map.remove("id") { out.insert("id".to_string(), id); } out_arr.push(Value::Object(out)); } else { out_arr.push(Value::Null); } } Value::Array(out_arr) } other => other, }) } /// Handles mapping over an array of entities, executing merge logic on each and returning the resolved list. fn merge_array(&self, items: Vec) -> Result { let mut resolved_items = Vec::new(); for item in items { // Recursively evaluate each object in the array let resolved = self.merge(item)?; resolved_items.push(resolved); } Ok(Value::Array(resolved_items)) } /// Core processing algorithm for a single Entity Object dictionary. fn merge_object(&self, mut obj: serde_json::Map) -> Result { // Step 1: Ensure it has a `type` definition to proceed let type_name = match obj.get("type").and_then(|v| v.as_str()) { Some(t) => t.to_string(), None => return Err("Missing required 'type' field on object".to_string()), }; // Step 2: Extract Type mapping from the Engine let type_def = match self.db.types.get(&type_name) { Some(t) => t, None => return Err(format!("Unknown entity type: {}", type_name)), }; // Step 3 & 4: (Pre/Post Staging based on `relationship` flag) if type_def.relationship { // Relationships: process children FIRST (Post-Staging) self.process_children(&mut obj, type_def)?; Ok(Value::Object(self.stage_entity(obj)?)) } else { // Entities: process core FIRST (Pre-Staging) let mut staged_obj_map = self.stage_entity(obj)?; self.process_children(&mut staged_obj_map, type_def)?; Ok(Value::Object(staged_obj_map)) } } /// Iterates values of `obj`, if they are structural (Array/Object), executes `self.merge()` on them. /// Uses the `Database` registry to find FK relations and apply the IDs upstream/downstream appropriately. fn process_children( &self, obj: &mut serde_json::Map, type_def: &crate::database::r#type::Type, ) -> Result<(), String> { let keys: Vec = obj.keys().cloned().collect(); for key in keys { // Temporarily extract value to process without borrowing Map mutably let val = match obj.remove(&key) { Some(v) => v, None => continue, }; if val.is_object() || val.is_array() { // Pre-Process: Propagate parent data to children BEFORE recursing and applying relations let mut child_val = val; let mut relation_info = None; // Try to peek at the child type for relational mapping let peek_obj = match &child_val { Value::Object(m) => Some(m), Value::Array(arr) if !arr.is_empty() => arr[0].as_object(), _ => None, }; if let Some(child_map) = peek_obj { if let Ok(Some(relation)) = self.get_entity_relation(obj, type_def, child_map, &key) { let child_type_name = child_map.get("type").and_then(|v| v.as_str()).unwrap_or(""); if let Some(c_type) = self.db.types.get(child_type_name) { let parent_is_source = type_def.hierarchy.contains(&relation.source_type); let child_is_source = c_type.hierarchy.contains(&relation.source_type); relation_info = Some((relation, parent_is_source, child_is_source)); } } } // Apply pre-merge mutations mapping IDs if let Some((relation, _parent_is_source, child_is_source)) = relation_info.as_ref() { match &mut child_val { Value::Object(child_map) => { // Cascade Organization ID if !child_map.contains_key("organization_id") { if let Some(org_id) = obj.get("organization_id") { child_map.insert("organization_id".to_string(), org_id.clone()); } } // If child owns FK, parent provides it if *child_is_source { Self::apply_entity_relation( child_map, &relation.source_columns, &relation.destination_columns, obj, ); } } Value::Array(items) => { for item in items.iter_mut() { if let Value::Object(child_map) = item { if !child_map.contains_key("organization_id") { if let Some(org_id) = obj.get("organization_id") { child_map.insert("organization_id".to_string(), org_id.clone()); } } if *child_is_source { Self::apply_entity_relation( child_map, &relation.source_columns, &relation.destination_columns, obj, ); } } } } _ => {} } } // RECURSE: Merge the modified children let merged_val = self.merge(child_val)?; // Post-Process: Apply relations upwards if parent owns the FK if let Some((relation, parent_is_source, _child_is_source)) = relation_info { if parent_is_source { match &merged_val { Value::Object(merged_child_map) => { Self::apply_entity_relation( obj, &relation.source_columns, &relation.destination_columns, merged_child_map, ); } Value::Array(items) if !items.is_empty() => { if let Value::Object(merged_child_map) = &items[0] { Self::apply_entity_relation( obj, &relation.source_columns, &relation.destination_columns, merged_child_map, ); } } _ => {} } } } obj.insert(key, merged_val); } else { obj.insert(key, val); } } Ok(()) } /// Evaluates `lk_` structures, fetches existing rows via SPI, computes `compare_entities` diff, /// executes UPDATE/INSERT SPI, and handles `agreego.change` auditing. fn stage_entity( &self, mut obj: serde_json::Map, ) -> Result, String> { let type_name = obj .get("type") .and_then(|v| v.as_str()) .unwrap() .to_string(); let type_def = self.db.types.get(&type_name).unwrap(); // 1. Fetch Existing Entity let existing_entity = self.fetch_entity(&obj, type_def)?; // 2. Identify System Keys we don't want to diff let system_keys = vec![ "id".to_string(), "type".to_string(), "organization_id".to_string(), "created_by".to_string(), "modified_by".to_string(), "created_at".to_string(), "modified_at".to_string(), ]; // 3. Compare entities to find exact changes let changes = self.compare_entities( existing_entity.as_ref(), &obj, &type_def.fields, &system_keys, ); // 4. If no changes and an entity existed, we skip let is_update = existing_entity.is_some(); if is_update && changes.is_empty() { return Ok(obj); } // 5. Apply correct system fields let user_id = self.db.auth_user_id()?; let timestamp = self.db.timestamp()?; let entity_change_kind = if !is_update { if !obj.contains_key("id") { use uuid::Uuid; obj.insert("id".to_string(), Value::String(Uuid::new_v4().to_string())); } obj.insert("created_by".to_string(), Value::String(user_id.clone())); obj.insert("created_at".to_string(), Value::String(timestamp.clone())); obj.insert("modified_by".to_string(), Value::String(user_id.clone())); obj.insert("modified_at".to_string(), Value::String(timestamp.clone())); "create" } else { obj.insert("modified_by".to_string(), Value::String(user_id.clone())); obj.insert("modified_at".to_string(), Value::String(timestamp.clone())); "update" }; // 6. Execute SQL Merges self.merge_entity_fields(is_update, &type_name, type_def, &changes, &obj)?; // 7. Fire agreego.change let mut complete = obj.clone(); if is_update { // overlay on top of existing for complete state if let Some(mut existing) = existing_entity { for (k, v) in &obj { existing.insert(k.clone(), v.clone()); } complete = existing; } } let mut notification = serde_json::Map::new(); notification.insert("complete".to_string(), Value::Object(complete.clone())); let changes_val = if !is_update { let mut c = changes.clone(); c.insert("type".to_string(), Value::String(type_name.clone())); Value::Object(c) } else { notification.insert("changes".to_string(), Value::Object(changes.clone())); Value::Object(changes.clone()) }; let change_sql = format!( "INSERT INTO agreego.change (changes, entity_id, id, kind, modified_at, modified_by) VALUES ({}, {}, {}, {}, {}, {})", Self::quote_literal(&changes_val), Self::quote_literal(obj.get("id").unwrap()), Self::quote_literal(&Value::String(uuid::Uuid::new_v4().to_string())), Self::quote_literal(&Value::String(entity_change_kind.to_string())), Self::quote_literal(&Value::String(timestamp.clone())), Self::quote_literal(&Value::String(user_id.clone())) ); let notification_json = Value::Object(notification); let notify_sql = format!( "SELECT pg_notify('entity', {})", Self::quote_literal(&Value::String(notification_json.to_string())) ); self .db .execute(&change_sql, None) .map_err(|e| format!("Executor Error in change: {:?}", e))?; self .db .execute(¬ify_sql, None) .map_err(|e| format!("Executor Error in notify: {:?}", e))?; Ok(obj) } /// Exact replica of `agreego.compare_entities`. Takes a fetched `old` entity from the DB (if any), /// the `new_fields` from the JSON payload, the `fields` defined on the `Type` hierarchy, and a list of `system_keys`. /// Returns a clean JSON object containing ONLY the modified keys, or an empty map if No-Op. fn compare_entities( &self, fetched_entity: Option<&serde_json::Map>, new_fields: &serde_json::Map, type_fields: &[String], system_keys: &[String], ) -> serde_json::Map { let mut changes = serde_json::Map::new(); for (key, new_val) in new_fields { // 1. Skip if key is not part of the Type's total field schema mapping if !type_fields.contains(key) { continue; } // 2. Skip strictly managed system audit keys if system_keys.contains(key) { continue; } match fetched_entity { // 3a. If no old entity, every valid field is a new "change" None => { changes.insert(key.clone(), new_val.clone()); } // 3b. If old entity exists, strictly compare the values Some(old_map) => { let old_val = old_map.get(key).unwrap_or(&Value::Null); if new_val != old_val { changes.insert(key.clone(), new_val.clone()); } } } } changes } /// Exact replica of `agreego.reduce_entity_relations`. Resolves Ambiguous Graph paths /// down to a single distinct FK relationship path based on prefix rules. fn reduce_entity_relations( &self, mut matching_relations: Vec, relative: &serde_json::Map, relation_name: &str, ) -> Result, String> { // 0 or 1 relations is an immediate fast-path resolution if matching_relations.is_empty() { return Ok(None); } if matching_relations.len() == 1 { return Ok(Some(matching_relations.pop().unwrap())); } // Step 1: Check for exact prefix match with the relation_name pointer let exact_match: Vec<_> = matching_relations .iter() .filter(|r| r.prefix.as_deref() == Some(relation_name)) .cloned() .collect(); if exact_match.len() == 1 { return Ok(Some(exact_match.into_iter().next().unwrap())); } // Step 2: Inverse filter - Remove any relations where their configured prefix IS found // inside the actual payload data on `relative` matching_relations.retain(|r| { if let Some(prefix) = &r.prefix { // If the prefix exists as a key in the relative JSON payload, we KEEP iter // (Wait, actually the SQL is `WHERE NOT EXISTS (select mr.prefix where relative ? mr.prefix)` // Translating: Keep relation R if its prefix is NOT matched in the payload !relative.contains_key(prefix) } else { true // No prefix means we keep it by default } }); if matching_relations.len() == 1 { Ok(Some(matching_relations.pop().unwrap())) } else { let constraints: Vec<_> = matching_relations .iter() .map(|r| r.constraint.clone()) .collect(); Err(format!( "AMBIGUOUS_TYPE_RELATIONS: Could not reduce ambiguous type relations: {}", constraints.join(", ") )) } } /// Exact replica of `agreego.get_entity_relation`. Given two entities (`entity` and `relative`) and the JSON key linking them, /// it searches the Database graphs for a concrete FK constraint. fn get_entity_relation( &self, _entity: &serde_json::Map, entity_type: &crate::database::r#type::Type, relative: &serde_json::Map, relation_name: &str, ) -> Result, String> { let relative_type_name = relative.get("type").and_then(|v| v.as_str()).unwrap_or(""); let relative_type = match self.db.types.get(relative_type_name) { Some(t) => t, None => return Ok(None), }; let mut relative_relations: Vec = Vec::new(); // 1. Look for direct relationships first for r in self.db.relations.values() { if r.source_type != "entity" && r.destination_type != "entity" { let condition1 = relative_type.hierarchy.contains(&r.source_type) && entity_type.hierarchy.contains(&r.destination_type); let condition2 = entity_type.hierarchy.contains(&r.source_type) && relative_type.hierarchy.contains(&r.destination_type); if condition1 || condition2 { relative_relations.push(r.clone()); } } } let mut relative_relation = self.reduce_entity_relations(relative_relations, relative, relation_name)?; // 2. Look for polymorphic relationships if no direct relationship is found if relative_relation.is_none() { let mut poly_relations: Vec = Vec::new(); for r in self.db.relations.values() { if r.destination_type == "entity" { let condition1 = relative_type.hierarchy.contains(&r.source_type); let condition2 = entity_type.hierarchy.contains(&r.source_type); if condition1 || condition2 { poly_relations.push(r.clone()); } } } relative_relation = self.reduce_entity_relations(poly_relations, relative, relation_name)?; } Ok(relative_relation) } /// Exact replica of `agreego.apply_entity_relation`. Syncs FK column values from the destination to the source. fn apply_entity_relation( source_entity: &mut serde_json::Map, source_columns: &[String], destination_columns: &[String], destination_entity: &serde_json::Map, ) { if source_columns.len() != destination_columns.len() { // In theory, validation should prevent this, but fail gracefully/ignore if lengths diverge. return; } for i in 0..source_columns.len() { let dest_val = destination_entity .get(&destination_columns[i]) .unwrap_or(&Value::Null) .clone(); source_entity.insert(source_columns[i].clone(), dest_val); } } /// Exact replica of `agreego.fetch_entity`. Dynamically constructs a `SELECT to_jsonb(t1.*) || to_jsonb(t2.*)` /// based on the Type hierarchy and available `id` or `lookup_fields` presence. fn fetch_entity( &self, entity_fields: &serde_json::Map, entity_type: &crate::database::r#type::Type, ) -> Result>, String> { let id_val = entity_fields.get("id"); let entity_type_name = entity_type.name.as_str(); // Check if all required lookup keys are PRESENT (value can be anything, including NULL) let lookup_complete = if entity_type.lookup_fields.is_empty() { false } else { entity_type .lookup_fields .iter() .all(|f| entity_fields.contains_key(f)) }; if id_val.is_none() && !lookup_complete { return Ok(None); } // Build or Retrieve Cached Select/Join clauses let fetch_sql_template = if let Some(cached) = self.cache.get(entity_type_name) { cached } else { let mut select_list = String::from("to_jsonb(t1.*)"); let mut join_clauses = format!("FROM agreego.\"{}\" t1", entity_type.hierarchy[0]); for (i, table_name) in entity_type.hierarchy.iter().enumerate().skip(1) { let t_alias = format!("t{}", i + 1); join_clauses.push_str(&format!( " LEFT JOIN agreego.\"{}\" {} ON {}.id = t1.id", table_name, t_alias, t_alias )); select_list.push_str(&format!(" || to_jsonb({}.*)", t_alias)); } let template = format!("SELECT {} {}", select_list, join_clauses); self .cache .insert(entity_type_name.to_string(), template.clone()); template }; // Build WHERE Clauses let mut id_condition = None; if let Some(id) = id_val { id_condition = Some(format!("t1.id = {}", Self::quote_literal(id))); } let mut lookup_condition = None; if lookup_complete { let mut lookup_predicates = Vec::new(); for column in &entity_type.lookup_fields { let val = entity_fields.get(column).unwrap_or(&Value::Null); if column == "type" { lookup_predicates.push(format!("t1.\"{}\" = {}", column, Self::quote_literal(val))); } else { if val.as_str() == Some("") || val.is_null() { lookup_predicates.push(format!("\"{}\" IS NULL", column)); } else { lookup_predicates.push(format!("\"{}\" = {}", column, Self::quote_literal(val))); } } } lookup_condition = Some(lookup_predicates.join(" AND ")); } // Determine final WHERE clause based on available conditions let where_clause = match (id_condition, lookup_condition) { (Some(id_cond), Some(lookup_cond)) => format!("WHERE ({}) OR ({})", id_cond, lookup_cond), (Some(id_cond), None) => format!("WHERE {}", id_cond), (None, Some(lookup_cond)) => format!("WHERE {}", lookup_cond), (None, None) => return Ok(None), }; // Construct Final Query let fetch_sql = format!("{} {}", fetch_sql_template, where_clause); // Execute and Return Result via Database Executor let fetched = match self.db.query(&fetch_sql, None) { Ok(Value::Array(table)) => { if table.len() > 1 { Err(format!( "TOO_MANY_LOOKUP_ROWS: Lookup for {} found too many existing rows", entity_type_name )) } else if table.is_empty() { Ok(None) } else { let row = table.first().unwrap(); match row { Value::Object(map) => Ok(Some(map.clone())), other => Err(format!( "Expected fetch_entity to return JSON object, got: {:?}", other )), } } } Ok(other) => Err(format!( "Expected array from query in fetch_entity, got: {:?}", other )), Err(e) => Err(format!("SPI error in fetch_entity: {:?}", e)), }?; Ok(fetched) } /// Exact replica of `agreego.merge_entity_fields`. Issues an INSERT or UPDATE per table /// in the Type's hierarchy, filtering out keys that don't belong to the specific table block. fn merge_entity_fields( &self, is_update: bool, entity_type_name: &str, entity_type: &crate::database::r#type::Type, changes: &serde_json::Map, full_entity: &serde_json::Map, ) -> Result<(), String> { let id_str = match full_entity.get("id").and_then(|v| v.as_str()) { Some(id) => id, None => return Err("Missing 'id' for merge execution".to_string()), }; let grouped_fields = match &entity_type.grouped_fields { Some(Value::Object(map)) => map, _ => { return Err(format!( "Grouped fields missing for type {}", entity_type_name )); } }; for table_name in &entity_type.hierarchy { // get the fields for this specific table (from grouped_fields) let table_fields = match grouped_fields.get(table_name).and_then(|v| v.as_array()) { Some(arr) => arr .iter() .filter_map(|v| v.as_str().map(|s| s.to_string())) .collect::>(), None => continue, }; let mut my_changes = Vec::new(); for field in &table_fields { if changes.contains_key(field) || (!is_update && full_entity.contains_key(field)) { // For inserts we want all provided fields. For updates we only want changes. my_changes.push(field.clone()); } } if is_update { if my_changes.is_empty() { continue; } let mut set_clauses = Vec::new(); for field in &my_changes { let val = changes.get(field).unwrap(); set_clauses.push(format!("\"{}\" = {}", field, Self::quote_literal(val))); } let sql = format!( "UPDATE agreego.\"{}\" SET {} WHERE id = {}", table_name, set_clauses.join(", "), Self::quote_literal(&Value::String(id_str.to_string())) ); self .db .execute(&sql, None) .map_err(|e| format!("SPI Error in UPDATE: {:?}", e))?; } else { // INSERT let mut columns = Vec::new(); let mut values = Vec::new(); for field in &my_changes { columns.push(format!("\"{}\"", field)); let val = full_entity.get(field).unwrap(); values.push(Self::quote_literal(val)); } // Ensure 'id' and 'type' are present if required by this specific table schema chunk if !columns.contains(&"\"id\"".to_string()) && table_fields.contains(&"id".to_string()) { columns.push("\"id\"".to_string()); values.push(Self::quote_literal(&Value::String(id_str.to_string()))); } if !columns.contains(&"\"type\"".to_string()) && table_fields.contains(&"type".to_string()) { columns.push("\"type\"".to_string()); values.push(Self::quote_literal(&Value::String( entity_type_name.to_string(), ))); } if columns.is_empty() { continue; } let sql = format!( "INSERT INTO agreego.\"{}\" ({}) VALUES ({})", table_name, columns.join(", "), values.join(", ") ); self .db .execute(&sql, None) .map_err(|e| format!("SPI Error in INSERT: {:?}", e))?; } } Ok(()) } /// Helper to emulate Postgres `quote_literal` fn quote_literal(val: &Value) -> String { match val { Value::Null => "NULL".to_string(), Value::Bool(b) => { if *b { "true".to_string() } else { "false".to_string() } } Value::Number(n) => n.to_string(), Value::String(s) => format!("'{}'", s.replace('\'', "''")), _ => format!( "'{}'", serde_json::to_string(val).unwrap().replace('\'', "''") ), } } }