From 6de75ba52515e940e3950bf40657ba7121a92e81 Mon Sep 17 00:00:00 2001 From: Alex Groleau Date: Sun, 15 Mar 2026 07:31:14 -0400 Subject: [PATCH] merger notification process order testing --- fixtures/merger.json | 30 +++++------ src/merger/mod.rs | 118 ++++++++++++++++++++++++++----------------- 2 files changed, 86 insertions(+), 62 deletions(-) diff --git a/fixtures/merger.json b/fixtures/merger.json index b873348..8370e5b 100644 --- a/fixtures/merger.json +++ b/fixtures/merger.json @@ -1213,21 +1213,6 @@ " '00000000-0000-0000-0000-000000000000'", ")" ], - [ - "SELECT pg_notify('entity', '{", - " \"complete\":{", - " \"created_at\":\"{{timestamp}}\",", - " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"date_of_birth\":\"2000-01-01\",", - " \"first_name\":\"Bob\",", - " \"id\":\"{{uuid:customer_id}}\",", - " \"last_name\":\"Smith\",", - " \"modified_at\":\"{{timestamp}}\",", - " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"type\":\"person\"", - " }", - "}')" - ], [ "INSERT INTO agreego.\"entity\" (", " \"created_at\",", @@ -1295,6 +1280,21 @@ " \"type\":\"order\"", " }", "}')" + ], + [ + "SELECT pg_notify('entity', '{", + " \"complete\":{", + " \"created_at\":\"{{timestamp}}\",", + " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"date_of_birth\":\"2000-01-01\",", + " \"first_name\":\"Bob\",", + " \"id\":\"{{uuid:customer_id}}\",", + " \"last_name\":\"Smith\",", + " \"modified_at\":\"{{timestamp}}\",", + " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"type\":\"person\"", + " }", + "}')" ] ] } diff --git a/src/merger/mod.rs b/src/merger/mod.rs index 4744f29..7a67e4a 100644 --- a/src/merger/mod.rs +++ b/src/merger/mod.rs @@ -21,63 +21,87 @@ impl Merger { } pub fn merge(&self, data: Value) -> crate::drop::Drop { - match self.merge_internal(data) { + let mut val_resolved = Value::Null; + let mut notifications_queue = Vec::new(); + + let result = self.merge_internal(data, &mut notifications_queue); + + match result { Ok(val) => { - let stripped_val = match val { - Value::Object(mut map) => { + val_resolved = val; + } + Err(msg) => { + return crate::drop::Drop::with_errors(vec![crate::drop::Error { + code: "MERGE_FAILED".to_string(), + message: msg, + details: crate::drop::ErrorDetails { + path: "".to_string(), + }, + }]); + } + }; + + // Execute the globally collected, pre-ordered notifications last! + for notify_sql in notifications_queue { + if let Err(e) = self.db.execute(¬ify_sql, None) { + return crate::drop::Drop::with_errors(vec![crate::drop::Error { + code: "MERGE_FAILED".to_string(), + message: format!("Executor Error in pre-ordered notify: {:?}", e), + details: crate::drop::ErrorDetails { + path: "".to_string(), + }, + }]); + } + } + + let stripped_val = match val_resolved { + Value::Object(mut map) => { + let mut out = serde_json::Map::new(); + if let Some(id) = map.remove("id") { + out.insert("id".to_string(), id); + } + Value::Object(out) + } + Value::Array(arr) => { + let mut out_arr = Vec::new(); + for item in arr { + if let Value::Object(mut map) = item { let mut out = serde_json::Map::new(); if let Some(id) = map.remove("id") { out.insert("id".to_string(), id); } - Value::Object(out) + out_arr.push(Value::Object(out)); + } else { + out_arr.push(Value::Null); } - Value::Array(arr) => { - let mut out_arr = Vec::new(); - for item in arr { - if let Value::Object(mut map) = item { - let mut out = serde_json::Map::new(); - if let Some(id) = map.remove("id") { - out.insert("id".to_string(), id); - } - out_arr.push(Value::Object(out)); - } else { - out_arr.push(Value::Null); - } - } - Value::Array(out_arr) - } - other => other, - }; - crate::drop::Drop::success_with_val(stripped_val) + } + Value::Array(out_arr) } - Err(msg) => crate::drop::Drop::with_errors(vec![crate::drop::Error { - code: "MERGE_FAILED".to_string(), - message: msg, - details: crate::drop::ErrorDetails { - path: "".to_string(), - }, - }]), - } + other => other, + }; + crate::drop::Drop::success_with_val(stripped_val) } - pub(crate) fn merge_internal(&self, data: Value) -> Result { + pub(crate) fn merge_internal(&self, data: Value, notifications: &mut Vec) -> Result { match data { - Value::Array(items) => self.merge_array(items), - Value::Object(map) => self.merge_object(map), + Value::Array(items) => self.merge_array(items, notifications), + Value::Object(map) => self.merge_object(map, notifications), _ => Err("Invalid merge payload: root must be an Object or Array".to_string()), } } - fn merge_array(&self, items: Vec) -> Result { + fn merge_array(&self, items: Vec, notifications: &mut Vec) -> Result { let mut resolved_items = Vec::new(); for item in items { - let resolved = self.merge_internal(item)?; + let resolved = self.merge_internal(item, notifications)?; resolved_items.push(resolved); } Ok(Value::Array(resolved_items)) } - fn merge_object(&self, obj: serde_json::Map) -> Result { + fn merge_object(&self, obj: serde_json::Map, notifications: &mut Vec) -> Result { + let queue_start = notifications.len(); + let type_name = match obj.get("type").and_then(|v| v.as_str()) { Some(t) => t.to_string(), None => return Err("Missing required 'type' field on object".to_string()), @@ -147,7 +171,7 @@ impl Merger { } } - let merged_relative = match self.merge_internal(Value::Object(relative))? { + let merged_relative = match self.merge_internal(Value::Object(relative), notifications)? { Value::Object(m) => m, _ => continue, }; @@ -174,7 +198,7 @@ impl Merger { &entity_fields, ); - let merged_relative = match self.merge_internal(Value::Object(relative))? { + let merged_relative = match self.merge_internal(Value::Object(relative), notifications)? { Value::Object(m) => m, _ => continue, }; @@ -242,7 +266,7 @@ impl Merger { &entity_fields, ); - let merged_relative = match self.merge_internal(Value::Object(relative_item))? { + let merged_relative = match self.merge_internal(Value::Object(relative_item), notifications)? { Value::Object(m) => m, _ => continue, }; @@ -255,7 +279,7 @@ impl Merger { } // 7. Perform change tracking - self.merge_entity_change( + let notify_sql = self.merge_entity_change( &entity_fields, entity_fetched.as_ref(), entity_change_kind.as_deref(), @@ -263,6 +287,10 @@ impl Merger { ×tamp, )?; + if let Some(sql) = notify_sql { + notifications.insert(queue_start, sql); + } + // Produce the full tree response let mut final_response = serde_json::Map::new(); if let Some(fetched) = entity_fetched { @@ -614,10 +642,10 @@ impl Merger { entity_change_kind: Option<&str>, user_id: &str, timestamp: &str, - ) -> Result<(), String> { + ) -> Result, String> { let change_kind = match entity_change_kind { Some(k) => k, - None => return Ok(()), + None => return Ok(None), }; let id_str = entity_fields.get("id").unwrap(); @@ -697,12 +725,8 @@ impl Merger { .db .execute(&change_sql, None) .map_err(|e| format!("Executor Error in change: {:?}", e))?; - self - .db - .execute(¬ify_sql, None) - .map_err(|e| format!("Executor Error in notify: {:?}", e))?; - Ok(()) + Ok(Some(notify_sql)) } fn compare_entities(