queryer supports subfiltering now

This commit is contained in:
2026-03-15 07:49:05 -04:00
parent 6de75ba525
commit db5183930d
4 changed files with 195 additions and 138 deletions

View File

@ -81,6 +81,7 @@ The Merger provides an automated, high-performance graph synchronization engine
* **The Archive Paradigm**: Data is never deleted in the Punc system. The Merger securely enforces referential integrity by toggling the `archived` Boolean flag on the base `entity` table rather than issuing SQL `DELETE` commands.
* **Change Tracking & Reactivity**: The Merger diffs the incoming JSON against the existing database row (utilizing static, `DashMap`-cached `lk_` SELECT string templates). Every detected change is recorded into the `agreego.change` audit table, tracking the user mapping. It then natively uses `pg_notify` to broadcast a completely flat row-level diff out to the Go WebSocket server for O(1) routing.
* **Flat Structural Beats (Unidirectional Flow)**: The Merger purposefully DOES NOT trace or hydrate outbound Foreign Keys or nested parent structures during writes. It emits completely flat, mathematically perfect structural deltas via `pg_notify` representing only the exact Postgres rows that changed. This guarantees the write-path remains O(1) lightning fast. It is the strict responsibility of the upstream Punc Framework (the Go `Speaker`) to intercept these flat beats, evaluate them against active Websocket Schema Topologies, and dynamically issue targeted `jspg_query` reads to hydrate the exact contextual subgraphs required by listening clients.
* **Pre-Order Notification Traversal**: To support proper topological hydration on the upstream Go Framework, the Merger decouples the `pg_notify` execution from the physical database write execution. The engine collects structural changes and explicitly fires `pg_notify` SQL statements in strict **Pre-Order** (Parent -> Relations -> Children). This guarantees that WebSocket clients receive the parent entity `Beat` prior to any nested child entities, ensuring stable unidirectional data flows without hydration race conditions.
* **Many-to-Many Graph Edge Management**: Operates seamlessly with the global `agreego.relationship` table, allowing the system to represent and merge arbitrary reified M:M relationships directionally between any two entities.
* **Sparse Updates**: Empty JSON strings `""` are directly bound as explicit SQL `NULL` directives to clear data, whilst omitted (missing) properties skip UPDATE execution entirely, ensuring partial UI submissions do not wipe out sibling fields.
* **Unified Return Structure**: To eliminate UI hydration race conditions and multi-user duplication, `jspg_merge` explicitly strips the response graph and returns only the root `{ "id": "uuid" }` (or an array of IDs for list insertions). External APIs can then explicitly call read APIs to fetch the resulting graph, while the UI relies 100% implicitly on the flat `pg_notify` pipeline for reactive state synchronization.

View File

@ -1409,21 +1409,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:line_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"order_id\":\"abc\",",
" \"price\":99.0,",
" \"product\":\"Widget\",",
" \"type\":\"order_line\"",
" }",
"}')"
],
[
"INSERT INTO agreego.change (",
" changes,",
@ -1457,6 +1442,21 @@
" \"type\":\"order\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:line_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"order_id\":\"abc\",",
" \"price\":99.0,",
" \"product\":\"Widget\",",
" \"type\":\"order_line\"",
" }",
"}')"
]
]
}
@ -1587,19 +1587,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:phone1_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"number\":\"555-0001\",",
" \"type\":\"phone_number\"",
" }",
"}')"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
@ -1661,23 +1648,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact1_id}}\",",
" \"is_primary\":true,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:phone1_id}}\",",
" \"target_type\":\"phone_number\",",
" \"type\":\"contact\"",
" }",
"}')"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
@ -1722,19 +1692,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:phone2_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"number\":\"555-0002\",",
" \"type\":\"phone_number\"",
" }",
"}')"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
@ -1796,23 +1753,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact2_id}}\",",
" \"is_primary\":false,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:phone2_id}}\",",
" \"target_type\":\"phone_number\",",
" \"type\":\"contact\"",
" }",
"}')"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
@ -1857,19 +1797,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"address\":\"test@example.com\",",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:email1_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"type\":\"email_address\"",
" }",
"}')"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
@ -1931,23 +1858,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact3_id}}\",",
" \"is_primary\":false,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:email1_id}}\",",
" \"target_type\":\"email_address\",",
" \"type\":\"contact\"",
" }",
"}')"
],
[
"INSERT INTO agreego.change (",
" changes,",
@ -1982,6 +1892,96 @@
" \"type\":\"person\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact1_id}}\",",
" \"is_primary\":true,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:phone1_id}}\",",
" \"target_type\":\"phone_number\",",
" \"type\":\"contact\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:phone1_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"number\":\"555-0001\",",
" \"type\":\"phone_number\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact2_id}}\",",
" \"is_primary\":false,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:phone2_id}}\",",
" \"target_type\":\"phone_number\",",
" \"type\":\"contact\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:phone2_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"number\":\"555-0002\",",
" \"type\":\"phone_number\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact3_id}}\",",
" \"is_primary\":false,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:email1_id}}\",",
" \"target_type\":\"email_address\",",
" \"type\":\"contact\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"address\":\"test@example.com\",",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:email1_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"type\":\"email_address\"",
" }",
"}')"
]
]
}

