Compare commits

...

7 Commits

Author SHA1 Message Date
b6c5561d2f version: 1.0.79 2026-03-20 05:58:53 -04:00
e01b778d68 jsob and test array handling improved in merger 2026-03-20 05:58:43 -04:00
6eb134c0d6 test checkpoint 2026-03-20 05:17:28 -04:00
7ccc4b7cce version: 1.0.78 2026-03-20 04:41:46 -04:00
77bfa4cd18 historical and notify respected 2026-03-20 04:41:35 -04:00
b47a5abd26 version: 1.0.77 2026-03-20 01:59:56 -04:00
fcd8310ed8 added new and old to changes and pg notify 2026-03-20 01:59:48 -04:00
5 changed files with 599 additions and 245 deletions

File diff suppressed because it is too large Load Diff

View File

@ -15,6 +15,8 @@ pub struct Type {
#[serde(default)] #[serde(default)]
pub historical: bool, pub historical: bool,
#[serde(default)] #[serde(default)]
pub notify: bool,
#[serde(default)]
pub sensitive: bool, pub sensitive: bool,
#[serde(default)] #[serde(default)]
pub ownable: bool, pub ownable: bool,

View File

@ -3,6 +3,7 @@
pub mod cache; pub mod cache;
use crate::database::r#type::Type;
use crate::database::Database; use crate::database::Database;
use serde_json::Value; use serde_json::Value;
use std::sync::Arc; use std::sync::Arc;
@ -321,8 +322,9 @@ impl Merger {
} }
} }
// 7. Perform change tracking // 7. Perform change tracking dynamically suppressing noise based on type bounds!
let notify_sql = self.merge_entity_change( let notify_sql = self.merge_entity_change(
type_def,
&entity_fields, &entity_fields,
entity_fetched.as_ref(), entity_fetched.as_ref(),
entity_change_kind.as_deref(), entity_change_kind.as_deref(),
@ -620,11 +622,7 @@ impl Merger {
for key in &sorted_keys { for key in &sorted_keys {
columns.push(format!("\"{}\"", key)); columns.push(format!("\"{}\"", key));
let val = entity_pairs.get(key).unwrap(); let val = entity_pairs.get(key).unwrap();
if val.as_str() == Some("") { values.push(Self::format_sql_value(val, key, entity_type));
values.push("NULL".to_string());
} else {
values.push(Self::quote_literal(val));
}
} }
if columns.is_empty() { if columns.is_empty() {
@ -658,7 +656,11 @@ impl Merger {
if val.as_str() == Some("") { if val.as_str() == Some("") {
set_clauses.push(format!("\"{}\" = NULL", key)); set_clauses.push(format!("\"{}\" = NULL", key));
} else { } else {
set_clauses.push(format!("\"{}\" = {}", key, Self::quote_literal(val))); set_clauses.push(format!(
"\"{}\" = {}",
key,
Self::format_sql_value(val, key, entity_type)
));
} }
} }
@ -680,6 +682,7 @@ impl Merger {
fn merge_entity_change( fn merge_entity_change(
&self, &self,
type_obj: &Type,
entity_fields: &serde_json::Map<String, Value>, entity_fields: &serde_json::Map<String, Value>,
entity_fetched: Option<&serde_json::Map<String, Value>>, entity_fetched: Option<&serde_json::Map<String, Value>>,
entity_change_kind: Option<&str>, entity_change_kind: Option<&str>,
@ -694,7 +697,8 @@ impl Merger {
let id_str = entity_fields.get("id").unwrap(); let id_str = entity_fields.get("id").unwrap();
let type_name = entity_fields.get("type").unwrap(); let type_name = entity_fields.get("type").unwrap();
let mut changes = serde_json::Map::new(); let mut old_vals = serde_json::Map::new();
let mut new_vals = serde_json::Map::new();
let is_update = change_kind == "update" || change_kind == "delete"; let is_update = change_kind == "update" || change_kind == "delete";
if !is_update { if !is_update {
@ -707,7 +711,7 @@ impl Merger {
]; ];
for (k, v) in entity_fields { for (k, v) in entity_fields {
if !system_keys.contains(k) { if !system_keys.contains(k) {
changes.insert(k.clone(), v.clone()); new_vals.insert(k.clone(), v.clone());
} }
} }
} else { } else {
@ -724,12 +728,13 @@ impl Merger {
if let Some(fetched) = entity_fetched { if let Some(fetched) = entity_fetched {
let old_val = fetched.get(k).unwrap_or(&Value::Null); let old_val = fetched.get(k).unwrap_or(&Value::Null);
if v != old_val { if v != old_val {
changes.insert(k.clone(), v.clone()); new_vals.insert(k.clone(), v.clone());
old_vals.insert(k.clone(), old_val.clone());
} }
} }
} }
} }
changes.insert("type".to_string(), type_name.clone()); new_vals.insert("type".to_string(), type_name.clone());
} }
let mut complete = entity_fields.clone(); let mut complete = entity_fields.clone();
@ -743,33 +748,48 @@ impl Merger {
} }
} }
let new_val_obj = Value::Object(new_vals);
let old_val_obj = if old_vals.is_empty() {
Value::Null
} else {
Value::Object(old_vals)
};
let mut notification = serde_json::Map::new(); let mut notification = serde_json::Map::new();
notification.insert("complete".to_string(), Value::Object(complete)); notification.insert("complete".to_string(), Value::Object(complete));
if is_update { notification.insert("new".to_string(), new_val_obj.clone());
notification.insert("changes".to_string(), Value::Object(changes.clone()));
if old_val_obj != Value::Null {
notification.insert("old".to_string(), old_val_obj.clone());
} }
let change_sql = format!( let mut notify_sql = None;
"INSERT INTO agreego.change (changes, entity_id, id, kind, modified_at, modified_by) VALUES ({}, {}, {}, {}, {}, {})", if type_obj.historical {
Self::quote_literal(&Value::Object(changes)), let change_sql = format!(
Self::quote_literal(id_str), "INSERT INTO agreego.change (\"old\", \"new\", entity_id, id, kind, modified_at, modified_by) VALUES ({}, {}, {}, {}, {}, {}, {})",
Self::quote_literal(&Value::String(uuid::Uuid::new_v4().to_string())), Self::quote_literal(&old_val_obj),
Self::quote_literal(&Value::String(change_kind.to_string())), Self::quote_literal(&new_val_obj),
Self::quote_literal(&Value::String(timestamp.to_string())), Self::quote_literal(id_str),
Self::quote_literal(&Value::String(user_id.to_string())) Self::quote_literal(&Value::String(uuid::Uuid::new_v4().to_string())),
); Self::quote_literal(&Value::String(change_kind.to_string())),
Self::quote_literal(&Value::String(timestamp.to_string())),
Self::quote_literal(&Value::String(user_id.to_string()))
);
let notify_sql = format!( self
"SELECT pg_notify('entity', {})", .db
Self::quote_literal(&Value::String(Value::Object(notification).to_string())) .execute(&change_sql, None)
); .map_err(|e| format!("Executor Error in change: {:?}", e))?;
}
self if type_obj.notify {
.db notify_sql = Some(format!(
.execute(&change_sql, None) "SELECT pg_notify('entity', {})",
.map_err(|e| format!("Executor Error in change: {:?}", e))?; Self::quote_literal(&Value::String(Value::Object(notification).to_string()))
));
}
Ok(Some(notify_sql)) Ok(notify_sql)
} }
fn compare_entities( fn compare_entities(
@ -821,6 +841,34 @@ impl Merger {
} }
} }
fn format_sql_value(val: &Value, key: &str, entity_type: &Type) -> String {
if val.as_str() == Some("") {
return "NULL".to_string();
}
let mut is_pg_array = false;
if let Some(field_types_map) = entity_type.field_types.as_ref().and_then(|v| v.as_object()) {
if let Some(t_val) = field_types_map.get(key) {
if let Some(t_str) = t_val.as_str() {
if t_str.starts_with('_') {
is_pg_array = true;
}
}
}
}
if is_pg_array && val.is_array() {
let mut s = val.to_string();
if s.starts_with('[') && s.ends_with(']') {
s.replace_range(0..1, "{");
s.replace_range(s.len() - 1..s.len(), "}");
}
Self::quote_literal(&Value::String(s))
} else {
Self::quote_literal(val)
}
}
fn quote_literal(val: &Value) -> String { fn quote_literal(val: &Value) -> String {
match val { match val {
Value::Null => "NULL".to_string(), Value::Null => "NULL".to_string(),

View File

@ -8536,3 +8536,9 @@ fn test_merger_0_7() {
let path = format!("{}/fixtures/merger.json", env!("CARGO_MANIFEST_DIR")); let path = format!("{}/fixtures/merger.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 7).unwrap(); crate::tests::runner::run_test_case(&path, 0, 7).unwrap();
} }
#[test]
fn test_merger_0_8() {
let path = format!("{}/fixtures/merger.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 8).unwrap();
}

View File

@ -1 +1 @@
1.0.76 1.0.79