validator reorg

This commit is contained in:
2026-02-26 19:17:13 -05:00
parent 960a99034a
commit e14f53e7d9
16 changed files with 501 additions and 423 deletions

79
src/entity/GEMINI.md Normal file
View File

@ -0,0 +1,79 @@
# Entity Engine (jspg)
## Overview
This document outlines the architecture for moving the complex, CPU-bound row merging (`merge_entity`) and dynamic querying (`query_entity`) functionality out of PL/pgSQL and directly into the Rust-based `jspg` extension.
By treating the `jspg` schema registry as the absolute Single Source of Truth, we can leverage Rust and the Postgres query planner (via SPI) to achieve near O(1) execution planning for deeply nested reads, complex relational writes, and partial hydration beats.
## The Problem
Historically, `agreego.merge_entity` (PL/pgSQL) handled nested writes by segmenting JSON, resolving types, searching hierarchies, and dynamically concatenating `INSERT`/`UPDATE` statements. `agreego.query_entity` was conceived to do the same for reads (handling base security, inheritance JOINs, and filtering automatically).
However, this design hits three major limitations:
1. **CPU Bound Operations**: PL/pgSQL is comparatively slow at complex string concatenation and massive JSON graph traversals.
2. **Query Planning Cache Busting**: Generating massive, dynamic SQL strings prevents Postgres from caching query plans. `EXECUTE dynamic_sql` forces the planner to re-evaluate statistics and execution paths on every function call, leading to extreme latency spikes at scale.
3. **The Hydration Beat Problem**: The Punc framework requires fetching specific UI "fragments" (e.g. just the `target` of a specific `contact` array element) to feed WebSockets. Hand-rolling CTEs for every possible sub-tree permutation to serve beats will quickly become unmaintainable.
## The Solution: Semantic Engine Database
By migrating `merge_entity` and `query_entity` to `jspg`, we turn the database into a pre-compiled Semantic Engine.
1. **Schema-to-SQL Compilation**: During the connection lifecycle (`cache_json_schemas()`), `jspg` statically analyzes the JSON Schema AST. It acts as a compiler, translating the schema layout into perfectly optimized, multi-JOIN SQL query strings for *every* node/fragment in the schema.
2. **Prepared Statements (SPI)**: `jspg` feeds these computed SQL strings into the Postgres SPI (Server Programming Interface) using `Spi::prepare()`. Postgres calculates the query execution plan *once* and caches it in memory.
3. **Instant Execution**: When a Punc needs data, `jspg` retrieves the cached PreparedStatement, securely binds binary parameters, and executes the pre-planned query instantly.
## Architecture
### 1. The `cache_json_schemas()` Expansion
The initialization function must now ingest `types` and `agreego.relation` data so the internal `Registry` holds the full Relational Graph.
During schema compilation, if a schema is associated with a database Type, it triggers the **SQL Compiler Phase**:
- It builds a table-resolution AST mapping to `JOIN` clauses based on foreign keys.
- It translates JSON schema properties to `SELECT jsonb_build_object(...)`.
- It generates static SQL for `INSERT`, `UPDATE`, and `SELECT` (including path-based fragment SELECTs).
- It calls `Spi::prepare()` to cache these plans inside the Session Context.
### 2. `agreego.query_entity` (Reads)
* **API**: `agreego.query_entity(schema_id TEXT, fragment_path TEXT, cue JSONB)`
* **Execution**:
* Rust locates the target Schema in memory.
* It uses the `fragment_path` (e.g., `/` for a full read, or `/contacts/0/target` for a hydration beat) to fetch the exact PreparedStatement.
* It binds variables (Row Level Security IDs, filtering, pagination limit/offset) parsed from the `cue`.
* SPI returns the heavily nested, pre-aggregated `JSONB` instantly.
### 3. Unified Aggregations & Computeds (Schema `query` objects)
We replace the concept of a complex string parser (PEL) with native structured JSON JSON objects using the `query` keyword.
A structured `query` block in the schema:
```json
"total": {
"type": "number",
"readOnly": true,
"query": {
"aggregate": "sum",
"source": "lines",
"field": "amount"
}
}
```
* **Frontend (Dart)**: The Go generator parses the JSON object directly and emits the native UI aggregation code (e.g. `lines.fold(...)`) for instant UI updates before the server responds.
* **Backend (jspg)**: The Rust SQL compiler natively deserializes the `query` object into an internal struct. It recognizes the `aggregate` instruction and outputs a Postgres native aggregation: `(SELECT SUM(amount) FROM agreego.invoice_line WHERE invoice_id = t1.id)` as a column in the prepared `SELECT` statement.
* **Unification**: The database-calculated value acts as the authoritative truth, synchronizing and correcting the client automatically on the resulting `beat`.
### 4. `agreego.merge_entity` (Writes)
* **API**: `agreego.merge_entity(cue JSONB)`
* **Execution**:
* Parses the incoming `cue` JSON via `serde_json` at C-like speeds.
* Recursively validates and *constructively masks* the tree against the strict schema.
* Traverses the relational graph (which is fully loaded in the `jspg` registry).
* Binds the new values directly into the cached `INSERT` or `UPDATE` SPI prepared statements for each table in the hierarchy.
* Evaluates field differences and natively uses `pg_notify` to fire atomic row-level changes for the Go Beat framework.
## Roadmap
1. **Relational Ingestion**: Update `cache_json_schemas` to pass relational metadata (`agreego.relation` rows) into the `jspg` registry cache.
2. **The SQL Compiler**: Build the AST-to-String compiler in Rust that reads properties, `$ref`s, and `$family` trees to piece together generic SQL.
3. **SPI Caching**: Integrate `Spi::prepare` into the `Validator` creation phase.
4. **Rust `merge_entity`**: Port the constructive structural extraction loop from PL/pgSQL to Rust.
5. **Rust `query_entity`**: Abstract the query runtime, mapping Punc JSON `filters` arrays to SPI-bound parameters safely.

