stems fixes and tests

This commit is contained in:
2026-03-13 01:17:27 -04:00
parent 797a0a5460
commit 6a275e1d90
9 changed files with 304 additions and 115 deletions

View File

@ -265,6 +265,7 @@ impl Database {
String::from(""),
None,
None,
true,
&mut inner_map,
&mut errors,
);
@ -290,21 +291,16 @@ impl Database {
mut current_path: String,
parent_type: Option<String>,
property_name: Option<String>,
is_root: bool,
inner_map: &mut HashMap<String, Arc<Stem>>,
errors: &mut Vec<crate::drop::Error>,
) {
let mut is_entity = false;
let mut entity_type = String::new();
let mut examine_id = None;
if let Some(ref r) = schema.obj.r#ref {
examine_id = Some(r.clone());
} else if let Some(ref id) = schema.obj.id {
examine_id = Some(id.clone());
}
if let Some(target) = examine_id {
let parts: Vec<&str> = target.split('.').collect();
// First check if the Schema's $id is a native Database Type
if let Some(ref id) = schema.obj.id {
let parts: Vec<&str> = id.split('.').collect();
if let Some(last_seg) = parts.last() {
if db.types.contains_key(*last_seg) {
is_entity = true;
@ -313,6 +309,20 @@ impl Database {
}
}
// If not found via $id, check the $ref pointer
// This allows ad-hoc schemas (like `save_person.response`) to successfully adopt the Type of what they $ref
if !is_entity {
if let Some(ref r) = schema.obj.r#ref {
let parts: Vec<&str> = r.split('.').collect();
if let Some(last_seg) = parts.last() {
if db.types.contains_key(*last_seg) {
is_entity = true;
entity_type = last_seg.to_string();
}
}
}
}
let mut relation_col = None;
if is_entity {
if let (Some(pt), Some(prop)) = (&parent_type, &property_name) {
@ -340,22 +350,37 @@ impl Database {
schema: Arc::new(schema.clone()),
};
let mut branch_path = current_path.clone();
if !current_path.is_empty() {
branch_path = format!("{}/{}", current_path, entity_type);
}
let mut branch_path = if is_root {
String::new()
} else if current_path.is_empty() {
entity_type.clone()
} else {
format!("{}/{}", current_path, entity_type)
};
if inner_map.contains_key(&branch_path) {
errors.push(crate::drop::Error {
code: "STEM_COLLISION".to_string(),
message: format!("The stem path `{}` resolves to multiple Entity boundaries. This usually occurs during un-wrapped $family or oneOf polymorphic schemas where multiple Entities are directly assigned to the same property. To fix this, encapsulate the polymorphic branch.", branch_path),
details: crate::drop::ErrorDetails {
path: root_schema_id.to_string(),
},
});
}
// DEDUPLICATION: If we just recursed into the EXACT same entity type definition,
// do not append again and do not re-register the stem.
let already_registered =
if current_path == entity_type || current_path.ends_with(&format!("/{}", entity_type)) {
branch_path = current_path.clone();
true
} else {
false
};
inner_map.insert(branch_path.clone(), Arc::new(stem));
if !already_registered {
if inner_map.contains_key(&branch_path) {
errors.push(crate::drop::Error {
code: "STEM_COLLISION".to_string(),
message: format!("The stem path `{}` resolves to multiple Entity boundaries. This usually occurs during un-wrapped $family or oneOf polymorphic schemas where multiple Entities are directly assigned to the same property. To fix this, encapsulate the polymorphic branch.", branch_path),
details: crate::drop::ErrorDetails {
path: root_schema_id.to_string(),
},
});
}
inner_map.insert(branch_path.clone(), Arc::new(stem));
}
// Update current_path for structural children
current_path = branch_path;
@ -381,6 +406,7 @@ impl Database {
current_path.clone(),
next_parent.clone(),
Some(k.clone()),
false,
inner_map,
errors,
);
@ -403,6 +429,7 @@ impl Database {
next_path,
next_parent.clone(),
Some(k.clone()),
false,
inner_map,
errors,
);
@ -418,11 +445,32 @@ impl Database {
current_path.clone(),
next_parent.clone(),
property_name.clone(),
false, // Arrays themselves aren't polymorphic branches, their items might be
inner_map,
errors,
);
}
// Follow external reference if we didn't just crawl local properties
if schema.obj.properties.is_none() && schema.obj.items.is_none() && schema.obj.one_of.is_none()
{
if let Some(ref r) = schema.obj.r#ref {
if let Some(target_schema) = db.schemas.get(r) {
Self::discover_stems(
db,
root_schema_id,
target_schema,
current_path.clone(),
next_parent.clone(),
property_name.clone(),
false,
inner_map,
errors,
);
}
}
}
// Polymorphism branch
if let Some(arr) = &schema.obj.one_of {
for v in arr {
@ -433,6 +481,7 @@ impl Database {
current_path.clone(),
next_parent.clone(),
property_name.clone(),
false,
inner_map,
errors,
);
@ -447,6 +496,7 @@ impl Database {
current_path.clone(),
next_parent.clone(),
property_name.clone(),
false,
inner_map,
errors,
);

View File

@ -1,79 +0,0 @@
# Entity Engine (jspg)
## Overview
This document outlines the architecture for moving the complex, CPU-bound row merging (`merge_entity`) and dynamic querying (`query_entity`) functionality out of PL/pgSQL and directly into the Rust-based `jspg` extension.
By treating the `jspg` schema registry as the absolute Single Source of Truth, we can leverage Rust and the Postgres query planner (via SPI) to achieve near O(1) execution planning for deeply nested reads, complex relational writes, and partial hydration beats.
## The Problem
Historically, `agreego.merge_entity` (PL/pgSQL) handled nested writes by segmenting JSON, resolving types, searching hierarchies, and dynamically concatenating `INSERT`/`UPDATE` statements. `agreego.query_entity` was conceived to do the same for reads (handling base security, inheritance JOINs, and filtering automatically).
However, this design hits three major limitations:
1. **CPU Bound Operations**: PL/pgSQL is comparatively slow at complex string concatenation and massive JSON graph traversals.
2. **Query Planning Cache Busting**: Generating massive, dynamic SQL strings prevents Postgres from caching query plans. `EXECUTE dynamic_sql` forces the planner to re-evaluate statistics and execution paths on every function call, leading to extreme latency spikes at scale.
3. **The Hydration Beat Problem**: The Punc framework requires fetching specific UI "fragments" (e.g. just the `target` of a specific `contact` array element) to feed WebSockets. Hand-rolling CTEs for every possible sub-tree permutation to serve beats will quickly become unmaintainable.
## The Solution: Semantic Engine Database
By migrating `merge_entity` and `query_entity` to `jspg`, we turn the database into a pre-compiled Semantic Engine.
1. **Schema-to-SQL Compilation**: During the connection lifecycle (`cache_json_schemas()`), `jspg` statically analyzes the JSON Schema AST. It acts as a compiler, translating the schema layout into perfectly optimized, multi-JOIN SQL query strings for *every* node/fragment in the schema.
2. **Prepared Statements (SPI)**: `jspg` feeds these computed SQL strings into the Postgres SPI (Server Programming Interface) using `Spi::prepare()`. Postgres calculates the query execution plan *once* and caches it in memory.
3. **Instant Execution**: When a Punc needs data, `jspg` retrieves the cached PreparedStatement, securely binds binary parameters, and executes the pre-planned query instantly.
## Architecture
### 1. The `cache_json_schemas()` Expansion
The initialization function must now ingest `types` and `agreego.relation` data so the internal `Registry` holds the full Relational Graph.
During schema compilation, if a schema is associated with a database Type, it triggers the **SQL Compiler Phase**:
- It builds a table-resolution AST mapping to `JOIN` clauses based on foreign keys.
- It translates JSON schema properties to `SELECT jsonb_build_object(...)`.
- It generates static SQL for `INSERT`, `UPDATE`, and `SELECT` (including path-based fragment SELECTs).
- It calls `Spi::prepare()` to cache these plans inside the Session Context.
### 2. `agreego.query_entity` (Reads)
* **API**: `agreego.query_entity(schema_id TEXT, fragment_path TEXT, cue JSONB)`
* **Execution**:
* Rust locates the target Schema in memory.
* It uses the `fragment_path` (e.g., `/` for a full read, or `/contacts/0/target` for a hydration beat) to fetch the exact PreparedStatement.
* It binds variables (Row Level Security IDs, filtering, pagination limit/offset) parsed from the `cue`.
* SPI returns the heavily nested, pre-aggregated `JSONB` instantly.
### 3. Unified Aggregations & Computeds (Schema `query` objects)
We replace the concept of a complex string parser (PEL) with native structured JSON JSON objects using the `query` keyword.
A structured `query` block in the schema:
```json
"total": {
"type": "number",
"readOnly": true,
"query": {
"aggregate": "sum",
"source": "lines",
"field": "amount"
}
}
```
* **Frontend (Dart)**: The Go generator parses the JSON object directly and emits the native UI aggregation code (e.g. `lines.fold(...)`) for instant UI updates before the server responds.
* **Backend (jspg)**: The Rust SQL compiler natively deserializes the `query` object into an internal struct. It recognizes the `aggregate` instruction and outputs a Postgres native aggregation: `(SELECT SUM(amount) FROM agreego.invoice_line WHERE invoice_id = t1.id)` as a column in the prepared `SELECT` statement.
* **Unification**: The database-calculated value acts as the authoritative truth, synchronizing and correcting the client automatically on the resulting `beat`.
### 4. `agreego.merge_entity` (Writes)
* **API**: `agreego.merge_entity(cue JSONB)`
* **Execution**:
* Parses the incoming `cue` JSON via `serde_json` at C-like speeds.
* Recursively validates and *constructively masks* the tree against the strict schema.
* Traverses the relational graph (which is fully loaded in the `jspg` registry).
* Binds the new values directly into the cached `INSERT` or `UPDATE` SPI prepared statements for each table in the hierarchy.
* Evaluates field differences and natively uses `pg_notify` to fire atomic row-level changes for the Go Beat framework.
## Roadmap
1. **Relational Ingestion**: Update `cache_json_schemas` to pass relational metadata (`agreego.relation` rows) into the `jspg` registry cache.
2. **The SQL Compiler**: Build the AST-to-String compiler in Rust that reads properties, `$ref`s, and `$family` trees to piece together generic SQL.
3. **SPI Caching**: Integrate `Spi::prepare` into the `Validator` creation phase.
4. **Rust `merge_entity`**: Port the constructive structural extraction loop from PL/pgSQL to Rust.
5. **Rust `query_entity`**: Abstract the query runtime, mapping Punc JSON `filters` arrays to SPI-bound parameters safely.

View File

@ -205,7 +205,7 @@ impl SqlCompiler {
let local_ctx = format!("{}_{}", parent_alias, prop_name.unwrap_or("obj"));
// 1. Build FROM clauses and table aliases
let (mut table_aliases, from_clauses) = self.build_hierarchy_from_clauses(type_def, &local_ctx);
let (table_aliases, from_clauses) = self.build_hierarchy_from_clauses(type_def, &local_ctx);
// 2. Map properties and build jsonb_build_object args
let select_args = self.map_properties_to_aliases(
@ -225,7 +225,7 @@ impl SqlCompiler {
};
// 3. Build WHERE clauses
let mut where_clauses = self.build_filter_where_clauses(
let where_clauses = self.build_filter_where_clauses(
schema,
type_def,
&table_aliases,

View File

@ -3443,6 +3443,12 @@ fn test_if_then_else_13_1() {
crate::tests::runner::run_test_case(&path, 13, 1).unwrap();
}
#[test]
fn test_stems_0_0() {
let path = format!("{}/fixtures/stems.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 0).unwrap();
}
#[test]
fn test_empty_string_0_0() {
let path = format!("{}/fixtures/emptyString.json", env!("CARGO_MANIFEST_DIR"));

View File

@ -97,6 +97,16 @@ pub fn run_test_case(path: &str, suite_idx: usize, case_idx: usize) -> Result<()
// 4. Run Tests
match test.action.as_str() {
"compile" => {
let result = test.run_compile(db.clone());
if let Err(e) = result {
println!("TEST COMPILE ERROR FOR '{}': {}", test.description, e);
failures.push(format!(
"[{}] Compile Test '{}' failed. Error: {}",
group.description, test.description, e
));
}
}
"validate" => {
let result = test.run_validate(db.clone());
if let Err(e) = result {

View File

@ -38,16 +38,39 @@ fn default_action() -> String {
}
impl TestCase {
pub fn execute(&self, db: Arc<Database>) -> Result<(), String> {
match self.action.as_str() {
"validate" => self.run_validate(db),
"merge" => self.run_merge(db),
"query" => self.run_query(db),
_ => Err(format!(
"Unknown action '{}' for test '{}'",
self.action, self.description
)),
pub fn run_compile(&self, db: Arc<Database>) -> Result<(), String> {
let expected_success = self.expect.as_ref().map(|e| e.success).unwrap_or(false);
// We assume db has already been setup and compiled successfully by runner.rs's `jspg_setup`
// We just need to check if there are compilation errors vs expected success
let got_success = true; // Setup ensures success unless setup fails, which runner handles
if expected_success != got_success {
return Err(format!(
"Expected success: {}, Got: {}",
expected_success, got_success
));
}
// Assert stems
if let Some(expect) = &self.expect {
if let Some(expected_stems) = &expect.stems {
// Convert the Db stems (HashMap<String, HashMap<String, Arc<Stem>>>) to matching JSON shape
let db_stems_json = serde_json::to_value(&db.stems).unwrap();
let expect_stems_json = serde_json::to_value(expected_stems).unwrap();
if db_stems_json != expect_stems_json {
let expected_pretty = serde_json::to_string_pretty(&expect_stems_json).unwrap();
let got_pretty = serde_json::to_string_pretty(&db_stems_json).unwrap();
return Err(format!(
"Stem validation failed.\nExpected:\n{}\n\nGot:\n{}",
expected_pretty, got_pretty
));
}
}
}
Ok(())
}
pub fn run_validate(&self, db: Arc<Database>) -> Result<(), String> {

View File

@ -14,6 +14,7 @@ pub struct ExpectBlock {
pub success: bool,
pub result: Option<serde_json::Value>,
pub errors: Option<Vec<serde_json::Value>>,
pub stems: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
#[serde(default)]
pub sql: Option<Vec<SqlExpectation>>,
}