merger now requires a schema id, queryer and merger now use pre-compiled edges for O(1) relations

This commit is contained in:
2026-03-21 20:33:28 -04:00
parent 9bdb767685
commit 882bdc6271
13 changed files with 1370 additions and 307 deletions

View File

@ -21,10 +21,26 @@ impl Merger {
}
}
pub fn merge(&self, data: Value) -> crate::drop::Drop {
pub fn merge(&self, schema_id: &str, data: Value) -> crate::drop::Drop {
let mut notifications_queue = Vec::new();
let result = self.merge_internal(data.clone(), &mut notifications_queue);
let target_schema = match self.db.schemas.get(schema_id) {
Some(s) => Arc::new(s.clone()),
None => {
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "MERGE_FAILED".to_string(),
message: format!("Unknown schema_id: {}", schema_id),
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: Some(data),
schema: None,
},
}]);
}
};
let result = self.merge_internal(target_schema, data.clone(), &mut notifications_queue);
let val_resolved = match result {
Ok(val) => val,
@ -88,24 +104,35 @@ impl Merger {
pub(crate) fn merge_internal(
&self,
schema: Arc<crate::database::schema::Schema>,
data: Value,
notifications: &mut Vec<String>,
) -> Result<Value, String> {
match data {
Value::Array(items) => self.merge_array(items, notifications),
Value::Object(map) => self.merge_object(map, notifications),
Value::Array(items) => self.merge_array(schema, items, notifications),
Value::Object(map) => self.merge_object(schema, map, notifications),
_ => Err("Invalid merge payload: root must be an Object or Array".to_string()),
}
}
fn merge_array(
&self,
schema: Arc<crate::database::schema::Schema>,
items: Vec<Value>,
notifications: &mut Vec<String>,
) -> Result<Value, String> {
let mut item_schema = schema.clone();
if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) = &schema.obj.type_ {
if t == "array" {
if let Some(items_def) = &schema.obj.items {
item_schema = items_def.clone();
}
}
}
let mut resolved_items = Vec::new();
for item in items {
let resolved = self.merge_internal(item, notifications)?;
let resolved = self.merge_internal(item_schema.clone(), item, notifications)?;
resolved_items.push(resolved);
}
Ok(Value::Array(resolved_items))
@ -113,6 +140,7 @@ impl Merger {
fn merge_object(
&self,
schema: Arc<crate::database::schema::Schema>,
obj: serde_json::Map<String, Value>,
notifications: &mut Vec<String>,
) -> Result<Value, String> {
@ -128,25 +156,49 @@ impl Merger {
None => return Err(format!("Unknown entity type: {}", type_name)),
};
// 1. Segment the entity: fields in type_def.fields are database fields, others are relationships
let compiled_props = match schema.obj.compiled_properties.get() {
Some(props) => props,
None => return Err("Schema has no compiled properties for merging".to_string()),
};
let mut entity_fields = serde_json::Map::new();
let mut entity_objects = serde_json::Map::new();
let mut entity_arrays = serde_json::Map::new();
let mut entity_objects = std::collections::BTreeMap::new();
let mut entity_arrays = std::collections::BTreeMap::new();
for (k, v) in obj {
let is_field = type_def.fields.contains(&k) || k == "created";
let typeof_v = match &v {
Value::Object(_) => "object",
Value::Array(_) => "array",
_ => "other",
};
// Always retain system and unmapped core fields natively implicitly mapped to the Postgres tables
if k == "id" || k == "type" || k == "created" {
entity_fields.insert(k.clone(), v.clone());
continue;
}
if is_field {
entity_fields.insert(k, v);
} else if typeof_v == "object" {
entity_objects.insert(k, v);
} else if typeof_v == "array" {
entity_arrays.insert(k, v);
if let Some(prop_schema) = compiled_props.get(&k) {
let mut is_edge = false;
if let Some(edges) = schema.obj.compiled_edges.get() {
if edges.contains_key(&k) {
is_edge = true;
}
}
if is_edge {
let typeof_v = match &v {
Value::Object(_) => "object",
Value::Array(_) => "array",
_ => "field", // Malformed edge data?
};
if typeof_v == "object" {
entity_objects.insert(k.clone(), (v.clone(), prop_schema.clone()));
} else if typeof_v == "array" {
entity_arrays.insert(k.clone(), (v.clone(), prop_schema.clone()));
} else {
entity_fields.insert(k.clone(), v.clone());
}
} else {
// Not an edge! It's a raw Postgres column (e.g., JSONB, text[])
entity_fields.insert(k.clone(), v.clone());
}
} else if type_def.fields.contains(&k) {
entity_fields.insert(k.clone(), v.clone());
}
}
@ -156,7 +208,6 @@ impl Merger {
let mut entity_change_kind = None;
let mut entity_fetched = None;
// 2. Pre-stage the entity (for non-relationships)
if !type_def.relationship {
let (fields, kind, fetched) =
self.stage_entity(entity_fields.clone(), type_def, &user_id, &timestamp)?;
@ -167,82 +218,74 @@ impl Merger {
let mut entity_response = serde_json::Map::new();
// 3. Handle related objects
for (relation_name, relative_val) in entity_objects {
for (relation_name, (relative_val, rel_schema)) in entity_objects {
let mut relative = match relative_val {
Value::Object(m) => m,
_ => continue,
};
// Attempt to extract relative object type name
let relative_type_name = match relative.get("type").and_then(|v| v.as_str()) {
Some(t) => t.to_string(),
None => continue,
};
let relative_keys: Vec<String> = relative.keys().cloned().collect();
if let Some(compiled_edges) = schema.obj.compiled_edges.get() {
println!("Compiled Edges keys for relation {}: {:?}", relation_name, compiled_edges.keys().collect::<Vec<_>>());
if let Some(edge) = compiled_edges.get(&relation_name) {
println!("FOUND EDGE {} -> {:?}", relation_name, edge.constraint);
if let Some(relation) = self.db.relations.get(&edge.constraint) {
let parent_is_source = edge.forward;
if parent_is_source {
if !relative.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
relative.insert("organization_id".to_string(), org_id.clone());
}
}
// Call central Database O(1) graph logic
let relative_relation = self.db.get_relation(
&type_def.name,
&relative_type_name,
&relation_name,
Some(&relative_keys),
);
let mut merged_relative = match self.merge_internal(rel_schema.clone(), Value::Object(relative), notifications)? {
Value::Object(m) => m,
_ => continue,
};
if let Some((relation, parent_is_source)) = relative_relation {
merged_relative.insert(
"type".to_string(),
Value::String(relative_type_name),
);
if parent_is_source {
// Parent holds FK to Child. Child MUST be generated FIRST.
if !relative.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
relative.insert("organization_id".to_string(), org_id.clone());
Self::apply_entity_relation(
&mut entity_fields,
&relation.source_columns,
&relation.destination_columns,
&merged_relative,
);
entity_response.insert(relation_name, Value::Object(merged_relative));
} else {
if !relative.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
relative.insert("organization_id".to_string(), org_id.clone());
}
}
Self::apply_entity_relation(
&mut relative,
&relation.source_columns,
&relation.destination_columns,
&entity_fields,
);
let merged_relative = match self.merge_internal(rel_schema.clone(), Value::Object(relative), notifications)? {
Value::Object(m) => m,
_ => continue,
};
entity_response.insert(relation_name, Value::Object(merged_relative));
}
}
let mut merged_relative = match self.merge_internal(Value::Object(relative), notifications)? {
Value::Object(m) => m,
_ => continue,
};
merged_relative.insert(
"type".to_string(),
Value::String(relative_type_name),
);
Self::apply_entity_relation(
&mut entity_fields,
&relation.source_columns,
&relation.destination_columns,
&merged_relative,
);
entity_response.insert(relation_name, Value::Object(merged_relative));
} else {
// Child holds FK back to Parent.
if !relative.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
relative.insert("organization_id".to_string(), org_id.clone());
}
}
Self::apply_entity_relation(
&mut relative,
&relation.source_columns,
&relation.destination_columns,
&entity_fields,
);
let merged_relative = match self.merge_internal(Value::Object(relative), notifications)? {
Value::Object(m) => m,
_ => continue,
};
entity_response.insert(relation_name, Value::Object(merged_relative));
}
}
}
// 4. Post-stage the entity (for relationships)
if type_def.relationship {
let (fields, kind, fetched) =
self.stage_entity(entity_fields.clone(), type_def, &user_id, &timestamp)?;
@ -251,7 +294,6 @@ impl Merger {
entity_fetched = fetched;
}
// 5. Process the main entity fields
self.merge_entity_fields(
entity_change_kind.as_deref().unwrap_or(""),
&type_name,
@ -260,13 +302,11 @@ impl Merger {
entity_fetched.as_ref(),
)?;
// Add main entity fields to response
for (k, v) in &entity_fields {
entity_response.insert(k.clone(), v.clone());
}
// 6. Handle related arrays
for (relation_name, relative_val) in entity_arrays {
for (relation_name, (relative_val, rel_schema)) in entity_arrays {
let relative_arr = match relative_val {
Value::Array(a) => a,
_ => continue,
@ -276,54 +316,46 @@ impl Merger {
continue;
}
let first_relative = match &relative_arr[0] {
Value::Object(m) => m,
_ => continue,
};
if let Some(compiled_edges) = schema.obj.compiled_edges.get() {
if let Some(edge) = compiled_edges.get(&relation_name) {
if let Some(relation) = self.db.relations.get(&edge.constraint) {
let mut relative_responses = Vec::new();
for relative_item_val in relative_arr {
if let Value::Object(mut relative_item) = relative_item_val {
if !relative_item.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
relative_item.insert("organization_id".to_string(), org_id.clone());
}
}
// Attempt to extract relative object type name
let relative_type_name = match first_relative.get("type").and_then(|v| v.as_str()) {
Some(t) => t,
None => continue,
};
Self::apply_entity_relation(
&mut relative_item,
&relation.source_columns,
&relation.destination_columns,
&entity_fields,
);
let relative_keys: Vec<String> = first_relative.keys().cloned().collect();
let mut item_schema = rel_schema.clone();
if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) = &rel_schema.obj.type_ {
if t == "array" {
if let Some(items_def) = &rel_schema.obj.items {
item_schema = items_def.clone();
}
}
}
// Call central Database O(1) graph logic
let relative_relation = self.db.get_relation(
&type_def.name,
relative_type_name,
&relation_name,
Some(&relative_keys),
);
let merged_relative =
match self.merge_internal(item_schema, Value::Object(relative_item), notifications)? {
Value::Object(m) => m,
_ => continue,
};
if let Some((relation, _)) = relative_relation {
let mut relative_responses = Vec::new();
for relative_item_val in relative_arr {
if let Value::Object(mut relative_item) = relative_item_val {
if !relative_item.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
relative_item.insert("organization_id".to_string(), org_id.clone());
relative_responses.push(Value::Object(merged_relative));
}
}
Self::apply_entity_relation(
&mut relative_item,
&relation.source_columns,
&relation.destination_columns,
&entity_fields,
);
let merged_relative =
match self.merge_internal(Value::Object(relative_item), notifications)? {
Value::Object(m) => m,
_ => continue,
};
relative_responses.push(Value::Object(merged_relative));
entity_response.insert(relation_name, Value::Array(relative_responses));
}
}
entity_response.insert(relation_name, Value::Array(relative_responses));
}
}