diff --git a/GEMINI.md b/GEMINI.md index 521bcd2..3822455 100644 --- a/GEMINI.md +++ b/GEMINI.md @@ -81,6 +81,7 @@ The Merger provides an automated, high-performance graph synchronization engine * **The Archive Paradigm**: Data is never deleted in the Punc system. The Merger securely enforces referential integrity by toggling the `archived` Boolean flag on the base `entity` table rather than issuing SQL `DELETE` commands. * **Change Tracking & Reactivity**: The Merger diffs the incoming JSON against the existing database row (utilizing static, `DashMap`-cached `lk_` SELECT string templates). Every detected change is recorded into the `agreego.change` audit table, tracking the user mapping. It then natively uses `pg_notify` to broadcast a completely flat row-level diff out to the Go WebSocket server for O(1) routing. * **Flat Structural Beats (Unidirectional Flow)**: The Merger purposefully DOES NOT trace or hydrate outbound Foreign Keys or nested parent structures during writes. It emits completely flat, mathematically perfect structural deltas via `pg_notify` representing only the exact Postgres rows that changed. This guarantees the write-path remains O(1) lightning fast. It is the strict responsibility of the upstream Punc Framework (the Go `Speaker`) to intercept these flat beats, evaluate them against active Websocket Schema Topologies, and dynamically issue targeted `jspg_query` reads to hydrate the exact contextual subgraphs required by listening clients. +* **Pre-Order Notification Traversal**: To support proper topological hydration on the upstream Go Framework, the Merger decouples the `pg_notify` execution from the physical database write execution. The engine collects structural changes and explicitly fires `pg_notify` SQL statements in strict **Pre-Order** (Parent -> Relations -> Children). This guarantees that WebSocket clients receive the parent entity `Beat` prior to any nested child entities, ensuring stable unidirectional data flows without hydration race conditions. * **Many-to-Many Graph Edge Management**: Operates seamlessly with the global `agreego.relationship` table, allowing the system to represent and merge arbitrary reified M:M relationships directionally between any two entities. * **Sparse Updates**: Empty JSON strings `""` are directly bound as explicit SQL `NULL` directives to clear data, whilst omitted (missing) properties skip UPDATE execution entirely, ensuring partial UI submissions do not wipe out sibling fields. * **Unified Return Structure**: To eliminate UI hydration race conditions and multi-user duplication, `jspg_merge` explicitly strips the response graph and returns only the root `{ "id": "uuid" }` (or an array of IDs for list insertions). External APIs can then explicitly call read APIs to fetch the resulting graph, while the UI relies 100% implicitly on the flat `pg_notify` pipeline for reactive state synchronization. diff --git a/fixtures/merger.json b/fixtures/merger.json index 8370e5b..cafd284 100644 --- a/fixtures/merger.json +++ b/fixtures/merger.json @@ -1409,21 +1409,6 @@ " '00000000-0000-0000-0000-000000000000'", ")" ], - [ - "SELECT pg_notify('entity', '{", - " \"complete\":{", - " \"created_at\":\"{{timestamp}}\",", - " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"id\":\"{{uuid:line_id}}\",", - " \"modified_at\":\"{{timestamp}}\",", - " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"order_id\":\"abc\",", - " \"price\":99.0,", - " \"product\":\"Widget\",", - " \"type\":\"order_line\"", - " }", - "}')" - ], [ "INSERT INTO agreego.change (", " changes,", @@ -1457,6 +1442,21 @@ " \"type\":\"order\"", " }", "}')" + ], + [ + "SELECT pg_notify('entity', '{", + " \"complete\":{", + " \"created_at\":\"{{timestamp}}\",", + " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"id\":\"{{uuid:line_id}}\",", + " \"modified_at\":\"{{timestamp}}\",", + " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"order_id\":\"abc\",", + " \"price\":99.0,", + " \"product\":\"Widget\",", + " \"type\":\"order_line\"", + " }", + "}')" ] ] } @@ -1587,19 +1587,6 @@ " '00000000-0000-0000-0000-000000000000'", ")" ], - [ - "SELECT pg_notify('entity', '{", - " \"complete\":{", - " \"created_at\":\"{{timestamp}}\",", - " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"id\":\"{{uuid:phone1_id}}\",", - " \"modified_at\":\"{{timestamp}}\",", - " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"number\":\"555-0001\",", - " \"type\":\"phone_number\"", - " }", - "}')" - ], [ "INSERT INTO agreego.\"entity\" (", " \"created_at\",", @@ -1661,23 +1648,6 @@ " '00000000-0000-0000-0000-000000000000'", ")" ], - [ - "SELECT pg_notify('entity', '{", - " \"complete\":{", - " \"created_at\":\"{{timestamp}}\",", - " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"id\":\"{{uuid:contact1_id}}\",", - " \"is_primary\":true,", - " \"modified_at\":\"{{timestamp}}\",", - " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"source_id\":\"{{uuid:person_id}}\",", - " \"source_type\":\"person\",", - " \"target_id\":\"{{uuid:phone1_id}}\",", - " \"target_type\":\"phone_number\",", - " \"type\":\"contact\"", - " }", - "}')" - ], [ "INSERT INTO agreego.\"entity\" (", " \"created_at\",", @@ -1722,19 +1692,6 @@ " '00000000-0000-0000-0000-000000000000'", ")" ], - [ - "SELECT pg_notify('entity', '{", - " \"complete\":{", - " \"created_at\":\"{{timestamp}}\",", - " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"id\":\"{{uuid:phone2_id}}\",", - " \"modified_at\":\"{{timestamp}}\",", - " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"number\":\"555-0002\",", - " \"type\":\"phone_number\"", - " }", - "}')" - ], [ "INSERT INTO agreego.\"entity\" (", " \"created_at\",", @@ -1796,23 +1753,6 @@ " '00000000-0000-0000-0000-000000000000'", ")" ], - [ - "SELECT pg_notify('entity', '{", - " \"complete\":{", - " \"created_at\":\"{{timestamp}}\",", - " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"id\":\"{{uuid:contact2_id}}\",", - " \"is_primary\":false,", - " \"modified_at\":\"{{timestamp}}\",", - " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"source_id\":\"{{uuid:person_id}}\",", - " \"source_type\":\"person\",", - " \"target_id\":\"{{uuid:phone2_id}}\",", - " \"target_type\":\"phone_number\",", - " \"type\":\"contact\"", - " }", - "}')" - ], [ "INSERT INTO agreego.\"entity\" (", " \"created_at\",", @@ -1857,19 +1797,6 @@ " '00000000-0000-0000-0000-000000000000'", ")" ], - [ - "SELECT pg_notify('entity', '{", - " \"complete\":{", - " \"address\":\"test@example.com\",", - " \"created_at\":\"{{timestamp}}\",", - " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"id\":\"{{uuid:email1_id}}\",", - " \"modified_at\":\"{{timestamp}}\",", - " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"type\":\"email_address\"", - " }", - "}')" - ], [ "INSERT INTO agreego.\"entity\" (", " \"created_at\",", @@ -1931,23 +1858,6 @@ " '00000000-0000-0000-0000-000000000000'", ")" ], - [ - "SELECT pg_notify('entity', '{", - " \"complete\":{", - " \"created_at\":\"{{timestamp}}\",", - " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"id\":\"{{uuid:contact3_id}}\",", - " \"is_primary\":false,", - " \"modified_at\":\"{{timestamp}}\",", - " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", - " \"source_id\":\"{{uuid:person_id}}\",", - " \"source_type\":\"person\",", - " \"target_id\":\"{{uuid:email1_id}}\",", - " \"target_type\":\"email_address\",", - " \"type\":\"contact\"", - " }", - "}')" - ], [ "INSERT INTO agreego.change (", " changes,", @@ -1982,6 +1892,96 @@ " \"type\":\"person\"", " }", "}')" + ], + [ + "SELECT pg_notify('entity', '{", + " \"complete\":{", + " \"created_at\":\"{{timestamp}}\",", + " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"id\":\"{{uuid:contact1_id}}\",", + " \"is_primary\":true,", + " \"modified_at\":\"{{timestamp}}\",", + " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"source_id\":\"{{uuid:person_id}}\",", + " \"source_type\":\"person\",", + " \"target_id\":\"{{uuid:phone1_id}}\",", + " \"target_type\":\"phone_number\",", + " \"type\":\"contact\"", + " }", + "}')" + ], + [ + "SELECT pg_notify('entity', '{", + " \"complete\":{", + " \"created_at\":\"{{timestamp}}\",", + " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"id\":\"{{uuid:phone1_id}}\",", + " \"modified_at\":\"{{timestamp}}\",", + " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"number\":\"555-0001\",", + " \"type\":\"phone_number\"", + " }", + "}')" + ], + [ + "SELECT pg_notify('entity', '{", + " \"complete\":{", + " \"created_at\":\"{{timestamp}}\",", + " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"id\":\"{{uuid:contact2_id}}\",", + " \"is_primary\":false,", + " \"modified_at\":\"{{timestamp}}\",", + " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"source_id\":\"{{uuid:person_id}}\",", + " \"source_type\":\"person\",", + " \"target_id\":\"{{uuid:phone2_id}}\",", + " \"target_type\":\"phone_number\",", + " \"type\":\"contact\"", + " }", + "}')" + ], + [ + "SELECT pg_notify('entity', '{", + " \"complete\":{", + " \"created_at\":\"{{timestamp}}\",", + " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"id\":\"{{uuid:phone2_id}}\",", + " \"modified_at\":\"{{timestamp}}\",", + " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"number\":\"555-0002\",", + " \"type\":\"phone_number\"", + " }", + "}')" + ], + [ + "SELECT pg_notify('entity', '{", + " \"complete\":{", + " \"created_at\":\"{{timestamp}}\",", + " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"id\":\"{{uuid:contact3_id}}\",", + " \"is_primary\":false,", + " \"modified_at\":\"{{timestamp}}\",", + " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"source_id\":\"{{uuid:person_id}}\",", + " \"source_type\":\"person\",", + " \"target_id\":\"{{uuid:email1_id}}\",", + " \"target_type\":\"email_address\",", + " \"type\":\"contact\"", + " }", + "}')" + ], + [ + "SELECT pg_notify('entity', '{", + " \"complete\":{", + " \"address\":\"test@example.com\",", + " \"created_at\":\"{{timestamp}}\",", + " \"created_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"id\":\"{{uuid:email1_id}}\",", + " \"modified_at\":\"{{timestamp}}\",", + " \"modified_by\":\"00000000-0000-0000-0000-000000000000\",", + " \"type\":\"email_address\"", + " }", + "}')" ] ] } diff --git a/fixtures/queryer.json b/fixtures/queryer.json index 0d2bf16..7b5b747 100644 --- a/fixtures/queryer.json +++ b/fixtures/queryer.json @@ -947,6 +947,9 @@ "$eq": true, "$ne": false }, + "contacts.#.is_primary": { + "$eq": true + }, "created_at": { "$eq": "2020-01-01T00:00:00Z", "$gt": "2019-01-01T00:00:00Z", @@ -970,10 +973,6 @@ "Bob" ] }, - "last_name": { - "$eq": "%Doe%", - "$ne": "%Smith%" - }, "id": { "$eq": "00000000-0000-0000-0000-000000000001", "$in": [ @@ -983,6 +982,13 @@ "$nin": [ "00000000-0000-0000-0000-000000000002" ] + }, + "last_name": { + "$eq": "%Doe%", + "$ne": "%Smith%" + }, + "phone_numbers.#.target.number": { + "$eq": "555-1234" } }, "expect": { @@ -1037,6 +1043,7 @@ " JOIN agreego.entity t1_obj_t2_contacts_t3 ON t1_obj_t2_contacts_t3.id = t1_obj_t2_contacts_t2.id", " WHERE", " NOT t1_obj_t2_contacts_t1.archived", + " AND t1_obj_t2_contacts_t1.is_primary = ($11#>>'{}')::boolean", " AND t1_obj_t2_contacts_t1.parent_id = t1_obj_t2.id),", " 'created_at', t1_obj_t2.created_at,", " 'email_addresses',", @@ -1093,6 +1100,7 @@ " JOIN agreego.entity t1_obj_t2_phone_numbers_t3_target_t2 ON t1_obj_t2_phone_numbers_t3_target_t2.id = t1_obj_t2_phone_numbers_t3_target_t1.id", " WHERE", " NOT t1_obj_t2_phone_numbers_t3_target_t1.archived", + " AND t1_obj_t2_phone_numbers_t3_target_t1.number ILIKE $32#>>'{}'", " AND t1_obj_t2_phone_numbers_t3_target_t1.parent_id = t1_obj_t2_phone_numbers_t3.id", " ),", " 'type', t1_obj_t2_phone_numbers_t3.type", @@ -1119,26 +1127,26 @@ " AND t1_obj_t1.age NOT IN (SELECT value::numeric FROM jsonb_array_elements_text(($8#>>'{}')::jsonb))", " AND t1_obj_t2.archived = ($9#>>'{}')::boolean", " AND t1_obj_t2.archived != ($10#>>'{}')::boolean", - " AND t1_obj_t2.created_at = ($11#>>'{}')::timestamptz", - " AND t1_obj_t2.created_at > ($12#>>'{}')::timestamptz", - " AND t1_obj_t2.created_at >= ($13#>>'{}')::timestamptz", - " AND t1_obj_t2.created_at < ($14#>>'{}')::timestamptz", - " AND t1_obj_t2.created_at <= ($15#>>'{}')::timestamptz", - " AND t1_obj_t2.created_at != ($16#>>'{}')::timestamptz", - " AND t1_obj_t1.first_name ILIKE $17#>>'{}'", - " AND t1_obj_t1.first_name > ($18#>>'{}')", - " AND t1_obj_t1.first_name >= ($19#>>'{}')", - " AND t1_obj_t1.first_name IN (SELECT value FROM jsonb_array_elements_text(($20#>>'{}')::jsonb))", - " AND t1_obj_t1.first_name < ($21#>>'{}')", - " AND t1_obj_t1.first_name <= ($22#>>'{}')", - " AND t1_obj_t1.first_name NOT ILIKE $23#>>'{}'", - " AND t1_obj_t1.first_name NOT IN (SELECT value FROM jsonb_array_elements_text(($24#>>'{}')::jsonb))", - " AND t1_obj_t2.id = ($25#>>'{}')::uuid", - " AND t1_obj_t2.id IN (SELECT value::uuid FROM jsonb_array_elements_text(($26#>>'{}')::jsonb))", - " AND t1_obj_t2.id != ($27#>>'{}')::uuid", - " AND t1_obj_t2.id NOT IN (SELECT value::uuid FROM jsonb_array_elements_text(($28#>>'{}')::jsonb))", - " AND t1_obj_t1.last_name ILIKE $29#>>'{}'", - " AND t1_obj_t1.last_name NOT ILIKE $30#>>'{}'", + " AND t1_obj_t2.created_at = ($12#>>'{}')::timestamptz", + " AND t1_obj_t2.created_at > ($13#>>'{}')::timestamptz", + " AND t1_obj_t2.created_at >= ($14#>>'{}')::timestamptz", + " AND t1_obj_t2.created_at < ($15#>>'{}')::timestamptz", + " AND t1_obj_t2.created_at <= ($16#>>'{}')::timestamptz", + " AND t1_obj_t2.created_at != ($17#>>'{}')::timestamptz", + " AND t1_obj_t1.first_name ILIKE $18#>>'{}'", + " AND t1_obj_t1.first_name > ($19#>>'{}')", + " AND t1_obj_t1.first_name >= ($20#>>'{}')", + " AND t1_obj_t1.first_name IN (SELECT value FROM jsonb_array_elements_text(($21#>>'{}')::jsonb))", + " AND t1_obj_t1.first_name < ($22#>>'{}')", + " AND t1_obj_t1.first_name <= ($23#>>'{}')", + " AND t1_obj_t1.first_name NOT ILIKE $24#>>'{}'", + " AND t1_obj_t1.first_name NOT IN (SELECT value FROM jsonb_array_elements_text(($25#>>'{}')::jsonb))", + " AND t1_obj_t2.id = ($26#>>'{}')::uuid", + " AND t1_obj_t2.id IN (SELECT value::uuid FROM jsonb_array_elements_text(($27#>>'{}')::jsonb))", + " AND t1_obj_t2.id != ($28#>>'{}')::uuid", + " AND t1_obj_t2.id NOT IN (SELECT value::uuid FROM jsonb_array_elements_text(($29#>>'{}')::jsonb))", + " AND t1_obj_t1.last_name ILIKE $30#>>'{}'", + " AND t1_obj_t1.last_name NOT ILIKE $31#>>'{}'", ")" ] ] diff --git a/src/queryer/compiler.rs b/src/queryer/compiler.rs index 0308b4b..73b8342 100644 --- a/src/queryer/compiler.rs +++ b/src/queryer/compiler.rs @@ -47,7 +47,7 @@ impl SqlCompiler { // We expect the top level to typically be an Object or Array let is_stem_query = stem_path.is_some(); - let (sql, _) = self.walk_schema(target_schema, "t1", None, filter_keys, is_stem_query, 0)?; + let (sql, _) = self.walk_schema(target_schema, "t1", None, filter_keys, is_stem_query, 0, String::new())?; Ok(sql) } @@ -61,12 +61,19 @@ impl SqlCompiler { filter_keys: &[String], is_stem_query: bool, depth: usize, + current_path: String, ) -> Result<(String, String), String> { // Determine the base schema type (could be an array, object, or literal) match &schema.obj.type_ { Some(crate::database::schema::SchemaTypeOrArray::Single(t)) if t == "array" => { // Handle Arrays: if let Some(items) = &schema.obj.items { + let next_path = if current_path.is_empty() { + String::from("#") + } else { + format!("{}.#", current_path) + }; + if let Some(ref_id) = &items.obj.r#ref { if let Some(type_def) = self.db.types.get(ref_id) { return self.compile_entity_node( @@ -78,6 +85,7 @@ impl SqlCompiler { filter_keys, is_stem_query, depth, + next_path, ); } } @@ -88,6 +96,7 @@ impl SqlCompiler { filter_keys, is_stem_query, depth + 1, + next_path, )?; return Ok(( format!("(SELECT jsonb_agg({}) FROM TODO)", item_sql), @@ -121,6 +130,7 @@ impl SqlCompiler { filter_keys, is_stem_query, depth, + current_path, ); } @@ -135,6 +145,7 @@ impl SqlCompiler { filter_keys, is_stem_query, depth, + current_path, ); } return Err(format!("Unresolved $ref: {}", ref_id)); @@ -148,6 +159,7 @@ impl SqlCompiler { filter_keys, is_stem_query, depth, + current_path, ); } @@ -195,6 +207,7 @@ impl SqlCompiler { filter_keys: &[String], is_stem_query: bool, depth: usize, + current_path: String, ) -> Result<(String, String), String> { let local_ctx = format!("{}_{}", parent_alias, prop_name.unwrap_or("obj")); @@ -210,6 +223,7 @@ impl SqlCompiler { filter_keys, is_stem_query, depth, + ¤t_path, )?; let jsonb_obj_sql = if select_args.is_empty() { @@ -226,6 +240,7 @@ impl SqlCompiler { parent_alias, prop_name, filter_keys, + ¤t_path, )?; let selection = if is_array { @@ -285,6 +300,7 @@ impl SqlCompiler { filter_keys: &[String], is_stem_query: bool, depth: usize, + current_path: &str, ) -> Result, String> { let mut select_args = Vec::new(); let grouped_fields = type_def.grouped_fields.as_ref().and_then(|v| v.as_object()); @@ -310,6 +326,12 @@ impl SqlCompiler { } } + let next_path = if current_path.is_empty() { + prop_key.clone() + } else { + format!("{}.{}", current_path, prop_key) + }; + let (val_sql, val_type) = self.walk_schema( prop_schema, &owner_alias, @@ -317,6 +339,7 @@ impl SqlCompiler { filter_keys, is_stem_query, depth + 1, + next_path, )?; if val_type != "abort" { @@ -334,6 +357,7 @@ impl SqlCompiler { parent_alias: &str, prop_name: Option<&str>, filter_keys: &[String], + current_path: &str, ) -> Result, String> { let base_alias = table_aliases .get(&type_def.name) @@ -343,15 +367,32 @@ impl SqlCompiler { let mut where_clauses = Vec::new(); where_clauses.push(format!("NOT {}.archived", base_alias)); - if parent_alias == "t1" { - for (i, filter_key) in filter_keys.iter().enumerate() { - let mut parts = filter_key.split(':'); - let field_name = parts.next().unwrap_or(filter_key); - let op = parts.next().unwrap_or("$eq"); + for (i, filter_key) in filter_keys.iter().enumerate() { + let mut parts = filter_key.split(':'); + let full_field_path = parts.next().unwrap_or(filter_key); + let op = parts.next().unwrap_or("$eq"); - let mut filter_alias = base_alias.clone(); + let field_name = if current_path.is_empty() { + if full_field_path.contains('.') || full_field_path.contains('#') { + continue; + } + full_field_path + } else { + let prefix = format!("{}.", current_path); + if full_field_path.starts_with(&prefix) { + let remainder = &full_field_path[prefix.len()..]; + if remainder.contains('.') || remainder.contains('#') { + continue; + } + remainder + } else { + continue; + } + }; - if let Some(gf) = type_def.grouped_fields.as_ref().and_then(|v| v.as_object()) { + let mut filter_alias = base_alias.clone(); + + if let Some(gf) = type_def.grouped_fields.as_ref().and_then(|v| v.as_object()) { for (t_name, fields_val) in gf { if let Some(fields_arr) = fields_val.as_array() { if fields_arr.iter().any(|v| v.as_str() == Some(field_name)) { @@ -453,7 +494,6 @@ impl SqlCompiler { "{}.{} {} {}", filter_alias, field_name, sql_op, param_sql )); - } } } @@ -471,9 +511,16 @@ impl SqlCompiler { filter_keys: &[String], is_stem_query: bool, depth: usize, + current_path: String, ) -> Result<(String, String), String> { let mut build_args = Vec::new(); for (k, v) in props { + let next_path = if current_path.is_empty() { + k.clone() + } else { + format!("{}.{}", current_path, k) + }; + let (child_sql, val_type) = self.walk_schema( v, parent_alias, @@ -481,6 +528,7 @@ impl SqlCompiler { filter_keys, is_stem_query, depth + 1, + next_path, )?; if val_type == "abort" { continue;