Compare commits

..

36 Commits

Author SHA1 Message Date
263cf04ffb version: 1.0.74 2026-03-18 04:40:06 -04:00
00375c2926 more fixes 2026-03-18 04:39:48 -04:00
885b9b5e44 version: 1.0.73 2026-03-18 02:42:34 -04:00
298645ffdb queryer fixes 2026-03-18 02:42:20 -04:00
330280ba48 queryer fixes 2026-03-18 02:41:56 -04:00
02e661d219 version: 1.0.72 2026-03-17 23:10:52 -04:00
f7163e2689 version: 1.0.71 2026-03-17 22:13:55 -04:00
091007006d queryer fixes in place 2026-03-17 22:13:34 -04:00
3d66a7fc3c queryer test checkpoit 2026-03-17 18:00:36 -04:00
e1314496dd queryer test checkpoint 2026-03-17 15:06:02 -04:00
70a27b430d version: 1.0.70 2026-03-17 05:42:40 -04:00
e078b8a74b queryer alias fixes installed 2026-03-17 05:42:27 -04:00
c2c0e62c2d queryer fixes checkpoint 2026-03-17 05:12:03 -04:00
ebb97b3509 version: 1.0.69 2026-03-16 21:21:25 -04:00
5d18847f32 pgrx try catch 2026-03-16 21:21:11 -04:00
4a33e29628 version: 1.0.68 2026-03-16 19:39:34 -04:00
d8fc286e94 better error dropping 2026-03-16 19:39:24 -04:00
507dc6d780 version: 1.0.67 2026-03-16 18:06:45 -04:00
e340039a30 version: 1.0.66 2026-03-16 18:06:05 -04:00
08768e3d42 queryer fixes 2026-03-16 18:05:47 -04:00
6c9e6575ce version: 1.0.65 2026-03-16 06:07:44 -04:00
5d11c4c92c jspg query with familties fixes 2026-03-16 06:07:13 -04:00
25239d635b version: 1.0.64 2026-03-16 00:19:44 -04:00
3bec6a6102 fixed jspg_schemas returning a drop now 2026-03-16 00:19:32 -04:00
6444b300b3 version: 1.0.63 2026-03-15 23:03:15 -04:00
c529c8b8ea added jspg_schemas for mixer 2026-03-15 23:03:03 -04:00
2f15ae3d41 version: 1.0.62 2026-03-15 10:10:19 -04:00
f8528aa85e version: 1.0.61 2026-03-15 10:09:27 -04:00
b6f383e700 version: 1.0.60 2026-03-15 09:46:23 -04:00
db5183930d queryer supports subfiltering now 2026-03-15 07:49:05 -04:00
6de75ba525 merger notification process order testing 2026-03-15 07:31:14 -04:00
6632570712 merger improvements 2026-03-15 03:24:00 -04:00
d4347072f2 stripping schema nulls from stems 2026-03-15 00:13:33 -04:00
290464adc1 gjson pathing for stem paths 2026-03-13 23:35:37 -04:00
d6deaa0b0f version: 1.0.59 2026-03-13 01:17:50 -04:00
6a275e1d90 stems fixes and tests 2026-03-13 01:17:27 -04:00
28 changed files with 3533 additions and 2139 deletions

5
.vscode/extensions.json vendored Normal file
View File

@ -0,0 +1,5 @@
{
"recommendations": [
"rust-lang.rust-analyzer"
]
}

81
Cargo.lock generated
View File

