|
|
|
@ -3,6 +3,7 @@
|
|
|
|
|
|
|
|
|
|
|
|
pub mod cache;
|
|
|
|
pub mod cache;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
use crate::database::r#type::Type;
|
|
|
|
use crate::database::Database;
|
|
|
|
use crate::database::Database;
|
|
|
|
use serde_json::Value;
|
|
|
|
use serde_json::Value;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use std::sync::Arc;
|
|
|
|
@ -321,8 +322,9 @@ impl Merger {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 7. Perform change tracking
|
|
|
|
// 7. Perform change tracking dynamically suppressing noise based on type bounds!
|
|
|
|
let notify_sql = self.merge_entity_change(
|
|
|
|
let notify_sql = self.merge_entity_change(
|
|
|
|
|
|
|
|
type_def,
|
|
|
|
&entity_fields,
|
|
|
|
&entity_fields,
|
|
|
|
entity_fetched.as_ref(),
|
|
|
|
entity_fetched.as_ref(),
|
|
|
|
entity_change_kind.as_deref(),
|
|
|
|
entity_change_kind.as_deref(),
|
|
|
|
@ -620,11 +622,7 @@ impl Merger {
|
|
|
|
for key in &sorted_keys {
|
|
|
|
for key in &sorted_keys {
|
|
|
|
columns.push(format!("\"{}\"", key));
|
|
|
|
columns.push(format!("\"{}\"", key));
|
|
|
|
let val = entity_pairs.get(key).unwrap();
|
|
|
|
let val = entity_pairs.get(key).unwrap();
|
|
|
|
if val.as_str() == Some("") {
|
|
|
|
values.push(Self::format_sql_value(val, key, entity_type));
|
|
|
|
values.push("NULL".to_string());
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
values.push(Self::quote_literal(val));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if columns.is_empty() {
|
|
|
|
if columns.is_empty() {
|
|
|
|
@ -658,7 +656,11 @@ impl Merger {
|
|
|
|
if val.as_str() == Some("") {
|
|
|
|
if val.as_str() == Some("") {
|
|
|
|
set_clauses.push(format!("\"{}\" = NULL", key));
|
|
|
|
set_clauses.push(format!("\"{}\" = NULL", key));
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
set_clauses.push(format!("\"{}\" = {}", key, Self::quote_literal(val)));
|
|
|
|
set_clauses.push(format!(
|
|
|
|
|
|
|
|
"\"{}\" = {}",
|
|
|
|
|
|
|
|
key,
|
|
|
|
|
|
|
|
Self::format_sql_value(val, key, entity_type)
|
|
|
|
|
|
|
|
));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@ -680,6 +682,7 @@ impl Merger {
|
|
|
|
|
|
|
|
|
|
|
|
fn merge_entity_change(
|
|
|
|
fn merge_entity_change(
|
|
|
|
&self,
|
|
|
|
&self,
|
|
|
|
|
|
|
|
type_obj: &Type,
|
|
|
|
entity_fields: &serde_json::Map<String, Value>,
|
|
|
|
entity_fields: &serde_json::Map<String, Value>,
|
|
|
|
entity_fetched: Option<&serde_json::Map<String, Value>>,
|
|
|
|
entity_fetched: Option<&serde_json::Map<String, Value>>,
|
|
|
|
entity_change_kind: Option<&str>,
|
|
|
|
entity_change_kind: Option<&str>,
|
|
|
|
@ -694,7 +697,8 @@ impl Merger {
|
|
|
|
let id_str = entity_fields.get("id").unwrap();
|
|
|
|
let id_str = entity_fields.get("id").unwrap();
|
|
|
|
let type_name = entity_fields.get("type").unwrap();
|
|
|
|
let type_name = entity_fields.get("type").unwrap();
|
|
|
|
|
|
|
|
|
|
|
|
let mut changes = serde_json::Map::new();
|
|
|
|
let mut old_vals = serde_json::Map::new();
|
|
|
|
|
|
|
|
let mut new_vals = serde_json::Map::new();
|
|
|
|
let is_update = change_kind == "update" || change_kind == "delete";
|
|
|
|
let is_update = change_kind == "update" || change_kind == "delete";
|
|
|
|
|
|
|
|
|
|
|
|
if !is_update {
|
|
|
|
if !is_update {
|
|
|
|
@ -707,7 +711,7 @@ impl Merger {
|
|
|
|
];
|
|
|
|
];
|
|
|
|
for (k, v) in entity_fields {
|
|
|
|
for (k, v) in entity_fields {
|
|
|
|
if !system_keys.contains(k) {
|
|
|
|
if !system_keys.contains(k) {
|
|
|
|
changes.insert(k.clone(), v.clone());
|
|
|
|
new_vals.insert(k.clone(), v.clone());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
@ -724,12 +728,13 @@ impl Merger {
|
|
|
|
if let Some(fetched) = entity_fetched {
|
|
|
|
if let Some(fetched) = entity_fetched {
|
|
|
|
let old_val = fetched.get(k).unwrap_or(&Value::Null);
|
|
|
|
let old_val = fetched.get(k).unwrap_or(&Value::Null);
|
|
|
|
if v != old_val {
|
|
|
|
if v != old_val {
|
|
|
|
changes.insert(k.clone(), v.clone());
|
|
|
|
new_vals.insert(k.clone(), v.clone());
|
|
|
|
|
|
|
|
old_vals.insert(k.clone(), old_val.clone());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
changes.insert("type".to_string(), type_name.clone());
|
|
|
|
new_vals.insert("type".to_string(), type_name.clone());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
let mut complete = entity_fields.clone();
|
|
|
|
let mut complete = entity_fields.clone();
|
|
|
|
@ -743,33 +748,48 @@ impl Merger {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let new_val_obj = Value::Object(new_vals);
|
|
|
|
|
|
|
|
let old_val_obj = if old_vals.is_empty() {
|
|
|
|
|
|
|
|
Value::Null
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
Value::Object(old_vals)
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
let mut notification = serde_json::Map::new();
|
|
|
|
let mut notification = serde_json::Map::new();
|
|
|
|
notification.insert("complete".to_string(), Value::Object(complete));
|
|
|
|
notification.insert("complete".to_string(), Value::Object(complete));
|
|
|
|
if is_update {
|
|
|
|
notification.insert("new".to_string(), new_val_obj.clone());
|
|
|
|
notification.insert("changes".to_string(), Value::Object(changes.clone()));
|
|
|
|
|
|
|
|
|
|
|
|
if old_val_obj != Value::Null {
|
|
|
|
|
|
|
|
notification.insert("old".to_string(), old_val_obj.clone());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
let change_sql = format!(
|
|
|
|
let mut notify_sql = None;
|
|
|
|
"INSERT INTO agreego.change (changes, entity_id, id, kind, modified_at, modified_by) VALUES ({}, {}, {}, {}, {}, {})",
|
|
|
|
if type_obj.historical {
|
|
|
|
Self::quote_literal(&Value::Object(changes)),
|
|
|
|
let change_sql = format!(
|
|
|
|
Self::quote_literal(id_str),
|
|
|
|
"INSERT INTO agreego.change (\"old\", \"new\", entity_id, id, kind, modified_at, modified_by) VALUES ({}, {}, {}, {}, {}, {}, {})",
|
|
|
|
Self::quote_literal(&Value::String(uuid::Uuid::new_v4().to_string())),
|
|
|
|
Self::quote_literal(&old_val_obj),
|
|
|
|
Self::quote_literal(&Value::String(change_kind.to_string())),
|
|
|
|
Self::quote_literal(&new_val_obj),
|
|
|
|
Self::quote_literal(&Value::String(timestamp.to_string())),
|
|
|
|
Self::quote_literal(id_str),
|
|
|
|
Self::quote_literal(&Value::String(user_id.to_string()))
|
|
|
|
Self::quote_literal(&Value::String(uuid::Uuid::new_v4().to_string())),
|
|
|
|
);
|
|
|
|
Self::quote_literal(&Value::String(change_kind.to_string())),
|
|
|
|
|
|
|
|
Self::quote_literal(&Value::String(timestamp.to_string())),
|
|
|
|
|
|
|
|
Self::quote_literal(&Value::String(user_id.to_string()))
|
|
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
let notify_sql = format!(
|
|
|
|
self
|
|
|
|
"SELECT pg_notify('entity', {})",
|
|
|
|
.db
|
|
|
|
Self::quote_literal(&Value::String(Value::Object(notification).to_string()))
|
|
|
|
.execute(&change_sql, None)
|
|
|
|
);
|
|
|
|
.map_err(|e| format!("Executor Error in change: {:?}", e))?;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
self
|
|
|
|
if type_obj.notify {
|
|
|
|
.db
|
|
|
|
notify_sql = Some(format!(
|
|
|
|
.execute(&change_sql, None)
|
|
|
|
"SELECT pg_notify('entity', {})",
|
|
|
|
.map_err(|e| format!("Executor Error in change: {:?}", e))?;
|
|
|
|
Self::quote_literal(&Value::String(Value::Object(notification).to_string()))
|
|
|
|
|
|
|
|
));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
Ok(Some(notify_sql))
|
|
|
|
Ok(notify_sql)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
fn compare_entities(
|
|
|
|
fn compare_entities(
|
|
|
|
@ -821,6 +841,34 @@ impl Merger {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fn format_sql_value(val: &Value, key: &str, entity_type: &Type) -> String {
|
|
|
|
|
|
|
|
if val.as_str() == Some("") {
|
|
|
|
|
|
|
|
return "NULL".to_string();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut is_pg_array = false;
|
|
|
|
|
|
|
|
if let Some(field_types_map) = entity_type.field_types.as_ref().and_then(|v| v.as_object()) {
|
|
|
|
|
|
|
|
if let Some(t_val) = field_types_map.get(key) {
|
|
|
|
|
|
|
|
if let Some(t_str) = t_val.as_str() {
|
|
|
|
|
|
|
|
if t_str.starts_with('_') {
|
|
|
|
|
|
|
|
is_pg_array = true;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if is_pg_array && val.is_array() {
|
|
|
|
|
|
|
|
let mut s = val.to_string();
|
|
|
|
|
|
|
|
if s.starts_with('[') && s.ends_with(']') {
|
|
|
|
|
|
|
|
s.replace_range(0..1, "{");
|
|
|
|
|
|
|
|
s.replace_range(s.len() - 1..s.len(), "}");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
Self::quote_literal(&Value::String(s))
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
Self::quote_literal(val)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
fn quote_literal(val: &Value) -> String {
|
|
|
|
fn quote_literal(val: &Value) -> String {
|
|
|
|
match val {
|
|
|
|
match val {
|
|
|
|
Value::Null => "NULL".to_string(),
|
|
|
|
Value::Null => "NULL".to_string(),
|
|
|
|
|