View File

@ -947,6 +947,9 @@
"$eq": true,
"$ne": false
},
"contacts.#.is_primary": {
"$eq": true
},
"created_at": {
"$eq": "2020-01-01T00:00:00Z",
"$gt": "2019-01-01T00:00:00Z",
@ -970,10 +973,6 @@
"Bob"
]
},
"last_name": {
"$eq": "%Doe%",
"$ne": "%Smith%"
},
"id": {
"$eq": "00000000-0000-0000-0000-000000000001",
"$in": [
@ -983,6 +982,13 @@
"$nin": [
"00000000-0000-0000-0000-000000000002"
]
},
"last_name": {
"$eq": "%Doe%",
"$ne": "%Smith%"
},
"phone_numbers.#.target.number": {
"$eq": "555-1234"
}
},
"expect": {
@ -1037,6 +1043,7 @@
" JOIN agreego.entity t1_obj_t2_contacts_t3 ON t1_obj_t2_contacts_t3.id = t1_obj_t2_contacts_t2.id",
" WHERE",
" NOT t1_obj_t2_contacts_t1.archived",
" AND t1_obj_t2_contacts_t1.is_primary = ($11#>>'{}')::boolean",
" AND t1_obj_t2_contacts_t1.parent_id = t1_obj_t2.id),",
" 'created_at', t1_obj_t2.created_at,",
" 'email_addresses',",
@ -1093,6 +1100,7 @@
" JOIN agreego.entity t1_obj_t2_phone_numbers_t3_target_t2 ON t1_obj_t2_phone_numbers_t3_target_t2.id = t1_obj_t2_phone_numbers_t3_target_t1.id",
" WHERE",
" NOT t1_obj_t2_phone_numbers_t3_target_t1.archived",
" AND t1_obj_t2_phone_numbers_t3_target_t1.number ILIKE $32#>>'{}'",
" AND t1_obj_t2_phone_numbers_t3_target_t1.parent_id = t1_obj_t2_phone_numbers_t3.id",
" ),",
" 'type', t1_obj_t2_phone_numbers_t3.type",
@ -1119,26 +1127,26 @@
" AND t1_obj_t1.age NOT IN (SELECT value::numeric FROM jsonb_array_elements_text(($8#>>'{}')::jsonb))",
" AND t1_obj_t2.archived = ($9#>>'{}')::boolean",
" AND t1_obj_t2.archived != ($10#>>'{}')::boolean",
" AND t1_obj_t2.created_at = ($11#>>'{}')::timestamptz",
" AND t1_obj_t2.created_at > ($12#>>'{}')::timestamptz",
" AND t1_obj_t2.created_at >= ($13#>>'{}')::timestamptz",
" AND t1_obj_t2.created_at < ($14#>>'{}')::timestamptz",
" AND t1_obj_t2.created_at <= ($15#>>'{}')::timestamptz",
" AND t1_obj_t2.created_at != ($16#>>'{}')::timestamptz",
" AND t1_obj_t1.first_name ILIKE $17#>>'{}'",
" AND t1_obj_t1.first_name > ($18#>>'{}')",
" AND t1_obj_t1.first_name >= ($19#>>'{}')",
" AND t1_obj_t1.first_name IN (SELECT value FROM jsonb_array_elements_text(($20#>>'{}')::jsonb))",
" AND t1_obj_t1.first_name < ($21#>>'{}')",
" AND t1_obj_t1.first_name <= ($22#>>'{}')",
" AND t1_obj_t1.first_name NOT ILIKE $23#>>'{}'",
" AND t1_obj_t1.first_name NOT IN (SELECT value FROM jsonb_array_elements_text(($24#>>'{}')::jsonb))",
" AND t1_obj_t2.id = ($25#>>'{}')::uuid",
" AND t1_obj_t2.id IN (SELECT value::uuid FROM jsonb_array_elements_text(($26#>>'{}')::jsonb))",
" AND t1_obj_t2.id != ($27#>>'{}')::uuid",
" AND t1_obj_t2.id NOT IN (SELECT value::uuid FROM jsonb_array_elements_text(($28#>>'{}')::jsonb))",
" AND t1_obj_t1.last_name ILIKE $29#>>'{}'",
" AND t1_obj_t1.last_name NOT ILIKE $30#>>'{}'",
" AND t1_obj_t2.created_at = ($12#>>'{}')::timestamptz",
" AND t1_obj_t2.created_at > ($13#>>'{}')::timestamptz",
" AND t1_obj_t2.created_at >= ($14#>>'{}')::timestamptz",
" AND t1_obj_t2.created_at < ($15#>>'{}')::timestamptz",
" AND t1_obj_t2.created_at <= ($16#>>'{}')::timestamptz",
" AND t1_obj_t2.created_at != ($17#>>'{}')::timestamptz",
" AND t1_obj_t1.first_name ILIKE $18#>>'{}'",
" AND t1_obj_t1.first_name > ($19#>>'{}')",
" AND t1_obj_t1.first_name >= ($20#>>'{}')",
" AND t1_obj_t1.first_name IN (SELECT value FROM jsonb_array_elements_text(($21#>>'{}')::jsonb))",
" AND t1_obj_t1.first_name < ($22#>>'{}')",
" AND t1_obj_t1.first_name <= ($23#>>'{}')",
" AND t1_obj_t1.first_name NOT ILIKE $24#>>'{}'",
" AND t1_obj_t1.first_name NOT IN (SELECT value FROM jsonb_array_elements_text(($25#>>'{}')::jsonb))",
" AND t1_obj_t2.id = ($26#>>'{}')::uuid",
" AND t1_obj_t2.id IN (SELECT value::uuid FROM jsonb_array_elements_text(($27#>>'{}')::jsonb))",
" AND t1_obj_t2.id != ($28#>>'{}')::uuid",
" AND t1_obj_t2.id NOT IN (SELECT value::uuid FROM jsonb_array_elements_text(($29#>>'{}')::jsonb))",
" AND t1_obj_t1.last_name ILIKE $30#>>'{}'",
" AND t1_obj_t1.last_name NOT ILIKE $31#>>'{}'",
")"
]
]