@ -55,6 +55,15 @@ version = "1.0.101"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea"
[[package]]
name = "ar_archive_writer"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7eb93bbb63b9c227414f6eb3a0adfddca591a8ce1e9b60661bb08969b87e340b"
dependencies = [
"object",
]
[[package]]
name = "async-trait"
version = "0.1.89"
@ -874,6 +883,7 @@ dependencies = [
"regex-syntax",
"serde",
"serde_json",
"sqlparser",
"url",
"uuid",
"xxhash-rust",
@ -1040,6 +1050,15 @@ dependencies = [
"objc2-core-foundation",
]
[[package]]
name = "object"
version = "0.37.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe"
dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.21.3"
@ -1377,6 +1396,16 @@ dependencies = [
"unarray",
]
[[package]]
name = "psm"
version = "0.1.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3852766467df634d74f0b2d7819bf8dc483a0eb2e3b0f50f756f9cfe8b0d18d8"
dependencies = [
"ar_archive_writer",
"cc",
]
[[package]]
name = "quick-error"
version = "1.2.3"
@ -1442,6 +1471,26 @@ dependencies = [
"rand_core",
]
[[package]]
name = "recursive"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0786a43debb760f491b1bc0269fe5e84155353c67482b9e60d0cfb596054b43e"
dependencies = [
"recursive-proc-macro-impl",
"stacker",
]
[[package]]
name = "recursive-proc-macro-impl"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "redox_syscall"
version = "0.5.18"
@ -1669,12 +1718,35 @@ dependencies = [
"windows-sys 0.60.2",
]
[[package]]
name = "sqlparser"
version = "0.61.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf5ea8d4d7c808e1af1cbabebca9a2abe603bcefc22294c5b95018d53200cb7"
dependencies = [
"log",
"recursive",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596"
[[package]]
name = "stacker"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d74a23609d509411d10e2176dc2a4346e3b4aea2e7b1869f19fdedbc71c013"
dependencies = [
"cc",
"cfg-if",
"libc",
"psm",
"windows-sys 0.59.0",
]
[[package]]
name = "stringprep"
version = "0.1.5"
@ -2323,6 +2395,15 @@ dependencies = [
"windows-link",
]
[[package]]
name = "windows-sys"
version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b"
dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.60.2"

View File

@ -23,6 +23,7 @@ indexmap = { version = "2.13.0", features = ["serde"] }
moka = { version = "0.12.14", features = ["sync"] }
xxhash-rust = { version = "0.8.15", features = ["xxh64"] }
dashmap = "6.1.0"
sqlparser = "0.61.0"
[dev-dependencies]
pgrx-tests = "0.16.1"

View File

@ -43,7 +43,7 @@ JSPG implements specific extensions to the Draft 2020-12 standard to support the
#### A. Polymorphism & Referencing (`$ref`, `$family`, and Native Types)
* **Native Type Discrimination (`variations`)**: Schemas defined inside a Postgres `type` are Entities. The validator securely and implicitly manages their `"type"` property. If an entity inherits from `user`, incoming JSON can safely define `{"type": "person"}` without errors, thanks to `compiled_variations` inheritance.
* **Structural Inheritance & Viral Infection (`$ref`)**: `$ref` is used exclusively for structural inheritance, *never* for union creation. A Punc request schema that `$ref`s an Entity virally inherits all physical database polymorphism rules for that target.
* **Shape Polymorphism (`$family`)**: Auto-expands polymorphic API lists based on an abstract Descendants Graph. If `{"$family": "widget"}` is used, JSPG evaluates the JSON against every schema that `$ref`s widget.
* **Shape Polymorphism (`$family`)**: Auto-expands polymorphic API lists based on an abstract **Descendants Graph**. If `{"$family": "widget"}` is used, the Validator dynamically identifies *every* schema in the registry that `$ref`s `widget` (e.g., `stock.widget`, `task.widget`) and evaluates the JSON against all of them.
* **Strict Matches & Depth Heuristic**: Polymorphic structures MUST match exactly **one** schema permutation. If multiple inherited struct permutations pass, JSPG applies the **Depth Heuristic Tie-Breaker**, selecting the candidate deepest in the inheritance tree.
#### B. Dot-Notation Schema Resolution & Database Mapping
@ -80,6 +80,8 @@ The Merger provides an automated, high-performance graph synchronization engine
* **Hierarchical Table Inheritance**: The Punc system uses distributed table inheritance (e.g. `person` inherits `user` inherits `organization` inherits `entity`). The Merger splits the incoming JSON payload and performs atomic row updates across *all* relevant tables in the lineage map.
* **The Archive Paradigm**: Data is never deleted in the Punc system. The Merger securely enforces referential integrity by toggling the `archived` Boolean flag on the base `entity` table rather than issuing SQL `DELETE` commands.
* **Change Tracking & Reactivity**: The Merger diffs the incoming JSON against the existing database row (utilizing static, `DashMap`-cached `lk_` SELECT string templates). Every detected change is recorded into the `agreego.change` audit table, tracking the user mapping. It then natively uses `pg_notify` to broadcast a completely flat row-level diff out to the Go WebSocket server for O(1) routing.
* **Flat Structural Beats (Unidirectional Flow)**: The Merger purposefully DOES NOT trace or hydrate outbound Foreign Keys or nested parent structures during writes. It emits completely flat, mathematically perfect structural deltas via `pg_notify` representing only the exact Postgres rows that changed. This guarantees the write-path remains O(1) lightning fast. It is the strict responsibility of the upstream Punc Framework (the Go `Speaker`) to intercept these flat beats, evaluate them against active Websocket Schema Topologies, and dynamically issue targeted `jspg_query` reads to hydrate the exact contextual subgraphs required by listening clients.
* **Pre-Order Notification Traversal**: To support proper topological hydration on the upstream Go Framework, the Merger decouples the `pg_notify` execution from the physical database write execution. The engine collects structural changes and explicitly fires `pg_notify` SQL statements in strict **Pre-Order** (Parent -> Relations -> Children). This guarantees that WebSocket clients receive the parent entity `Beat` prior to any nested child entities, ensuring stable unidirectional data flows without hydration race conditions.
* **Many-to-Many Graph Edge Management**: Operates seamlessly with the global `agreego.relationship` table, allowing the system to represent and merge arbitrary reified M:M relationships directionally between any two entities.
* **Sparse Updates**: Empty JSON strings `""` are directly bound as explicit SQL `NULL` directives to clear data, whilst omitted (missing) properties skip UPDATE execution entirely, ensuring partial UI submissions do not wipe out sibling fields.
* **Unified Return Structure**: To eliminate UI hydration race conditions and multi-user duplication, `jspg_merge` explicitly strips the response graph and returns only the root `{ "id": "uuid" }` (or an array of IDs for list insertions). External APIs can then explicitly call read APIs to fetch the resulting graph, while the UI relies 100% implicitly on the flat `pg_notify` pipeline for reactive state synchronization.
@ -101,20 +103,28 @@ The Queryer transforms Postgres into a pre-compiled Semantic Query Engine via th
* **Array Inclusion**: `{"$in": [values]}`, `{"$nin": [values]}` use native `jsonb_array_elements_text()` bindings to enforce `IN` and `NOT IN` logic without runtime SQL injection risks.
* **Text Matching (ILIKE)**: Evaluates `$eq` or `$ne` against string fields containing the `%` character natively into Postgres `ILIKE` and `NOT ILIKE` partial substring matches.
* **Type Casting**: Safely resolves dynamic combinations by casting values instantly into the physical database types mapped in the schema (e.g. parsing `uuid` bindings to `::uuid`, formatting DateTimes to `::timestamptz`, and numbers to `::numeric`).
### 4. The Stem Engine
* **Polymorphic SQL Generation (`$family`)**: Compiles `$family` properties by analyzing the **Physical Database Variations**, *not* the schema descendants.
* **The Dot Convention**: When a schema requests `$family: "target.schema"`, the compiler extracts the base type (e.g. `schema`) and looks up its Physical Table definition.
* **Multi-Table Branching**: If the Physical Table is a parent to other tables (e.g. `organization` has variations `["organization", "bot", "person"]`), the compiler generates a dynamic `CASE WHEN type = '...' THEN ...` query, expanding into `JOIN`s for each variation.
* **Single-Table Bypass**: If the Physical Table is a leaf node with only one variation (e.g. `person` has variations `["person"]`), the compiler cleanly bypasses `CASE` generation and compiles a simple `SELECT` across the base table, as all schema extensions (e.g. `light.person`, `full.person`) are guaranteed to reside in the exact same physical row.
### The Stem Engine
Rather than over-fetching heavy Entity payloads and trimming them, Punc Framework Websockets depend on isolated subgraphs defined as **Stems**.
A `Stem` is **not a JSON Pointer** or a physical path string (like `/properties/contacts/items/phone_number`). It is simply a declaration of an **Entity Type boundary** that exists somewhere within the compiled JSON Schema graph.
A `Stem` is a declaration of an **Entity Type boundary** that exists somewhere within the compiled JSON Schema graph, expressed using **`gjson` multipath syntax** (e.g., `contacts.#.phone_numbers.#`).
Because `pg_notify` (Beats) fire rigidly from physical Postgres tables (e.g. `{"type": "phone_number"}`), the Go Framework only ever needs to know: "Does the schema `with_contacts.person` contain the `phone_number` Entity anywhere inside its tree?"
Because `pg_notify` (Beats) fire rigidly from physical Postgres tables (e.g. `{"type": "phone_number"}`), the Go Framework only ever needs to know: "Does the schema `with_contacts.person` contain the `phone_number` Entity anywhere inside its tree, and if so, what is the gjson path to iterate its payload?"
* **Initialization:** During startup (`jspg_stems()`), the database crawls all Schemas and maps out every physical Entity Type it references. It builds a flat dictionary of `Schema ID -> [Entity Types]` (e.g. `with_contacts.person -> ["person", "contact", "phone_number", "email_address"]`).
* **Relationship Path Squashing:** When calculating nested string paths structurally to discover these boundaries, JSPG intentionally **omits** properties natively named `target` or `source` if they belong to a native database `relationship` table override. This ensures paths like `phone_numbers/contact/target` correctly register their beat resolution pattern as `phone_numbers/contact/phone_number`.
* **Initialization:** During startup (`jspg_stems()`), the database crawls all Schemas and maps out every physical Entity Type it references. It builds a highly optimized `HashMap<String, HashMap<String, Arc<Stem>>>` providing strictly `O(1)` memory lookups mapping `Schema ID -> { Stem Path -> Entity Type }`.
* **GJSON Pathing:** Unlike standard JSON Pointers, stems utilize `.#` array iterator syntax. The Go web server consumes this native path (e.g. `lines.#`) across the raw Postgres JSON byte payload, extracting all active UUIDs in one massive sub-millisecond sweep without unmarshaling Go ASTs.
* **Polymorphic Condition Selectors:** When trailing paths would otherwise collide because of abstract polymorphic type definitions (e.g., a `target` property bounded by a `oneOf` taking either `phone_number` or `email_address`), JSPG natively appends evaluated `gjson` type conditions into the path (e.g. `contacts.#.target#(type=="phone_number")`). This guarantees `O(1)` key uniqueness in the HashMap while retaining extreme array extraction speeds natively without runtime AST evaluation.
* **Identifier Prioritization:** When determining if a nested object boundary is an Entity, JSPG natively prioritizes defined `$id` tags over `$ref` inheritance pointers to prevent polymorphic boundaries from devolving into their generic base classes.
* **Cyclical Deduplication:** Because Punc relationships often reference back on themselves via deeply nested classes, the Stem Engine applies intelligent path deduplication. If the active `current_path` already ends with the target entity string, it traverses the inheritance properties without appending the entity to the stem path again, eliminating infinite powerset loops.
* **Relationship Path Squashing:** When calculating string paths structurally, JSPG intentionally **omits** properties natively named `target` or `source` if they belong to a native database `relationship` table override.
* **The Go Router**: The Golang Punc framework uses this exact mapping to register WebSocket Beat frequencies exclusively on the Entity types discovered.
* **The Queryer Execution**: When the Go framework asks JSPG to hydrate a partial `phone_number` stem for the `with_contacts.person` schema, instead of jumping through string paths, the SQL Compiler simply reaches into the Schema's AST using the `phone_number` Type string, pulls out exactly that entity's mapping rules, and returns a fully correlated `SELECT` block! This natively handles nested array properties injected via `oneOf` or array references efficiently bypassing runtime powerset expansion.
* **Performance:** These Stem execution structures are fully statically compiled via SPI and map perfectly to `O(1)` real-time routing logic on the application tier.
## 5. Testing & Execution Architecture
JSPG implements a strict separation of concerns to bypass the need to boot a full PostgreSQL cluster for unit and integration testing. Because `pgrx::spi::Spi` directly links to PostgreSQL C-headers, building the library with `cargo test` on macOS natively normally results in fatal `dyld` crashes.
@ -127,7 +137,8 @@ To solve this, JSPG introduces the `DatabaseExecutor` trait inside `src/database
### Universal Test Harness (`src/tests/`)
JSPG abandons the standard `cargo pgrx test` model in favor of native OS testing for a >1000x speed increase (`~0.05s` execution).
1. **JSON Fixtures**: All core interactions are defined abstractly as JSON arrays in `fixtures/`. Each file contains suites of `TestCase` objects with an `action` flag (`validate`, `merge`, `query`).
1. **JSON Fixtures**: All core interactions are defined abstractly as JSON arrays in `fixtures/`. Each file contains suites of `TestCase` objects with an `action` flag (`compile`, `validate`, `merge`, `query`).
2. **`build.rs` Generator**: The build script traverses the JSON fixtures, extracts their structural identities, and generates standard `#[test]` blocks into `src/tests/fixtures.rs`.
3. **Modular Test Dispatcher**: The `src/tests/types/` module deserializes the abstract JSON test payloads into `Suite`, `Case`, and `Expect` data structures.
4. **Unit Context Execution**: When `cargo test` executes, the `Runner` feeds the JSON payloads directly into `case.execute(db)`. Because the tests run natively inside the module via `#cfg(test)`, the Rust compiler globally erases `pgrx` C-linkage, instantiates the `MockExecutor`, and allows for pure structural evaluation of complex database logic completely in memory.
* The `compile` action natively asserts the exact output shape of `jspg_stems`, allowing structural and relationship mapping logic to be tested purely through JSON without writing brute-force manual tests in Rust.
4. **Unit Context Execution**: When `cargo test` executes, the runner iterates the JSON payloads. Because the tests run natively inside the module via `#cfg(test)`, the Rust compiler globally erases `pgrx` C-linkage, instantiates the `MockExecutor`, and allows for pure structural evaluation of complex database logic completely in memory in parallel.

View File

@ -1213,21 +1213,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"date_of_birth\":\"2000-01-01\",",
" \"first_name\":\"Bob\",",
" \"id\":\"{{uuid:customer_id}}\",",
" \"last_name\":\"Smith\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"type\":\"person\"",
" }",
"}')"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
@ -1295,6 +1280,21 @@
" \"type\":\"order\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"date_of_birth\":\"2000-01-01\",",
" \"first_name\":\"Bob\",",
" \"id\":\"{{uuid:customer_id}}\",",
" \"last_name\":\"Smith\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"type\":\"person\"",
" }",
"}')"
]
]
}
@ -1409,21 +1409,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:line_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"order_id\":\"abc\",",
" \"price\":99.0,",
" \"product\":\"Widget\",",
" \"type\":\"order_line\"",
" }",
"}')"
],
[
"INSERT INTO agreego.change (",
" changes,",
@ -1457,6 +1442,21 @@
" \"type\":\"order\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:line_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"order_id\":\"abc\",",
" \"price\":99.0,",
" \"product\":\"Widget\",",
" \"type\":\"order_line\"",
" }",
"}')"
]
]
}
@ -1587,19 +1587,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:phone1_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"number\":\"555-0001\",",
" \"type\":\"phone_number\"",
" }",
"}')"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
@ -1661,23 +1648,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact1_id}}\",",
" \"is_primary\":true,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:phone1_id}}\",",
" \"target_type\":\"phone_number\",",
" \"type\":\"contact\"",
" }",
"}')"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
@ -1722,19 +1692,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:phone2_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"number\":\"555-0002\",",
" \"type\":\"phone_number\"",
" }",
"}')"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
@ -1796,23 +1753,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact2_id}}\",",
" \"is_primary\":false,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:phone2_id}}\",",
" \"target_type\":\"phone_number\",",
" \"type\":\"contact\"",
" }",
"}')"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
@ -1857,19 +1797,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"address\":\"test@example.com\",",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:email1_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"type\":\"email_address\"",
" }",
"}')"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
@ -1931,23 +1858,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact3_id}}\",",
" \"is_primary\":false,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:email1_id}}\",",
" \"target_type\":\"email_address\",",
" \"type\":\"contact\"",
" }",
"}')"
],
[
"INSERT INTO agreego.change (",
" changes,",
@ -1982,6 +1892,96 @@
" \"type\":\"person\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact1_id}}\",",
" \"is_primary\":true,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:phone1_id}}\",",
" \"target_type\":\"phone_number\",",
" \"type\":\"contact\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:phone1_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"number\":\"555-0001\",",
" \"type\":\"phone_number\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact2_id}}\",",
" \"is_primary\":false,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:phone2_id}}\",",
" \"target_type\":\"phone_number\",",
" \"type\":\"contact\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:phone2_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"number\":\"555-0002\",",
" \"type\":\"phone_number\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact3_id}}\",",
" \"is_primary\":false,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:email1_id}}\",",
" \"target_type\":\"email_address\",",
" \"type\":\"contact\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"address\":\"test@example.com\",",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:email1_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"type\":\"email_address\"",
" }",
"}')"
]
]
}