View File

@ -2,20 +2,8 @@ use pgrx::*;
pg_module_magic!();
pub mod compiler;
pub mod drop;
pub mod formats;
pub mod registry;
mod schema;
pub mod util;
mod validator;
pub mod context;
pub mod error;
pub mod instance;
pub mod result;
pub(crate) mod rules;
pub mod validator;
use serde_json::json;
use std::sync::{Arc, RwLock};

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
use crate::schema::Schema;
use crate::validator::schema::Schema;
use regex::Regex;
use serde_json::Value;
// use std::collections::HashMap;
@ -26,7 +26,7 @@ impl Compiler {
fn compile_formats_and_regexes(schema: &mut Schema) {
// 1. Compile Format
if let Some(format_str) = &schema.format {
if let Some(fmt) = crate::formats::FORMATS.get(format_str.as_str()) {
if let Some(fmt) = crate::validator::formats::FORMATS.get(format_str.as_str()) {
schema.compiled_format = Some(CompiledFormat::Func(fmt.func));
}
}
@ -64,13 +64,13 @@ impl Compiler {
if let Some(deps) = schema.dependencies.take() {
for (key, dep) in deps {
match dep {
crate::schema::Dependency::Props(props) => {
crate::validator::schema::Dependency::Props(props) => {
schema
.dependent_required
.get_or_insert_with(std::collections::BTreeMap::new)
.insert(key, props);
}
crate::schema::Dependency::Schema(sub_schema) => {
crate::validator::schema::Dependency::Schema(sub_schema) => {
schema
.dependent_schemas
.get_or_insert_with(std::collections::BTreeMap::new)
@ -86,7 +86,7 @@ impl Compiler {
// Compile self
if let Some(format_str) = &schema.format {
if let Some(fmt) = crate::formats::FORMATS.get(format_str.as_str()) {
if let Some(fmt) = crate::validator::formats::FORMATS.get(format_str.as_str()) {
schema.compiled_format = Some(CompiledFormat::Func(fmt.func));
}
}
@ -167,7 +167,7 @@ impl Compiler {
/// Recursively traverses the schema tree to build the local registry index.
fn compile_index(
schema: &Arc<Schema>,
registry: &mut crate::registry::Registry,
registry: &mut crate::validator::registry::Registry,
parent_base: Option<String>,
pointer: json_pointer::JsonPointer<String, Vec<String>>,
) {
@ -355,7 +355,7 @@ impl Compiler {
}
// 2. Build ID/Pointer Index
let mut registry = crate::registry::Registry::new();
let mut registry = crate::validator::registry::Registry::new();
// We need a temporary Arc to satisfy compile_index recursion
// But we are modifying root_schema.

View File

@ -1,8 +1,8 @@
use crate::error::ValidationError;
use crate::instance::ValidationInstance;
use crate::result::ValidationResult;
use crate::schema::Schema;
use crate::validator::schema::Schema;
use crate::validator::Validator;
use crate::validator::error::ValidationError;
use crate::validator::instance::ValidationInstance;
use crate::validator::result::ValidationResult;
use std::collections::HashSet;
pub struct ValidationContext<'a, I: ValidationInstance<'a>> {
@ -87,7 +87,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
if let Some(id) = &self.schema.obj.id {
let current_base = self.scope.last().map(|s| s.as_str()).unwrap_or("");
let mut new_base = id.clone();
let mut new_base = id.clone().to_string();
if !current_base.is_empty() {
if let Ok(base_url) = url::Url::parse(current_base) {
if let Ok(joined) = base_url.join(id) {

View File

@ -1,10 +1,21 @@
pub use crate::context::ValidationContext;
pub use crate::error::ValidationError;
pub use crate::instance::{MutableInstance, ReadOnlyInstance};
pub use crate::result::ValidationResult;
pub mod compiler;
pub mod context;
pub mod error;
pub mod formats;
pub mod instance;
pub mod registry;
pub mod result;
pub mod rules;
pub mod schema;
pub mod util;
use crate::registry::Registry;
use crate::schema::Schema;
pub use context::ValidationContext;
pub use error::ValidationError;
pub use instance::{MutableInstance, ReadOnlyInstance};
pub use result::ValidationResult;
use crate::validator::registry::Registry;
use crate::validator::schema::Schema;
use serde_json::Value;
use std::collections::HashSet;
use std::sync::Arc;
@ -57,7 +68,7 @@ impl Validator {
"oneOf": object_refs
});
if let Ok(schema) = serde_json::from_value::<Schema>(schema_json) {
let compiled = crate::compiler::Compiler::compile(schema, None);
let compiled = crate::validator::compiler::Compiler::compile(schema, None);
families.insert(family_name, compiled);
}
}
@ -98,7 +109,7 @@ impl Validator {
"boolean" => val.is_boolean(),
"string" => val.is_string(),
"number" => val.is_number(),
"integer" => crate::util::is_integer(val),
"integer" => crate::validator::util::is_integer(val),
"object" => val.is_object(),
"array" => val.is_array(),
_ => true,
@ -124,7 +135,7 @@ impl Validator {
let joined_str = joined.to_string();
if let Some(indexrs) = &root.obj.compiled_registry {
if let Some(s) = indexrs.schemas.get(&joined_str) {
return Some((ResolvedRef::Local(s.as_ref()), joined_str));
return Some((ResolvedRef::Local(s.as_ref() as &Schema), joined_str));
}
}
@ -133,7 +144,7 @@ impl Validator {
if decoded_str != joined_str {
if let Some(indexrs) = &root.obj.compiled_registry {
if let Some(s) = indexrs.schemas.get(&decoded_str) {
return Some((ResolvedRef::Local(s.as_ref()), decoded_str));
return Some((ResolvedRef::Local(s.as_ref() as &Schema), decoded_str));
}
}
}
@ -149,7 +160,7 @@ impl Validator {
if let Some(indexrs) = &root.obj.compiled_registry {
if let Some(s) = indexrs.schemas.get(&joined_str) {
return Some((ResolvedRef::Local(s.as_ref()), joined_str));
return Some((ResolvedRef::Local(s.as_ref() as &Schema), joined_str));
}
}
@ -158,7 +169,7 @@ impl Validator {
if decoded_str != joined_str {
if let Some(indexrs) = &root.obj.compiled_registry {
if let Some(s) = indexrs.schemas.get(&decoded_str) {
return Some((ResolvedRef::Local(s.as_ref()), decoded_str));
return Some((ResolvedRef::Local(s.as_ref() as &Schema), decoded_str));
}
}
}

View File

@ -1,4 +1,4 @@
use crate::schema::Schema;
use crate::validator::schema::Schema;
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::sync::RwLock;
@ -21,13 +21,13 @@ impl Registry {
}
}
pub fn add(&mut self, schema: crate::schema::Schema) {
pub fn add(&mut self, schema: crate::validator::schema::Schema) {
let id = schema
.obj
.id
.clone()
.expect("Schema must have an $id to be registered");
let compiled = crate::compiler::Compiler::compile(schema, Some(id.clone()));
let compiled = crate::validator::compiler::Compiler::compile(schema, Some(id.clone()));
self.schemas.insert(id, compiled);
}

View File

@ -1,4 +1,4 @@
use crate::error::ValidationError;
use crate::validator::error::ValidationError;
use std::collections::HashSet;
#[derive(Debug, Default, Clone, serde::Serialize)]

View File

@ -2,10 +2,10 @@ use regex::Regex;
use serde_json::Value;
use std::collections::HashSet;
use crate::context::ValidationContext;
use crate::error::ValidationError;
use crate::instance::ValidationInstance;
use crate::result::ValidationResult;
use crate::validator::context::ValidationContext;
use crate::validator::error::ValidationError;
use crate::validator::instance::ValidationInstance;
use crate::validator::result::ValidationResult;
use crate::validator::{ResolvedRef, Validator};
impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
@ -113,7 +113,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
if ref_string == "#" {
let mut new_overrides = self.overrides.clone();
if let Some(props) = &self.schema.properties {
new_overrides.extend(props.keys().cloned());
new_overrides.extend(props.keys().map(|k| k.to_string()));
}
let derived = self.derive(
@ -157,7 +157,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
let mut new_overrides = self.overrides.clone();
if let Some(props) = &self.schema.properties {
new_overrides.extend(props.keys().cloned());
new_overrides.extend(props.keys().map(|k| k.to_string()));
}
let target_ctx = ValidationContext::new(
@ -219,7 +219,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
if let Some(indexrs) = &self.root.obj.compiled_registry {
if let Some(s) = indexrs.schemas.get(&key) {
if s.obj.dynamic_anchor.as_deref() == Some(anchor) {
resolved_target = Some((ResolvedRef::Local(s.as_ref()), key.clone()));
resolved_target = Some((ResolvedRef::Local(s.as_ref()), key.to_string()));
break;
}
}
@ -232,7 +232,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
if s.obj.dynamic_anchor.as_deref() == Some(anchor) {
resolved_target = Some((
ResolvedRef::Global(compiled.as_ref(), s.as_ref()),
key.clone(),
key.to_string(),
));
break;
}
@ -246,7 +246,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
if s.obj.dynamic_anchor.as_deref() == Some(anchor) {
resolved_target = Some((
ResolvedRef::Global(compiled.as_ref(), s.as_ref()),
key.clone(),
key.to_string(),
));
break;
}
@ -279,7 +279,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
let scope_to_pass = if let Some(ref tid) = target_schema.obj.id {
let mut new_scope = effective_scope.clone();
new_scope.push(tid.clone());
new_scope.push(tid.to_string());
new_scope
} else {
if !resource_base.is_empty() && resource_base != current_base_resolved {
@ -293,7 +293,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
let mut new_overrides = self.overrides.clone();
if let Some(props) = &self.schema.properties {
new_overrides.extend(props.keys().cloned());
new_overrides.extend(props.keys().map(|k| k.to_string()));
}
let target_ctx = ValidationContext::new(
@ -343,7 +343,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
let current = self.instance.as_value();
if let Some(ref type_) = self.schema.type_ {
match type_ {
crate::schema::SchemaTypeOrArray::Single(t) => {
crate::validator::schema::SchemaTypeOrArray::Single(t) => {
if !Validator::check_type(t, current) {
result.errors.push(ValidationError {
code: "INVALID_TYPE".to_string(),
@ -352,7 +352,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
});
}
}
crate::schema::SchemaTypeOrArray::Multiple(types) => {
crate::validator::schema::SchemaTypeOrArray::Multiple(types) => {
let mut valid = false;
for t in types {
if Validator::check_type(t, current) {
@ -372,7 +372,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
}
if let Some(ref const_val) = self.schema.const_ {
if !crate::util::equals(current, const_val) {
if !crate::validator::util::equals(current, const_val) {
result.errors.push(ValidationError {
code: "CONST_VIOLATED".to_string(),
message: "Value does not match const".to_string(),
@ -390,7 +390,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
if let Some(ref enum_vals) = self.schema.enum_ {
let mut found = false;
for val in enum_vals {
if crate::util::equals(current, val) {
if crate::validator::util::equals(current, val) {
found = true;
break;
}
@ -451,7 +451,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
}
}
if let Some(multiple_of) = self.schema.multiple_of {
let val = num / multiple_of;
let val: f64 = num / multiple_of;
if (val - val.round()).abs() > f64::EPSILON {
result.errors.push(ValidationError {
code: "MULTIPLE_OF_VIOLATED".to_string(),
@ -510,7 +510,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
let current = self.instance.as_value();
if let Some(ref compiled_fmt) = self.schema.compiled_format {
match compiled_fmt {
crate::compiler::CompiledFormat::Func(f) => {
crate::validator::compiler::CompiledFormat::Func(f) => {
let should = if let Some(s) = current.as_str() {
!s.is_empty()
} else {
@ -526,7 +526,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
}
}
}
crate::compiler::CompiledFormat::Regex(re) => {
crate::validator::compiler::CompiledFormat::Regex(re) => {
if let Some(s) = current.as_str() {
if !re.is_match(s) {
result.errors.push(ValidationError {
@ -630,7 +630,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
);
let item_res = derived.validate()?;
result.merge(item_res);
result.evaluated_keys.insert(key.clone());
result.evaluated_keys.insert(key.to_string());
}
}
}
@ -656,7 +656,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
);
let item_res = derived.validate()?;
result.merge(item_res);
result.evaluated_keys.insert(key.clone());
result.evaluated_keys.insert(key.to_string());
}
}
}
@ -667,7 +667,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
for (key, _) in obj {
let mut locally_matched = false;
if let Some(props) = &self.schema.properties {
if props.contains_key(key) {
if props.contains_key(&key.to_string()) {
locally_matched = true;
}
}
@ -700,7 +700,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
);
let item_res = derived.validate()?;
result.merge(item_res);
result.evaluated_keys.insert(key.clone());
result.evaluated_keys.insert(key.to_string());
}
}
}
@ -709,7 +709,7 @@ impl<'a, I: ValidationInstance<'a>> ValidationContext<'a, I> {
if let Some(ref property_names) = self.schema.property_names {
for key in obj.keys() {
let _new_path = format!("{}/propertyNames/{}", self.path, key);
let val_str = Value::String(key.clone());
let val_str = Value::String(key.to_string());
let ctx = ValidationContext::new(
self.validator,

View File

@ -95,7 +95,7 @@ pub struct SchemaObject {
#[serde(
default,
rename = "const",
deserialize_with = "crate::util::deserialize_some"
deserialize_with = "crate::validator::util::deserialize_some"
)]
pub const_: Option<Value>,
@ -138,13 +138,13 @@ pub struct SchemaObject {
// Compiled Fields (Hidden from JSON/Serde)
#[serde(skip)]
pub compiled_format: Option<crate::compiler::CompiledFormat>,
pub compiled_format: Option<crate::validator::compiler::CompiledFormat>,
#[serde(skip)]
pub compiled_pattern: Option<crate::compiler::CompiledRegex>,
pub compiled_pattern: Option<crate::validator::compiler::CompiledRegex>,
#[serde(skip)]
pub compiled_pattern_properties: Option<Vec<(crate::compiler::CompiledRegex, Arc<Schema>)>>,
pub compiled_pattern_properties: Option<Vec<(crate::validator::compiler::CompiledRegex, Arc<Schema>)>>,
#[serde(skip)]
pub compiled_registry: Option<Arc<crate::registry::Registry>>,
pub compiled_registry: Option<Arc<crate::validator::registry::Registry>>,
}
#[derive(Debug, Clone, Serialize)]

View File

@ -25,7 +25,7 @@ struct TestCase {
expected: Option<serde_json::Value>,
}
// use crate::registry::REGISTRY; // No longer used directly for tests!
// use crate::validator::registry::REGISTRY; // No longer used directly for tests!
use crate::validator::Validator;
use serde_json::Value;
@ -60,7 +60,7 @@ pub fn run_test_file_at_index(path: &str, index: usize) -> Result<(), String> {
// 3. Register root 'schemas' if present (generic test support)
// Some tests use a raw 'schema' or 'schemas' field at the group level
if let Some(schema_val) = &group.schema {
match serde_json::from_value::<crate::schema::Schema>(schema_val.clone()) {
match serde_json::from_value::<crate::validator::schema::Schema>(schema_val.clone()) {
Ok(mut schema) => {
let id_clone = schema.obj.id.clone();
if id_clone.is_some() {
@ -197,7 +197,7 @@ pub fn run_test_file(path: &str) -> Result<(), String> {
// Register main 'schema' if present (Standard style)
if let Some(ref schema_val) = group.schema {
let mut schema: crate::schema::Schema =
let mut schema: crate::validator::schema::Schema =
serde_json::from_value(schema_val.clone()).expect("Failed to parse test schema");
// If schema has no ID, assign unique_id and use add() or manual insert?