diff --git a/fixtures/merger.json b/fixtures/merger.json index 74180cb..7e61114 100644 --- a/fixtures/merger.json +++ b/fixtures/merger.json @@ -19,7 +19,7 @@ { "id": "22222222-2222-2222-2222-222222222222", "type": "relation", - "constraint": "fk_order_customer", + "constraint": "fk_order_customer_person", "source_type": "order", "source_columns": [ "customer_id" @@ -41,8 +41,7 @@ "destination_type": "order", "destination_columns": [ "id" - ], - "prefix": "lines" + ] }, { "id": "44444444-4444-4444-4444-444444444444", diff --git a/fixtures/queryer.json b/fixtures/queryer.json index 59ee865..53ba91d 100644 --- a/fixtures/queryer.json +++ b/fixtures/queryer.json @@ -71,7 +71,7 @@ { "id": "22222222-2222-2222-2222-222222222222", "type": "relation", - "constraint": "fk_order_customer", + "constraint": "fk_order_customer_person", "source_type": "order", "source_columns": [ "customer_id" @@ -79,7 +79,8 @@ "destination_type": "person", "destination_columns": [ "id" - ] + ], + "prefix": "customer" }, { "id": "22222222-2222-2222-2222-222222222227", diff --git a/src/merger/mod.rs b/src/merger/mod.rs index 446e65b..7221575 100644 --- a/src/merger/mod.rs +++ b/src/merger/mod.rs @@ -3,8 +3,8 @@ pub mod cache; -use crate::database::r#type::Type; use crate::database::Database; +use crate::database::r#type::Type; use serde_json::Value; use std::sync::Arc; @@ -25,19 +25,19 @@ impl Merger { let mut notifications_queue = Vec::new(); let target_schema = match self.db.schemas.get(schema_id) { - Some(s) => Arc::new(s.clone()), - None => { - return crate::drop::Drop::with_errors(vec![crate::drop::Error { - code: "MERGE_FAILED".to_string(), - message: format!("Unknown schema_id: {}", schema_id), - details: crate::drop::ErrorDetails { - path: "".to_string(), - cause: None, - context: Some(data), - schema: None, - }, - }]); - } + Some(s) => Arc::new(s.clone()), + None => { + return crate::drop::Drop::with_errors(vec![crate::drop::Error { + code: "MERGE_FAILED".to_string(), + message: format!("Unknown schema_id: {}", schema_id), + details: crate::drop::ErrorDetails { + path: "".to_string(), + cause: None, + context: Some(data), + schema: None, + }, + }]); + } }; let result = self.merge_internal(target_schema, data.clone(), &mut notifications_queue); @@ -50,18 +50,24 @@ impl Merger { let mut final_cause = None; if let Ok(Value::Object(map)) = serde_json::from_str::(&msg) { - if let (Some(Value::String(e_msg)), Some(Value::String(e_code))) = (map.get("error"), map.get("code")) { + if let (Some(Value::String(e_msg)), Some(Value::String(e_code))) = + (map.get("error"), map.get("code")) + { final_message = e_msg.clone(); final_code = e_code.clone(); let mut cause_parts = Vec::new(); if let Some(Value::String(d)) = map.get("detail") { - if !d.is_empty() { cause_parts.push(d.clone()); } + if !d.is_empty() { + cause_parts.push(d.clone()); + } } if let Some(Value::String(h)) = map.get("hint") { - if !h.is_empty() { cause_parts.push(h.clone()); } + if !h.is_empty() { + cause_parts.push(h.clone()); + } } if !cause_parts.is_empty() { - final_cause = Some(cause_parts.join("\n")); + final_cause = Some(cause_parts.join("\n")); } } } @@ -144,11 +150,11 @@ impl Merger { ) -> Result { let mut item_schema = schema.clone(); if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) = &schema.obj.type_ { - if t == "array" { - if let Some(items_def) = &schema.obj.items { - item_schema = items_def.clone(); - } + if t == "array" { + if let Some(items_def) = &schema.obj.items { + item_schema = items_def.clone(); } + } } let mut resolved_items = Vec::new(); @@ -178,48 +184,48 @@ impl Merger { }; let compiled_props = match schema.obj.compiled_properties.get() { - Some(props) => props, - None => return Err("Schema has no compiled properties for merging".to_string()), + Some(props) => props, + None => return Err("Schema has no compiled properties for merging".to_string()), }; let mut entity_fields = serde_json::Map::new(); let mut entity_objects = std::collections::BTreeMap::new(); let mut entity_arrays = std::collections::BTreeMap::new(); - for (k, v) in obj { + for (k, v) in obj.clone() { // Always retain system and unmapped core fields natively implicitly mapped to the Postgres tables if k == "id" || k == "type" || k == "created" { - entity_fields.insert(k.clone(), v.clone()); - continue; + entity_fields.insert(k.clone(), v.clone()); + continue; } if let Some(prop_schema) = compiled_props.get(&k) { - let mut is_edge = false; - if let Some(edges) = schema.obj.compiled_edges.get() { - if edges.contains_key(&k) { - is_edge = true; - } - } - - if is_edge { - let typeof_v = match &v { - Value::Object(_) => "object", - Value::Array(_) => "array", - _ => "field", // Malformed edge data? - }; - if typeof_v == "object" { - entity_objects.insert(k.clone(), (v.clone(), prop_schema.clone())); - } else if typeof_v == "array" { - entity_arrays.insert(k.clone(), (v.clone(), prop_schema.clone())); - } else { - entity_fields.insert(k.clone(), v.clone()); - } - } else { - // Not an edge! It's a raw Postgres column (e.g., JSONB, text[]) - entity_fields.insert(k.clone(), v.clone()); - } - } else if type_def.fields.contains(&k) { + let mut is_edge = false; + if let Some(edges) = schema.obj.compiled_edges.get() { + if edges.contains_key(&k) { + is_edge = true; + } + } + + if is_edge { + let typeof_v = match &v { + Value::Object(_) => "object", + Value::Array(_) => "array", + _ => "field", // Malformed edge data? + }; + if typeof_v == "object" { + entity_objects.insert(k.clone(), (v.clone(), prop_schema.clone())); + } else if typeof_v == "array" { + entity_arrays.insert(k.clone(), (v.clone(), prop_schema.clone())); + } else { + entity_fields.insert(k.clone(), v.clone()); + } + } else { + // Not an edge! It's a raw Postgres column (e.g., JSONB, text[]) entity_fields.insert(k.clone(), v.clone()); + } + } else if type_def.fields.contains(&k) { + entity_fields.insert(k.clone(), v.clone()); } } @@ -253,12 +259,16 @@ impl Merger { }; if let Some(compiled_edges) = schema.obj.compiled_edges.get() { - println!("Compiled Edges keys for relation {}: {:?}", relation_name, compiled_edges.keys().collect::>()); + println!( + "Compiled Edges keys for relation {}: {:?}", + relation_name, + compiled_edges.keys().collect::>() + ); if let Some(edge) = compiled_edges.get(&relation_name) { println!("FOUND EDGE {} -> {:?}", relation_name, edge.constraint); if let Some(relation) = self.db.relations.get(&edge.constraint) { let parent_is_source = edge.forward; - + if parent_is_source { if !relative.contains_key("organization_id") { if let Some(org_id) = entity_fields.get("organization_id") { @@ -266,15 +276,16 @@ impl Merger { } } - let mut merged_relative = match self.merge_internal(rel_schema.clone(), Value::Object(relative), notifications)? { + let mut merged_relative = match self.merge_internal( + rel_schema.clone(), + Value::Object(relative), + notifications, + )? { Value::Object(m) => m, _ => continue, }; - merged_relative.insert( - "type".to_string(), - Value::String(relative_type_name), - ); + merged_relative.insert("type".to_string(), Value::String(relative_type_name)); Self::apply_entity_relation( &mut entity_fields, @@ -297,7 +308,11 @@ impl Merger { &entity_fields, ); - let merged_relative = match self.merge_internal(rel_schema.clone(), Value::Object(relative), notifications)? { + let merged_relative = match self.merge_internal( + rel_schema.clone(), + Value::Object(relative), + notifications, + )? { Value::Object(m) => m, _ => continue, }; @@ -318,6 +333,20 @@ impl Merger { entity_replaces = replaces; } + #[cfg(not(test))] + if type_name == "contact" || type_name == "person" { + pgrx::notice!("=== DEBUG {} PAYLOAD ===", type_name); + pgrx::notice!("1. Incoming obj iteration: {:?}", obj); + pgrx::notice!("2. Final mapped entity_fields: {:?}", entity_fields); + pgrx::notice!( + "3. TypeDef fields check: {:?}", + type_def.fields.contains(&"source_id".to_string()) + ); + if !entity_fields.contains_key("source_id") { + pgrx::notice!("CRITICAL ERROR: source_id was dropped during mapping loop!"); + } + } + self.merge_entity_fields( entity_change_kind.as_deref().unwrap_or(""), &type_name, @@ -360,19 +389,24 @@ impl Merger { ); let mut item_schema = rel_schema.clone(); - if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) = &rel_schema.obj.type_ { - if t == "array" { - if let Some(items_def) = &rel_schema.obj.items { - item_schema = items_def.clone(); - } + if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) = + &rel_schema.obj.type_ + { + if t == "array" { + if let Some(items_def) = &rel_schema.obj.items { + item_schema = items_def.clone(); } + } } - let merged_relative = - match self.merge_internal(item_schema, Value::Object(relative_item), notifications)? { - Value::Object(m) => m, - _ => continue, - }; + let merged_relative = match self.merge_internal( + item_schema, + Value::Object(relative_item), + notifications, + )? { + Value::Object(m) => m, + _ => continue, + }; relative_responses.push(Value::Object(merged_relative)); } @@ -433,8 +467,8 @@ impl Merger { // An anchor is STRICTLY a struct containing merely an `id` and `type`. // We aggressively bypass Database SPI `SELECT` fetches because there are no primitive // mutations to apply to the row. PostgreSQL inherently protects relationships via Foreign Keys downstream. - let is_anchor = entity_fields.len() == 2 - && entity_fields.contains_key("id") + let is_anchor = entity_fields.len() == 2 + && entity_fields.contains_key("id") && entity_fields.contains_key("type"); let has_valid_id = entity_fields @@ -450,13 +484,13 @@ impl Merger { let mut replaces_id = None; if let Some(ref fetched_row) = entity_fetched { - let provided_id = entity_fields.get("id").and_then(|v| v.as_str()); - let fetched_id = fetched_row.get("id").and_then(|v| v.as_str()); - if let (Some(pid), Some(fid)) = (provided_id, fetched_id) { - if !pid.is_empty() && pid != fid { - replaces_id = Some(pid.to_string()); - } + let provided_id = entity_fields.get("id").and_then(|v| v.as_str()); + let fetched_id = fetched_row.get("id").and_then(|v| v.as_str()); + if let (Some(pid), Some(fid)) = (provided_id, fetched_id) { + if !pid.is_empty() && pid != fid { + replaces_id = Some(pid.to_string()); } + } } let system_keys = vec![ @@ -548,7 +582,12 @@ impl Merger { entity_fields = new_fields; } - Ok((entity_fields, entity_change_kind, entity_fetched, replaces_id)) + Ok(( + entity_fields, + entity_change_kind, + entity_fetched, + replaces_id, + )) } fn fetch_entity( @@ -735,9 +774,7 @@ impl Merger { columns.join(", "), values.join(", ") ); - self - .db - .execute(&sql, None)?; + self.db.execute(&sql, None)?; } else if change_kind == "update" || change_kind == "delete" { entity_pairs.remove("id"); entity_pairs.remove("type"); @@ -769,9 +806,7 @@ impl Merger { set_clauses.join(", "), Self::quote_literal(&Value::String(id_str.to_string())) ); - self - .db - .execute(&sql, None)?; + self.db.execute(&sql, None)?; } } @@ -857,13 +892,13 @@ impl Merger { let mut notification = serde_json::Map::new(); notification.insert("complete".to_string(), Value::Object(complete)); notification.insert("new".to_string(), new_val_obj.clone()); - + if old_val_obj != Value::Null { - notification.insert("old".to_string(), old_val_obj.clone()); + notification.insert("old".to_string(), old_val_obj.clone()); } - + if let Some(rep) = replaces_id { - notification.insert("replaces".to_string(), Value::String(rep.to_string())); + notification.insert("replaces".to_string(), Value::String(rep.to_string())); } let mut notify_sql = None;