merger improvements

This commit is contained in:
2026-03-15 03:24:00 -04:00
parent d4347072f2
commit 6632570712
7 changed files with 86 additions and 3302 deletions

View File

@ -80,7 +80,7 @@ The Merger provides an automated, high-performance graph synchronization engine
* **Hierarchical Table Inheritance**: The Punc system uses distributed table inheritance (e.g. `person` inherits `user` inherits `organization` inherits `entity`). The Merger splits the incoming JSON payload and performs atomic row updates across *all* relevant tables in the lineage map. * **Hierarchical Table Inheritance**: The Punc system uses distributed table inheritance (e.g. `person` inherits `user` inherits `organization` inherits `entity`). The Merger splits the incoming JSON payload and performs atomic row updates across *all* relevant tables in the lineage map.
* **The Archive Paradigm**: Data is never deleted in the Punc system. The Merger securely enforces referential integrity by toggling the `archived` Boolean flag on the base `entity` table rather than issuing SQL `DELETE` commands. * **The Archive Paradigm**: Data is never deleted in the Punc system. The Merger securely enforces referential integrity by toggling the `archived` Boolean flag on the base `entity` table rather than issuing SQL `DELETE` commands.
* **Change Tracking & Reactivity**: The Merger diffs the incoming JSON against the existing database row (utilizing static, `DashMap`-cached `lk_` SELECT string templates). Every detected change is recorded into the `agreego.change` audit table, tracking the user mapping. It then natively uses `pg_notify` to broadcast a completely flat row-level diff out to the Go WebSocket server for O(1) routing. * **Change Tracking & Reactivity**: The Merger diffs the incoming JSON against the existing database row (utilizing static, `DashMap`-cached `lk_` SELECT string templates). Every detected change is recorded into the `agreego.change` audit table, tracking the user mapping. It then natively uses `pg_notify` to broadcast a completely flat row-level diff out to the Go WebSocket server for O(1) routing.
* **Intelligent Nested Notifications (Reactive Hydration)**: When the Merger detects that a Foreign Key column (e.g., `customer_id` on an `order`) has been explicitly modified, it automatically queries the newly pointed-to entity (e.g., `person`) and bundles its full JSON payload directly inside the parent's `pg_notify` broadcast. This guarantees that downstream caching clients (like the Dart UI) seamlessly receive the exact required subgraph to hydrate their relational widgets instantly, eliminating the need for N+1 HTTP fetches or extraneous notification lookups. This only triggers when the parent entity actively owns the relationship column and the value materially changes. * **Flat Structural Beats (Unidirectional Flow)**: The Merger purposefully DOES NOT trace or hydrate outbound Foreign Keys or nested parent structures during writes. It emits completely flat, mathematically perfect structural deltas via `pg_notify` representing only the exact Postgres rows that changed. This guarantees the write-path remains O(1) lightning fast. It is the strict responsibility of the upstream Punc Framework (the Go `Speaker`) to intercept these flat beats, evaluate them against active Websocket Schema Topologies, and dynamically issue targeted `jspg_query` reads to hydrate the exact contextual subgraphs required by listening clients.
* **Many-to-Many Graph Edge Management**: Operates seamlessly with the global `agreego.relationship` table, allowing the system to represent and merge arbitrary reified M:M relationships directionally between any two entities. * **Many-to-Many Graph Edge Management**: Operates seamlessly with the global `agreego.relationship` table, allowing the system to represent and merge arbitrary reified M:M relationships directionally between any two entities.
* **Sparse Updates**: Empty JSON strings `""` are directly bound as explicit SQL `NULL` directives to clear data, whilst omitted (missing) properties skip UPDATE execution entirely, ensuring partial UI submissions do not wipe out sibling fields. * **Sparse Updates**: Empty JSON strings `""` are directly bound as explicit SQL `NULL` directives to clear data, whilst omitted (missing) properties skip UPDATE execution entirely, ensuring partial UI submissions do not wipe out sibling fields.
* **Unified Return Structure**: To eliminate UI hydration race conditions and multi-user duplication, `jspg_merge` explicitly strips the response graph and returns only the root `{ "id": "uuid" }` (or an array of IDs for list insertions). External APIs can then explicitly call read APIs to fetch the resulting graph, while the UI relies 100% implicitly on the flat `pg_notify` pipeline for reactive state synchronization. * **Unified Return Structure**: To eliminate UI hydration race conditions and multi-user duplication, `jspg_merge` explicitly strips the response graph and returns only the root `{ "id": "uuid" }` (or an array of IDs for list insertions). External APIs can then explicitly call read APIs to fetch the resulting graph, while the UI relies 100% implicitly on the flat `pg_notify` pipeline for reactive state synchronization.

