merger notification process order testing

This commit is contained in:
2026-03-15 07:31:14 -04:00
parent 6632570712
commit 6de75ba525
2 changed files with 86 additions and 62 deletions

View File

@ -1213,21 +1213,6 @@
" '00000000-0000-0000-0000-000000000000'", " '00000000-0000-0000-0000-000000000000'",
")" ")"
], ],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"date_of_birth\":\"2000-01-01\",",
" \"first_name\":\"Bob\",",
" \"id\":\"{{uuid:customer_id}}\",",
" \"last_name\":\"Smith\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"type\":\"person\"",
" }",
"}')"
],
[ [
"INSERT INTO agreego.\"entity\" (", "INSERT INTO agreego.\"entity\" (",
" \"created_at\",", " \"created_at\",",
@ -1295,6 +1280,21 @@
" \"type\":\"order\"", " \"type\":\"order\"",
" }", " }",
"}')" "}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"date_of_birth\":\"2000-01-01\",",
" \"first_name\":\"Bob\",",
" \"id\":\"{{uuid:customer_id}}\",",
" \"last_name\":\"Smith\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"type\":\"person\"",
" }",
"}')"
] ]
] ]
} }

View File

@ -21,63 +21,87 @@ impl Merger {
} }
pub fn merge(&self, data: Value) -> crate::drop::Drop { pub fn merge(&self, data: Value) -> crate::drop::Drop {
match self.merge_internal(data) { let mut val_resolved = Value::Null;
let mut notifications_queue = Vec::new();
let result = self.merge_internal(data, &mut notifications_queue);
match result {
Ok(val) => { Ok(val) => {
let stripped_val = match val { val_resolved = val;
Value::Object(mut map) => { }
Err(msg) => {
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "MERGE_FAILED".to_string(),
message: msg,
details: crate::drop::ErrorDetails {
path: "".to_string(),
},
}]);
}
};
// Execute the globally collected, pre-ordered notifications last!
for notify_sql in notifications_queue {
if let Err(e) = self.db.execute(&notify_sql, None) {
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "MERGE_FAILED".to_string(),
message: format!("Executor Error in pre-ordered notify: {:?}", e),
details: crate::drop::ErrorDetails {
path: "".to_string(),
},
}]);
}
}
let stripped_val = match val_resolved {
Value::Object(mut map) => {
let mut out = serde_json::Map::new();
if let Some(id) = map.remove("id") {
out.insert("id".to_string(), id);
}
Value::Object(out)
}
Value::Array(arr) => {
let mut out_arr = Vec::new();
for item in arr {
if let Value::Object(mut map) = item {
let mut out = serde_json::Map::new(); let mut out = serde_json::Map::new();
if let Some(id) = map.remove("id") { if let Some(id) = map.remove("id") {
out.insert("id".to_string(), id); out.insert("id".to_string(), id);
} }
Value::Object(out) out_arr.push(Value::Object(out));
} else {
out_arr.push(Value::Null);
} }
Value::Array(arr) => { }
let mut out_arr = Vec::new(); Value::Array(out_arr)
for item in arr {
if let Value::Object(mut map) = item {
let mut out = serde_json::Map::new();
if let Some(id) = map.remove("id") {
out.insert("id".to_string(), id);
}
out_arr.push(Value::Object(out));
} else {
out_arr.push(Value::Null);
}
}
Value::Array(out_arr)
}
other => other,
};
crate::drop::Drop::success_with_val(stripped_val)
} }
Err(msg) => crate::drop::Drop::with_errors(vec![crate::drop::Error { other => other,
code: "MERGE_FAILED".to_string(), };
message: msg, crate::drop::Drop::success_with_val(stripped_val)
details: crate::drop::ErrorDetails {
path: "".to_string(),
},
}]),
}
} }
pub(crate) fn merge_internal(&self, data: Value) -> Result<Value, String> { pub(crate) fn merge_internal(&self, data: Value, notifications: &mut Vec<String>) -> Result<Value, String> {
match data { match data {
Value::Array(items) => self.merge_array(items), Value::Array(items) => self.merge_array(items, notifications),
Value::Object(map) => self.merge_object(map), Value::Object(map) => self.merge_object(map, notifications),
_ => Err("Invalid merge payload: root must be an Object or Array".to_string()), _ => Err("Invalid merge payload: root must be an Object or Array".to_string()),
} }
} }
fn merge_array(&self, items: Vec<Value>) -> Result<Value, String> { fn merge_array(&self, items: Vec<Value>, notifications: &mut Vec<String>) -> Result<Value, String> {
let mut resolved_items = Vec::new(); let mut resolved_items = Vec::new();
for item in items { for item in items {
let resolved = self.merge_internal(item)?; let resolved = self.merge_internal(item, notifications)?;
resolved_items.push(resolved); resolved_items.push(resolved);
} }
Ok(Value::Array(resolved_items)) Ok(Value::Array(resolved_items))
} }
fn merge_object(&self, obj: serde_json::Map<String, Value>) -> Result<Value, String> { fn merge_object(&self, obj: serde_json::Map<String, Value>, notifications: &mut Vec<String>) -> Result<Value, String> {
let queue_start = notifications.len();
let type_name = match obj.get("type").and_then(|v| v.as_str()) { let type_name = match obj.get("type").and_then(|v| v.as_str()) {
Some(t) => t.to_string(), Some(t) => t.to_string(),
None => return Err("Missing required 'type' field on object".to_string()), None => return Err("Missing required 'type' field on object".to_string()),
@ -147,7 +171,7 @@ impl Merger {
} }
} }
let merged_relative = match self.merge_internal(Value::Object(relative))? { let merged_relative = match self.merge_internal(Value::Object(relative), notifications)? {
Value::Object(m) => m, Value::Object(m) => m,
_ => continue, _ => continue,
}; };
@ -174,7 +198,7 @@ impl Merger {
&entity_fields, &entity_fields,
); );
let merged_relative = match self.merge_internal(Value::Object(relative))? { let merged_relative = match self.merge_internal(Value::Object(relative), notifications)? {
Value::Object(m) => m, Value::Object(m) => m,
_ => continue, _ => continue,
}; };
@ -242,7 +266,7 @@ impl Merger {
&entity_fields, &entity_fields,
); );
let merged_relative = match self.merge_internal(Value::Object(relative_item))? { let merged_relative = match self.merge_internal(Value::Object(relative_item), notifications)? {
Value::Object(m) => m, Value::Object(m) => m,
_ => continue, _ => continue,
}; };
@ -255,7 +279,7 @@ impl Merger {
} }
// 7. Perform change tracking // 7. Perform change tracking
self.merge_entity_change( let notify_sql = self.merge_entity_change(
&entity_fields, &entity_fields,
entity_fetched.as_ref(), entity_fetched.as_ref(),
entity_change_kind.as_deref(), entity_change_kind.as_deref(),
@ -263,6 +287,10 @@ impl Merger {
&timestamp, &timestamp,
)?; )?;
if let Some(sql) = notify_sql {
notifications.insert(queue_start, sql);
}
// Produce the full tree response // Produce the full tree response
let mut final_response = serde_json::Map::new(); let mut final_response = serde_json::Map::new();
if let Some(fetched) = entity_fetched { if let Some(fetched) = entity_fetched {
@ -614,10 +642,10 @@ impl Merger {
entity_change_kind: Option<&str>, entity_change_kind: Option<&str>,
user_id: &str, user_id: &str,
timestamp: &str, timestamp: &str,
) -> Result<(), String> { ) -> Result<Option<String>, String> {
let change_kind = match entity_change_kind { let change_kind = match entity_change_kind {
Some(k) => k, Some(k) => k,
None => return Ok(()), None => return Ok(None),
}; };
let id_str = entity_fields.get("id").unwrap(); let id_str = entity_fields.get("id").unwrap();
@ -697,12 +725,8 @@ impl Merger {
.db .db
.execute(&change_sql, None) .execute(&change_sql, None)
.map_err(|e| format!("Executor Error in change: {:?}", e))?; .map_err(|e| format!("Executor Error in change: {:?}", e))?;
self
.db
.execute(&notify_sql, None)
.map_err(|e| format!("Executor Error in notify: {:?}", e))?;
Ok(()) Ok(Some(notify_sql))
} }
fn compare_entities( fn compare_entities(