View File

@ -47,7 +47,7 @@ impl SqlCompiler {
// We expect the top level to typically be an Object or Array
let is_stem_query = stem_path.is_some();
let (sql, _) = self.walk_schema(target_schema, "t1", None, filter_keys, is_stem_query, 0)?;
let (sql, _) = self.walk_schema(target_schema, "t1", None, filter_keys, is_stem_query, 0, String::new())?;
Ok(sql)
}
@ -61,12 +61,19 @@ impl SqlCompiler {
filter_keys: &[String],
is_stem_query: bool,
depth: usize,
current_path: String,
) -> Result<(String, String), String> {
// Determine the base schema type (could be an array, object, or literal)
match &schema.obj.type_ {
Some(crate::database::schema::SchemaTypeOrArray::Single(t)) if t == "array" => {
// Handle Arrays:
if let Some(items) = &schema.obj.items {
let next_path = if current_path.is_empty() {
String::from("#")
} else {
format!("{}.#", current_path)
};
if let Some(ref_id) = &items.obj.r#ref {
if let Some(type_def) = self.db.types.get(ref_id) {
return self.compile_entity_node(
@ -78,6 +85,7 @@ impl SqlCompiler {
filter_keys,
is_stem_query,
depth,
next_path,
);
}
}
@ -88,6 +96,7 @@ impl SqlCompiler {
filter_keys,
is_stem_query,
depth + 1,
next_path,
)?;
return Ok((
format!("(SELECT jsonb_agg({}) FROM TODO)", item_sql),
@ -121,6 +130,7 @@ impl SqlCompiler {
filter_keys,
is_stem_query,
depth,
current_path,
);
}
@ -135,6 +145,7 @@ impl SqlCompiler {
filter_keys,
is_stem_query,
depth,
current_path,
);
}
return Err(format!("Unresolved $ref: {}", ref_id));
@ -148,6 +159,7 @@ impl SqlCompiler {
filter_keys,
is_stem_query,
depth,
current_path,
);
}
@ -195,6 +207,7 @@ impl SqlCompiler {
filter_keys: &[String],
is_stem_query: bool,
depth: usize,
current_path: String,
) -> Result<(String, String), String> {
let local_ctx = format!("{}_{}", parent_alias, prop_name.unwrap_or("obj"));
@ -210,6 +223,7 @@ impl SqlCompiler {
filter_keys,
is_stem_query,
depth,
&current_path,
)?;
let jsonb_obj_sql = if select_args.is_empty() {
@ -226,6 +240,7 @@ impl SqlCompiler {
parent_alias,
prop_name,
filter_keys,
&current_path,
)?;
let selection = if is_array {
@ -285,6 +300,7 @@ impl SqlCompiler {
filter_keys: &[String],
is_stem_query: bool,
depth: usize,
current_path: &str,
) -> Result<Vec<String>, String> {
let mut select_args = Vec::new();
let grouped_fields = type_def.grouped_fields.as_ref().and_then(|v| v.as_object());
@ -310,6 +326,12 @@ impl SqlCompiler {
}
}
let next_path = if current_path.is_empty() {
prop_key.clone()
} else {
format!("{}.{}", current_path, prop_key)
};
let (val_sql, val_type) = self.walk_schema(
prop_schema,
&owner_alias,
@ -317,6 +339,7 @@ impl SqlCompiler {
filter_keys,
is_stem_query,
depth + 1,
next_path,
)?;
if val_type != "abort" {
@ -334,6 +357,7 @@ impl SqlCompiler {
parent_alias: &str,
prop_name: Option<&str>,
filter_keys: &[String],
current_path: &str,
) -> Result<Vec<String>, String> {
let base_alias = table_aliases
.get(&type_def.name)
@ -343,12 +367,29 @@ impl SqlCompiler {
let mut where_clauses = Vec::new();
where_clauses.push(format!("NOT {}.archived", base_alias));
if parent_alias == "t1" {
for (i, filter_key) in filter_keys.iter().enumerate() {
let mut parts = filter_key.split(':');
let field_name = parts.next().unwrap_or(filter_key);
let full_field_path = parts.next().unwrap_or(filter_key);
let op = parts.next().unwrap_or("$eq");
let field_name = if current_path.is_empty() {
if full_field_path.contains('.') || full_field_path.contains('#') {
continue;
}
full_field_path
} else {
let prefix = format!("{}.", current_path);
if full_field_path.starts_with(&prefix) {
let remainder = &full_field_path[prefix.len()..];
if remainder.contains('.') || remainder.contains('#') {
continue;
}
remainder
} else {
continue;
}
};
let mut filter_alias = base_alias.clone();
if let Some(gf) = type_def.grouped_fields.as_ref().and_then(|v| v.as_object()) {
@ -455,7 +496,6 @@ impl SqlCompiler {
));
}
}
}
if let Some(_prop) = prop_name {
where_clauses.push(format!("{}.parent_id = {}.id", base_alias, parent_alias));
@ -471,9 +511,16 @@ impl SqlCompiler {
filter_keys: &[String],
is_stem_query: bool,
depth: usize,
current_path: String,
) -> Result<(String, String), String> {
let mut build_args = Vec::new();
for (k, v) in props {
let next_path = if current_path.is_empty() {
k.clone()
} else {
format!("{}.{}", current_path, k)
};
let (child_sql, val_type) = self.walk_schema(
v,
parent_alias,
@ -481,6 +528,7 @@ impl SqlCompiler {
filter_keys,
is_stem_query,
depth + 1,
next_path,
)?;
if val_type == "abort" {
continue;