View File

@ -1159,7 +1159,22 @@
" 'id', t1_obj_t3.id,", " 'id', t1_obj_t3.id,",
" 'is_primary', t1_obj_t1.is_primary,", " 'is_primary', t1_obj_t1.is_primary,",
" 'name', t1_obj_t3.name,", " 'name', t1_obj_t3.name,",
" 'type', t1_obj_t3.type)", " 'target',",
" (SELECT jsonb_build_object(",
" 'archived', t1_obj_t3_target_t2.archived,",
" 'created_at', t1_obj_t3_target_t2.created_at,",
" 'id', t1_obj_t3_target_t2.id,",
" 'name', t1_obj_t3_target_t2.name,",
" 'number', t1_obj_t3_target_t1.number,",
" 'type', t1_obj_t3_target_t2.type",
" )",
" FROM agreego.phone_number t1_obj_t3_target_t1",
" JOIN agreego.entity t1_obj_t3_target_t2 ON t1_obj_t3_target_t2.id = t1_obj_t3_target_t1.id",
" WHERE",
" NOT t1_obj_t3_target_t1.archived",
" AND t1_obj_t3_target_t1.parent_id = t1_obj_t3.id),",
" 'type', t1_obj_t3.type",
")",
"FROM agreego.contact t1_obj_t1", "FROM agreego.contact t1_obj_t1",
"JOIN agreego.relationship t1_obj_t2 ON t1_obj_t2.id = t1_obj_t1.id", "JOIN agreego.relationship t1_obj_t2 ON t1_obj_t2.id = t1_obj_t1.id",
"JOIN agreego.entity t1_obj_t3 ON t1_obj_t3.id = t1_obj_t2.id", "JOIN agreego.entity t1_obj_t3 ON t1_obj_t3.id = t1_obj_t2.id",

File diff suppressed because it is too large Load Diff

View File

