//! The `merger` module handles executing Postgres SPI directives dynamically based on JSON payloads //! using the structurally isolated schema rules provided by the `Database` registry. pub mod cache; use crate::database::Database; use serde_json::Value; use std::sync::Arc; pub struct Merger { pub db: Arc, pub cache: cache::StatementCache, } impl Merger { pub fn new(db: Arc) -> Self { Self { db, cache: cache::StatementCache::new(10_000), } } pub fn merge(&self, data: Value) -> crate::drop::Drop { let mut val_resolved = Value::Null; let mut notifications_queue = Vec::new(); let result = self.merge_internal(data, &mut notifications_queue); match result { Ok(val) => { val_resolved = val; } Err(msg) => { return crate::drop::Drop::with_errors(vec![crate::drop::Error { code: "MERGE_FAILED".to_string(), message: msg, details: crate::drop::ErrorDetails { path: "".to_string(), cause: None, context: None, schema: None, }, }]); } }; // Execute the globally collected, pre-ordered notifications last! for notify_sql in notifications_queue { if let Err(e) = self.db.execute(¬ify_sql, None) { return crate::drop::Drop::with_errors(vec![crate::drop::Error { code: "MERGE_FAILED".to_string(), message: format!("Executor Error in pre-ordered notify: {:?}", e), details: crate::drop::ErrorDetails { path: "".to_string(), cause: None, context: None, schema: None, }, }]); } } let stripped_val = match val_resolved { Value::Object(mut map) => { let mut out = serde_json::Map::new(); if let Some(id) = map.remove("id") { out.insert("id".to_string(), id); } Value::Object(out) } Value::Array(arr) => { let mut out_arr = Vec::new(); for item in arr { if let Value::Object(mut map) = item { let mut out = serde_json::Map::new(); if let Some(id) = map.remove("id") { out.insert("id".to_string(), id); } out_arr.push(Value::Object(out)); } else { out_arr.push(Value::Null); } } Value::Array(out_arr) } other => other, }; crate::drop::Drop::success_with_val(stripped_val) } pub(crate) fn merge_internal(&self, data: Value, notifications: &mut Vec) -> Result { match data { Value::Array(items) => self.merge_array(items, notifications), Value::Object(map) => self.merge_object(map, notifications), _ => Err("Invalid merge payload: root must be an Object or Array".to_string()), } } fn merge_array(&self, items: Vec, notifications: &mut Vec) -> Result { let mut resolved_items = Vec::new(); for item in items { let resolved = self.merge_internal(item, notifications)?; resolved_items.push(resolved); } Ok(Value::Array(resolved_items)) } fn merge_object(&self, obj: serde_json::Map, notifications: &mut Vec) -> Result { let queue_start = notifications.len(); let type_name = match obj.get("type").and_then(|v| v.as_str()) { Some(t) => t.to_string(), None => return Err("Missing required 'type' field on object".to_string()), }; let type_def = match self.db.types.get(&type_name) { Some(t) => t, None => return Err(format!("Unknown entity type: {}", type_name)), }; // 1. Segment the entity: fields in type_def.fields are database fields, others are relationships let mut entity_fields = serde_json::Map::new(); let mut entity_objects = serde_json::Map::new(); let mut entity_arrays = serde_json::Map::new(); for (k, v) in obj { let is_field = type_def.fields.contains(&k) || k == "created"; let typeof_v = match &v { Value::Object(_) => "object", Value::Array(_) => "array", _ => "other", }; if is_field { entity_fields.insert(k, v); } else if typeof_v == "object" { entity_objects.insert(k, v); } else if typeof_v == "array" { entity_arrays.insert(k, v); } } let user_id = self.db.auth_user_id()?; let timestamp = self.db.timestamp()?; let mut entity_change_kind = None; let mut entity_fetched = None; // 2. Pre-stage the entity (for non-relationships) if !type_def.relationship { let (fields, kind, fetched) = self.stage_entity(entity_fields.clone(), type_def, &user_id, ×tamp)?; entity_fields = fields; entity_change_kind = kind; entity_fetched = fetched; } let mut entity_response = serde_json::Map::new(); // 3. Handle related objects for (relation_name, relative_val) in entity_objects { let mut relative = match relative_val { Value::Object(m) => m, _ => continue, }; let relative_relation = self.get_entity_relation(type_def, &relative, &relation_name)?; if let Some(relation) = relative_relation { let parent_is_source = type_def.hierarchy.contains(&relation.source_type); if parent_is_source { // Parent holds FK to Child. Child MUST be generated FIRST. if !relative.contains_key("organization_id") { if let Some(org_id) = entity_fields.get("organization_id") { relative.insert("organization_id".to_string(), org_id.clone()); } } let merged_relative = match self.merge_internal(Value::Object(relative), notifications)? { Value::Object(m) => m, _ => continue, }; Self::apply_entity_relation( &mut entity_fields, &relation.source_columns, &relation.destination_columns, &merged_relative, ); entity_response.insert(relation_name, Value::Object(merged_relative)); } else { // Child holds FK back to Parent. if !relative.contains_key("organization_id") { if let Some(org_id) = entity_fields.get("organization_id") { relative.insert("organization_id".to_string(), org_id.clone()); } } Self::apply_entity_relation( &mut relative, &relation.source_columns, &relation.destination_columns, &entity_fields, ); let merged_relative = match self.merge_internal(Value::Object(relative), notifications)? { Value::Object(m) => m, _ => continue, }; entity_response.insert(relation_name, Value::Object(merged_relative)); } } } // 4. Post-stage the entity (for relationships) if type_def.relationship { let (fields, kind, fetched) = self.stage_entity(entity_fields.clone(), type_def, &user_id, ×tamp)?; entity_fields = fields; entity_change_kind = kind; entity_fetched = fetched; } // 5. Process the main entity fields self.merge_entity_fields( entity_change_kind.as_deref().unwrap_or(""), &type_name, type_def, &entity_fields, entity_fetched.as_ref(), )?; // Add main entity fields to response for (k, v) in &entity_fields { entity_response.insert(k.clone(), v.clone()); } // 6. Handle related arrays for (relation_name, relative_val) in entity_arrays { let relative_arr = match relative_val { Value::Array(a) => a, _ => continue, }; if relative_arr.is_empty() { continue; } let first_relative = match &relative_arr[0] { Value::Object(m) => m, _ => continue, }; let relative_relation = self.get_entity_relation(type_def, first_relative, &relation_name)?; if let Some(relation) = relative_relation { let mut relative_responses = Vec::new(); for relative_item_val in relative_arr { if let Value::Object(mut relative_item) = relative_item_val { if !relative_item.contains_key("organization_id") { if let Some(org_id) = entity_fields.get("organization_id") { relative_item.insert("organization_id".to_string(), org_id.clone()); } } Self::apply_entity_relation( &mut relative_item, &relation.source_columns, &relation.destination_columns, &entity_fields, ); let merged_relative = match self.merge_internal(Value::Object(relative_item), notifications)? { Value::Object(m) => m, _ => continue, }; relative_responses.push(Value::Object(merged_relative)); } } entity_response.insert(relation_name, Value::Array(relative_responses)); } } // 7. Perform change tracking let notify_sql = self.merge_entity_change( &entity_fields, entity_fetched.as_ref(), entity_change_kind.as_deref(), &user_id, ×tamp, )?; if let Some(sql) = notify_sql { notifications.insert(queue_start, sql); } // Produce the full tree response let mut final_response = serde_json::Map::new(); if let Some(fetched) = entity_fetched { for (k, v) in fetched { final_response.insert(k, v); } } for (k, v) in entity_response { final_response.insert(k, v); } Ok(Value::Object(final_response)) } fn stage_entity( &self, mut entity_fields: serde_json::Map, type_def: &crate::database::r#type::Type, user_id: &str, timestamp: &str, ) -> Result< ( serde_json::Map, Option, Option>, ), String, > { let type_name = type_def.name.as_str(); let entity_fetched = self.fetch_entity(&entity_fields, type_def)?; let system_keys = vec![ "id".to_string(), "type".to_string(), "created_by".to_string(), "modified_by".to_string(), "created_at".to_string(), "modified_at".to_string(), ]; let changes = self.compare_entities( entity_fetched.as_ref(), &entity_fields, &type_def.fields, &system_keys, ); let mut entity_change_kind = None; if entity_fetched.is_none() { let entity_id = entity_fields .get("id") .and_then(|v| v.as_str()) .unwrap_or(""); let id_val = if entity_id.is_empty() { Value::String(uuid::Uuid::new_v4().to_string()) } else { Value::String(entity_id.to_string()) }; entity_change_kind = Some("create".to_string()); let mut new_fields = changes.clone(); new_fields.insert("id".to_string(), id_val); new_fields.insert("type".to_string(), Value::String(type_name.to_string())); new_fields.insert("created_by".to_string(), Value::String(user_id.to_string())); new_fields.insert( "created_at".to_string(), Value::String(timestamp.to_string()), ); new_fields.insert( "modified_by".to_string(), Value::String(user_id.to_string()), ); new_fields.insert( "modified_at".to_string(), Value::String(timestamp.to_string()), ); entity_fields = new_fields; } else if changes.is_empty() { let mut new_fields = serde_json::Map::new(); new_fields.insert( "id".to_string(), entity_fetched.as_ref().unwrap().get("id").unwrap().clone(), ); new_fields.insert("type".to_string(), Value::String(type_name.to_string())); entity_fields = new_fields; } else { let is_archived = changes .get("archived") .and_then(|v| v.as_bool()) .unwrap_or(false); entity_change_kind = if is_archived { Some("delete".to_string()) } else { Some("update".to_string()) }; let mut new_fields = changes.clone(); new_fields.insert( "id".to_string(), entity_fetched.as_ref().unwrap().get("id").unwrap().clone(), ); new_fields.insert("type".to_string(), Value::String(type_name.to_string())); new_fields.insert( "modified_by".to_string(), Value::String(user_id.to_string()), ); new_fields.insert( "modified_at".to_string(), Value::String(timestamp.to_string()), ); entity_fields = new_fields; } Ok((entity_fields, entity_change_kind, entity_fetched)) } fn fetch_entity( &self, entity_fields: &serde_json::Map, entity_type: &crate::database::r#type::Type, ) -> Result>, String> { let id_val = entity_fields.get("id"); let entity_type_name = entity_type.name.as_str(); let mut lookup_complete = false; if !entity_type.lookup_fields.is_empty() { lookup_complete = true; for column in &entity_type.lookup_fields { match entity_fields.get(column) { Some(Value::Null) | None => { lookup_complete = false; break; } Some(Value::String(s)) if s.is_empty() => { lookup_complete = false; break; } _ => {} } } } if id_val.is_none() && !lookup_complete { return Ok(None); } let fetch_sql_template = if let Some(cached) = self.cache.get(entity_type_name) { cached } else { let mut select_list = String::from("to_jsonb(t1.*)"); let mut join_clauses = format!("FROM agreego.\"{}\" t1", entity_type.hierarchy[0]); for (i, table_name) in entity_type.hierarchy.iter().enumerate().skip(1) { let t_alias = format!("t{}", i + 1); join_clauses.push_str(&format!( " LEFT JOIN agreego.\"{}\" {} ON {}.id = t1.id", table_name, t_alias, t_alias )); select_list.push_str(&format!(" || to_jsonb({}.*)", t_alias)); } let template = format!("SELECT {} {}", select_list, join_clauses); self .cache .insert(entity_type_name.to_string(), template.clone()); template }; let where_clause = if let Some(id) = id_val { format!("WHERE t1.id = {}", Self::quote_literal(id)) } else if lookup_complete { let mut lookup_predicates = Vec::new(); for column in &entity_type.lookup_fields { let val = entity_fields.get(column).unwrap_or(&Value::Null); if column == "type" { lookup_predicates.push(format!("t1.\"{}\" = {}", column, Self::quote_literal(val))); } else { lookup_predicates.push(format!("\"{}\" = {}", column, Self::quote_literal(val))); } } format!("WHERE {}", lookup_predicates.join(" AND ")) } else { return Ok(None); }; let final_sql = format!("{} {}", fetch_sql_template, where_clause); let fetched = match self.db.query(&final_sql, None) { Ok(Value::Array(table)) => { if table.len() > 1 { Err(format!( "TOO_MANY_LOOKUP_ROWS: Lookup for {} found too many existing rows", entity_type_name )) } else if table.is_empty() { Ok(None) } else { let row = table.first().unwrap(); match row { Value::Object(map) => Ok(Some(map.clone())), other => Err(format!("Expected JSON object, got: {:?}", other)), } } } Ok(_) => Err("Expected array from query in fetch_entity".to_string()), Err(e) => Err(format!("SPI error in fetch_entity: {:?}", e)), }?; Ok(fetched) } fn merge_entity_fields( &self, change_kind: &str, entity_type_name: &str, entity_type: &crate::database::r#type::Type, entity_fields: &serde_json::Map, _entity_fetched: Option<&serde_json::Map>, ) -> Result<(), String> { if change_kind.is_empty() { return Ok(()); } let id_str = match entity_fields.get("id").and_then(|v| v.as_str()) { Some(id) => id, None => return Err("Missing 'id' for merge execution".to_string()), }; let grouped_fields = match &entity_type.grouped_fields { Some(Value::Object(map)) => map, _ => { return Err(format!( "Grouped fields missing for type {}", entity_type_name )); } }; let mut execute_order: Vec = entity_type.hierarchy.clone(); if change_kind == "create" { execute_order.reverse(); } for table_name in execute_order { let table_fields = match grouped_fields.get(&table_name).and_then(|v| v.as_array()) { Some(arr) => arr .iter() .filter_map(|v| v.as_str().map(|s| s.to_string())) .collect::>(), None => continue, }; let mut entity_pairs = serde_json::Map::new(); for (k, v) in entity_fields { if table_fields.contains(k) { entity_pairs.insert(k.clone(), v.clone()); } } if change_kind == "create" { if !entity_pairs.contains_key("id") && table_fields.contains(&"id".to_string()) { entity_pairs.insert("id".to_string(), Value::String(id_str.to_string())); } if !entity_pairs.contains_key("type") && table_fields.contains(&"type".to_string()) { entity_pairs.insert( "type".to_string(), Value::String(entity_type_name.to_string()), ); } let mut columns = Vec::new(); let mut values = Vec::new(); let mut sorted_keys: Vec<_> = entity_pairs.keys().cloned().collect(); sorted_keys.sort(); for key in &sorted_keys { columns.push(format!("\"{}\"", key)); let val = entity_pairs.get(key).unwrap(); if val.as_str() == Some("") { values.push("NULL".to_string()); } else { values.push(Self::quote_literal(val)); } } if columns.is_empty() { continue; } let sql = format!( "INSERT INTO agreego.\"{}\" ({}) VALUES ({})", table_name, columns.join(", "), values.join(", ") ); self .db .execute(&sql, None) .map_err(|e| format!("SPI Error in INSERT: {:?}", e))?; } else if change_kind == "update" || change_kind == "delete" { entity_pairs.remove("id"); entity_pairs.remove("type"); if entity_pairs.is_empty() { continue; } let mut set_clauses = Vec::new(); let mut sorted_keys: Vec<_> = entity_pairs.keys().cloned().collect(); sorted_keys.sort(); for key in &sorted_keys { let val = entity_pairs.get(key).unwrap(); if val.as_str() == Some("") { set_clauses.push(format!("\"{}\" = NULL", key)); } else { set_clauses.push(format!("\"{}\" = {}", key, Self::quote_literal(val))); } } let sql = format!( "UPDATE agreego.\"{}\" SET {} WHERE id = {}", table_name, set_clauses.join(", "), Self::quote_literal(&Value::String(id_str.to_string())) ); self .db .execute(&sql, None) .map_err(|e| format!("SPI Error in UPDATE: {:?}", e))?; } } Ok(()) } fn merge_entity_change( &self, entity_fields: &serde_json::Map, entity_fetched: Option<&serde_json::Map>, entity_change_kind: Option<&str>, user_id: &str, timestamp: &str, ) -> Result, String> { let change_kind = match entity_change_kind { Some(k) => k, None => return Ok(None), }; let id_str = entity_fields.get("id").unwrap(); let type_name = entity_fields.get("type").unwrap(); let mut changes = serde_json::Map::new(); let is_update = change_kind == "update" || change_kind == "delete"; if !is_update { let system_keys = vec![ "id".to_string(), "created_by".to_string(), "modified_by".to_string(), "created_at".to_string(), "modified_at".to_string(), ]; for (k, v) in entity_fields { if !system_keys.contains(k) { changes.insert(k.clone(), v.clone()); } } } else { let system_keys = vec![ "id".to_string(), "type".to_string(), "created_by".to_string(), "modified_by".to_string(), "created_at".to_string(), "modified_at".to_string(), ]; for (k, v) in entity_fields { if !system_keys.contains(k) { if let Some(fetched) = entity_fetched { let old_val = fetched.get(k).unwrap_or(&Value::Null); if v != old_val { changes.insert(k.clone(), v.clone()); } } } } changes.insert("type".to_string(), type_name.clone()); } let mut complete = entity_fields.clone(); if is_update { if let Some(fetched) = entity_fetched { let mut temp = fetched.clone(); for (k, v) in entity_fields { temp.insert(k.clone(), v.clone()); } complete = temp; } } let mut notification = serde_json::Map::new(); notification.insert("complete".to_string(), Value::Object(complete)); if is_update { notification.insert("changes".to_string(), Value::Object(changes.clone())); } let change_sql = format!( "INSERT INTO agreego.change (changes, entity_id, id, kind, modified_at, modified_by) VALUES ({}, {}, {}, {}, {}, {})", Self::quote_literal(&Value::Object(changes)), Self::quote_literal(id_str), Self::quote_literal(&Value::String(uuid::Uuid::new_v4().to_string())), Self::quote_literal(&Value::String(change_kind.to_string())), Self::quote_literal(&Value::String(timestamp.to_string())), Self::quote_literal(&Value::String(user_id.to_string())) ); let notify_sql = format!( "SELECT pg_notify('entity', {})", Self::quote_literal(&Value::String(Value::Object(notification).to_string())) ); self .db .execute(&change_sql, None) .map_err(|e| format!("Executor Error in change: {:?}", e))?; Ok(Some(notify_sql)) } fn compare_entities( &self, fetched_entity: Option<&serde_json::Map>, new_fields: &serde_json::Map, type_fields: &[String], system_keys: &[String], ) -> serde_json::Map { let mut changes = serde_json::Map::new(); if fetched_entity.is_none() { for (k, v) in new_fields { if type_fields.contains(k) && !system_keys.contains(k) { changes.insert(k.clone(), v.clone()); } } return changes; } let old_map = fetched_entity.unwrap(); for (k, v) in new_fields { if type_fields.contains(k) && !system_keys.contains(k) { let old_val = old_map.get(k).unwrap_or(&Value::Null); if v != old_val { changes.insert(k.clone(), v.clone()); } } } changes } fn reduce_entity_relations( &self, mut matching_relations: Vec, relative: &serde_json::Map, relation_name: &str, ) -> Result, String> { if matching_relations.is_empty() { return Ok(None); } if matching_relations.len() == 1 { return Ok(Some(matching_relations.pop().unwrap())); } let exact_match: Vec<_> = matching_relations .iter() .filter(|r| r.prefix.as_deref() == Some(relation_name)) .cloned() .collect(); if exact_match.len() == 1 { return Ok(Some(exact_match.into_iter().next().unwrap())); } matching_relations.retain(|r| { if let Some(prefix) = &r.prefix { !relative.contains_key(prefix) } else { true } }); if matching_relations.len() == 1 { Ok(Some(matching_relations.pop().unwrap())) } else { let constraints: Vec<_> = matching_relations .iter() .map(|r| r.constraint.clone()) .collect(); Err(format!( "AMBIGUOUS_TYPE_RELATIONS: Could not reduce ambiguous type relations: {}", constraints.join(", ") )) } } fn get_entity_relation( &self, entity_type: &crate::database::r#type::Type, relative: &serde_json::Map, relation_name: &str, ) -> Result, String> { let relative_type_name = match relative.get("type").and_then(|v| v.as_str()) { Some(t) => t, None => return Ok(None), }; let relative_type = match self.db.types.get(relative_type_name) { Some(t) => t, None => return Ok(None), }; let mut relative_relations: Vec = Vec::new(); for r in self.db.relations.values() { if r.source_type != "entity" && r.destination_type != "entity" { let condition1 = relative_type.hierarchy.contains(&r.source_type) && entity_type.hierarchy.contains(&r.destination_type); let condition2 = entity_type.hierarchy.contains(&r.source_type) && relative_type.hierarchy.contains(&r.destination_type); if condition1 || condition2 { relative_relations.push(r.clone()); } } } let mut relative_relation = self.reduce_entity_relations(relative_relations, relative, relation_name)?; if relative_relation.is_none() { let mut poly_relations: Vec = Vec::new(); for r in self.db.relations.values() { if r.destination_type == "entity" { let condition1 = relative_type.hierarchy.contains(&r.source_type); let condition2 = entity_type.hierarchy.contains(&r.source_type); if condition1 || condition2 { poly_relations.push(r.clone()); } } } relative_relation = self.reduce_entity_relations(poly_relations, relative, relation_name)?; } Ok(relative_relation) } fn apply_entity_relation( source_entity: &mut serde_json::Map, source_columns: &[String], destination_columns: &[String], destination_entity: &serde_json::Map, ) { if source_columns.len() != destination_columns.len() { return; } for i in 0..source_columns.len() { if let Some(dest_val) = destination_entity.get(&destination_columns[i]) { source_entity.insert(source_columns[i].clone(), dest_val.clone()); } } } fn quote_literal(val: &Value) -> String { match val { Value::Null => "NULL".to_string(), Value::Bool(b) => { if *b { "true".to_string() } else { "false".to_string() } } Value::Number(n) => { if let Some(f) = n.as_f64() { if f.fract() == 0.0 { return f.trunc().to_string(); } } n.to_string() } Value::String(s) => { if s.is_empty() { "NULL".to_string() } else { format!("'{}'", s.replace('\'', "''")) } } _ => format!( "'{}'", serde_json::to_string(val).unwrap().replace('\'', "''") ), } } }