File diff suppressed because it is too large Load Diff

312
fixtures/stems.json Normal file
View File

@ -0,0 +1,312 @@
[
{
"description": "Stem Engine Unit Tests",
"database": {
"puncs": [],
"enums": [],
"relations": [
{
"id": "rel1",
"type": "relation",
"constraint": "fk_contact_entity",
"source_type": "contact",
"source_columns": [
"entity_id"
],
"destination_type": "person",
"destination_columns": [
"id"
],
"prefix": null
},
{
"id": "rel2",
"type": "relation",
"constraint": "fk_relationship_target",
"source_type": "relationship",
"source_columns": [
"target_id",
"target_type"
],
"destination_type": "entity",
"destination_columns": [
"id",
"type"
],
"prefix": "target"
}
],
"types": [
{
"name": "entity",
"hierarchy": [
"entity"
],
"schemas": [
{
"$id": "entity",
"type": "object",
"properties": {}
}
]
},
{
"name": "person",
"hierarchy": [
"person",
"entity"
],
"schemas": [
{
"$id": "person",
"$ref": "entity",
"properties": {}
}
]
},
{
"name": "email_address",
"hierarchy": [
"email_address",
"entity"
],
"schemas": [
{
"$id": "email_address",
"$ref": "entity",
"properties": {}
}
]
},
{
"name": "phone_number",
"hierarchy": [
"phone_number",
"entity"
],
"schemas": [
{
"$id": "phone_number",
"$ref": "entity",
"properties": {}
}
]
},
{
"name": "relationship",
"relationship": true,
"hierarchy": [
"relationship",
"entity"
],
"schemas": [
{
"$id": "relationship",
"$ref": "entity",
"properties": {}
}
]
},
{
"name": "contact",
"relationship": true,
"hierarchy": [
"contact",
"relationship",
"entity"
],
"schemas": [
{
"$id": "contact",
"$ref": "relationship",
"properties": {
"target": {
"oneOf": [
{
"$ref": "phone_number"
},
{
"$ref": "email_address"
}
]
}
}
}
]
},
{
"name": "save_person",
"schemas": [
{
"$id": "save_person.response",
"$ref": "person",
"properties": {
"contacts": {
"type": "array",
"items": {
"$ref": "contact"
}
}
}
}
]
}
]
},
"tests": [
{
"description": "correctly squashes deep oneOf refs through array paths",
"action": "compile",
"expect": {
"success": true,
"stems": {
"contact": {
"": {
"schema": {
"$id": "contact",
"$ref": "relationship",
"properties": {
"target": {
"oneOf": [
{
"$ref": "phone_number"
},
{
"$ref": "email_address"
}
]
}
}
},
"type": "contact"
},
"target#(type==\"email_address\")": {
"relation": "target_id",
"schema": {
"$id": "email_address",
"$ref": "entity",
"properties": {}
},
"type": "email_address"
},
"target#(type==\"phone_number\")": {
"relation": "target_id",
"schema": {
"$id": "phone_number",
"$ref": "entity",
"properties": {}
},
"type": "phone_number"
}
},
"email_address": {
"": {
"schema": {
"$id": "email_address",
"$ref": "entity",
"properties": {}
},
"type": "email_address"
}
},
"entity": {
"": {
"schema": {
"$id": "entity",
"properties": {},
"type": "object"
},
"type": "entity"
}
},
"person": {
"": {
"schema": {
"$id": "person",
"$ref": "entity",
"properties": {}
},
"type": "person"
}
},
"phone_number": {
"": {
"schema": {
"$id": "phone_number",
"$ref": "entity",
"properties": {}
},
"type": "phone_number"
}
},
"relationship": {
"": {
"schema": {
"$id": "relationship",
"$ref": "entity",
"properties": {}
},
"type": "relationship"
}
},
"save_person.response": {
"": {
"schema": {
"$id": "save_person.response",
"$ref": "person",
"properties": {
"contacts": {
"items": {
"$ref": "contact"
},
"type": "array"
}
}
},
"type": "person"
},
"contacts.#": {
"relation": "contacts_id",
"schema": {
"$id": "contact",
"$ref": "relationship",
"properties": {
"target": {
"oneOf": [
{
"$ref": "phone_number"
},
{
"$ref": "email_address"
}
]
}
}
},
"type": "contact"
},
"contacts.#.target#(type==\"email_address\")": {
"relation": "target_id",
"schema": {
"$id": "email_address",
"$ref": "entity",
"properties": {}
},
"type": "email_address"
},
"contacts.#.target#(type==\"phone_number\")": {
"relation": "target_id",
"schema": {
"$id": "phone_number",
"$ref": "entity",
"properties": {}
},
"type": "phone_number"
}
}
}
}
}
]
}
]

View File

