test pgrx no fixes

This commit is contained in:
2026-03-27 18:02:24 -04:00
parent dd98bfac9e
commit 7523431007
3 changed files with 129 additions and 94 deletions

View File

@ -19,7 +19,7 @@
{
"id": "22222222-2222-2222-2222-222222222222",
"type": "relation",
"constraint": "fk_order_customer",
"constraint": "fk_order_customer_person",
"source_type": "order",
"source_columns": [
"customer_id"
@ -41,8 +41,7 @@
"destination_type": "order",
"destination_columns": [
"id"
],
"prefix": "lines"
]
},
{
"id": "44444444-4444-4444-4444-444444444444",

View File

@ -71,7 +71,7 @@
{
"id": "22222222-2222-2222-2222-222222222222",
"type": "relation",
"constraint": "fk_order_customer",
"constraint": "fk_order_customer_person",
"source_type": "order",
"source_columns": [
"customer_id"
@ -79,7 +79,8 @@
"destination_type": "person",
"destination_columns": [
"id"
]
],
"prefix": "customer"
},
{
"id": "22222222-2222-2222-2222-222222222227",

View File

@ -3,8 +3,8 @@
pub mod cache;
use crate::database::r#type::Type;
use crate::database::Database;
use crate::database::r#type::Type;
use serde_json::Value;
use std::sync::Arc;
@ -25,19 +25,19 @@ impl Merger {
let mut notifications_queue = Vec::new();
let target_schema = match self.db.schemas.get(schema_id) {
Some(s) => Arc::new(s.clone()),
None => {
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "MERGE_FAILED".to_string(),
message: format!("Unknown schema_id: {}", schema_id),
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: Some(data),
schema: None,
},
}]);
}
Some(s) => Arc::new(s.clone()),
None => {
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "MERGE_FAILED".to_string(),
message: format!("Unknown schema_id: {}", schema_id),
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: Some(data),
schema: None,
},
}]);
}
};
let result = self.merge_internal(target_schema, data.clone(), &mut notifications_queue);
@ -50,18 +50,24 @@ impl Merger {
let mut final_cause = None;
if let Ok(Value::Object(map)) = serde_json::from_str::<Value>(&msg) {
if let (Some(Value::String(e_msg)), Some(Value::String(e_code))) = (map.get("error"), map.get("code")) {
if let (Some(Value::String(e_msg)), Some(Value::String(e_code))) =
(map.get("error"), map.get("code"))
{
final_message = e_msg.clone();
final_code = e_code.clone();
let mut cause_parts = Vec::new();
if let Some(Value::String(d)) = map.get("detail") {
if !d.is_empty() { cause_parts.push(d.clone()); }
if !d.is_empty() {
cause_parts.push(d.clone());
}
}
if let Some(Value::String(h)) = map.get("hint") {
if !h.is_empty() { cause_parts.push(h.clone()); }
if !h.is_empty() {
cause_parts.push(h.clone());
}
}
if !cause_parts.is_empty() {
final_cause = Some(cause_parts.join("\n"));
final_cause = Some(cause_parts.join("\n"));
}
}
}
@ -144,11 +150,11 @@ impl Merger {
) -> Result<Value, String> {
let mut item_schema = schema.clone();
if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) = &schema.obj.type_ {
if t == "array" {
if let Some(items_def) = &schema.obj.items {
item_schema = items_def.clone();
}
if t == "array" {
if let Some(items_def) = &schema.obj.items {
item_schema = items_def.clone();
}
}
}
let mut resolved_items = Vec::new();
@ -178,48 +184,48 @@ impl Merger {
};
let compiled_props = match schema.obj.compiled_properties.get() {
Some(props) => props,
None => return Err("Schema has no compiled properties for merging".to_string()),
Some(props) => props,
None => return Err("Schema has no compiled properties for merging".to_string()),
};
let mut entity_fields = serde_json::Map::new();
let mut entity_objects = std::collections::BTreeMap::new();
let mut entity_arrays = std::collections::BTreeMap::new();
for (k, v) in obj {
for (k, v) in obj.clone() {
// Always retain system and unmapped core fields natively implicitly mapped to the Postgres tables
if k == "id" || k == "type" || k == "created" {
entity_fields.insert(k.clone(), v.clone());
continue;
entity_fields.insert(k.clone(), v.clone());
continue;
}
if let Some(prop_schema) = compiled_props.get(&k) {
let mut is_edge = false;
if let Some(edges) = schema.obj.compiled_edges.get() {
if edges.contains_key(&k) {
is_edge = true;
}
}
if is_edge {
let typeof_v = match &v {
Value::Object(_) => "object",
Value::Array(_) => "array",
_ => "field", // Malformed edge data?
};
if typeof_v == "object" {
entity_objects.insert(k.clone(), (v.clone(), prop_schema.clone()));
} else if typeof_v == "array" {
entity_arrays.insert(k.clone(), (v.clone(), prop_schema.clone()));
} else {
entity_fields.insert(k.clone(), v.clone());
}
} else {
// Not an edge! It's a raw Postgres column (e.g., JSONB, text[])
entity_fields.insert(k.clone(), v.clone());
}
} else if type_def.fields.contains(&k) {
let mut is_edge = false;
if let Some(edges) = schema.obj.compiled_edges.get() {
if edges.contains_key(&k) {
is_edge = true;
}
}
if is_edge {
let typeof_v = match &v {
Value::Object(_) => "object",
Value::Array(_) => "array",
_ => "field", // Malformed edge data?
};
if typeof_v == "object" {
entity_objects.insert(k.clone(), (v.clone(), prop_schema.clone()));
} else if typeof_v == "array" {
entity_arrays.insert(k.clone(), (v.clone(), prop_schema.clone()));
} else {
entity_fields.insert(k.clone(), v.clone());
}
} else {
// Not an edge! It's a raw Postgres column (e.g., JSONB, text[])
entity_fields.insert(k.clone(), v.clone());
}
} else if type_def.fields.contains(&k) {
entity_fields.insert(k.clone(), v.clone());
}
}
@ -253,12 +259,16 @@ impl Merger {
};
if let Some(compiled_edges) = schema.obj.compiled_edges.get() {
println!("Compiled Edges keys for relation {}: {:?}", relation_name, compiled_edges.keys().collect::<Vec<_>>());
println!(
"Compiled Edges keys for relation {}: {:?}",
relation_name,
compiled_edges.keys().collect::<Vec<_>>()
);
if let Some(edge) = compiled_edges.get(&relation_name) {
println!("FOUND EDGE {} -> {:?}", relation_name, edge.constraint);
if let Some(relation) = self.db.relations.get(&edge.constraint) {
let parent_is_source = edge.forward;
if parent_is_source {
if !relative.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
@ -266,15 +276,16 @@ impl Merger {
}
}
let mut merged_relative = match self.merge_internal(rel_schema.clone(), Value::Object(relative), notifications)? {
let mut merged_relative = match self.merge_internal(
rel_schema.clone(),
Value::Object(relative),
notifications,
)? {
Value::Object(m) => m,
_ => continue,
};
merged_relative.insert(
"type".to_string(),
Value::String(relative_type_name),
);
merged_relative.insert("type".to_string(), Value::String(relative_type_name));
Self::apply_entity_relation(
&mut entity_fields,
@ -297,7 +308,11 @@ impl Merger {
&entity_fields,
);
let merged_relative = match self.merge_internal(rel_schema.clone(), Value::Object(relative), notifications)? {
let merged_relative = match self.merge_internal(
rel_schema.clone(),
Value::Object(relative),
notifications,
)? {
Value::Object(m) => m,
_ => continue,
};
@ -318,6 +333,20 @@ impl Merger {
entity_replaces = replaces;
}
#[cfg(not(test))]
if type_name == "contact" || type_name == "person" {
pgrx::notice!("=== DEBUG {} PAYLOAD ===", type_name);
pgrx::notice!("1. Incoming obj iteration: {:?}", obj);
pgrx::notice!("2. Final mapped entity_fields: {:?}", entity_fields);
pgrx::notice!(
"3. TypeDef fields check: {:?}",
type_def.fields.contains(&"source_id".to_string())
);
if !entity_fields.contains_key("source_id") {
pgrx::notice!("CRITICAL ERROR: source_id was dropped during mapping loop!");
}
}
self.merge_entity_fields(
entity_change_kind.as_deref().unwrap_or(""),
&type_name,
@ -360,19 +389,24 @@ impl Merger {
);
let mut item_schema = rel_schema.clone();
if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) = &rel_schema.obj.type_ {
if t == "array" {
if let Some(items_def) = &rel_schema.obj.items {
item_schema = items_def.clone();
}
if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) =
&rel_schema.obj.type_
{
if t == "array" {
if let Some(items_def) = &rel_schema.obj.items {
item_schema = items_def.clone();
}
}
}
let merged_relative =
match self.merge_internal(item_schema, Value::Object(relative_item), notifications)? {
Value::Object(m) => m,
_ => continue,
};
let merged_relative = match self.merge_internal(
item_schema,
Value::Object(relative_item),
notifications,
)? {
Value::Object(m) => m,
_ => continue,
};
relative_responses.push(Value::Object(merged_relative));
}
@ -433,8 +467,8 @@ impl Merger {
// An anchor is STRICTLY a struct containing merely an `id` and `type`.
// We aggressively bypass Database SPI `SELECT` fetches because there are no primitive
// mutations to apply to the row. PostgreSQL inherently protects relationships via Foreign Keys downstream.
let is_anchor = entity_fields.len() == 2
&& entity_fields.contains_key("id")
let is_anchor = entity_fields.len() == 2
&& entity_fields.contains_key("id")
&& entity_fields.contains_key("type");
let has_valid_id = entity_fields
@ -450,13 +484,13 @@ impl Merger {
let mut replaces_id = None;
if let Some(ref fetched_row) = entity_fetched {
let provided_id = entity_fields.get("id").and_then(|v| v.as_str());
let fetched_id = fetched_row.get("id").and_then(|v| v.as_str());
if let (Some(pid), Some(fid)) = (provided_id, fetched_id) {
if !pid.is_empty() && pid != fid {
replaces_id = Some(pid.to_string());
}
let provided_id = entity_fields.get("id").and_then(|v| v.as_str());
let fetched_id = fetched_row.get("id").and_then(|v| v.as_str());
if let (Some(pid), Some(fid)) = (provided_id, fetched_id) {
if !pid.is_empty() && pid != fid {
replaces_id = Some(pid.to_string());
}
}
}
let system_keys = vec![
@ -548,7 +582,12 @@ impl Merger {
entity_fields = new_fields;
}
Ok((entity_fields, entity_change_kind, entity_fetched, replaces_id))
Ok((
entity_fields,
entity_change_kind,
entity_fetched,
replaces_id,
))
}
fn fetch_entity(
@ -735,9 +774,7 @@ impl Merger {
columns.join(", "),
values.join(", ")
);
self
.db
.execute(&sql, None)?;
self.db.execute(&sql, None)?;
} else if change_kind == "update" || change_kind == "delete" {
entity_pairs.remove("id");
entity_pairs.remove("type");
@ -769,9 +806,7 @@ impl Merger {
set_clauses.join(", "),
Self::quote_literal(&Value::String(id_str.to_string()))
);
self
.db
.execute(&sql, None)?;
self.db.execute(&sql, None)?;
}
}
@ -857,13 +892,13 @@ impl Merger {
let mut notification = serde_json::Map::new();
notification.insert("complete".to_string(), Value::Object(complete));
notification.insert("new".to_string(), new_val_obj.clone());
if old_val_obj != Value::Null {
notification.insert("old".to_string(), old_val_obj.clone());
notification.insert("old".to_string(), old_val_obj.clone());
}
if let Some(rep) = replaces_id {
notification.insert("replaces".to_string(), Value::String(rep.to_string()));
notification.insert("replaces".to_string(), Value::String(rep.to_string()));
}
let mut notify_sql = None;