@ -5,7 +5,6 @@ use std::sync::Arc;
// Schema mirrors the Go Punc Generator's schema struct for consistency. // Schema mirrors the Go Punc Generator's schema struct for consistency.
// It is an order-preserving representation of a JSON Schema. // It is an order-preserving representation of a JSON Schema.
pub fn deserialize_some<'de, D>(deserializer: D) -> Result<Option<Value>, D::Error> pub fn deserialize_some<'de, D>(deserializer: D) -> Result<Option<Value>, D::Error>
where where
D: serde::Deserializer<'de>, D: serde::Deserializer<'de>,
@ -13,117 +12,159 @@ where
let v = Value::deserialize(deserializer)?; let v = Value::deserialize(deserializer)?;
Ok(Some(v)) Ok(Some(v))
} }
#[derive(Debug, Clone, Serialize, Deserialize, Default)] #[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct SchemaObject { pub struct SchemaObject {
// Core Schema Keywords // Core Schema Keywords
#[serde(rename = "$id")] #[serde(rename = "$id")]
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<String>, pub id: Option<String>,
#[serde(rename = "$ref")] #[serde(rename = "$ref")]
#[serde(skip_serializing_if = "Option::is_none")]
pub r#ref: Option<String>, pub r#ref: Option<String>,
/* #[serde(skip_serializing_if = "Option::is_none")]
Note: The `Ref` field in the Go struct is a pointer populated by the linker.
In Rust, we might handle this differently (e.g., separate lookup or Rc/Arc),
so we omit the direct recursive `Ref` field for now and rely on `ref_string`.
*/
pub description: Option<String>, pub description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub title: Option<String>, pub title: Option<String>,
#[serde(default)] // Allow missing type #[serde(default)] // Allow missing type
#[serde(rename = "type")] #[serde(rename = "type")]
#[serde(skip_serializing_if = "Option::is_none")]
pub type_: Option<SchemaTypeOrArray>, // Handles string or array of strings pub type_: Option<SchemaTypeOrArray>, // Handles string or array of strings
// Object Keywords // Object Keywords
#[serde(skip_serializing_if = "Option::is_none")]
pub properties: Option<BTreeMap<String, Arc<Schema>>>, pub properties: Option<BTreeMap<String, Arc<Schema>>>,
#[serde(rename = "patternProperties")] #[serde(rename = "patternProperties")]
#[serde(skip_serializing_if = "Option::is_none")]
pub pattern_properties: Option<BTreeMap<String, Arc<Schema>>>, pub pattern_properties: Option<BTreeMap<String, Arc<Schema>>>,
#[serde(rename = "additionalProperties")] #[serde(rename = "additionalProperties")]
#[serde(skip_serializing_if = "Option::is_none")]
pub additional_properties: Option<Arc<Schema>>, pub additional_properties: Option<Arc<Schema>>,
#[serde(rename = "$family")] #[serde(rename = "$family")]
#[serde(skip_serializing_if = "Option::is_none")]
pub family: Option<String>, pub family: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub required: Option<Vec<String>>, pub required: Option<Vec<String>>,
// dependencies can be schema dependencies or property dependencies // dependencies can be schema dependencies or property dependencies
#[serde(skip_serializing_if = "Option::is_none")]
pub dependencies: Option<BTreeMap<String, Dependency>>, pub dependencies: Option<BTreeMap<String, Dependency>>,
// Array Keywords // Array Keywords
#[serde(rename = "items")] #[serde(rename = "items")]
#[serde(skip_serializing_if = "Option::is_none")]
pub items: Option<Arc<Schema>>, pub items: Option<Arc<Schema>>,
#[serde(rename = "prefixItems")] #[serde(rename = "prefixItems")]
#[serde(skip_serializing_if = "Option::is_none")]
pub prefix_items: Option<Vec<Arc<Schema>>>, pub prefix_items: Option<Vec<Arc<Schema>>>,
// String Validation // String Validation
#[serde(rename = "minLength")] #[serde(rename = "minLength")]
#[serde(skip_serializing_if = "Option::is_none")]
pub min_length: Option<f64>, pub min_length: Option<f64>,
#[serde(rename = "maxLength")] #[serde(rename = "maxLength")]
#[serde(skip_serializing_if = "Option::is_none")]
pub max_length: Option<f64>, pub max_length: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pattern: Option<String>, pub pattern: Option<String>,
// Array Validation // Array Validation
#[serde(rename = "minItems")] #[serde(rename = "minItems")]
#[serde(skip_serializing_if = "Option::is_none")]
pub min_items: Option<f64>, pub min_items: Option<f64>,
#[serde(rename = "maxItems")] #[serde(rename = "maxItems")]
#[serde(skip_serializing_if = "Option::is_none")]
pub max_items: Option<f64>, pub max_items: Option<f64>,
#[serde(rename = "uniqueItems")] #[serde(rename = "uniqueItems")]
#[serde(skip_serializing_if = "Option::is_none")]
pub unique_items: Option<bool>, pub unique_items: Option<bool>,
#[serde(rename = "contains")] #[serde(rename = "contains")]
#[serde(skip_serializing_if = "Option::is_none")]
pub contains: Option<Arc<Schema>>, pub contains: Option<Arc<Schema>>,
#[serde(rename = "minContains")] #[serde(rename = "minContains")]
#[serde(skip_serializing_if = "Option::is_none")]
pub min_contains: Option<f64>, pub min_contains: Option<f64>,
#[serde(rename = "maxContains")] #[serde(rename = "maxContains")]
#[serde(skip_serializing_if = "Option::is_none")]
pub max_contains: Option<f64>, pub max_contains: Option<f64>,
// Object Validation // Object Validation
#[serde(rename = "minProperties")] #[serde(rename = "minProperties")]
#[serde(skip_serializing_if = "Option::is_none")]
pub min_properties: Option<f64>, pub min_properties: Option<f64>,
#[serde(rename = "maxProperties")] #[serde(rename = "maxProperties")]
#[serde(skip_serializing_if = "Option::is_none")]
pub max_properties: Option<f64>, pub max_properties: Option<f64>,
#[serde(rename = "propertyNames")] #[serde(rename = "propertyNames")]
#[serde(skip_serializing_if = "Option::is_none")]
pub property_names: Option<Arc<Schema>>, pub property_names: Option<Arc<Schema>>,
// Numeric Validation // Numeric Validation
#[serde(skip_serializing_if = "Option::is_none")]
pub format: Option<String>, pub format: Option<String>,
#[serde(rename = "enum")] #[serde(rename = "enum")]
#[serde(skip_serializing_if = "Option::is_none")]
pub enum_: Option<Vec<Value>>, // `enum` is a reserved keyword in Rust pub enum_: Option<Vec<Value>>, // `enum` is a reserved keyword in Rust
#[serde( #[serde(
default, default,
rename = "const", rename = "const",
deserialize_with = "crate::database::schema::deserialize_some" deserialize_with = "crate::database::schema::deserialize_some"
)] )]
#[serde(skip_serializing_if = "Option::is_none")]
pub const_: Option<Value>, pub const_: Option<Value>,
// Numeric Validation // Numeric Validation
#[serde(rename = "multipleOf")] #[serde(rename = "multipleOf")]
#[serde(skip_serializing_if = "Option::is_none")]
pub multiple_of: Option<f64>, pub multiple_of: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub minimum: Option<f64>, pub minimum: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub maximum: Option<f64>, pub maximum: Option<f64>,
#[serde(rename = "exclusiveMinimum")] #[serde(rename = "exclusiveMinimum")]
#[serde(skip_serializing_if = "Option::is_none")]
pub exclusive_minimum: Option<f64>, pub exclusive_minimum: Option<f64>,
#[serde(rename = "exclusiveMaximum")] #[serde(rename = "exclusiveMaximum")]
#[serde(skip_serializing_if = "Option::is_none")]
pub exclusive_maximum: Option<f64>, pub exclusive_maximum: Option<f64>,
// Combining Keywords // Combining Keywords
#[serde(rename = "allOf")] #[serde(rename = "allOf")]
#[serde(skip_serializing_if = "Option::is_none")]
pub all_of: Option<Vec<Arc<Schema>>>, pub all_of: Option<Vec<Arc<Schema>>>,
#[serde(rename = "oneOf")] #[serde(rename = "oneOf")]
#[serde(skip_serializing_if = "Option::is_none")]
pub one_of: Option<Vec<Arc<Schema>>>, pub one_of: Option<Vec<Arc<Schema>>>,
#[serde(rename = "not")] #[serde(rename = "not")]
#[serde(skip_serializing_if = "Option::is_none")]
pub not: Option<Arc<Schema>>, pub not: Option<Arc<Schema>>,
#[serde(rename = "if")] #[serde(rename = "if")]
#[serde(skip_serializing_if = "Option::is_none")]
pub if_: Option<Arc<Schema>>, pub if_: Option<Arc<Schema>>,
#[serde(rename = "then")] #[serde(rename = "then")]
#[serde(skip_serializing_if = "Option::is_none")]
pub then_: Option<Arc<Schema>>, pub then_: Option<Arc<Schema>>,
#[serde(rename = "else")] #[serde(rename = "else")]
#[serde(skip_serializing_if = "Option::is_none")]
pub else_: Option<Arc<Schema>>, pub else_: Option<Arc<Schema>>,
// Custom Vocabularies // Custom Vocabularies
#[serde(skip_serializing_if = "Option::is_none")]
pub form: Option<Vec<String>>, pub form: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub display: Option<Vec<String>>, pub display: Option<Vec<String>>,
#[serde(rename = "enumNames")] #[serde(rename = "enumNames")]
#[serde(skip_serializing_if = "Option::is_none")]
pub enum_names: Option<Vec<String>>, pub enum_names: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub control: Option<String>, pub control: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub actions: Option<BTreeMap<String, Action>>, pub actions: Option<BTreeMap<String, Action>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub computer: Option<String>, pub computer: Option<String>,
#[serde(default)] #[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub extensible: Option<bool>, pub extensible: Option<bool>,
#[serde(skip)] #[serde(skip)]
@ -331,7 +372,9 @@ pub enum SchemaTypeOrArray {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Action { pub struct Action {
#[serde(skip_serializing_if = "Option::is_none")]
pub navigate: Option<String>, pub navigate: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub punc: Option<String>, pub punc: Option<String>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]

View File

@ -112,8 +112,6 @@ pub fn jspg_validate(schema_id: &str, instance: JsonB) -> JsonB {
#[cfg_attr(not(test), pg_extern)] #[cfg_attr(not(test), pg_extern)]
pub fn jspg_stems() -> JsonB { pub fn jspg_stems() -> JsonB {
use serde_json::Value;
let engine_opt = { let engine_opt = {
let lock = GLOBAL_JSPG.read().unwrap(); let lock = GLOBAL_JSPG.read().unwrap();
lock.clone() lock.clone()

View File

@ -69,9 +69,6 @@ impl SqlCompiler {
if let Some(items) = &schema.obj.items { if let Some(items) = &schema.obj.items {
if let Some(ref_id) = &items.obj.r#ref { if let Some(ref_id) = &items.obj.r#ref {
if let Some(type_def) = self.db.types.get(ref_id) { if let Some(type_def) = self.db.types.get(ref_id) {
if is_stem_query && depth > 0 {
return Ok(("".to_string(), "abort".to_string()));
}
return self.compile_entity_node( return self.compile_entity_node(
items, items,
type_def, type_def,
@ -115,9 +112,6 @@ impl SqlCompiler {
} }
if let Some(type_def) = resolved_type { if let Some(type_def) = resolved_type {
if is_stem_query && depth > 0 {
return Ok(("".to_string(), "abort".to_string()));
}
return self.compile_entity_node( return self.compile_entity_node(
schema, schema,
type_def, type_def,

File diff suppressed because it is too large Load Diff