@ -24,20 +24,28 @@ impl DatabaseExecutor for SpiExecutor {
}
}
Spi::connect(|client| {
match client.select(sql, Some(args_with_oid.len() as i64), &args_with_oid) {
Ok(tup_table) => {
let mut results = Vec::new();
for row in tup_table {
if let Ok(Some(jsonb)) = row.get::<pgrx::JsonB>(1) {
results.push(jsonb.0);
pgrx::PgTryBuilder::new(|| {
Spi::connect(|client| {
pgrx::notice!("JSPG_SQL: {}", sql);
match client.select(sql, Some(args_with_oid.len() as i64), &args_with_oid) {
Ok(tup_table) => {
let mut results = Vec::new();
for row in tup_table {
if let Ok(Some(jsonb)) = row.get::<pgrx::JsonB>(1) {
results.push(jsonb.0);
}
}
Ok(Value::Array(results))
}
Ok(Value::Array(results))
Err(e) => Err(format!("SPI Query Fetch Failure: {}", e)),
}
Err(e) => Err(format!("SPI Query Fetch Failure: {}", e)),
}
})
})
.catch_others(|cause| {
pgrx::warning!("JSPG Caught Native Postgres Error: {:?}", cause);
Err(format!("{:?}", cause))
})
.execute()
}
fn execute(&self, sql: &str, args: Option<&[Value]>) -> Result<(), String> {
@ -52,12 +60,20 @@ impl DatabaseExecutor for SpiExecutor {
}
}
Spi::connect_mut(|client| {
match client.update(sql, Some(args_with_oid.len() as i64), &args_with_oid) {
Ok(_) => Ok(()),
Err(e) => Err(format!("SPI Execution Failure: {}", e)),
}
pgrx::PgTryBuilder::new(|| {
Spi::connect_mut(|client| {
pgrx::notice!("JSPG_SQL: {}", sql);
match client.update(sql, Some(args_with_oid.len() as i64), &args_with_oid) {
Ok(_) => Ok(()),
Err(e) => Err(format!("SPI Execution Failure: {}", e)),
}
})
})
.catch_others(|cause| {
pgrx::warning!("JSPG Caught Native Postgres Error: {:?}", cause);
Err(format!("{:?}", cause))
})
.execute()
}
fn auth_user_id(&self) -> Result<String, String> {

View File

@ -32,7 +32,7 @@ pub struct Database {
pub enums: HashMap<String, Enum>,
pub types: HashMap<String, Type>,
pub puncs: HashMap<String, Punc>,
pub relations: HashMap<String, Relation>,
pub relations: Vec<Relation>,
pub schemas: HashMap<String, Schema>,
// Map of Schema ID -> { Entity Type -> Target Subschema Arc }
pub stems: HashMap<String, HashMap<String, Arc<Stem>>>,
@ -46,7 +46,7 @@ impl Database {
let mut db = Self {
enums: HashMap::new(),
types: HashMap::new(),
relations: HashMap::new(),
relations: Vec::new(),
puncs: HashMap::new(),
schemas: HashMap::new(),
stems: HashMap::new(),
@ -78,7 +78,11 @@ impl Database {
for item in arr {
match serde_json::from_value::<Relation>(item.clone()) {
Ok(def) => {
db.relations.insert(def.constraint.clone(), def);
if db.types.contains_key(&def.source_type)
&& db.types.contains_key(&def.destination_type)
{
db.relations.push(def);
}
}
Err(e) => println!("DATABASE RELATION PARSE FAILED: {:?}", e),
}
@ -137,7 +141,6 @@ impl Database {
self.executor.timestamp()
}
/// Organizes the graph of the database, compiling regex, format functions, and caching relationships.
pub fn compile(&mut self) -> Result<(), crate::drop::Drop> {
self.collect_schemas();
self.collect_depths();
@ -226,6 +229,79 @@ impl Database {
self.descendants = descendants;
}
pub fn get_relation(
&self,
parent_type: &str,
child_type: &str,
prop_name: &str,
relative_keys: Option<&Vec<String>>,
) -> Option<(&Relation, bool)> {
if parent_type == "entity" && child_type == "entity" {
return None; // Ignore entity <-> entity generic fallbacks, they aren't useful edges
}
let p_def = self.types.get(parent_type)?;
let c_def = self.types.get(child_type)?;
let mut matching_rels = Vec::new();
let mut directions = Vec::new();
for rel in &self.relations {
let is_forward = p_def.hierarchy.contains(&rel.source_type)
&& c_def.hierarchy.contains(&rel.destination_type);
let is_reverse = p_def.hierarchy.contains(&rel.destination_type)
&& c_def.hierarchy.contains(&rel.source_type);
if is_forward {
matching_rels.push(rel);
directions.push(true);
} else if is_reverse {
matching_rels.push(rel);
directions.push(false);
}
}
if matching_rels.is_empty() {
return None;
}
if matching_rels.len() == 1 {
return Some((matching_rels[0], directions[0]));
}
let mut chosen_idx = 0;
let mut resolved = false;
// Reduce ambiguity with prefix
for (i, rel) in matching_rels.iter().enumerate() {
if let Some(prefix) = &rel.prefix {
if prefix == prop_name {
chosen_idx = i;
resolved = true;
break;
}
}
}
// Reduce ambiguity by checking if relative payload OMITS the prefix (M:M heuristic)
if !resolved && relative_keys.is_some() {
let keys = relative_keys.unwrap();
let mut missing_prefix_ids = Vec::new();
for (i, rel) in matching_rels.iter().enumerate() {
if let Some(prefix) = &rel.prefix {
if !keys.contains(prefix) {
missing_prefix_ids.push(i);
}
}
}
if missing_prefix_ids.len() == 1 {
chosen_idx = missing_prefix_ids[0];
}
}
Some((matching_rels[chosen_idx], directions[chosen_idx]))
}
fn collect_descendants_recursively(
target: &str,
direct_refs: &HashMap<String, Vec<String>>,
@ -265,11 +341,12 @@ impl Database {
String::from(""),
None,
None,
false,
&mut inner_map,
Vec::new(),
&mut errors,
);
if !inner_map.is_empty() {
println!("SCHEMA: {} STEMS: {:?}", schema_id, inner_map.keys());
db_stems.insert(schema_id, inner_map);
}
}
@ -287,24 +364,20 @@ impl Database {
db: &Database,
root_schema_id: &str,
schema: &Schema,
mut current_path: String,
current_path: String,
parent_type: Option<String>,
property_name: Option<String>,
is_polymorphic: bool,
inner_map: &mut HashMap<String, Arc<Stem>>,
seen_entities: Vec<String>,
errors: &mut Vec<crate::drop::Error>,
) {
let mut is_entity = false;
let mut entity_type = String::new();
let mut examine_id = None;
if let Some(ref r) = schema.obj.r#ref {
examine_id = Some(r.clone());
} else if let Some(ref id) = schema.obj.id {
examine_id = Some(id.clone());
}
if let Some(target) = examine_id {
let parts: Vec<&str> = target.split('.').collect();
// First check if the Schema's $id is a native Database Type
if let Some(ref id) = schema.obj.id {
let parts: Vec<&str> = id.split('.').collect();
if let Some(last_seg) = parts.last() {
if db.types.contains_key(*last_seg) {
is_entity = true;
@ -313,52 +386,59 @@ impl Database {
}
}
// If not found via $id, check the $ref pointer
// This allows ad-hoc schemas (like `save_person.response`) to successfully adopt the Type of what they $ref
if !is_entity {
if let Some(ref r) = schema.obj.r#ref {
let parts: Vec<&str> = r.split('.').collect();
if let Some(last_seg) = parts.last() {
if db.types.contains_key(*last_seg) {
is_entity = true;
entity_type = last_seg.to_string();
}
}
}
}
if is_entity {
if seen_entities.contains(&entity_type) {
return; // Break cyclical schemas!
}
}
let mut relation_col = None;
if is_entity {
if let (Some(pt), Some(prop)) = (&parent_type, &property_name) {
let expected_col = format!("{}_id", prop);
let mut found = false;
for rel in db.relations.values() {
if (rel.source_type == *pt && rel.destination_type == entity_type)
|| (rel.source_type == entity_type && rel.destination_type == *pt)
{
if rel.source_columns.contains(&expected_col) {
relation_col = Some(expected_col.clone());
found = true;
break;
}
if let Some((rel, _)) = db.get_relation(pt, &entity_type, prop, None) {
if rel.source_columns.contains(&expected_col) {
relation_col = Some(expected_col.clone());
found = true;
}
}
if !found {
relation_col = Some(expected_col);
}
}
let mut final_path = current_path.clone();
if is_polymorphic && !final_path.is_empty() && !final_path.ends_with(&entity_type) {
if final_path.ends_with(".#") {
final_path = format!("{}(type==\"{}\")", final_path, entity_type);
} else {
final_path = format!("{}#(type==\"{}\")", final_path, entity_type);
}
}
let stem = Stem {
r#type: entity_type.clone(),
relation: relation_col,
schema: Arc::new(schema.clone()),
};
let mut branch_path = current_path.clone();
if !current_path.is_empty() {
branch_path = format!("{}/{}", current_path, entity_type);
}
if inner_map.contains_key(&branch_path) {
errors.push(crate::drop::Error {
code: "STEM_COLLISION".to_string(),
message: format!("The stem path `{}` resolves to multiple Entity boundaries. This usually occurs during un-wrapped $family or oneOf polymorphic schemas where multiple Entities are directly assigned to the same property. To fix this, encapsulate the polymorphic branch.", branch_path),
details: crate::drop::ErrorDetails {
path: root_schema_id.to_string(),
},
});
}
inner_map.insert(branch_path.clone(), Arc::new(stem));
// Update current_path for structural children
current_path = branch_path;
inner_map.insert(final_path, Arc::new(stem));
}
let next_parent = if is_entity {
@ -367,33 +447,22 @@ impl Database {
parent_type.clone()
};
let pass_seen = if is_entity {
let mut ns = seen_entities.clone();
ns.push(entity_type.clone());
ns
} else {
seen_entities.clone()
};
// Properties branch
if let Some(props) = &schema.obj.properties {
for (k, v) in props {
// Bypass target and source properties if we are in a relationship
if let Some(parent_str) = &next_parent {
if let Some(pt) = db.types.get(parent_str) {
if pt.relationship && (k == "target" || k == "source") {
Self::discover_stems(
db,
root_schema_id,
v,
current_path.clone(),
next_parent.clone(),
Some(k.clone()),
inner_map,
errors,
);
continue;
}
}
}
// Standard Property Pathing
let next_path = if current_path.is_empty() {
k.clone()
} else {
format!("{}/{}", current_path, k)
format!("{}.{}", current_path, k)
};
Self::discover_stems(
@ -403,7 +472,9 @@ impl Database {
next_path,
next_parent.clone(),
Some(k.clone()),
false,
inner_map,
pass_seen.clone(),
errors,
);
}
@ -411,18 +482,47 @@ impl Database {
// Array Item branch
if let Some(items) = &schema.obj.items {
let next_path = if current_path.is_empty() {
String::from("#")
} else {
format!("{}.#", current_path)
};
Self::discover_stems(
db,
root_schema_id,
items,
current_path.clone(),
next_path,
next_parent.clone(),
property_name.clone(),
false,
inner_map,
pass_seen.clone(),
errors,
);
}
// Follow external reference if we didn't just crawl local properties
if schema.obj.properties.is_none() && schema.obj.items.is_none() && schema.obj.one_of.is_none()
{
if let Some(ref r) = schema.obj.r#ref {
if let Some(target_schema) = db.schemas.get(r) {
Self::discover_stems(
db,
root_schema_id,
target_schema,
current_path.clone(),
next_parent.clone(),
property_name.clone(),
is_polymorphic,
inner_map,
seen_entities.clone(),
errors,
);
}
}
}
// Polymorphism branch
if let Some(arr) = &schema.obj.one_of {
for v in arr {
@ -433,7 +533,9 @@ impl Database {
current_path.clone(),
next_parent.clone(),
property_name.clone(),
true,
inner_map,
pass_seen.clone(),
errors,
);
}
@ -447,7 +549,9 @@ impl Database {
current_path.clone(),
next_parent.clone(),
property_name.clone(),
is_polymorphic,
inner_map,
pass_seen.clone(),
errors,
);
}

View File

@ -5,7 +5,6 @@ use std::sync::Arc;
// Schema mirrors the Go Punc Generator's schema struct for consistency.
// It is an order-preserving representation of a JSON Schema.
pub fn deserialize_some<'de, D>(deserializer: D) -> Result<Option<Value>, D::Error>
where
D: serde::Deserializer<'de>,
@ -13,117 +12,159 @@ where
let v = Value::deserialize(deserializer)?;
Ok(Some(v))
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct SchemaObject {
// Core Schema Keywords
#[serde(rename = "$id")]
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(rename = "$ref")]
#[serde(skip_serializing_if = "Option::is_none")]
pub r#ref: Option<String>,
/*
Note: The `Ref` field in the Go struct is a pointer populated by the linker.
In Rust, we might handle this differently (e.g., separate lookup or Rc/Arc),
so we omit the direct recursive `Ref` field for now and rely on `ref_string`.
*/
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub title: Option<String>,
#[serde(default)] // Allow missing type
#[serde(rename = "type")]
#[serde(skip_serializing_if = "Option::is_none")]
pub type_: Option<SchemaTypeOrArray>, // Handles string or array of strings
// Object Keywords
#[serde(skip_serializing_if = "Option::is_none")]
pub properties: Option<BTreeMap<String, Arc<Schema>>>,
#[serde(rename = "patternProperties")]
#[serde(skip_serializing_if = "Option::is_none")]
pub pattern_properties: Option<BTreeMap<String, Arc<Schema>>>,
#[serde(rename = "additionalProperties")]
#[serde(skip_serializing_if = "Option::is_none")]
pub additional_properties: Option<Arc<Schema>>,
#[serde(rename = "$family")]
#[serde(skip_serializing_if = "Option::is_none")]
pub family: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub required: Option<Vec<String>>,
// dependencies can be schema dependencies or property dependencies
#[serde(skip_serializing_if = "Option::is_none")]
pub dependencies: Option<BTreeMap<String, Dependency>>,
// Array Keywords
#[serde(rename = "items")]
#[serde(skip_serializing_if = "Option::is_none")]
pub items: Option<Arc<Schema>>,
#[serde(rename = "prefixItems")]
#[serde(skip_serializing_if = "Option::is_none")]
pub prefix_items: Option<Vec<Arc<Schema>>>,
// String Validation
#[serde(rename = "minLength")]
#[serde(skip_serializing_if = "Option::is_none")]
pub min_length: Option<f64>,
#[serde(rename = "maxLength")]
#[serde(skip_serializing_if = "Option::is_none")]
pub max_length: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pattern: Option<String>,
// Array Validation
#[serde(rename = "minItems")]
#[serde(skip_serializing_if = "Option::is_none")]
pub min_items: Option<f64>,
#[serde(rename = "maxItems")]
#[serde(skip_serializing_if = "Option::is_none")]
pub max_items: Option<f64>,
#[serde(rename = "uniqueItems")]
#[serde(skip_serializing_if = "Option::is_none")]
pub unique_items: Option<bool>,
#[serde(rename = "contains")]
#[serde(skip_serializing_if = "Option::is_none")]
pub contains: Option<Arc<Schema>>,
#[serde(rename = "minContains")]
#[serde(skip_serializing_if = "Option::is_none")]
pub min_contains: Option<f64>,
#[serde(rename = "maxContains")]
#[serde(skip_serializing_if = "Option::is_none")]
pub max_contains: Option<f64>,
// Object Validation
#[serde(rename = "minProperties")]
#[serde(skip_serializing_if = "Option::is_none")]
pub min_properties: Option<f64>,
#[serde(rename = "maxProperties")]
#[serde(skip_serializing_if = "Option::is_none")]
pub max_properties: Option<f64>,
#[serde(rename = "propertyNames")]
#[serde(skip_serializing_if = "Option::is_none")]
pub property_names: Option<Arc<Schema>>,
// Numeric Validation
#[serde(skip_serializing_if = "Option::is_none")]
pub format: Option<String>,
#[serde(rename = "enum")]
#[serde(skip_serializing_if = "Option::is_none")]
pub enum_: Option<Vec<Value>>, // `enum` is a reserved keyword in Rust
#[serde(
default,
rename = "const",
deserialize_with = "crate::database::schema::deserialize_some"
)]
#[serde(skip_serializing_if = "Option::is_none")]
pub const_: Option<Value>,
// Numeric Validation
#[serde(rename = "multipleOf")]
#[serde(skip_serializing_if = "Option::is_none")]
pub multiple_of: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub minimum: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub maximum: Option<f64>,
#[serde(rename = "exclusiveMinimum")]
#[serde(skip_serializing_if = "Option::is_none")]
pub exclusive_minimum: Option<f64>,
#[serde(rename = "exclusiveMaximum")]
#[serde(skip_serializing_if = "Option::is_none")]
pub exclusive_maximum: Option<f64>,
// Combining Keywords
#[serde(rename = "allOf")]
#[serde(skip_serializing_if = "Option::is_none")]
pub all_of: Option<Vec<Arc<Schema>>>,
#[serde(rename = "oneOf")]
#[serde(skip_serializing_if = "Option::is_none")]
pub one_of: Option<Vec<Arc<Schema>>>,
#[serde(rename = "not")]
#[serde(skip_serializing_if = "Option::is_none")]
pub not: Option<Arc<Schema>>,
#[serde(rename = "if")]
#[serde(skip_serializing_if = "Option::is_none")]
pub if_: Option<Arc<Schema>>,
#[serde(rename = "then")]
#[serde(skip_serializing_if = "Option::is_none")]
pub then_: Option<Arc<Schema>>,
#[serde(rename = "else")]
#[serde(skip_serializing_if = "Option::is_none")]
pub else_: Option<Arc<Schema>>,
// Custom Vocabularies
#[serde(skip_serializing_if = "Option::is_none")]
pub form: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub display: Option<Vec<String>>,
#[serde(rename = "enumNames")]
#[serde(skip_serializing_if = "Option::is_none")]
pub enum_names: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub control: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub actions: Option<BTreeMap<String, Action>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub computer: Option<String>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub extensible: Option<bool>,
#[serde(skip)]
@ -331,7 +372,9 @@ pub enum SchemaTypeOrArray {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Action {
#[serde(skip_serializing_if = "Option::is_none")]
pub navigate: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub punc: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@ -8,10 +8,5 @@ pub struct Stem {
#[serde(skip_serializing_if = "Option::is_none")]
pub relation: Option<String>,
// The actual database schema node mapping for
// O(1) jump table execution for queryer.
//
// Automatically skipped from `jspg_stems()` JSON payload output.
#[serde(skip)]
pub schema: Arc<Schema>,
}

View File

@ -67,6 +67,10 @@ pub struct Error {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ErrorDetails {
pub path: String,
// Extensions can be added here (package, cause, etc)
// For now, validator only provides path
#[serde(skip_serializing_if = "Option::is_none")]
pub cause: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub context: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub schema: Option<String>,
}

View File

@ -1,79 +0,0 @@
# Entity Engine (jspg)
## Overview
This document outlines the architecture for moving the complex, CPU-bound row merging (`merge_entity`) and dynamic querying (`query_entity`) functionality out of PL/pgSQL and directly into the Rust-based `jspg` extension.
By treating the `jspg` schema registry as the absolute Single Source of Truth, we can leverage Rust and the Postgres query planner (via SPI) to achieve near O(1) execution planning for deeply nested reads, complex relational writes, and partial hydration beats.
## The Problem
Historically, `agreego.merge_entity` (PL/pgSQL) handled nested writes by segmenting JSON, resolving types, searching hierarchies, and dynamically concatenating `INSERT`/`UPDATE` statements. `agreego.query_entity` was conceived to do the same for reads (handling base security, inheritance JOINs, and filtering automatically).
However, this design hits three major limitations:
1. **CPU Bound Operations**: PL/pgSQL is comparatively slow at complex string concatenation and massive JSON graph traversals.
2. **Query Planning Cache Busting**: Generating massive, dynamic SQL strings prevents Postgres from caching query plans. `EXECUTE dynamic_sql` forces the planner to re-evaluate statistics and execution paths on every function call, leading to extreme latency spikes at scale.
3. **The Hydration Beat Problem**: The Punc framework requires fetching specific UI "fragments" (e.g. just the `target` of a specific `contact` array element) to feed WebSockets. Hand-rolling CTEs for every possible sub-tree permutation to serve beats will quickly become unmaintainable.
## The Solution: Semantic Engine Database
By migrating `merge_entity` and `query_entity` to `jspg`, we turn the database into a pre-compiled Semantic Engine.
1. **Schema-to-SQL Compilation**: During the connection lifecycle (`cache_json_schemas()`), `jspg` statically analyzes the JSON Schema AST. It acts as a compiler, translating the schema layout into perfectly optimized, multi-JOIN SQL query strings for *every* node/fragment in the schema.
2. **Prepared Statements (SPI)**: `jspg` feeds these computed SQL strings into the Postgres SPI (Server Programming Interface) using `Spi::prepare()`. Postgres calculates the query execution plan *once* and caches it in memory.
3. **Instant Execution**: When a Punc needs data, `jspg` retrieves the cached PreparedStatement, securely binds binary parameters, and executes the pre-planned query instantly.
## Architecture
### 1. The `cache_json_schemas()` Expansion
The initialization function must now ingest `types` and `agreego.relation` data so the internal `Registry` holds the full Relational Graph.
During schema compilation, if a schema is associated with a database Type, it triggers the **SQL Compiler Phase**:
- It builds a table-resolution AST mapping to `JOIN` clauses based on foreign keys.
- It translates JSON schema properties to `SELECT jsonb_build_object(...)`.
- It generates static SQL for `INSERT`, `UPDATE`, and `SELECT` (including path-based fragment SELECTs).
- It calls `Spi::prepare()` to cache these plans inside the Session Context.
### 2. `agreego.query_entity` (Reads)
* **API**: `agreego.query_entity(schema_id TEXT, fragment_path TEXT, cue JSONB)`
* **Execution**:
* Rust locates the target Schema in memory.
* It uses the `fragment_path` (e.g., `/` for a full read, or `/contacts/0/target` for a hydration beat) to fetch the exact PreparedStatement.
* It binds variables (Row Level Security IDs, filtering, pagination limit/offset) parsed from the `cue`.
* SPI returns the heavily nested, pre-aggregated `JSONB` instantly.
### 3. Unified Aggregations & Computeds (Schema `query` objects)
We replace the concept of a complex string parser (PEL) with native structured JSON JSON objects using the `query` keyword.
A structured `query` block in the schema:
```json
"total": {
"type": "number",
"readOnly": true,
"query": {
"aggregate": "sum",
"source": "lines",
"field": "amount"
}
}
```
* **Frontend (Dart)**: The Go generator parses the JSON object directly and emits the native UI aggregation code (e.g. `lines.fold(...)`) for instant UI updates before the server responds.
* **Backend (jspg)**: The Rust SQL compiler natively deserializes the `query` object into an internal struct. It recognizes the `aggregate` instruction and outputs a Postgres native aggregation: `(SELECT SUM(amount) FROM agreego.invoice_line WHERE invoice_id = t1.id)` as a column in the prepared `SELECT` statement.
* **Unification**: The database-calculated value acts as the authoritative truth, synchronizing and correcting the client automatically on the resulting `beat`.
### 4. `agreego.merge_entity` (Writes)
* **API**: `agreego.merge_entity(cue JSONB)`
* **Execution**:
* Parses the incoming `cue` JSON via `serde_json` at C-like speeds.
* Recursively validates and *constructively masks* the tree against the strict schema.
* Traverses the relational graph (which is fully loaded in the `jspg` registry).
* Binds the new values directly into the cached `INSERT` or `UPDATE` SPI prepared statements for each table in the hierarchy.
* Evaluates field differences and natively uses `pg_notify` to fire atomic row-level changes for the Go Beat framework.
## Roadmap
1. **Relational Ingestion**: Update `cache_json_schemas` to pass relational metadata (`agreego.relation` rows) into the `jspg` registry cache.
2. **The SQL Compiler**: Build the AST-to-String compiler in Rust that reads properties, `$ref`s, and `$family` trees to piece together generic SQL.
3. **SPI Caching**: Integrate `Spi::prepare` into the `Validator` creation phase.
4. **Rust `merge_entity`**: Port the constructive structural extraction loop from PL/pgSQL to Rust.
5. **Rust `query_entity`**: Abstract the query runtime, mapping Punc JSON `filters` arrays to SPI-bound parameters safely.

View File

@ -31,6 +31,9 @@ fn jspg_failure() -> JsonB {
message: "JSPG extension has not been initialized via jspg_setup".to_string(),
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: None,
schema: None,
},
};
let drop = crate::drop::Drop::with_errors(vec![error]);
@ -111,9 +114,7 @@ pub fn jspg_validate(schema_id: &str, instance: JsonB) -> JsonB {
}
#[cfg_attr(not(test), pg_extern)]
pub fn jspg_stems() -> JsonB {
use serde_json::{Map, Value};
pub fn jspg_schemas() -> JsonB {
let engine_opt = {
let lock = GLOBAL_JSPG.read().unwrap();
lock.clone()
@ -121,9 +122,30 @@ pub fn jspg_stems() -> JsonB {
match engine_opt {
Some(engine) => {
JsonB(serde_json::to_value(&engine.database.stems).unwrap_or(Value::Object(Map::new())))
let schemas_json = serde_json::to_value(&engine.database.schemas)
.unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
let drop = crate::drop::Drop::success_with_val(schemas_json);
JsonB(serde_json::to_value(drop).unwrap())
}
None => JsonB(Value::Object(Map::new())),
None => jspg_failure(),
}
}
#[cfg_attr(not(test), pg_extern)]
pub fn jspg_stems() -> JsonB {
let engine_opt = {
let lock = GLOBAL_JSPG.read().unwrap();
lock.clone()
};
match engine_opt {
Some(engine) => {
let stems_json = serde_json::to_value(&engine.database.stems)
.unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
let drop = crate::drop::Drop::success_with_val(stems_json);
JsonB(serde_json::to_value(drop).unwrap())
}
None => jspg_failure(),
}
}

View File

@ -21,63 +21,102 @@ impl Merger {
}
pub fn merge(&self, data: Value) -> crate::drop::Drop {
match self.merge_internal(data) {
Ok(val) => {
let stripped_val = match val {
Value::Object(mut map) => {
let mut notifications_queue = Vec::new();
let result = self.merge_internal(data, &mut notifications_queue);
let val_resolved = match result {
Ok(val) => val,
Err(msg) => {
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "MERGE_FAILED".to_string(),
message: msg,
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: None,
schema: None,
},
}]);
}
};
// Execute the globally collected, pre-ordered notifications last!
for notify_sql in notifications_queue {
if let Err(e) = self.db.execute(&notify_sql, None) {
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "MERGE_FAILED".to_string(),
message: format!("Executor Error in pre-ordered notify: {:?}", e),
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: None,
schema: None,
},
}]);
}
}
let stripped_val = match val_resolved {
Value::Object(mut map) => {
let mut out = serde_json::Map::new();
if let Some(id) = map.remove("id") {
out.insert("id".to_string(), id);
}
Value::Object(out)
}
Value::Array(arr) => {
let mut out_arr = Vec::new();
for item in arr {
if let Value::Object(mut map) = item {
let mut out = serde_json::Map::new();
if let Some(id) = map.remove("id") {
out.insert("id".to_string(), id);
}
Value::Object(out)
out_arr.push(Value::Object(out));
} else {
out_arr.push(Value::Null);
}
Value::Array(arr) => {
let mut out_arr = Vec::new();
for item in arr {
if let Value::Object(mut map) = item {
let mut out = serde_json::Map::new();
if let Some(id) = map.remove("id") {
out.insert("id".to_string(), id);
}
out_arr.push(Value::Object(out));
} else {
out_arr.push(Value::Null);
}
}
Value::Array(out_arr)
}
other => other,
};
crate::drop::Drop::success_with_val(stripped_val)
}
Value::Array(out_arr)
}
Err(msg) => crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "MERGE_FAILED".to_string(),
message: msg,
details: crate::drop::ErrorDetails {
path: "".to_string(),
},
}]),
}
other => other,
};
crate::drop::Drop::success_with_val(stripped_val)
}
pub(crate) fn merge_internal(&self, data: Value) -> Result<Value, String> {
pub(crate) fn merge_internal(
&self,
data: Value,
notifications: &mut Vec<String>,
) -> Result<Value, String> {
match data {
Value::Array(items) => self.merge_array(items),
Value::Object(map) => self.merge_object(map),
Value::Array(items) => self.merge_array(items, notifications),
Value::Object(map) => self.merge_object(map, notifications),
_ => Err("Invalid merge payload: root must be an Object or Array".to_string()),
}
}
fn merge_array(&self, items: Vec<Value>) -> Result<Value, String> {
fn merge_array(
&self,
items: Vec<Value>,
notifications: &mut Vec<String>,
) -> Result<Value, String> {
let mut resolved_items = Vec::new();
for item in items {
let resolved = self.merge_internal(item)?;
let resolved = self.merge_internal(item, notifications)?;
resolved_items.push(resolved);
}
Ok(Value::Array(resolved_items))
}
fn merge_object(&self, obj: serde_json::Map<String, Value>) -> Result<Value, String> {
fn merge_object(
&self,
obj: serde_json::Map<String, Value>,
notifications: &mut Vec<String>,
) -> Result<Value, String> {
let queue_start = notifications.len();
let type_name = match obj.get("type").and_then(|v| v.as_str()) {
Some(t) => t.to_string(),
None => return Err("Missing required 'type' field on object".to_string()),
@ -134,10 +173,23 @@ impl Merger {
_ => continue,
};
let relative_relation = self.get_entity_relation(type_def, &relative, &relation_name)?;
// Attempt to extract relative object type name
let relative_type_name = match relative.get("type").and_then(|v| v.as_str()) {
Some(t) => t,
None => continue,
};
if let Some(relation) = relative_relation {
let parent_is_source = type_def.hierarchy.contains(&relation.source_type);
let relative_keys: Vec<String> = relative.keys().cloned().collect();
// Call central Database O(1) graph logic
let relative_relation = self.db.get_relation(
&type_def.name,
relative_type_name,
&relation_name,
Some(&relative_keys),
);
if let Some((relation, parent_is_source)) = relative_relation {
if parent_is_source {
// Parent holds FK to Child. Child MUST be generated FIRST.
@ -147,7 +199,7 @@ impl Merger {
}
}
let merged_relative = match self.merge_internal(Value::Object(relative))? {
let merged_relative = match self.merge_internal(Value::Object(relative), notifications)? {
Value::Object(m) => m,
_ => continue,
};
@ -174,7 +226,7 @@ impl Merger {
&entity_fields,
);
let merged_relative = match self.merge_internal(Value::Object(relative))? {
let merged_relative = match self.merge_internal(Value::Object(relative), notifications)? {
Value::Object(m) => m,
_ => continue,
};
@ -223,9 +275,23 @@ impl Merger {
_ => continue,
};
let relative_relation = self.get_entity_relation(type_def, first_relative, &relation_name)?;
// Attempt to extract relative object type name
let relative_type_name = match first_relative.get("type").and_then(|v| v.as_str()) {
Some(t) => t,
None => continue,
};
if let Some(relation) = relative_relation {
let relative_keys: Vec<String> = first_relative.keys().cloned().collect();
// Call central Database O(1) graph logic
let relative_relation = self.db.get_relation(
&type_def.name,
relative_type_name,
&relation_name,
Some(&relative_keys),
);
if let Some((relation, _)) = relative_relation {
let mut relative_responses = Vec::new();
for relative_item_val in relative_arr {
if let Value::Object(mut relative_item) = relative_item_val {
@ -242,10 +308,11 @@ impl Merger {
&entity_fields,
);
let merged_relative = match self.merge_internal(Value::Object(relative_item))? {
Value::Object(m) => m,
_ => continue,
};
let merged_relative =
match self.merge_internal(Value::Object(relative_item), notifications)? {
Value::Object(m) => m,
_ => continue,
};
relative_responses.push(Value::Object(merged_relative));
}
@ -255,7 +322,7 @@ impl Merger {
}
// 7. Perform change tracking
self.merge_entity_change(
let notify_sql = self.merge_entity_change(
&entity_fields,
entity_fetched.as_ref(),
entity_change_kind.as_deref(),
@ -263,6 +330,10 @@ impl Merger {
&timestamp,
)?;
if let Some(sql) = notify_sql {
notifications.insert(queue_start, sql);
}
// Produce the full tree response
let mut final_response = serde_json::Map::new();
if let Some(fetched) = entity_fetched {
@ -614,10 +685,10 @@ impl Merger {
entity_change_kind: Option<&str>,
user_id: &str,
timestamp: &str,
) -> Result<(), String> {
) -> Result<Option<String>, String> {
let change_kind = match entity_change_kind {
Some(k) => k,
None => return Ok(()),
None => return Ok(None),
};
let id_str = entity_fields.get("id").unwrap();
@ -697,12 +768,8 @@ impl Merger {
.db
.execute(&change_sql, None)
.map_err(|e| format!("Executor Error in change: {:?}", e))?;
self
.db
.execute(&notify_sql, None)
.map_err(|e| format!("Executor Error in notify: {:?}", e))?;
Ok(())
Ok(Some(notify_sql))
}
fn compare_entities(
@ -736,101 +803,7 @@ impl Merger {
changes
}
fn reduce_entity_relations(
&self,
mut matching_relations: Vec<crate::database::relation::Relation>,
relative: &serde_json::Map<String, Value>,
relation_name: &str,
) -> Result<Option<crate::database::relation::Relation>, String> {
if matching_relations.is_empty() {
return Ok(None);
}
if matching_relations.len() == 1 {
return Ok(Some(matching_relations.pop().unwrap()));
}
let exact_match: Vec<_> = matching_relations
.iter()
.filter(|r| r.prefix.as_deref() == Some(relation_name))
.cloned()
.collect();
if exact_match.len() == 1 {
return Ok(Some(exact_match.into_iter().next().unwrap()));
}
matching_relations.retain(|r| {
if let Some(prefix) = &r.prefix {
!relative.contains_key(prefix)
} else {
true
}
});
if matching_relations.len() == 1 {
Ok(Some(matching_relations.pop().unwrap()))
} else {
let constraints: Vec<_> = matching_relations
.iter()
.map(|r| r.constraint.clone())
.collect();
Err(format!(
"AMBIGUOUS_TYPE_RELATIONS: Could not reduce ambiguous type relations: {}",
constraints.join(", ")
))
}
}
fn get_entity_relation(
&self,
entity_type: &crate::database::r#type::Type,
relative: &serde_json::Map<String, Value>,
relation_name: &str,
) -> Result<Option<crate::database::relation::Relation>, String> {
let relative_type_name = match relative.get("type").and_then(|v| v.as_str()) {
Some(t) => t,
None => return Ok(None),
};
let relative_type = match self.db.types.get(relative_type_name) {
Some(t) => t,
None => return Ok(None),
};
let mut relative_relations: Vec<crate::database::relation::Relation> = Vec::new();
for r in self.db.relations.values() {
if r.source_type != "entity" && r.destination_type != "entity" {
let condition1 = relative_type.hierarchy.contains(&r.source_type)
&& entity_type.hierarchy.contains(&r.destination_type);
let condition2 = entity_type.hierarchy.contains(&r.source_type)
&& relative_type.hierarchy.contains(&r.destination_type);
if condition1 || condition2 {
relative_relations.push(r.clone());
}
}
}
let mut relative_relation =
self.reduce_entity_relations(relative_relations, relative, relation_name)?;
if relative_relation.is_none() {
let mut poly_relations: Vec<crate::database::relation::Relation> = Vec::new();
for r in self.db.relations.values() {
if r.destination_type == "entity" {
let condition1 = relative_type.hierarchy.contains(&r.source_type);
let condition2 = entity_type.hierarchy.contains(&r.source_type);
if condition1 || condition2 {
poly_relations.push(r.clone());
}
}
}
relative_relation = self.reduce_entity_relations(poly_relations, relative, relation_name)?;
}
Ok(relative_relation)
}
// Helper Functions
fn apply_entity_relation(
source_entity: &mut serde_json::Map<String, Value>,

File diff suppressed because it is too large Load Diff

View File

@ -32,9 +32,12 @@ impl Queryer {
Err(msg) => {
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "FILTER_PARSE_FAILED".to_string(),
message: msg,
message: msg.clone(),
details: crate::drop::ErrorDetails {
path: schema_id.to_string(),
path: "".to_string(), // filters apply to the root query
cause: Some(msg),
context: filters.map(|f| vec![f.to_string()]),
schema: Some(schema_id.to_string()),
},
}]);
}
@ -94,7 +97,13 @@ impl Queryer {
return Ok(cached_sql.value().clone());
}
let compiler = compiler::SqlCompiler::new(self.db.clone());
let compiler = compiler::Compiler {
db: &self.db,
filter_keys: filter_keys,
is_stem_query: stem_opt.is_some(),
alias_counter: 0,
};
match compiler.compile(schema_id, stem_opt, filter_keys) {
Ok(compiled_sql) => {
self
@ -104,9 +113,12 @@ impl Queryer {
}
Err(e) => Err(crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "QUERY_COMPILATION_FAILED".to_string(),
message: e,
message: e.clone(),
details: crate::drop::ErrorDetails {
path: schema_id.to_string(),
path: "".to_string(),
cause: Some(e),
context: None,
schema: Some(schema_id.to_string()),
},
}])),
}
@ -130,14 +142,20 @@ impl Queryer {
code: "QUERY_FAILED".to_string(),
message: format!("Expected array from generic query, got: {:?}", other),
details: crate::drop::ErrorDetails {
path: schema_id.to_string(),
path: "".to_string(),
cause: Some(format!("Expected array, got {}", other)),
context: Some(vec![sql.to_string()]),
schema: Some(schema_id.to_string()),
},
}]),
Err(e) => crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "QUERY_FAILED".to_string(),
message: format!("SPI error in queryer: {}", e),
details: crate::drop::ErrorDetails {
path: schema_id.to_string(),
path: "".to_string(),
cause: Some(format!("SPI error in queryer: {}", e)),
context: Some(vec![sql.to_string()]),
schema: Some(schema_id.to_string()),
},
}]),
}

View File

@ -1469,6 +1469,12 @@ fn test_queryer_0_9() {
crate::tests::runner::run_test_case(&path, 0, 9).unwrap();
}
#[test]
fn test_queryer_0_10() {
let path = format!("{}/fixtures/queryer.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 10).unwrap();
}
#[test]
fn test_not_0_0() {
let path = format!("{}/fixtures/not.json", env!("CARGO_MANIFEST_DIR"));
@ -3443,6 +3449,12 @@ fn test_if_then_else_13_1() {
crate::tests::runner::run_test_case(&path, 13, 1).unwrap();
}
#[test]
fn test_stems_0_0() {
let path = format!("{}/fixtures/stems.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 0).unwrap();
}
#[test]
fn test_empty_string_0_0() {
let path = format!("{}/fixtures/emptyString.json", env!("CARGO_MANIFEST_DIR"));

View File

@ -49,6 +49,25 @@ fn test_library_api() {
})
);
// 3. Validate jspg_schemas
let schemas_drop = jspg_schemas();
assert_eq!(
schemas_drop.0,
json!({
"type": "drop",
"response": {
"test_schema": {
"$id": "test_schema",
"type": "object",
"properties": {
"name": { "type": "string" }
},
"required": ["name"]
}
}
})
);
// 4. Validate Happy Path
let happy_drop = jspg_validate("test_schema", JsonB(json!({"name": "Neo"})));
assert_eq!(

View File

@ -1,19 +1,10 @@
use crate::tests::types::Suite;
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
use std::sync::{Arc, OnceLock, RwLock};
#[derive(Debug, Deserialize)]
pub struct TestSuite {
#[allow(dead_code)]
pub description: String,
pub database: serde_json::Value,
pub tests: Vec<TestCase>,
}
use crate::tests::types::TestCase;
use serde_json::Value;
pub fn deserialize_some<'de, D>(deserializer: D) -> Result<Option<Value>, D::Error>
where
D: serde::Deserializer<'de>,
@ -23,7 +14,7 @@ where
}
// Type alias for easier reading
type CompiledSuite = Arc<Vec<(TestSuite, Arc<crate::database::Database>)>>;
type CompiledSuite = Arc<Vec<(Suite, Arc<crate::database::Database>)>>;
// Global cache mapping filename -> Vector of (Parsed JSON suite, Compiled Database)
static CACHE: OnceLock<RwLock<HashMap<String, CompiledSuite>>> = OnceLock::new();
@ -46,7 +37,7 @@ fn get_cached_file(path: &str) -> CompiledSuite {
} else {
let content =
fs::read_to_string(path).unwrap_or_else(|_| panic!("Failed to read file: {}", path));
let suites: Vec<TestSuite> = serde_json::from_str(&content)
let suites: Vec<Suite> = serde_json::from_str(&content)
.unwrap_or_else(|e| panic!("Failed to parse JSON in {}: {}", path, e));
let mut compiled_suites = Vec::new();
@ -97,6 +88,16 @@ pub fn run_test_case(path: &str, suite_idx: usize, case_idx: usize) -> Result<()
// 4. Run Tests
match test.action.as_str() {
"compile" => {
let result = test.run_compile(db.clone());
if let Err(e) = result {
println!("TEST COMPILE ERROR FOR '{}': {}", test.description, e);
failures.push(format!(
"[{}] Compile Test '{}' failed. Error: {}",
group.description, test.description, e
));
}
}
"validate" => {
let result = test.run_validate(db.clone());
if let Err(e) = result {

View File

@ -1,11 +1,11 @@
use super::expect::ExpectBlock;
use super::expect::Expect;
use crate::database::Database;
use serde::Deserialize;
use serde_json::Value;
use std::sync::Arc;
#[derive(Debug, Deserialize)]
pub struct TestCase {
pub struct Case {
pub description: String,
#[serde(default = "default_action")]
@ -30,24 +30,47 @@ pub struct TestCase {
#[serde(default)]
pub mocks: Option<serde_json::Value>,
pub expect: Option<ExpectBlock>,
pub expect: Option<Expect>,
}
fn default_action() -> String {
"validate".to_string()
}
impl TestCase {
pub fn execute(&self, db: Arc<Database>) -> Result<(), String> {
match self.action.as_str() {
"validate" => self.run_validate(db),
"merge" => self.run_merge(db),
"query" => self.run_query(db),
_ => Err(format!(
"Unknown action '{}' for test '{}'",
self.action, self.description
)),
impl Case {
pub fn run_compile(&self, db: Arc<Database>) -> Result<(), String> {
let expected_success = self.expect.as_ref().map(|e| e.success).unwrap_or(false);
// We assume db has already been setup and compiled successfully by runner.rs's `jspg_setup`
// We just need to check if there are compilation errors vs expected success
let got_success = true; // Setup ensures success unless setup fails, which runner handles
if expected_success != got_success {
return Err(format!(
"Expected success: {}, Got: {}",
expected_success, got_success
));
}
// Assert stems
if let Some(expect) = &self.expect {
if let Some(expected_stems) = &expect.stems {
// Convert the Db stems (HashMap<String, HashMap<String, Arc<Stem>>>) to matching JSON shape
let db_stems_json = serde_json::to_value(&db.stems).unwrap();
let expect_stems_json = serde_json::to_value(expected_stems).unwrap();
if db_stems_json != expect_stems_json {
let expected_pretty = serde_json::to_string_pretty(&expect_stems_json).unwrap();
let got_pretty = serde_json::to_string_pretty(&db_stems_json).unwrap();
return Err(format!(
"Stem validation failed.\nExpected:\n{}\n\nGot:\n{}",
expected_pretty, got_pretty
));
}
}
}
Ok(())
}
pub fn run_validate(&self, db: Arc<Database>) -> Result<(), String> {
@ -115,6 +138,7 @@ impl TestCase {
))
} else if let Some(expect) = &self.expect {
let queries = db.executor.get_queries();
expect.assert_pattern(&queries)?;
expect.assert_sql(&queries)
} else {
Ok(())
@ -153,6 +177,7 @@ impl TestCase {
))
} else if let Some(expect) = &self.expect {
let queries = db.executor.get_queries();
expect.assert_pattern(&queries)?;
expect.assert_sql(&queries)
} else {
Ok(())

View File

@ -0,0 +1,22 @@
pub mod pattern;
pub mod sql;
use serde::Deserialize;
use std::collections::HashMap;
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum SqlExpectation {
Single(String),
Multi(Vec<String>),
}
#[derive(Debug, Deserialize)]
pub struct Expect {
pub success: bool,
pub result: Option<serde_json::Value>,
pub errors: Option<Vec<serde_json::Value>>,
pub stems: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
#[serde(default)]
pub sql: Option<Vec<SqlExpectation>>,
}

View File

@ -1,29 +1,13 @@
use super::Expect;
use regex::Regex;
use serde::Deserialize;
use std::collections::HashMap;
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum SqlExpectation {
Single(String),
Multi(Vec<String>),
}
#[derive(Debug, Deserialize)]
pub struct ExpectBlock {
pub success: bool,
pub result: Option<serde_json::Value>,
pub errors: Option<Vec<serde_json::Value>>,
#[serde(default)]
pub sql: Option<Vec<SqlExpectation>>,
}
impl ExpectBlock {
impl Expect {
/// Advanced SQL execution assertion algorithm ported from `assert.go`.
/// This compares two arrays of strings, one containing {{uuid:name}} or {{timestamp}} placeholders,
/// and the other containing actual executed database queries. It ensures that placeholder UUIDs
/// are consistently mapped to the same actual UUIDs across all lines, and strictly validates line-by-line sequences.
pub fn assert_sql(&self, actual: &[String]) -> Result<(), String> {
pub fn assert_pattern(&self, actual: &[String]) -> Result<(), String> {
let patterns = match &self.sql {
Some(s) => s,
None => return Ok(()),
@ -75,8 +59,8 @@ impl ExpectBlock {
let aline = clean_str(aline_raw);
let pattern_str_raw = match pattern_expect {
SqlExpectation::Single(s) => s.clone(),
SqlExpectation::Multi(m) => m.join(" "),
super::SqlExpectation::Single(s) => s.clone(),
super::SqlExpectation::Multi(m) => m.join(" "),
};
let pattern_str = clean_str(&pattern_str_raw);

View File

@ -0,0 +1,206 @@
use super::Expect;
use sqlparser::ast::{Expr, Query, SelectItem, Statement, TableFactor};
use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser;
use std::collections::HashSet;
impl Expect {
pub fn assert_sql(&self, actual: &[String]) -> Result<(), String> {
for query in actual {
if let Err(e) = Self::validate_semantic_sql(query) {
return Err(e);
}
}
Ok(())
}
pub fn validate_semantic_sql(sql: &str) -> Result<(), String> {
let dialect = PostgreSqlDialect {};
let statements = match Parser::parse_sql(&dialect, sql) {
Ok(s) => s,
Err(e) => return Err(format!("SQL Syntax Error: {}\nSQL: {}", e, sql)),
};
for statement in statements {
Self::validate_statement(&statement, sql)?;
}
Ok(())
}
fn validate_statement(stmt: &Statement, original_sql: &str) -> Result<(), String> {
match stmt {
Statement::Query(query) => Self::validate_query(query, &HashSet::new(), original_sql)?,
Statement::Insert(insert) => {
if let Some(query) = &insert.source {
Self::validate_query(query, &HashSet::new(), original_sql)?
}
}
Statement::Update(update) => {
if let Some(expr) = &update.selection {
Self::validate_expr(expr, &HashSet::new(), original_sql)?;
}
}
Statement::Delete(delete) => {
if let Some(expr) = &delete.selection {
Self::validate_expr(expr, &HashSet::new(), original_sql)?;
}
}
_ => {}
}
Ok(())
}
fn validate_query(
query: &Query,
available_aliases: &HashSet<String>,
original_sql: &str,
) -> Result<(), String> {
if let sqlparser::ast::SetExpr::Select(select) = &*query.body {
Self::validate_select(&select, available_aliases, original_sql)?;
}
Ok(())
}
fn validate_select(
select: &sqlparser::ast::Select,
parent_aliases: &HashSet<String>,
original_sql: &str,
) -> Result<(), String> {
let mut available_aliases = parent_aliases.clone();
// 1. Collect all declared table aliases in the FROM clause and JOINs
for table_with_joins in &select.from {
Self::collect_aliases_from_table_factor(&table_with_joins.relation, &mut available_aliases);
for join in &table_with_joins.joins {
Self::collect_aliases_from_table_factor(&join.relation, &mut available_aliases);
}
}
// 2. Validate all SELECT projection fields
for projection in &select.projection {
if let SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } = projection {
Self::validate_expr(expr, &available_aliases, original_sql)?;
}
}
// 3. Validate ON conditions in joins
for table_with_joins in &select.from {
for join in &table_with_joins.joins {
if let sqlparser::ast::JoinOperator::Inner(sqlparser::ast::JoinConstraint::On(expr))
| sqlparser::ast::JoinOperator::LeftOuter(sqlparser::ast::JoinConstraint::On(expr))
| sqlparser::ast::JoinOperator::RightOuter(sqlparser::ast::JoinConstraint::On(expr))
| sqlparser::ast::JoinOperator::FullOuter(sqlparser::ast::JoinConstraint::On(expr))
| sqlparser::ast::JoinOperator::Join(sqlparser::ast::JoinConstraint::On(expr)) =
&join.join_operator
{
Self::validate_expr(expr, &available_aliases, original_sql)?;
}
}
}
// 4. Validate WHERE conditions
if let Some(selection) = &select.selection {
Self::validate_expr(selection, &available_aliases, original_sql)?;
}
Ok(())
}
fn collect_aliases_from_table_factor(tf: &TableFactor, aliases: &mut HashSet<String>) {
match tf {
TableFactor::Table { name, alias, .. } => {
if let Some(table_alias) = alias {
aliases.insert(table_alias.name.value.clone());
} else if let Some(last) = name.0.last() {
match last {
sqlparser::ast::ObjectNamePart::Identifier(i) => {
aliases.insert(i.value.clone());
}
_ => {}
}
}
}
TableFactor::Derived {
subquery,
alias: Some(table_alias),
..
} => {
aliases.insert(table_alias.name.value.clone());
// A derived table is technically a nested scope which is opaque outside, but for pure semantic checks
// its internal contents should be validated purely within its own scope (not leaking external aliases in, usually)
// but Postgres allows lateral correlation. We will validate its interior with an empty scope.
let _ = Self::validate_query(subquery, &HashSet::new(), "");
}
_ => {}
}
}
fn validate_expr(
expr: &Expr,
available_aliases: &HashSet<String>,
sql: &str,
) -> Result<(), String> {
match expr {
Expr::CompoundIdentifier(idents) => {
if idents.len() == 2 {
let alias = &idents[0].value;
if !available_aliases.is_empty() && !available_aliases.contains(alias) {
return Err(format!(
"Semantic Error: Orchestrated query referenced table alias '{}' but it was not declared in the query's FROM/JOIN clauses.\nAvailable aliases: {:?}\nSQL: {}",
alias, available_aliases, sql
));
}
} else if idents.len() > 2 {
let alias = &idents[1].value; // In form schema.table.column, 'table' is idents[1]
if !available_aliases.is_empty() && !available_aliases.contains(alias) {
return Err(format!(
"Semantic Error: Orchestrated query referenced table '{}' but it was not mapped.\nAvailable aliases: {:?}\nSQL: {}",
alias, available_aliases, sql
));
}
}
}
Expr::Subquery(subquery) => Self::validate_query(subquery, available_aliases, sql)?,
Expr::Exists { subquery, .. } => Self::validate_query(subquery, available_aliases, sql)?,
Expr::InSubquery {
expr: e, subquery, ..
} => {
Self::validate_expr(e, available_aliases, sql)?;
Self::validate_query(subquery, available_aliases, sql)?;
}
Expr::BinaryOp { left, right, .. } => {
Self::validate_expr(left, available_aliases, sql)?;
Self::validate_expr(right, available_aliases, sql)?;
}
Expr::IsFalse(e)
| Expr::IsNotFalse(e)
| Expr::IsTrue(e)
| Expr::IsNotTrue(e)
| Expr::IsNull(e)
| Expr::IsNotNull(e)
| Expr::InList { expr: e, .. }
| Expr::Nested(e)
| Expr::UnaryOp { expr: e, .. }
| Expr::Cast { expr: e, .. }
| Expr::Like { expr: e, .. }
| Expr::ILike { expr: e, .. }
| Expr::AnyOp { left: e, .. }
| Expr::AllOp { left: e, .. } => {
Self::validate_expr(e, available_aliases, sql)?;
}
Expr::Function(func) => {
if let sqlparser::ast::FunctionArguments::List(args) = &func.args {
if let Some(sqlparser::ast::FunctionArg::Unnamed(
sqlparser::ast::FunctionArgExpr::Expr(e),
)) = args.args.get(0)
{
Self::validate_expr(e, available_aliases, sql)?;
}
}
}
_ => {}
}
Ok(())
}
}

View File

@ -2,6 +2,6 @@ pub mod case;
pub mod expect;
pub mod suite;
pub use case::TestCase;
pub use expect::ExpectBlock;
pub use suite::TestSuite;
pub use case::Case;
pub use expect::Expect;
pub use suite::Suite;

View File

@ -1,10 +1,10 @@
use super::case::TestCase;
use super::case::Case;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
pub struct TestSuite {
pub struct Suite {
#[allow(dead_code)]
pub description: String,
pub database: serde_json::Value,
pub tests: Vec<TestCase>,
pub tests: Vec<Case>,
}

View File

@ -67,7 +67,12 @@ impl Validator {
.map(|e| crate::drop::Error {
code: e.code,
message: e.message,
details: crate::drop::ErrorDetails { path: e.path },
details: crate::drop::ErrorDetails {
path: e.path,
cause: None,
context: None,
schema: None,
},
})
.collect();
crate::drop::Drop::with_errors(errors)
@ -76,7 +81,12 @@ impl Validator {
Err(e) => crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: e.code,
message: e.message,
details: crate::drop::ErrorDetails { path: e.path },
details: crate::drop::ErrorDetails {
path: e.path,
cause: None,
context: None,
schema: None,
},
}]),
}
} else {
@ -84,7 +94,10 @@ impl Validator {
code: "SCHEMA_NOT_FOUND".to_string(),
message: format!("Schema {} not found", schema_id),
details: crate::drop::ErrorDetails {
path: "".to_string(),
path: "/".to_string(),
cause: None,
context: None,
schema: None,
},
}])
}

View File

@ -1 +1 @@
1.0.58
1.0.74