Compare commits

...

67 Commits

Author SHA1 Message Date
93b0a70718 version: 1.0.96 2026-03-27 02:28:53 -04:00
9c24f1af8f fixed issue where merge lookups with no changes were not generating a notification 2026-03-27 02:08:45 -04:00
f9cf1f837a version: 1.0.95 2026-03-27 01:18:41 -04:00
796df7763c added replaces field to merge for the notification when a lookup is successful 2026-03-27 01:18:36 -04:00
4a10833f50 version: 1.0.94 2026-03-26 23:50:03 -04:00
46fc032026 fixed merge lookup issue 2026-03-26 23:49:52 -04:00
7ec06b81cc version: 1.0.93 2026-03-26 22:28:18 -04:00
c4e8e0309f removed initial / in validator making paths consistent across validate merger and queryer 2026-03-26 22:27:59 -04:00
eb91b65e65 version: 1.0.92 2026-03-26 14:06:40 -04:00
8bf3649465 validator now uses hybrid uuid and numeric index pathing 2026-03-26 14:06:24 -04:00
9fe5a34163 version: 1.0.91 2026-03-25 21:37:15 -04:00
f5bf21eb58 fixed root array queries 2026-03-25 21:37:01 -04:00
9dcafed406 version: 1.0.90 2026-03-25 19:32:02 -04:00
ffd6c27da3 more pg try catching and error handling 2026-03-25 19:31:51 -04:00
4941dc6069 doc update 2026-03-23 19:07:45 -04:00
a8a15a82ef version: 1.0.89 2026-03-23 16:41:41 -04:00
8dcc714963 fixed target_type restrictions in where clauses 2026-03-23 16:41:34 -04:00
f87ac81f3b pre-script-fix 2026-03-23 16:34:45 -04:00
8ca9017cc4 version: 1.0.88 2026-03-23 14:37:29 -04:00
10c57e59ec fixed nested filtering syntax 2026-03-23 14:37:22 -04:00
ef4571767c version: 1.0.87 2026-03-23 12:49:36 -04:00
29bd25eaff fixed filter override for archived 2026-03-23 12:49:30 -04:00
4d9b510819 version: 1.0.86 2026-03-23 12:26:03 -04:00
3c4b1066df fixed merger with anchor test issue 2026-03-23 12:25:55 -04:00
4c59d9ba7f version: 1.0.85 2026-03-23 12:05:47 -04:00
a1038490dd tested nested merging with anchors 2026-03-23 12:05:34 -04:00
14707330a7 subschema id queryer test added 2026-03-22 05:54:31 -04:00
77bc92533c version: 1.0.84 2026-03-22 03:35:54 -04:00
4060119b01 schema ids can now contain a subschema 2026-03-22 03:35:47 -04:00
95546fe10c version: 1.0.83 2026-03-21 20:33:48 -04:00
882bdc6271 merger now requires a schema id, queryer and merger now use pre-compiled edges for O(1) relations 2026-03-21 20:33:28 -04:00
9bdb767685 version: 1.0.82 2026-03-20 18:05:43 -04:00
bdd89fe695 cleanup 2026-03-20 18:05:37 -04:00
8135d80045 cleanup 2026-03-20 18:05:18 -04:00
9255439d53 added support for root schema compiled properties for the mixer 2026-03-20 18:04:49 -04:00
9038607729 version: 1.0.81 2026-03-20 15:53:59 -04:00
9f6c27c3b8 support ad-hoc refing without entity types 2026-03-20 15:53:48 -04:00
75aac41362 version: 1.0.80 2026-03-20 06:48:19 -04:00
dbcef42401 merger fixes 2026-03-20 06:48:08 -04:00
b6c5561d2f version: 1.0.79 2026-03-20 05:58:53 -04:00
e01b778d68 jsob and test array handling improved in merger 2026-03-20 05:58:43 -04:00
6eb134c0d6 test checkpoint 2026-03-20 05:17:28 -04:00
7ccc4b7cce version: 1.0.78 2026-03-20 04:41:46 -04:00
77bfa4cd18 historical and notify respected 2026-03-20 04:41:35 -04:00
b47a5abd26 version: 1.0.77 2026-03-20 01:59:56 -04:00
fcd8310ed8 added new and old to changes and pg notify 2026-03-20 01:59:48 -04:00
31519e8447 version: 1.0.76 2026-03-18 22:04:51 -04:00
847e921b1c stems removed from queryer 2026-03-18 22:04:29 -04:00
e19e1921e5 version: 1.0.75 2026-03-18 05:03:45 -04:00
94d011e729 merger payload issue when errors 2026-03-18 05:03:35 -04:00
263cf04ffb version: 1.0.74 2026-03-18 04:40:06 -04:00
00375c2926 more fixes 2026-03-18 04:39:48 -04:00
885b9b5e44 version: 1.0.73 2026-03-18 02:42:34 -04:00
298645ffdb queryer fixes 2026-03-18 02:42:20 -04:00
330280ba48 queryer fixes 2026-03-18 02:41:56 -04:00
02e661d219 version: 1.0.72 2026-03-17 23:10:52 -04:00
f7163e2689 version: 1.0.71 2026-03-17 22:13:55 -04:00
091007006d queryer fixes in place 2026-03-17 22:13:34 -04:00
3d66a7fc3c queryer test checkpoit 2026-03-17 18:00:36 -04:00
e1314496dd queryer test checkpoint 2026-03-17 15:06:02 -04:00
70a27b430d version: 1.0.70 2026-03-17 05:42:40 -04:00
e078b8a74b queryer alias fixes installed 2026-03-17 05:42:27 -04:00
c2c0e62c2d queryer fixes checkpoint 2026-03-17 05:12:03 -04:00
ebb97b3509 version: 1.0.69 2026-03-16 21:21:25 -04:00
5d18847f32 pgrx try catch 2026-03-16 21:21:11 -04:00
4a33e29628 version: 1.0.68 2026-03-16 19:39:34 -04:00
d8fc286e94 better error dropping 2026-03-16 19:39:24 -04:00
44 changed files with 4047 additions and 2771 deletions

11
.test/tests.md Normal file
View File

@ -0,0 +1,11 @@
# 🗒️ Test Report (punc/framework)
_Generated at Wed Mar 18 05:21:40 EDT 2026_
## Summary
| Lang | Status | Tests | Passed | Failed | Duration |
| :--- | :---: | :---: | :---: | :---: | ---: |
## Results

5
.vscode/extensions.json vendored Normal file
View File

@ -0,0 +1,5 @@
{
"recommendations": [
"rust-lang.rust-analyzer"
]
}

View File

@ -39,10 +39,6 @@ crate-type = ["cdylib", "lib"]
name = "pgrx_embed_jspg"
path = "src/bin/pgrx_embed.rs"
[[bin]]
name = "ast_explore"
path = "src/bin/ast_explore.rs"
[features]
default = ["pg18"]
pg18 = ["pgrx/pg18", "pgrx-tests/pg18" ]

View File

@ -7,22 +7,29 @@
JSPG operates by deeply integrating the JSON Schema Draft 2020-12 specification directly into the Postgres session lifecycle. It is built around three core pillars:
* **Validator**: In-memory, near-instant JSON structural validation and type polymorphism routing.
* **Merger**: Automatically traverse and UPSERT deeply nested JSON graphs into normalized relational tables.
* **Queryer**: Compile JSON Schemas into static, cached SQL SPI `SELECT` plans for fetching full entities or isolated "Stems".
* **Queryer**: Compile JSON Schemas into static, cached SQL SPI `SELECT` plans for fetching full entities or isolated ad-hoc object boundaries.
### 🎯 Goals
1. **Draft 2020-12 Compliance**: Attempt to adhere to the official JSON Schema Draft 2020-12 specification.
2. **Ultra-Fast Execution**: Compile schemas into optimized in-memory validation trees and cached SQL SPIs to bypass Postgres Query Builder overheads.
3. **Connection-Bound Caching**: Leverage the PostgreSQL session lifecycle using an **Atomic Swap** pattern. Schemas are 100% frozen, completely eliminating locks during read access.
4. **Structural Inheritance**: Support object-oriented schema design via Implicit Keyword Shadowing and virtual `$family` references natively mapped to Postgres table constraints.
5. **Reactive Beats**: Provide natively generated "Stems" (isolated payload fragments) for dynamic websocket reactivity.
5. **Reactive Beats**: Provide ultra-fast natively generated flat payloads mapping directly to the Dart topological state for dynamic websocket reactivity.
### Concurrency & Threading ("Immutable Graphs")
To support high-throughput operations while allowing for runtime updates (e.g., during hot-reloading), JSPG uses an **Atomic Swap** pattern:
1. **Parser Phase**: Schema JSONs are parsed into ordered `Schema` structs.
2. **Compiler Phase**: The database iterates all parsed schemas and pre-computes native optimization maps (Descendants Map, Depths Map, Variations Map).
3. **Immutable Validator**: The `Validator` struct immutably owns the `Database` registry and all its global maps. Schemas themselves are completely frozen; `$ref` strings are resolved dynamically at runtime using pre-computed O(1) maps.
3. **Immutable AST Caching**: The `Validator` struct immutably owns the `Database` registry. Schemas themselves are frozen structurally, but utilize `OnceLock` interior mutability during the Compilation Phase to permanently cache resolved `$ref` inheritances, properties, and `compiled_edges` directly onto their AST nodes. This guarantees strict `O(1)` relationship and property validation execution at runtime without locking or recursive DB polling.
4. **Lock-Free Reads**: Incoming operations acquire a read lock just long enough to clone the `Arc` inside an `RwLock<Option<Arc<Validator>>>`, ensuring zero blocking during schema updates.
### Global API Reference
These functions operate on the global `GLOBAL_JSPG` engine instance and provide administrative boundaries:
* `jspg_setup(database jsonb) -> jsonb`: Initializes the engine. Deserializes the full database schema registry (types, enums, puncs, relations) from Postgres and compiles them into memory atomically.
* `jspg_teardown() -> jsonb`: Clears the current session's engine instance from `GLOBAL_JSPG`, resetting the cache.
* `jspg_schemas() -> jsonb`: Exports the fully compiled AST snapshot (including all inherited dependencies) out of `GLOBAL_JSPG` into standard JSON Schema representations.
---
## 2. Validator
@ -30,10 +37,7 @@ To support high-throughput operations while allowing for runtime updates (e.g.,
The Validator provides strict, schema-driven evaluation for the "Punc" architecture.
### API Reference
* `jspg_setup(database jsonb) -> jsonb`: Loads and compiles the entire registry (types, enums, puncs, relations) atomically.
* `mask_json_schema(schema_id text, instance jsonb) -> jsonb`: Validates and prunes unknown properties dynamically, returning masked data.
* `jspg_validate(schema_id text, instance jsonb) -> jsonb`: Returns boolean-like success or structured errors.
* `jspg_teardown() -> jsonb`: Clears the current session's schema cache.
* `jspg_validate(schema_id text, instance jsonb) -> jsonb`: Validates the `instance` JSON payload strictly against the constraints of the registered `schema_id`. Returns boolean-like success or structured error codes.
### Custom Features & Deviations
JSPG implements specific extensions to the Draft 2020-12 standard to support the Punc architecture's object-oriented needs while heavily optimizing for zero-runtime lookups.
@ -43,7 +47,7 @@ JSPG implements specific extensions to the Draft 2020-12 standard to support the
#### A. Polymorphism & Referencing (`$ref`, `$family`, and Native Types)
* **Native Type Discrimination (`variations`)**: Schemas defined inside a Postgres `type` are Entities. The validator securely and implicitly manages their `"type"` property. If an entity inherits from `user`, incoming JSON can safely define `{"type": "person"}` without errors, thanks to `compiled_variations` inheritance.
* **Structural Inheritance & Viral Infection (`$ref`)**: `$ref` is used exclusively for structural inheritance, *never* for union creation. A Punc request schema that `$ref`s an Entity virally inherits all physical database polymorphism rules for that target.
* **Shape Polymorphism (`$family`)**: Auto-expands polymorphic API lists based on an abstract Descendants Graph. If `{"$family": "widget"}` is used, JSPG evaluates the JSON against every schema that `$ref`s widget.
* **Shape Polymorphism (`$family`)**: Auto-expands polymorphic API lists based on an abstract **Descendants Graph**. If `{"$family": "widget"}` is used, the Validator dynamically identifies *every* schema in the registry that `$ref`s `widget` (e.g., `stock.widget`, `task.widget`) and evaluates the JSON against all of them.
* **Strict Matches & Depth Heuristic**: Polymorphic structures MUST match exactly **one** schema permutation. If multiple inherited struct permutations pass, JSPG applies the **Depth Heuristic Tie-Breaker**, selecting the candidate deepest in the inheritance tree.
#### B. Dot-Notation Schema Resolution & Database Mapping
@ -69,11 +73,14 @@ To simplify frontend form validation, format validators specifically for `uuid`,
## 3. Merger
The Merger provides an automated, high-performance graph synchronization engine via the `jspg_merge(cue JSONB)` API. It orchestrates the complex mapping of nested JSON objects into normalized Postgres relational tables, honoring all inheritance and graph constraints.
The Merger provides an automated, high-performance graph synchronization engine. It orchestrates the complex mapping of nested JSON objects into normalized Postgres relational tables, honoring all inheritance and graph constraints.
### API Reference
* `jspg_merge(schema_id text, data jsonb) -> jsonb`: Traverses the provided JSON payload according to the compiled relational map of `schema_id`. Dynamically builds and executes relational SQL UPSERT paths natively.
### Core Features
* **Caching Strategy**: The Merger leverages the `Validator`'s in-memory `Database` registry to instantly resolve Foreign Key mapping graphs. It additionally utilizes the concurrent `GLOBAL_JSPG` application memory (`DashMap`) to cache statically constructed SQL `SELECT` strings used during deduplication (`lk_`) and difference tracking calculations.
* **Caching Strategy**: The Merger leverages the native `compiled_edges` permanently cached onto the Schema AST via `OnceLock` to instantly resolve Foreign Key mapping graphs natively in absolute `O(1)` time. It additionally utilizes the concurrent `GLOBAL_JSPG` application memory (`DashMap`) to cache statically constructed SQL `SELECT` strings used during deduplication (`lk_`) and difference tracking calculations.
* **Deep Graph Merging**: The Merger walks arbitrary levels of deeply nested JSON schemas (e.g. tracking an `order`, its `customer`, and an array of its `lines`). It intelligently discovers the correct parent-to-child or child-to-parent Foreign Keys stored in the registry and automatically maps the UUIDs across the relationships during UPSERT.
* **Prefix Foreign Key Matching**: Handles scenario where multiple relations point to the same table by using database Foreign Key constraint prefixes (`fk_`). For example, if a schema has `shipping_address` and `billing_address`, the merger resolves against `fk_shipping_address_entity` vs `fk_billing_address_entity` automatically to correctly route object properties.
* **Dynamic Deduplication & Lookups**: If a nested object is provided without an `id`, the Merger utilizes Postgres `lk_` index constraints defined in the schema registry (e.g. `lk_person` mapped to `first_name` and `last_name`). It dynamically queries these unique matching constraints to discover the correct UUID to perform an UPDATE, preventing data duplication.
@ -91,7 +98,10 @@ The Merger provides an automated, high-performance graph synchronization engine
## 4. Queryer
The Queryer transforms Postgres into a pre-compiled Semantic Query Engine via the `jspg_query(schema_id text, cue jsonb)` API, designed to serve the exact shape of Punc responses directly via SQL.
The Queryer transforms Postgres into a pre-compiled Semantic Query Engine, designed to serve the exact shape of Punc responses directly via SQL.
### API Reference
* `jspg_query(schema_id text, filters jsonb) -> jsonb`: Compiles the JSON Schema AST of `schema_id` directly into pre-planned, nested multi-JOIN SQL execution trees. Processes `filters` structurally.
### Core Features
@ -103,23 +113,16 @@ The Queryer transforms Postgres into a pre-compiled Semantic Query Engine via th
* **Array Inclusion**: `{"$in": [values]}`, `{"$nin": [values]}` use native `jsonb_array_elements_text()` bindings to enforce `IN` and `NOT IN` logic without runtime SQL injection risks.
* **Text Matching (ILIKE)**: Evaluates `$eq` or `$ne` against string fields containing the `%` character natively into Postgres `ILIKE` and `NOT ILIKE` partial substring matches.
* **Type Casting**: Safely resolves dynamic combinations by casting values instantly into the physical database types mapped in the schema (e.g. parsing `uuid` bindings to `::uuid`, formatting DateTimes to `::timestamptz`, and numbers to `::numeric`).
* **Polymorphic SQL Generation (`$family`)**: Compiles `$family` properties by analyzing the **Physical Database Variations**, *not* the schema descendants.
* **The Dot Convention**: When a schema requests `$family: "target.schema"`, the compiler extracts the base type (e.g. `schema`) and looks up its Physical Table definition.
* **Multi-Table Branching**: If the Physical Table is a parent to other tables (e.g. `organization` has variations `["organization", "bot", "person"]`), the compiler generates a dynamic `CASE WHEN type = '...' THEN ...` query, expanding into `JOIN`s for each variation.
* **Single-Table Bypass**: If the Physical Table is a leaf node with only one variation (e.g. `person` has variations `["person"]`), the compiler cleanly bypasses `CASE` generation and compiles a simple `SELECT` across the base table, as all schema extensions (e.g. `light.person`, `full.person`) are guaranteed to reside in the exact same physical row.
### The Stem Engine
### Ad-Hoc Schema Promotion
Rather than over-fetching heavy Entity payloads and trimming them, Punc Framework Websockets depend on isolated subgraphs defined as **Stems**.
A `Stem` is a declaration of an **Entity Type boundary** that exists somewhere within the compiled JSON Schema graph, expressed using **`gjson` multipath syntax** (e.g., `contacts.#.phone_numbers.#`).
Because `pg_notify` (Beats) fire rigidly from physical Postgres tables (e.g. `{"type": "phone_number"}`), the Go Framework only ever needs to know: "Does the schema `with_contacts.person` contain the `phone_number` Entity anywhere inside its tree, and if so, what is the gjson path to iterate its payload?"
* **Initialization:** During startup (`jspg_stems()`), the database crawls all Schemas and maps out every physical Entity Type it references. It builds a highly optimized `HashMap<String, HashMap<String, Arc<Stem>>>` providing strictly `O(1)` memory lookups mapping `Schema ID -> { Stem Path -> Entity Type }`.
* **GJSON Pathing:** Unlike standard JSON Pointers, stems utilize `.#` array iterator syntax. The Go web server consumes this native path (e.g. `lines.#`) across the raw Postgres JSON byte payload, extracting all active UUIDs in one massive sub-millisecond sweep without unmarshaling Go ASTs.
* **Polymorphic Condition Selectors:** When trailing paths would otherwise collide because of abstract polymorphic type definitions (e.g., a `target` property bounded by a `oneOf` taking either `phone_number` or `email_address`), JSPG natively appends evaluated `gjson` type conditions into the path (e.g. `contacts.#.target#(type=="phone_number")`). This guarantees `O(1)` key uniqueness in the HashMap while retaining extreme array extraction speeds natively without runtime AST evaluation.
* **Identifier Prioritization:** When determining if a nested object boundary is an Entity, JSPG natively prioritizes defined `$id` tags over `$ref` inheritance pointers to prevent polymorphic boundaries from devolving into their generic base classes.
* **Cyclical Deduplication:** Because Punc relationships often reference back on themselves via deeply nested classes, the Stem Engine applies intelligent path deduplication. If the active `current_path` already ends with the target entity string, it traverses the inheritance properties without appending the entity to the stem path again, eliminating infinite powerset loops.
* **Relationship Path Squashing:** When calculating string paths structurally, JSPG intentionally **omits** properties natively named `target` or `source` if they belong to a native database `relationship` table override.
* **The Go Router**: The Golang Punc framework uses this exact mapping to register WebSocket Beat frequencies exclusively on the Entity types discovered.
* **The Queryer Execution**: When the Go framework asks JSPG to hydrate a partial `phone_number` stem for the `with_contacts.person` schema, instead of jumping through string paths, the SQL Compiler simply reaches into the Schema's AST using the `phone_number` Type string, pulls out exactly that entity's mapping rules, and returns a fully correlated `SELECT` block! This natively handles nested array properties injected via `oneOf` or array references efficiently bypassing runtime powerset expansion.
* **Performance:** These Stem execution structures are fully statically compiled via SPI and map perfectly to `O(1)` real-time routing logic on the application tier.
To seamlessly support deeply nested, inline Object definitions that don't declare an explicit `$id`, JSPG aggressively promotes them to standalone topological entities during the database compilation phase.
* **Hash Generation:** While evaluating the unified graph, if the compiler enters an `Object` or `Array` structure completely lacking an `$id`, it dynamically calculates a localized hash alias representing exactly its structural constraints.
* **Promotion:** This inline chunk is mathematically elevated to its own `$id` in the `db.schemas` cache registry. This guarantees that $O(1)$ WebSockets or isolated queries can natively target any arbitrary sub-object of a massive database topology directly without recursively re-parsing its parent's AST block every read.
## 5. Testing & Execution Architecture

58
LOOKUP_VERIFICATION.md Normal file
View File

@ -0,0 +1,58 @@
# The Postgres Partial Index Claiming Pattern
This document outlines the architectural strategy for securely handling the deduplication, claiming, and verification of sensitive unique identifiers (like email addresses or phone numbers) strictly through PostgreSQL without requiring "magical" logic in the JSPG `Merger`.
## The Denial of Service (DoS) Squatter Problem
If you enforce a standard `UNIQUE` constraint on an email address table:
1. Malicious User A signs up and adds `jeff.bezos@amazon.com` to their account but never verifies it.
2. The real Jeff Bezos signs up.
3. The Database blocks Jeff because the unique string already exists.
The squatter has effectively locked the legitimate owner out of the system.
## The Anti-Patterns
1. **Global Entity Flags**: Adding a global `verified` boolean to the root `entity` table forces unrelated objects (like Widgets, Invoices, Orders) to carry verification logic that doesn't belong to them.
2. **Magical Merger Logic**: Making JSPG's `Merger` aware of a specific `verified` field breaks its pure structural translation model. The Merger shouldn't need hardcoded conditional logic to know if it's allowed to update an unverified row.
## The Solution: Postgres Partial Unique Indexes
The holy grail is to defer all claiming logic natively to the database engine using a **Partial Unique Index**.
```sql
-- Remove any existing global unique constraint on address first
CREATE UNIQUE INDEX lk_email_address_verified
ON email_address (address)
WHERE verified_at IS NOT NULL;
```
### How the Lifecycle Works Natively
1. **Unverified Squatters (Isolated Rows):**
A hundred different users can send `{ "address": "jeff.bezos@amazon.com" }` through the `save_person` Punc. Because the Punc isolates them and doesn't allow setting the `verified_at` property natively (enforced by the JSON schema), the JSPG Merger inserts `NULL`.
Postgres permits all 100 `INSERT` commands to succeed because the Partial Index **ignores** rows where `verified_at IS NULL`. Every user gets their own isolated, unverified row acting as a placeholder on their contact edge.
2. **The Verification Race (The Claim):**
The real Jeff clicks his magic verification link. The backend securely executes a specific verification Punc that runs:
`UPDATE email_address SET verified_at = now() WHERE id = <jeff's-real-uuid>`
3. **The Lockout:**
Because Jeff's row now strictly satisfies `verified_at IS NOT NULL`, that exact row enters the Partial Unique Index.
If any of the other 99 squatters *ever* click their fake verification links (or if a new user tries to verify that same email), PostgreSQL hits the index and violently throws a **Unique Constraint Violation**, flawlessly blocking them. The winner has permanently claimed the slot across the entire environment!
### Periodic Cleanup
Since unverified rows are allowed to accumulate without colliding, a simple Postgres `pg_cron` job or backend worker can sweep the table nightly to prune abandoned claims and preserve storage:
```sql
DELETE FROM email_address
WHERE verified_at IS NULL
AND created_at < NOW() - INTERVAL '24 hours';
```
### Why this is the Ultimate Architecture
* The **JSPG Merger** remains mathematically pure. It doesn't know what `verified_at` is; it simply respects the database's structural limits (`O(1)` pure translation).
* **Row-Level Security (RLS)** naturally blocks users from seeing or claiming each other's unverified rows.
* You offload complex race-condition tracking entirely to the C-level PostgreSQL B-Tree indexing engine, guaranteeing absolute cluster-wide atomicity.

View File

View File

@ -142,7 +142,7 @@
"errors": [
{
"code": "CONST_VIOLATED",
"path": "/con"
"path": "con"
}
]
}

View File

@ -48,7 +48,7 @@
"errors": [
{
"code": "TYPE_MISMATCH",
"path": "/base_prop"
"path": "base_prop"
}
]
}
@ -109,7 +109,7 @@
"errors": [
{
"code": "REQUIRED_FIELD_MISSING",
"path": "/a"
"path": "a"
}
]
}
@ -126,7 +126,7 @@
"errors": [
{
"code": "REQUIRED_FIELD_MISSING",
"path": "/b"
"path": "b"
}
]
}
@ -196,7 +196,7 @@
"errors": [
{
"code": "DEPENDENCY_FAILED",
"path": "/base_dep"
"path": "base_dep"
}
]
}
@ -214,7 +214,7 @@
"errors": [
{
"code": "DEPENDENCY_FAILED",
"path": "/child_dep"
"path": "child_dep"
}
]
}

File diff suppressed because it is too large Load Diff

214
fixtures/paths.json Normal file
View File

@ -0,0 +1,214 @@
[
{
"description": "Hybrid Array Pathing",
"database": {
"schemas": [
{
"$id": "hybrid_pathing",
"type": "object",
"properties": {
"primitives": {
"type": "array",
"items": {
"type": "string"
}
},
"ad_hoc_objects": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string"
}
},
"required": [
"name"
]
}
},
"entities": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"value": {
"type": "number",
"minimum": 10
}
}
}
},
"deep_entities": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"nested": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"flag": {
"type": "boolean"
}
}
}
}
}
}
}
}
}
]
},
"tests": [
{
"description": "happy path passes structural validation",
"data": {
"primitives": [
"a",
"b"
],
"ad_hoc_objects": [
{
"name": "obj1"
}
],
"entities": [
{
"id": "entity-1",
"value": 15
}
],
"deep_entities": [
{
"id": "parent-1",
"nested": [
{
"id": "child-1",
"flag": true
}
]
}
]
},
"schema_id": "hybrid_pathing",
"action": "validate",
"expect": {
"success": true
}
},
{
"description": "primitive arrays use numeric indexing",
"data": {
"primitives": [
"a",
123
]
},
"schema_id": "hybrid_pathing",
"action": "validate",
"expect": {
"success": false,
"errors": [
{
"code": "INVALID_TYPE",
"path": "primitives/1"
}
]
}
},
{
"description": "ad-hoc objects without ids use numeric indexing",
"data": {
"ad_hoc_objects": [
{
"name": "valid"
},
{
"age": 30
}
]
},
"schema_id": "hybrid_pathing",
"action": "validate",
"expect": {
"success": false,
"errors": [
{
"code": "REQUIRED_FIELD_MISSING",
"path": "ad_hoc_objects/1/name"
}
]
}
},
{
"description": "arrays of objects with ids use topological uuid indexing",
"data": {
"entities": [
{
"id": "entity-alpha",
"value": 20
},
{
"id": "entity-beta",
"value": 5
}
]
},
"schema_id": "hybrid_pathing",
"action": "validate",
"expect": {
"success": false,
"errors": [
{
"code": "MINIMUM_VIOLATED",
"path": "entities/entity-beta/value"
}
]
}
},
{
"description": "deeply nested entity arrays retain full topological paths",
"data": {
"deep_entities": [
{
"id": "parent-omega",
"nested": [
{
"id": "child-alpha",
"flag": true
},
{
"id": "child-beta",
"flag": "invalid-string"
}
]
}
]
},
"schema_id": "hybrid_pathing",
"action": "validate",
"expect": {
"success": false,
"errors": [
{
"code": "INVALID_TYPE",
"path": "deep_entities/parent-omega/nested/child-beta/flag"
}
]
}
}
]
}
]

File diff suppressed because it is too large Load Diff

View File

@ -677,7 +677,7 @@
"errors": [
{
"code": "TYPE_MISMATCH",
"path": "/type"
"path": "type"
}
]
}
@ -782,7 +782,7 @@
"errors": [
{
"code": "TYPE_MISMATCH",
"path": "/type"
"path": "type"
}
]
}

View File

@ -1,312 +0,0 @@
[
{
"description": "Stem Engine Unit Tests",
"database": {
"puncs": [],
"enums": [],
"relations": [
{
"id": "rel1",
"type": "relation",
"constraint": "fk_contact_entity",
"source_type": "contact",
"source_columns": [
"entity_id"
],
"destination_type": "person",
"destination_columns": [
"id"
],
"prefix": null
},
{
"id": "rel2",
"type": "relation",
"constraint": "fk_relationship_target",
"source_type": "relationship",
"source_columns": [
"target_id",
"target_type"
],
"destination_type": "entity",
"destination_columns": [
"id",
"type"
],
"prefix": "target"
}
],
"types": [
{
"name": "entity",
"hierarchy": [
"entity"
],
"schemas": [
{
"$id": "entity",
"type": "object",
"properties": {}
}
]
},
{
"name": "person",
"hierarchy": [
"person",
"entity"
],
"schemas": [
{
"$id": "person",
"$ref": "entity",
"properties": {}
}
]
},
{
"name": "email_address",
"hierarchy": [
"email_address",
"entity"
],
"schemas": [
{
"$id": "email_address",
"$ref": "entity",
"properties": {}
}
]
},
{
"name": "phone_number",
"hierarchy": [
"phone_number",
"entity"
],
"schemas": [
{
"$id": "phone_number",
"$ref": "entity",
"properties": {}
}
]
},
{
"name": "relationship",
"relationship": true,
"hierarchy": [
"relationship",
"entity"
],
"schemas": [
{
"$id": "relationship",
"$ref": "entity",
"properties": {}
}
]
},
{
"name": "contact",
"relationship": true,
"hierarchy": [
"contact",
"relationship",
"entity"
],
"schemas": [
{
"$id": "contact",
"$ref": "relationship",
"properties": {
"target": {
"oneOf": [
{
"$ref": "phone_number"
},
{
"$ref": "email_address"
}
]
}
}
}
]
},
{
"name": "save_person",
"schemas": [
{
"$id": "save_person.response",
"$ref": "person",
"properties": {
"contacts": {
"type": "array",
"items": {
"$ref": "contact"
}
}
}
}
]
}
]
},
"tests": [
{
"description": "correctly squashes deep oneOf refs through array paths",
"action": "compile",
"expect": {
"success": true,
"stems": {
"contact": {
"": {
"schema": {
"$id": "contact",
"$ref": "relationship",
"properties": {
"target": {
"oneOf": [
{
"$ref": "phone_number"
},
{
"$ref": "email_address"
}
]
}
}
},
"type": "contact"
},
"target#(type==\"email_address\")": {
"relation": "target_id",
"schema": {
"$id": "email_address",
"$ref": "entity",
"properties": {}
},
"type": "email_address"
},
"target#(type==\"phone_number\")": {
"relation": "target_id",
"schema": {
"$id": "phone_number",
"$ref": "entity",
"properties": {}
},
"type": "phone_number"
}
},
"email_address": {
"": {
"schema": {
"$id": "email_address",
"$ref": "entity",
"properties": {}
},
"type": "email_address"
}
},
"entity": {
"": {
"schema": {
"$id": "entity",
"properties": {},
"type": "object"
},
"type": "entity"
}
},
"person": {
"": {
"schema": {
"$id": "person",
"$ref": "entity",
"properties": {}
},
"type": "person"
}
},
"phone_number": {
"": {
"schema": {
"$id": "phone_number",
"$ref": "entity",
"properties": {}
},
"type": "phone_number"
}
},
"relationship": {
"": {
"schema": {
"$id": "relationship",
"$ref": "entity",
"properties": {}
},
"type": "relationship"
}
},
"save_person.response": {
"": {
"schema": {
"$id": "save_person.response",
"$ref": "person",
"properties": {
"contacts": {
"items": {
"$ref": "contact"
},
"type": "array"
}
}
},
"type": "person"
},
"contacts.#": {
"relation": "contacts_id",
"schema": {
"$id": "contact",
"$ref": "relationship",
"properties": {
"target": {
"oneOf": [
{
"$ref": "phone_number"
},
{
"$ref": "email_address"
}
]
}
}
},
"type": "contact"
},
"contacts.#.target#(type==\"email_address\")": {
"relation": "target_id",
"schema": {
"$id": "email_address",
"$ref": "entity",
"properties": {}
},
"type": "email_address"
},
"contacts.#.target#(type==\"phone_number\")": {
"relation": "target_id",
"schema": {
"$id": "phone_number",
"$ref": "entity",
"properties": {}
},
"type": "phone_number"
}
}
}
}
}
]
}
]

View File

@ -1,17 +0,0 @@
use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser;
use std::env;
fn main() {
let sql = "SELECT t1_obj_t1_addresses_t1_target_t2.archived, t1.id FROM person t1 JOIN address t1_obj_t1_addresses ON true";
let dialect = PostgreSqlDialect {};
match Parser::parse_sql(&dialect, sql) {
Ok(ast) => {
println!("{:#?}", ast);
}
Err(e) => {
println!("Error: {:?}", e);
}
}
}

7
src/database/edge.rs Normal file
View File

@ -0,0 +1,7 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct Edge {
pub constraint: String,
pub forward: bool,
}

View File

@ -124,42 +124,23 @@ fn parse_and_match_mocks(sql: &str, mocks: &[Value]) -> Option<Vec<Value>> {
return None;
};
// 2. Extract WHERE conditions
let mut conditions = Vec::new();
// 2. Extract WHERE conditions string
let mut where_clause = String::new();
if let Some(where_idx) = sql_upper.find(" WHERE ") {
let mut where_end = sql_upper.find(" ORDER BY ").unwrap_or(sql.len());
let mut where_end = sql_upper.find(" ORDER BY ").unwrap_or(sql_upper.len());
if let Some(limit_idx) = sql_upper.find(" LIMIT ") {
if limit_idx < where_end {
where_end = limit_idx;
}
}
let where_clause = &sql[where_idx + 7..where_end];
let and_regex = Regex::new(r"(?i)\s+AND\s+").ok()?;
let parts = and_regex.split(where_clause);
for part in parts {
if let Some(eq_idx) = part.find('=') {
let left = part[..eq_idx]
.trim()
.split('.')
.last()
.unwrap_or("")
.trim_matches('"');
let right = part[eq_idx + 1..].trim().trim_matches('\'');
conditions.push((left.to_string(), right.to_string()));
} else if part.to_uppercase().contains(" IS NULL") {
let left = part[..part.to_uppercase().find(" IS NULL").unwrap()]
.trim()
.split('.')
.last()
.unwrap_or("")
.replace('"', ""); // Remove quotes explicitly
conditions.push((left, "null".to_string()));
}
}
where_clause = sql[where_idx + 7..where_end].to_string();
}
// 3. Find matching mocks
let mut matches = Vec::new();
let or_regex = Regex::new(r"(?i)\s+OR\s+").ok()?;
let and_regex = Regex::new(r"(?i)\s+AND\s+").ok()?;
for mock in mocks {
if let Some(mock_obj) = mock.as_object() {
if let Some(t) = mock_obj.get("type") {
@ -168,25 +149,66 @@ fn parse_and_match_mocks(sql: &str, mocks: &[Value]) -> Option<Vec<Value>> {
}
}
let mut matches_all = true;
for (k, v) in &conditions {
let mock_val_str = match mock_obj.get(k) {
Some(Value::String(s)) => s.clone(),
Some(Value::Number(n)) => n.to_string(),
Some(Value::Bool(b)) => b.to_string(),
Some(Value::Null) => "null".to_string(),
_ => {
matches_all = false;
break;
if where_clause.is_empty() {
matches.push(mock.clone());
continue;
}
let or_parts = or_regex.split(&where_clause);
let mut any_branch_matched = false;
for or_part in or_parts {
let branch_str = or_part.replace('(', "").replace(')', "");
let mut branch_matches = true;
for part in and_regex.split(&branch_str) {
if let Some(eq_idx) = part.find('=') {
let left = part[..eq_idx]
.trim()
.split('.')
.last()
.unwrap_or("")
.trim_matches('"');
let right = part[eq_idx + 1..].trim().trim_matches('\'');
let mock_val_str = match mock_obj.get(left) {
Some(Value::String(s)) => s.clone(),
Some(Value::Number(n)) => n.to_string(),
Some(Value::Bool(b)) => b.to_string(),
Some(Value::Null) => "null".to_string(),
_ => "".to_string(),
};
if mock_val_str != right {
branch_matches = false;
break;
}
} else if part.to_uppercase().contains(" IS NULL") {
let left = part[..part.to_uppercase().find(" IS NULL").unwrap()]
.trim()
.split('.')
.last()
.unwrap_or("")
.trim_matches('"');
let mock_val_str = match mock_obj.get(left) {
Some(Value::Null) => "null".to_string(),
_ => "".to_string(),
};
if mock_val_str != "null" {
branch_matches = false;
break;
}
}
};
if mock_val_str != *v {
matches_all = false;
}
if branch_matches {
any_branch_matched = true;
break;
}
}
if matches_all {
if any_branch_matched {
matches.push(mock.clone());
}
}

View File

@ -9,6 +9,61 @@ impl SpiExecutor {
pub fn new() -> Self {
Self {}
}
fn transact<F, R>(&self, f: F) -> Result<R, String>
where
F: FnOnce() -> Result<R, String>,
{
unsafe {
let oldcontext = pgrx::pg_sys::CurrentMemoryContext;
let oldowner = pgrx::pg_sys::CurrentResourceOwner;
pgrx::pg_sys::BeginInternalSubTransaction(std::ptr::null());
pgrx::pg_sys::MemoryContextSwitchTo(oldcontext);
let runner = std::panic::AssertUnwindSafe(move || {
let res = f();
pgrx::pg_sys::ReleaseCurrentSubTransaction();
pgrx::pg_sys::MemoryContextSwitchTo(oldcontext);
pgrx::pg_sys::CurrentResourceOwner = oldowner;
res
});
pgrx::PgTryBuilder::new(runner)
.catch_rust_panic(|cause| {
pgrx::pg_sys::RollbackAndReleaseCurrentSubTransaction();
pgrx::pg_sys::MemoryContextSwitchTo(oldcontext);
pgrx::pg_sys::CurrentResourceOwner = oldowner;
// Rust panics are fatal bugs, not validation errors. Rethrow so they bubble up.
cause.rethrow()
})
.catch_others(|cause| {
pgrx::pg_sys::RollbackAndReleaseCurrentSubTransaction();
pgrx::pg_sys::MemoryContextSwitchTo(oldcontext);
pgrx::pg_sys::CurrentResourceOwner = oldowner;
let error_msg = match &cause {
pgrx::pg_sys::panic::CaughtError::PostgresError(e)
| pgrx::pg_sys::panic::CaughtError::ErrorReport(e) => {
let json_err = serde_json::json!({
"error": e.message(),
"code": format!("{:?}", e.sql_error_code()),
"detail": e.detail(),
"hint": e.hint()
});
json_err.to_string()
}
_ => format!("{:?}", cause),
};
pgrx::warning!("JSPG Caught Native Postgres Error: {}", error_msg);
Err(error_msg)
})
.execute()
}
}
}
impl DatabaseExecutor for SpiExecutor {
@ -24,19 +79,22 @@ impl DatabaseExecutor for SpiExecutor {
}
}
Spi::connect(|client| {
match client.select(sql, Some(args_with_oid.len() as i64), &args_with_oid) {
Ok(tup_table) => {
let mut results = Vec::new();
for row in tup_table {
if let Ok(Some(jsonb)) = row.get::<pgrx::JsonB>(1) {
results.push(jsonb.0);
self.transact(|| {
Spi::connect(|client| {
pgrx::notice!("JSPG_SQL: {}", sql);
match client.select(sql, Some(args_with_oid.len() as i64), &args_with_oid) {
Ok(tup_table) => {
let mut results = Vec::new();
for row in tup_table {
if let Ok(Some(jsonb)) = row.get::<pgrx::JsonB>(1) {
results.push(jsonb.0);
}
}
Ok(Value::Array(results))
}
Ok(Value::Array(results))
Err(e) => Err(format!("SPI Query Fetch Failure: {}", e)),
}
Err(e) => Err(format!("SPI Query Fetch Failure: {}", e)),
}
})
})
}
@ -52,45 +110,52 @@ impl DatabaseExecutor for SpiExecutor {
}
}
Spi::connect_mut(|client| {
match client.update(sql, Some(args_with_oid.len() as i64), &args_with_oid) {
Ok(_) => Ok(()),
Err(e) => Err(format!("SPI Execution Failure: {}", e)),
}
self.transact(|| {
Spi::connect_mut(|client| {
pgrx::notice!("JSPG_SQL: {}", sql);
match client.update(sql, Some(args_with_oid.len() as i64), &args_with_oid) {
Ok(_) => Ok(()),
Err(e) => Err(format!("SPI Execution Failure: {}", e)),
}
})
})
}
fn auth_user_id(&self) -> Result<String, String> {
Spi::connect(|client| {
let mut tup_table = client
.select(
"SELECT COALESCE(current_setting('auth.user_id', true), 'ffffffff-ffff-ffff-ffff-ffffffffffff')",
None,
&[],
)
.map_err(|e| format!("SPI Select Error: {}", e))?;
self.transact(|| {
Spi::connect(|client| {
let mut tup_table = client
.select(
"SELECT COALESCE(current_setting('auth.user_id', true), 'ffffffff-ffff-ffff-ffff-ffffffffffff')",
None,
&[],
)
.map_err(|e| format!("SPI Select Error: {}", e))?;
let row = tup_table
.next()
.ok_or("No user id setting returned from context".to_string())?;
let user_id: Option<String> = row.get(1).map_err(|e| e.to_string())?;
let row = tup_table
.next()
.ok_or("No user id setting returned from context".to_string())?;
let user_id: Option<String> = row.get(1).map_err(|e| e.to_string())?;
user_id.ok_or("Missing user_id".to_string())
user_id.ok_or("Missing user_id".to_string())
})
})
}
fn timestamp(&self) -> Result<String, String> {
Spi::connect(|client| {
let mut tup_table = client
.select("SELECT clock_timestamp()::text", None, &[])
.map_err(|e| format!("SPI Select Error: {}", e))?;
self.transact(|| {
Spi::connect(|client| {
let mut tup_table = client
.select("SELECT clock_timestamp()::text", None, &[])
.map_err(|e| format!("SPI Select Error: {}", e))?;
let row = tup_table
.next()
.ok_or("No clock timestamp returned".to_string())?;
let timestamp: Option<String> = row.get(1).map_err(|e| e.to_string())?;
let row = tup_table
.next()
.ok_or("No clock timestamp returned".to_string())?;
let timestamp: Option<String> = row.get(1).map_err(|e| e.to_string())?;
timestamp.ok_or("Missing timestamp".to_string())
timestamp.ok_or("Missing timestamp".to_string())
})
})
}
}

View File

@ -1,3 +1,4 @@
pub mod edge;
pub mod r#enum;
pub mod executors;
pub mod formats;
@ -18,14 +19,11 @@ use executors::pgrx::SpiExecutor;
#[cfg(test)]
use executors::mock::MockExecutor;
pub mod stem;
use punc::Punc;
use relation::Relation;
use schema::Schema;
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use stem::Stem;
use r#type::Type;
pub struct Database {
@ -34,8 +32,6 @@ pub struct Database {
pub puncs: HashMap<String, Punc>,
pub relations: HashMap<String, Relation>,
pub schemas: HashMap<String, Schema>,
// Map of Schema ID -> { Entity Type -> Target Subschema Arc }
pub stems: HashMap<String, HashMap<String, Arc<Stem>>>,
pub descendants: HashMap<String, Vec<String>>,
pub depths: HashMap<String, usize>,
pub executor: Box<dyn DatabaseExecutor + Send + Sync>,
@ -49,7 +45,6 @@ impl Database {
relations: HashMap::new(),
puncs: HashMap::new(),
schemas: HashMap::new(),
stems: HashMap::new(),
descendants: HashMap::new(),
depths: HashMap::new(),
#[cfg(not(test))]
@ -78,9 +73,24 @@ impl Database {
for item in arr {
match serde_json::from_value::<Relation>(item.clone()) {
Ok(def) => {
db.relations.insert(def.constraint.clone(), def);
if db.types.contains_key(&def.source_type)
&& db.types.contains_key(&def.destination_type)
{
db.relations.insert(def.constraint.clone(), def);
}
}
Err(e) => {
return Err(crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "DATABASE_RELATION_PARSE_FAILED".to_string(),
message: format!("Failed to parse database relation: {}", e),
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: None,
schema: None,
},
}]));
}
Err(e) => println!("DATABASE RELATION PARSE FAILED: {:?}", e),
}
}
}
@ -137,40 +147,68 @@ impl Database {
self.executor.timestamp()
}
/// Organizes the graph of the database, compiling regex, format functions, and caching relationships.
pub fn compile(&mut self) -> Result<(), crate::drop::Drop> {
self.collect_schemas();
let mut harvested = Vec::new();
for schema in self.schemas.values_mut() {
if let Err(msg) = schema.collect_schemas(None, &mut harvested) {
return Err(crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "SCHEMA_VALIDATION_FAILED".to_string(),
message: msg,
details: crate::drop::ErrorDetails { path: "".to_string(), cause: None, context: None, schema: None },
}]));
}
}
self.schemas.extend(harvested);
if let Err(msg) = self.collect_schemas() {
return Err(crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "SCHEMA_VALIDATION_FAILED".to_string(),
message: msg,
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: None,
schema: None,
},
}]));
}
self.collect_depths();
self.collect_descendants();
self.compile_schemas();
self.collect_stems()?;
// Mathematically evaluate all property inheritances, formats, schemas, and foreign key edges topographically over OnceLocks
let mut visited = std::collections::HashSet::new();
for schema in self.schemas.values() {
schema.compile(self, &mut visited);
}
Ok(())
}
fn collect_schemas(&mut self) {
fn collect_schemas(&mut self) -> Result<(), String> {
let mut to_insert = Vec::new();
// Pass 1: Extract all Schemas structurally off top level definitions into the master registry.
// Validate every node recursively via string filters natively!
for type_def in self.types.values() {
for mut schema in type_def.schemas.clone() {
schema.harvest(&mut to_insert);
schema.collect_schemas(None, &mut to_insert)?;
}
}
for punc_def in self.puncs.values() {
for mut schema in punc_def.schemas.clone() {
schema.harvest(&mut to_insert);
schema.collect_schemas(None, &mut to_insert)?;
}
}
for enum_def in self.enums.values() {
for mut schema in enum_def.schemas.clone() {
schema.harvest(&mut to_insert);
schema.collect_schemas(None, &mut to_insert)?;
}
}
for (id, schema) in to_insert {
self.schemas.insert(id, schema);
}
Ok(())
}
fn collect_depths(&mut self) {
@ -228,8 +266,8 @@ impl Database {
fn collect_descendants_recursively(
target: &str,
direct_refs: &HashMap<String, Vec<String>>,
descendants: &mut HashSet<String>,
direct_refs: &std::collections::HashMap<String, Vec<String>>,
descendants: &mut std::collections::HashSet<String>,
) {
if let Some(children) = direct_refs.get(target) {
for child in children {
@ -239,249 +277,4 @@ impl Database {
}
}
}
fn compile_schemas(&mut self) {
// Pass 3: compile_internals across pure structure
let schema_ids: Vec<String> = self.schemas.keys().cloned().collect();
for id in schema_ids {
if let Some(schema) = self.schemas.get_mut(&id) {
schema.compile_internals();
}
}
}
fn collect_stems(&mut self) -> Result<(), crate::drop::Drop> {
let mut db_stems: HashMap<String, HashMap<String, Arc<Stem>>> = HashMap::new();
let mut errors: Vec<crate::drop::Error> = Vec::new();
let schema_ids: Vec<String> = self.schemas.keys().cloned().collect();
for schema_id in schema_ids {
if let Some(schema) = self.schemas.get(&schema_id) {
let mut inner_map = HashMap::new();
Self::discover_stems(
self,
&schema_id,
schema,
String::from(""),
None,
None,
false,
&mut inner_map,
Vec::new(),
&mut errors,
);
if !inner_map.is_empty() {
db_stems.insert(schema_id, inner_map);
}
}
}
self.stems = db_stems;
if !errors.is_empty() {
return Err(crate::drop::Drop::with_errors(errors));
}
Ok(())
}
fn discover_stems(
db: &Database,
root_schema_id: &str,
schema: &Schema,
current_path: String,
parent_type: Option<String>,
property_name: Option<String>,
is_polymorphic: bool,
inner_map: &mut HashMap<String, Arc<Stem>>,
seen_entities: Vec<String>,
errors: &mut Vec<crate::drop::Error>,
) {
let mut is_entity = false;
let mut entity_type = String::new();
// First check if the Schema's $id is a native Database Type
if let Some(ref id) = schema.obj.id {
let parts: Vec<&str> = id.split('.').collect();
if let Some(last_seg) = parts.last() {
if db.types.contains_key(*last_seg) {
is_entity = true;
entity_type = last_seg.to_string();
}
}
}
// If not found via $id, check the $ref pointer
// This allows ad-hoc schemas (like `save_person.response`) to successfully adopt the Type of what they $ref
if !is_entity {
if let Some(ref r) = schema.obj.r#ref {
let parts: Vec<&str> = r.split('.').collect();
if let Some(last_seg) = parts.last() {
if db.types.contains_key(*last_seg) {
is_entity = true;
entity_type = last_seg.to_string();
}
}
}
}
if is_entity {
if seen_entities.contains(&entity_type) {
return; // Break cyclical schemas!
}
}
let mut relation_col = None;
if is_entity {
if let (Some(pt), Some(prop)) = (&parent_type, &property_name) {
let expected_col = format!("{}_id", prop);
let mut found = false;
for rel in db.relations.values() {
if (rel.source_type == *pt && rel.destination_type == entity_type)
|| (rel.source_type == entity_type && rel.destination_type == *pt)
{
if rel.source_columns.contains(&expected_col) {
relation_col = Some(expected_col.clone());
found = true;
break;
}
}
}
if !found {
relation_col = Some(expected_col);
}
}
let mut final_path = current_path.clone();
if is_polymorphic && !final_path.is_empty() && !final_path.ends_with(&entity_type) {
if final_path.ends_with(".#") {
final_path = format!("{}(type==\"{}\")", final_path, entity_type);
} else {
final_path = format!("{}#(type==\"{}\")", final_path, entity_type);
}
}
let stem = Stem {
r#type: entity_type.clone(),
relation: relation_col,
schema: Arc::new(schema.clone()),
};
inner_map.insert(final_path, Arc::new(stem));
}
let next_parent = if is_entity {
Some(entity_type.clone())
} else {
parent_type.clone()
};
let pass_seen = if is_entity {
let mut ns = seen_entities.clone();
ns.push(entity_type.clone());
ns
} else {
seen_entities.clone()
};
// Properties branch
if let Some(props) = &schema.obj.properties {
for (k, v) in props {
// Standard Property Pathing
let next_path = if current_path.is_empty() {
k.clone()
} else {
format!("{}.{}", current_path, k)
};
Self::discover_stems(
db,
root_schema_id,
v,
next_path,
next_parent.clone(),
Some(k.clone()),
false,
inner_map,
pass_seen.clone(),
errors,
);
}
}
// Array Item branch
if let Some(items) = &schema.obj.items {
let next_path = if current_path.is_empty() {
String::from("#")
} else {
format!("{}.#", current_path)
};
Self::discover_stems(
db,
root_schema_id,
items,
next_path,
next_parent.clone(),
property_name.clone(),
false,
inner_map,
pass_seen.clone(),
errors,
);
}
// Follow external reference if we didn't just crawl local properties
if schema.obj.properties.is_none() && schema.obj.items.is_none() && schema.obj.one_of.is_none()
{
if let Some(ref r) = schema.obj.r#ref {
if let Some(target_schema) = db.schemas.get(r) {
Self::discover_stems(
db,
root_schema_id,
target_schema,
current_path.clone(),
next_parent.clone(),
property_name.clone(),
is_polymorphic,
inner_map,
seen_entities.clone(),
errors,
);
}
}
}
// Polymorphism branch
if let Some(arr) = &schema.obj.one_of {
for v in arr {
Self::discover_stems(
db,
root_schema_id,
v.as_ref(),
current_path.clone(),
next_parent.clone(),
property_name.clone(),
true,
inner_map,
pass_seen.clone(),
errors,
);
}
}
if let Some(arr) = &schema.obj.all_of {
for v in arr {
Self::discover_stems(
db,
root_schema_id,
v.as_ref(),
current_path.clone(),
next_parent.clone(),
property_name.clone(),
is_polymorphic,
inner_map,
pass_seen.clone(),
errors,
);
}
}
}
}

View File

@ -2,6 +2,26 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::OnceLock;
pub fn serialize_once_lock<T: serde::Serialize, S: serde::Serializer>(
lock: &OnceLock<T>,
serializer: S,
) -> Result<S::Ok, S::Error> {
if let Some(val) = lock.get() {
val.serialize(serializer)
} else {
serializer.serialize_none()
}
}
pub fn is_once_lock_map_empty<K, V>(lock: &OnceLock<std::collections::BTreeMap<K, V>>) -> bool {
lock.get().map_or(true, |m| m.is_empty())
}
pub fn is_once_lock_vec_empty<T>(lock: &OnceLock<Vec<T>>) -> bool {
lock.get().map_or(true, |v| v.is_empty())
}
// Schema mirrors the Go Punc Generator's schema struct for consistency.
// It is an order-preserving representation of a JSON Schema.
@ -167,12 +187,27 @@ pub struct SchemaObject {
#[serde(skip_serializing_if = "Option::is_none")]
pub extensible: Option<bool>,
#[serde(rename = "compiledProperties")]
#[serde(skip_deserializing)]
#[serde(skip_serializing_if = "crate::database::schema::is_once_lock_vec_empty")]
#[serde(serialize_with = "crate::database::schema::serialize_once_lock")]
pub compiled_property_names: OnceLock<Vec<String>>,
#[serde(skip)]
pub compiled_format: Option<CompiledFormat>,
pub compiled_properties: OnceLock<BTreeMap<String, Arc<Schema>>>,
#[serde(rename = "compiledEdges")]
#[serde(skip_deserializing)]
#[serde(skip_serializing_if = "crate::database::schema::is_once_lock_map_empty")]
#[serde(serialize_with = "crate::database::schema::serialize_once_lock")]
pub compiled_edges: OnceLock<BTreeMap<String, crate::database::edge::Edge>>,
#[serde(skip)]
pub compiled_pattern: Option<CompiledRegex>,
pub compiled_format: OnceLock<CompiledFormat>,
#[serde(skip)]
pub compiled_pattern_properties: Option<Vec<(CompiledRegex, Arc<Schema>)>>,
pub compiled_pattern: OnceLock<CompiledRegex>,
#[serde(skip)]
pub compiled_pattern_properties: OnceLock<Vec<(CompiledRegex, Arc<Schema>)>>,
}
/// Represents a compiled format validator
@ -216,19 +251,37 @@ impl std::ops::DerefMut for Schema {
}
impl Schema {
pub fn compile_internals(&mut self) {
self.map_children(|child| child.compile_internals());
if let Some(format_str) = &self.obj.format
&& let Some(fmt) = crate::database::formats::FORMATS.get(format_str.as_str())
{
self.obj.compiled_format = Some(crate::database::schema::CompiledFormat::Func(fmt.func));
pub fn compile(
&self,
db: &crate::database::Database,
visited: &mut std::collections::HashSet<String>,
) {
if self.obj.compiled_properties.get().is_some() {
return;
}
if let Some(pattern_str) = &self.obj.pattern
&& let Ok(re) = regex::Regex::new(pattern_str)
{
self.obj.compiled_pattern = Some(crate::database::schema::CompiledRegex(re));
if let Some(id) = &self.obj.id {
if !visited.insert(id.clone()) {
return; // Break cyclical resolution
}
}
if let Some(format_str) = &self.obj.format {
if let Some(fmt) = crate::database::formats::FORMATS.get(format_str.as_str()) {
let _ = self
.obj
.compiled_format
.set(crate::database::schema::CompiledFormat::Func(fmt.func));
}
}
if let Some(pattern_str) = &self.obj.pattern {
if let Ok(re) = regex::Regex::new(pattern_str) {
let _ = self
.obj
.compiled_pattern
.set(crate::database::schema::CompiledRegex(re));
}
}
if let Some(pattern_props) = &self.obj.pattern_properties {
@ -239,73 +292,352 @@ impl Schema {
}
}
if !compiled.is_empty() {
self.obj.compiled_pattern_properties = Some(compiled);
let _ = self.obj.compiled_pattern_properties.set(compiled);
}
}
let mut props = std::collections::BTreeMap::new();
// 1. Resolve INHERITANCE dependencies first
if let Some(ref_id) = &self.obj.r#ref {
if let Some(parent) = db.schemas.get(ref_id) {
parent.compile(db, visited);
if let Some(p_props) = parent.obj.compiled_properties.get() {
props.extend(p_props.clone());
}
}
}
if let Some(all_of) = &self.obj.all_of {
for ao in all_of {
ao.compile(db, visited);
if let Some(ao_props) = ao.obj.compiled_properties.get() {
props.extend(ao_props.clone());
}
}
}
if let Some(then_schema) = &self.obj.then_ {
then_schema.compile(db, visited);
if let Some(t_props) = then_schema.obj.compiled_properties.get() {
props.extend(t_props.clone());
}
}
if let Some(else_schema) = &self.obj.else_ {
else_schema.compile(db, visited);
if let Some(e_props) = else_schema.obj.compiled_properties.get() {
props.extend(e_props.clone());
}
}
// 2. Add local properties
if let Some(local_props) = &self.obj.properties {
for (k, v) in local_props {
props.insert(k.clone(), v.clone());
}
}
// 3. Set the OnceLock!
let _ = self.obj.compiled_properties.set(props.clone());
let mut names: Vec<String> = props.keys().cloned().collect();
names.sort();
let _ = self.obj.compiled_property_names.set(names);
// 4. Compute Edges natively
let schema_edges = self.compile_edges(db, visited, &props);
let _ = self.obj.compiled_edges.set(schema_edges);
// 5. Build our inline children properties recursively NOW! (Depth-first search)
if let Some(local_props) = &self.obj.properties {
for child in local_props.values() {
child.compile(db, visited);
}
}
if let Some(items) = &self.obj.items {
items.compile(db, visited);
}
if let Some(pattern_props) = &self.obj.pattern_properties {
for child in pattern_props.values() {
child.compile(db, visited);
}
}
if let Some(additional_props) = &self.obj.additional_properties {
additional_props.compile(db, visited);
}
if let Some(one_of) = &self.obj.one_of {
for child in one_of {
child.compile(db, visited);
}
}
if let Some(arr) = &self.obj.prefix_items {
for child in arr {
child.compile(db, visited);
}
}
if let Some(child) = &self.obj.not {
child.compile(db, visited);
}
if let Some(child) = &self.obj.contains {
child.compile(db, visited);
}
if let Some(child) = &self.obj.property_names {
child.compile(db, visited);
}
if let Some(child) = &self.obj.if_ {
child.compile(db, visited);
}
if let Some(id) = &self.obj.id {
visited.remove(id);
}
}
pub fn harvest(&mut self, to_insert: &mut Vec<(String, Schema)>) {
#[allow(unused_variables)]
fn validate_identifier(id: &str, field_name: &str) -> Result<(), String> {
#[cfg(not(test))]
for c in id.chars() {
if !c.is_ascii_lowercase() && !c.is_ascii_digit() && c != '_' && c != '.' {
return Err(format!("Invalid character '{}' in JSON Schema '{}' property: '{}'. Identifiers must exclusively contain [a-z0-9_.]", c, field_name, id));
}
}
Ok(())
}
pub fn collect_schemas(
&mut self,
tracking_path: Option<String>,
to_insert: &mut Vec<(String, Schema)>,
) -> Result<(), String> {
if let Some(id) = &self.obj.id {
Self::validate_identifier(id, "$id")?;
to_insert.push((id.clone(), self.clone()));
}
self.map_children(|child| child.harvest(to_insert));
if let Some(r#ref) = &self.obj.r#ref {
Self::validate_identifier(r#ref, "$ref")?;
}
if let Some(family) = &self.obj.family {
Self::validate_identifier(family, "$family")?;
}
// Is this schema an inline ad-hoc composition?
// Meaning it has a tracking context, lacks an explicit $id, but extends an Entity ref with explicit properties!
if self.obj.id.is_none() && self.obj.r#ref.is_some() && self.obj.properties.is_some() {
if let Some(ref path) = tracking_path {
to_insert.push((path.clone(), self.clone()));
}
}
// Provide the path origin to children natively, prioritizing the explicit `$id` boundary if one exists
let origin_path = self.obj.id.clone().or(tracking_path);
self.collect_child_schemas(origin_path, to_insert)?;
Ok(())
}
pub fn map_children<F>(&mut self, mut f: F)
where
F: FnMut(&mut Schema),
{
pub fn collect_child_schemas(
&mut self,
origin_path: Option<String>,
to_insert: &mut Vec<(String, Schema)>,
) -> Result<(), String> {
if let Some(props) = &mut self.obj.properties {
for v in props.values_mut() {
for (k, v) in props.iter_mut() {
let mut inner = (**v).clone();
f(&mut inner);
let next_path = origin_path.as_ref().map(|o| format!("{}/{}", o, k));
inner.collect_schemas(next_path, to_insert)?;
*v = Arc::new(inner);
}
}
if let Some(pattern_props) = &mut self.obj.pattern_properties {
for v in pattern_props.values_mut() {
for (k, v) in pattern_props.iter_mut() {
let mut inner = (**v).clone();
f(&mut inner);
let next_path = origin_path.as_ref().map(|o| format!("{}/{}", o, k));
inner.collect_schemas(next_path, to_insert)?;
*v = Arc::new(inner);
}
}
let mut map_arr = |arr: &mut Vec<Arc<Schema>>| {
let mut map_arr = |arr: &mut Vec<Arc<Schema>>| -> Result<(), String> {
for v in arr.iter_mut() {
let mut inner = (**v).clone();
f(&mut inner);
inner.collect_schemas(origin_path.clone(), to_insert)?;
*v = Arc::new(inner);
}
Ok(())
};
if let Some(arr) = &mut self.obj.prefix_items {
map_arr(arr);
}
if let Some(arr) = &mut self.obj.all_of {
map_arr(arr);
}
if let Some(arr) = &mut self.obj.one_of {
map_arr(arr);
}
if let Some(arr) = &mut self.obj.prefix_items { map_arr(arr)?; }
if let Some(arr) = &mut self.obj.all_of { map_arr(arr)?; }
if let Some(arr) = &mut self.obj.one_of { map_arr(arr)?; }
let mut map_opt = |opt: &mut Option<Arc<Schema>>| {
let mut map_opt = |opt: &mut Option<Arc<Schema>>, pass_path: bool| -> Result<(), String> {
if let Some(v) = opt {
let mut inner = (**v).clone();
f(&mut inner);
let next = if pass_path { origin_path.clone() } else { None };
inner.collect_schemas(next, to_insert)?;
*v = Arc::new(inner);
}
Ok(())
};
map_opt(&mut self.obj.additional_properties);
map_opt(&mut self.obj.items);
map_opt(&mut self.obj.contains);
map_opt(&mut self.obj.property_names);
map_opt(&mut self.obj.not);
map_opt(&mut self.obj.if_);
map_opt(&mut self.obj.then_);
map_opt(&mut self.obj.else_);
map_opt(&mut self.obj.additional_properties, false)?;
// `items` absolutely must inherit the EXACT property path assigned to the Array wrapper!
// This allows nested Arrays enclosing bare Entity structs to correctly register as the boundary mapping.
map_opt(&mut self.obj.items, true)?;
map_opt(&mut self.obj.not, false)?;
map_opt(&mut self.obj.contains, false)?;
map_opt(&mut self.obj.property_names, false)?;
map_opt(&mut self.obj.if_, false)?;
map_opt(&mut self.obj.then_, false)?;
map_opt(&mut self.obj.else_, false)?;
Ok(())
}
pub fn compile_edges(
&self,
db: &crate::database::Database,
visited: &mut std::collections::HashSet<String>,
props: &std::collections::BTreeMap<String, std::sync::Arc<Schema>>,
) -> std::collections::BTreeMap<String, crate::database::edge::Edge> {
let mut schema_edges = std::collections::BTreeMap::new();
let mut parent_type_name = None;
if let Some(family) = &self.obj.family {
parent_type_name = Some(family.split('.').next_back().unwrap_or(family).to_string());
} else if let Some(identifier) = self.obj.identifier() {
parent_type_name = Some(identifier);
}
if let Some(p_type) = parent_type_name {
if db.types.contains_key(&p_type) {
for (prop_name, prop_schema) in props {
let mut child_type_name = None;
let mut target_schema = prop_schema.clone();
if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) =
&prop_schema.obj.type_
{
if t == "array" {
if let Some(items) = &prop_schema.obj.items {
target_schema = items.clone();
}
}
}
if let Some(family) = &target_schema.obj.family {
child_type_name = Some(family.split('.').next_back().unwrap_or(family).to_string());
} else if let Some(ref_id) = target_schema.obj.identifier() {
child_type_name = Some(ref_id);
} else if let Some(arr) = &target_schema.obj.one_of {
if let Some(first) = arr.first() {
if let Some(ref_id) = first.obj.identifier() {
child_type_name = Some(ref_id);
}
}
}
if let Some(c_type) = child_type_name {
if db.types.contains_key(&c_type) {
target_schema.compile(db, visited);
if let Some(compiled_target_props) = target_schema.obj.compiled_properties.get() {
let keys_for_ambiguity: Vec<String> =
compiled_target_props.keys().cloned().collect();
if let Some((relation, is_forward)) =
resolve_relation(db, &p_type, &c_type, prop_name, Some(&keys_for_ambiguity))
{
schema_edges.insert(
prop_name.clone(),
crate::database::edge::Edge {
constraint: relation.constraint.clone(),
forward: is_forward,
},
);
}
}
}
}
}
}
}
schema_edges
}
}
pub(crate) fn resolve_relation<'a>(
db: &'a crate::database::Database,
parent_type: &str,
child_type: &str,
prop_name: &str,
relative_keys: Option<&Vec<String>>,
) -> Option<(&'a crate::database::relation::Relation, bool)> {
if parent_type == "entity" && child_type == "entity" {
return None;
}
let p_def = db.types.get(parent_type)?;
let c_def = db.types.get(child_type)?;
let mut matching_rels = Vec::new();
let mut directions = Vec::new();
for rel in db.relations.values() {
let is_forward = p_def.hierarchy.contains(&rel.source_type)
&& c_def.hierarchy.contains(&rel.destination_type);
let is_reverse = p_def.hierarchy.contains(&rel.destination_type)
&& c_def.hierarchy.contains(&rel.source_type);
if is_forward {
matching_rels.push(rel);
directions.push(true);
} else if is_reverse {
matching_rels.push(rel);
directions.push(false);
}
}
if matching_rels.is_empty() {
return None;
}
if matching_rels.len() == 1 {
return Some((matching_rels[0], directions[0]));
}
let mut chosen_idx = 0;
let mut resolved = false;
for (i, rel) in matching_rels.iter().enumerate() {
if let Some(prefix) = &rel.prefix {
if prop_name.starts_with(prefix)
|| prefix.starts_with(prop_name)
|| prefix.replace("_", "") == prop_name.replace("_", "")
{
chosen_idx = i;
resolved = true;
break;
}
}
}
if !resolved && relative_keys.is_some() {
let keys = relative_keys.unwrap();
let mut missing_prefix_ids = Vec::new();
for (i, rel) in matching_rels.iter().enumerate() {
if let Some(prefix) = &rel.prefix {
if !keys.contains(prefix) {
missing_prefix_ids.push(i);
}
}
}
if missing_prefix_ids.len() == 1 {
chosen_idx = missing_prefix_ids[0];
}
}
Some((matching_rels[chosen_idx], directions[chosen_idx]))
}
impl<'de> Deserialize<'de> for Schema {
@ -363,6 +695,16 @@ impl<'de> Deserialize<'de> for Schema {
}
}
impl SchemaObject {
pub fn identifier(&self) -> Option<String> {
if let Some(lookup_key) = self.id.as_ref().or(self.r#ref.as_ref()) {
Some(lookup_key.split('.').next_back().unwrap_or("").to_string())
} else {
None
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum SchemaTypeOrArray {

View File

@ -1,12 +0,0 @@
use crate::database::schema::Schema;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Stem {
pub r#type: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub relation: Option<String>,
pub schema: Arc<Schema>,
}

View File

@ -15,6 +15,8 @@ pub struct Type {
#[serde(default)]
pub historical: bool,
#[serde(default)]
pub notify: bool,
#[serde(default)]
pub sensitive: bool,
#[serde(default)]
pub ownable: bool,

View File

@ -67,6 +67,10 @@ pub struct Error {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ErrorDetails {
pub path: String,
// Extensions can be added here (package, cause, etc)
// For now, validator only provides path
#[serde(skip_serializing_if = "Option::is_none")]
pub cause: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub context: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub schema: Option<String>,
}

View File

@ -31,6 +31,9 @@ fn jspg_failure() -> JsonB {
message: "JSPG extension has not been initialized via jspg_setup".to_string(),
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: None,
schema: None,
},
};
let drop = crate::drop::Drop::with_errors(vec![error]);
@ -57,7 +60,7 @@ pub fn jspg_setup(database: JsonB) -> JsonB {
}
#[cfg_attr(not(test), pg_extern)]
pub fn jspg_merge(data: JsonB) -> JsonB {
pub fn jspg_merge(schema_id: &str, data: JsonB) -> JsonB {
// Try to acquire a read lock to get a clone of the Engine Arc
let engine_opt = {
let lock = GLOBAL_JSPG.read().unwrap();
@ -66,7 +69,7 @@ pub fn jspg_merge(data: JsonB) -> JsonB {
match engine_opt {
Some(engine) => {
let drop = engine.merger.merge(data.0);
let drop = engine.merger.merge(schema_id, data.0);
JsonB(serde_json::to_value(drop).unwrap())
}
None => jspg_failure(),
@ -74,7 +77,7 @@ pub fn jspg_merge(data: JsonB) -> JsonB {
}
#[cfg_attr(not(test), pg_extern)]
pub fn jspg_query(schema_id: &str, stem: Option<&str>, filters: Option<JsonB>) -> JsonB {
pub fn jspg_query(schema_id: &str, filters: Option<JsonB>) -> JsonB {
let engine_opt = {
let lock = GLOBAL_JSPG.read().unwrap();
lock.clone()
@ -84,7 +87,7 @@ pub fn jspg_query(schema_id: &str, stem: Option<&str>, filters: Option<JsonB>) -
Some(engine) => {
let drop = engine
.queryer
.query(schema_id, stem, filters.as_ref().map(|f| &f.0));
.query(schema_id, filters.as_ref().map(|f| &f.0));
JsonB(serde_json::to_value(drop).unwrap())
}
None => jspg_failure(),
@ -128,24 +131,6 @@ pub fn jspg_schemas() -> JsonB {
}
}
#[cfg_attr(not(test), pg_extern)]
pub fn jspg_stems() -> JsonB {
let engine_opt = {
let lock = GLOBAL_JSPG.read().unwrap();
lock.clone()
};
match engine_opt {
Some(engine) => {
let stems_json = serde_json::to_value(&engine.database.stems)
.unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
let drop = crate::drop::Drop::success_with_val(stems_json);
JsonB(serde_json::to_value(drop).unwrap())
}
None => jspg_failure(),
}
}
#[cfg_attr(not(test), pg_extern(strict))]
pub fn jspg_teardown() -> JsonB {
let mut lock = GLOBAL_JSPG.write().unwrap();

View File

@ -3,6 +3,7 @@
pub mod cache;
use crate::database::r#type::Type;
use crate::database::Database;
use serde_json::Value;
use std::sync::Arc;
@ -20,22 +21,59 @@ impl Merger {
}
}
pub fn merge(&self, data: Value) -> crate::drop::Drop {
let mut val_resolved = Value::Null;
pub fn merge(&self, schema_id: &str, data: Value) -> crate::drop::Drop {
let mut notifications_queue = Vec::new();
let result = self.merge_internal(data, &mut notifications_queue);
let target_schema = match self.db.schemas.get(schema_id) {
Some(s) => Arc::new(s.clone()),
None => {
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "MERGE_FAILED".to_string(),
message: format!("Unknown schema_id: {}", schema_id),
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: Some(data),
schema: None,
},
}]);
}
};
match result {
Ok(val) => {
val_resolved = val;
}
let result = self.merge_internal(target_schema, data.clone(), &mut notifications_queue);
let val_resolved = match result {
Ok(val) => val,
Err(msg) => {
let mut final_code = "MERGE_FAILED".to_string();
let mut final_message = msg.clone();
let mut final_cause = None;
if let Ok(Value::Object(map)) = serde_json::from_str::<Value>(&msg) {
if let (Some(Value::String(e_msg)), Some(Value::String(e_code))) = (map.get("error"), map.get("code")) {
final_message = e_msg.clone();
final_code = e_code.clone();
let mut cause_parts = Vec::new();
if let Some(Value::String(d)) = map.get("detail") {
if !d.is_empty() { cause_parts.push(d.clone()); }
}
if let Some(Value::String(h)) = map.get("hint") {
if !h.is_empty() { cause_parts.push(h.clone()); }
}
if !cause_parts.is_empty() {
final_cause = Some(cause_parts.join("\n"));
}
}
}
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "MERGE_FAILED".to_string(),
message: msg,
code: final_code,
message: final_message,
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: final_cause,
context: Some(data),
schema: None,
},
}]);
}
@ -49,6 +87,9 @@ impl Merger {
message: format!("Executor Error in pre-ordered notify: {:?}", e),
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: None,
schema: None,
},
}]);
}
@ -82,24 +123,48 @@ impl Merger {
crate::drop::Drop::success_with_val(stripped_val)
}
pub(crate) fn merge_internal(&self, data: Value, notifications: &mut Vec<String>) -> Result<Value, String> {
pub(crate) fn merge_internal(
&self,
schema: Arc<crate::database::schema::Schema>,
data: Value,
notifications: &mut Vec<String>,
) -> Result<Value, String> {
match data {
Value::Array(items) => self.merge_array(items, notifications),
Value::Object(map) => self.merge_object(map, notifications),
Value::Array(items) => self.merge_array(schema, items, notifications),
Value::Object(map) => self.merge_object(schema, map, notifications),
_ => Err("Invalid merge payload: root must be an Object or Array".to_string()),
}
}
fn merge_array(&self, items: Vec<Value>, notifications: &mut Vec<String>) -> Result<Value, String> {
fn merge_array(
&self,
schema: Arc<crate::database::schema::Schema>,
items: Vec<Value>,
notifications: &mut Vec<String>,
) -> Result<Value, String> {
let mut item_schema = schema.clone();
if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) = &schema.obj.type_ {
if t == "array" {
if let Some(items_def) = &schema.obj.items {
item_schema = items_def.clone();
}
}
}
let mut resolved_items = Vec::new();
for item in items {
let resolved = self.merge_internal(item, notifications)?;
let resolved = self.merge_internal(item_schema.clone(), item, notifications)?;
resolved_items.push(resolved);
}
Ok(Value::Array(resolved_items))
}
fn merge_object(&self, obj: serde_json::Map<String, Value>, notifications: &mut Vec<String>) -> Result<Value, String> {
fn merge_object(
&self,
schema: Arc<crate::database::schema::Schema>,
obj: serde_json::Map<String, Value>,
notifications: &mut Vec<String>,
) -> Result<Value, String> {
let queue_start = notifications.len();
let type_name = match obj.get("type").and_then(|v| v.as_str()) {
@ -112,25 +177,49 @@ impl Merger {
None => return Err(format!("Unknown entity type: {}", type_name)),
};
// 1. Segment the entity: fields in type_def.fields are database fields, others are relationships
let compiled_props = match schema.obj.compiled_properties.get() {
Some(props) => props,
None => return Err("Schema has no compiled properties for merging".to_string()),
};
let mut entity_fields = serde_json::Map::new();
let mut entity_objects = serde_json::Map::new();
let mut entity_arrays = serde_json::Map::new();
let mut entity_objects = std::collections::BTreeMap::new();
let mut entity_arrays = std::collections::BTreeMap::new();
for (k, v) in obj {
let is_field = type_def.fields.contains(&k) || k == "created";
let typeof_v = match &v {
Value::Object(_) => "object",
Value::Array(_) => "array",
_ => "other",
};
// Always retain system and unmapped core fields natively implicitly mapped to the Postgres tables
if k == "id" || k == "type" || k == "created" {
entity_fields.insert(k.clone(), v.clone());
continue;
}
if is_field {
entity_fields.insert(k, v);
} else if typeof_v == "object" {
entity_objects.insert(k, v);
} else if typeof_v == "array" {
entity_arrays.insert(k, v);
if let Some(prop_schema) = compiled_props.get(&k) {
let mut is_edge = false;
if let Some(edges) = schema.obj.compiled_edges.get() {
if edges.contains_key(&k) {
is_edge = true;
}
}
if is_edge {
let typeof_v = match &v {
Value::Object(_) => "object",
Value::Array(_) => "array",
_ => "field", // Malformed edge data?
};
if typeof_v == "object" {
entity_objects.insert(k.clone(), (v.clone(), prop_schema.clone()));
} else if typeof_v == "array" {
entity_arrays.insert(k.clone(), (v.clone(), prop_schema.clone()));
} else {
entity_fields.insert(k.clone(), v.clone());
}
} else {
// Not an edge! It's a raw Postgres column (e.g., JSONB, text[])
entity_fields.insert(k.clone(), v.clone());
}
} else if type_def.fields.contains(&k) {
entity_fields.insert(k.clone(), v.clone());
}
}
@ -139,85 +228,96 @@ impl Merger {
let mut entity_change_kind = None;
let mut entity_fetched = None;
let mut entity_replaces = None;
// 2. Pre-stage the entity (for non-relationships)
if !type_def.relationship {
let (fields, kind, fetched) =
let (fields, kind, fetched, replaces) =
self.stage_entity(entity_fields.clone(), type_def, &user_id, &timestamp)?;
entity_fields = fields;
entity_change_kind = kind;
entity_fetched = fetched;
entity_replaces = replaces;
}
let mut entity_response = serde_json::Map::new();
// 3. Handle related objects
for (relation_name, relative_val) in entity_objects {
for (relation_name, (relative_val, rel_schema)) in entity_objects {
let mut relative = match relative_val {
Value::Object(m) => m,
_ => continue,
};
let relative_relation = self.get_entity_relation(type_def, &relative, &relation_name)?;
let relative_type_name = match relative.get("type").and_then(|v| v.as_str()) {
Some(t) => t.to_string(),
None => continue,
};
if let Some(relation) = relative_relation {
let parent_is_source = type_def.hierarchy.contains(&relation.source_type);
if let Some(compiled_edges) = schema.obj.compiled_edges.get() {
println!("Compiled Edges keys for relation {}: {:?}", relation_name, compiled_edges.keys().collect::<Vec<_>>());
if let Some(edge) = compiled_edges.get(&relation_name) {
println!("FOUND EDGE {} -> {:?}", relation_name, edge.constraint);
if let Some(relation) = self.db.relations.get(&edge.constraint) {
let parent_is_source = edge.forward;
if parent_is_source {
// Parent holds FK to Child. Child MUST be generated FIRST.
if !relative.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
relative.insert("organization_id".to_string(), org_id.clone());
if parent_is_source {
if !relative.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
relative.insert("organization_id".to_string(), org_id.clone());
}
}
let mut merged_relative = match self.merge_internal(rel_schema.clone(), Value::Object(relative), notifications)? {
Value::Object(m) => m,
_ => continue,
};
merged_relative.insert(
"type".to_string(),
Value::String(relative_type_name),
);
Self::apply_entity_relation(
&mut entity_fields,
&relation.source_columns,
&relation.destination_columns,
&merged_relative,
);
entity_response.insert(relation_name, Value::Object(merged_relative));
} else {
if !relative.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
relative.insert("organization_id".to_string(), org_id.clone());
}
}
Self::apply_entity_relation(
&mut relative,
&relation.source_columns,
&relation.destination_columns,
&entity_fields,
);
let merged_relative = match self.merge_internal(rel_schema.clone(), Value::Object(relative), notifications)? {
Value::Object(m) => m,
_ => continue,
};
entity_response.insert(relation_name, Value::Object(merged_relative));
}
}
let merged_relative = match self.merge_internal(Value::Object(relative), notifications)? {
Value::Object(m) => m,
_ => continue,
};
Self::apply_entity_relation(
&mut entity_fields,
&relation.source_columns,
&relation.destination_columns,
&merged_relative,
);
entity_response.insert(relation_name, Value::Object(merged_relative));
} else {
// Child holds FK back to Parent.
if !relative.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
relative.insert("organization_id".to_string(), org_id.clone());
}
}
Self::apply_entity_relation(
&mut relative,
&relation.source_columns,
&relation.destination_columns,
&entity_fields,
);
let merged_relative = match self.merge_internal(Value::Object(relative), notifications)? {
Value::Object(m) => m,
_ => continue,
};
entity_response.insert(relation_name, Value::Object(merged_relative));
}
}
}
// 4. Post-stage the entity (for relationships)
if type_def.relationship {
let (fields, kind, fetched) =
let (fields, kind, fetched, replaces) =
self.stage_entity(entity_fields.clone(), type_def, &user_id, &timestamp)?;
entity_fields = fields;
entity_change_kind = kind;
entity_fetched = fetched;
entity_replaces = replaces;
}
// 5. Process the main entity fields
self.merge_entity_fields(
entity_change_kind.as_deref().unwrap_or(""),
&type_name,
@ -226,13 +326,11 @@ impl Merger {
entity_fetched.as_ref(),
)?;
// Add main entity fields to response
for (k, v) in &entity_fields {
entity_response.insert(k.clone(), v.clone());
}
// 6. Handle related arrays
for (relation_name, relative_val) in entity_arrays {
for (relation_name, (relative_val, rel_schema)) in entity_arrays {
let relative_arr = match relative_val {
Value::Array(a) => a,
_ => continue,
@ -242,49 +340,58 @@ impl Merger {
continue;
}
let first_relative = match &relative_arr[0] {
Value::Object(m) => m,
_ => continue,
};
if let Some(compiled_edges) = schema.obj.compiled_edges.get() {
if let Some(edge) = compiled_edges.get(&relation_name) {
if let Some(relation) = self.db.relations.get(&edge.constraint) {
let mut relative_responses = Vec::new();
for relative_item_val in relative_arr {
if let Value::Object(mut relative_item) = relative_item_val {
if !relative_item.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
relative_item.insert("organization_id".to_string(), org_id.clone());
}
}
let relative_relation = self.get_entity_relation(type_def, first_relative, &relation_name)?;
Self::apply_entity_relation(
&mut relative_item,
&relation.source_columns,
&relation.destination_columns,
&entity_fields,
);
if let Some(relation) = relative_relation {
let mut relative_responses = Vec::new();
for relative_item_val in relative_arr {
if let Value::Object(mut relative_item) = relative_item_val {
if !relative_item.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
relative_item.insert("organization_id".to_string(), org_id.clone());
let mut item_schema = rel_schema.clone();
if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) = &rel_schema.obj.type_ {
if t == "array" {
if let Some(items_def) = &rel_schema.obj.items {
item_schema = items_def.clone();
}
}
}
let merged_relative =
match self.merge_internal(item_schema, Value::Object(relative_item), notifications)? {
Value::Object(m) => m,
_ => continue,
};
relative_responses.push(Value::Object(merged_relative));
}
}
Self::apply_entity_relation(
&mut relative_item,
&relation.source_columns,
&relation.destination_columns,
&entity_fields,
);
let merged_relative = match self.merge_internal(Value::Object(relative_item), notifications)? {
Value::Object(m) => m,
_ => continue,
};
relative_responses.push(Value::Object(merged_relative));
entity_response.insert(relation_name, Value::Array(relative_responses));
}
}
entity_response.insert(relation_name, Value::Array(relative_responses));
}
}
// 7. Perform change tracking
// 7. Perform change tracking dynamically suppressing noise based on type bounds!
let notify_sql = self.merge_entity_change(
type_def,
&entity_fields,
entity_fetched.as_ref(),
entity_change_kind.as_deref(),
&user_id,
&timestamp,
entity_replaces.as_deref(),
)?;
if let Some(sql) = notify_sql {
@ -316,13 +423,42 @@ impl Merger {
serde_json::Map<String, Value>,
Option<String>,
Option<serde_json::Map<String, Value>>,
Option<String>,
),
String,
> {
let type_name = type_def.name.as_str();
// 🚀 Anchor Short-Circuit Optimization
// An anchor is STRICTLY a struct containing merely an `id` and `type`.
// We aggressively bypass Database SPI `SELECT` fetches because there are no primitive
// mutations to apply to the row. PostgreSQL inherently protects relationships via Foreign Keys downstream.
let is_anchor = entity_fields.len() == 2
&& entity_fields.contains_key("id")
&& entity_fields.contains_key("type");
let has_valid_id = entity_fields
.get("id")
.and_then(|v| v.as_str())
.map_or(false, |s| !s.is_empty());
if is_anchor && has_valid_id {
return Ok((entity_fields, None, None, None));
}
let entity_fetched = self.fetch_entity(&entity_fields, type_def)?;
let mut replaces_id = None;
if let Some(ref fetched_row) = entity_fetched {
let provided_id = entity_fields.get("id").and_then(|v| v.as_str());
let fetched_id = fetched_row.get("id").and_then(|v| v.as_str());
if let (Some(pid), Some(fid)) = (provided_id, fetched_id) {
if !pid.is_empty() && pid != fid {
replaces_id = Some(pid.to_string());
}
}
}
let system_keys = vec![
"id".to_string(),
"type".to_string(),
@ -372,7 +508,7 @@ impl Merger {
);
entity_fields = new_fields;
} else if changes.is_empty() {
} else if changes.is_empty() && replaces_id.is_none() {
let mut new_fields = serde_json::Map::new();
new_fields.insert(
"id".to_string(),
@ -388,6 +524,8 @@ impl Merger {
.unwrap_or(false);
entity_change_kind = if is_archived {
Some("delete".to_string())
} else if changes.is_empty() && replaces_id.is_some() {
Some("replace".to_string())
} else {
Some("update".to_string())
};
@ -410,7 +548,7 @@ impl Merger {
entity_fields = new_fields;
}
Ok((entity_fields, entity_change_kind, entity_fetched))
Ok((entity_fields, entity_change_kind, entity_fetched, replaces_id))
}
fn fetch_entity(
@ -465,11 +603,14 @@ impl Merger {
template
};
let where_clause = if let Some(id) = id_val {
format!("WHERE t1.id = {}", Self::quote_literal(id))
} else if lookup_complete {
let mut lookup_predicates = Vec::new();
let mut where_parts = Vec::new();
if let Some(id) = id_val {
where_parts.push(format!("t1.id = {}", Self::quote_literal(id)));
}
if lookup_complete {
let mut lookup_predicates = Vec::new();
for column in &entity_type.lookup_fields {
let val = entity_fields.get(column).unwrap_or(&Value::Null);
if column == "type" {
@ -478,10 +619,14 @@ impl Merger {
lookup_predicates.push(format!("\"{}\" = {}", column, Self::quote_literal(val)));
}
}
format!("WHERE {}", lookup_predicates.join(" AND "))
} else {
where_parts.push(format!("({})", lookup_predicates.join(" AND ")));
}
if where_parts.is_empty() {
return Ok(None);
};
}
let where_clause = format!("WHERE {}", where_parts.join(" OR "));
let final_sql = format!("{} {}", fetch_sql_template, where_clause);
@ -577,11 +722,7 @@ impl Merger {
for key in &sorted_keys {
columns.push(format!("\"{}\"", key));
let val = entity_pairs.get(key).unwrap();
if val.as_str() == Some("") {
values.push("NULL".to_string());
} else {
values.push(Self::quote_literal(val));
}
values.push(Self::format_sql_value(val, key, entity_type));
}
if columns.is_empty() {
@ -596,8 +737,7 @@ impl Merger {
);
self
.db
.execute(&sql, None)
.map_err(|e| format!("SPI Error in INSERT: {:?}", e))?;
.execute(&sql, None)?;
} else if change_kind == "update" || change_kind == "delete" {
entity_pairs.remove("id");
entity_pairs.remove("type");
@ -615,7 +755,11 @@ impl Merger {
if val.as_str() == Some("") {
set_clauses.push(format!("\"{}\" = NULL", key));
} else {
set_clauses.push(format!("\"{}\" = {}", key, Self::quote_literal(val)));
set_clauses.push(format!(
"\"{}\" = {}",
key,
Self::format_sql_value(val, key, entity_type)
));
}
}
@ -627,8 +771,7 @@ impl Merger {
);
self
.db
.execute(&sql, None)
.map_err(|e| format!("SPI Error in UPDATE: {:?}", e))?;
.execute(&sql, None)?;
}
}
@ -637,11 +780,13 @@ impl Merger {
fn merge_entity_change(
&self,
type_obj: &Type,
entity_fields: &serde_json::Map<String, Value>,
entity_fetched: Option<&serde_json::Map<String, Value>>,
entity_change_kind: Option<&str>,
user_id: &str,
timestamp: &str,
replaces_id: Option<&str>,
) -> Result<Option<String>, String> {
let change_kind = match entity_change_kind {
Some(k) => k,
@ -651,10 +796,11 @@ impl Merger {
let id_str = entity_fields.get("id").unwrap();
let type_name = entity_fields.get("type").unwrap();
let mut changes = serde_json::Map::new();
let is_update = change_kind == "update" || change_kind == "delete";
let mut old_vals = serde_json::Map::new();
let mut new_vals = serde_json::Map::new();
let exists = change_kind == "update" || change_kind == "delete" || change_kind == "replace";
if !is_update {
if !exists {
let system_keys = vec![
"id".to_string(),
"created_by".to_string(),
@ -664,7 +810,7 @@ impl Merger {
];
for (k, v) in entity_fields {
if !system_keys.contains(k) {
changes.insert(k.clone(), v.clone());
new_vals.insert(k.clone(), v.clone());
}
}
} else {
@ -681,16 +827,17 @@ impl Merger {
if let Some(fetched) = entity_fetched {
let old_val = fetched.get(k).unwrap_or(&Value::Null);
if v != old_val {
changes.insert(k.clone(), v.clone());
new_vals.insert(k.clone(), v.clone());
old_vals.insert(k.clone(), old_val.clone());
}
}
}
}
changes.insert("type".to_string(), type_name.clone());
new_vals.insert("type".to_string(), type_name.clone());
}
let mut complete = entity_fields.clone();
if is_update {
if exists {
if let Some(fetched) = entity_fetched {
let mut temp = fetched.clone();
for (k, v) in entity_fields {
@ -700,33 +847,49 @@ impl Merger {
}
}
let new_val_obj = Value::Object(new_vals);
let old_val_obj = if old_vals.is_empty() {
Value::Null
} else {
Value::Object(old_vals)
};
let mut notification = serde_json::Map::new();
notification.insert("complete".to_string(), Value::Object(complete));
if is_update {
notification.insert("changes".to_string(), Value::Object(changes.clone()));
notification.insert("new".to_string(), new_val_obj.clone());
if old_val_obj != Value::Null {
notification.insert("old".to_string(), old_val_obj.clone());
}
let change_sql = format!(
"INSERT INTO agreego.change (changes, entity_id, id, kind, modified_at, modified_by) VALUES ({}, {}, {}, {}, {}, {})",
Self::quote_literal(&Value::Object(changes)),
Self::quote_literal(id_str),
Self::quote_literal(&Value::String(uuid::Uuid::new_v4().to_string())),
Self::quote_literal(&Value::String(change_kind.to_string())),
Self::quote_literal(&Value::String(timestamp.to_string())),
Self::quote_literal(&Value::String(user_id.to_string()))
);
if let Some(rep) = replaces_id {
notification.insert("replaces".to_string(), Value::String(rep.to_string()));
}
let notify_sql = format!(
"SELECT pg_notify('entity', {})",
Self::quote_literal(&Value::String(Value::Object(notification).to_string()))
);
let mut notify_sql = None;
if type_obj.historical && change_kind != "replace" {
let change_sql = format!(
"INSERT INTO agreego.change (\"old\", \"new\", entity_id, id, kind, modified_at, modified_by) VALUES ({}, {}, {}, {}, {}, {}, {})",
Self::quote_literal(&old_val_obj),
Self::quote_literal(&new_val_obj),
Self::quote_literal(id_str),
Self::quote_literal(&Value::String(uuid::Uuid::new_v4().to_string())),
Self::quote_literal(&Value::String(change_kind.to_string())),
Self::quote_literal(&Value::String(timestamp.to_string())),
Self::quote_literal(&Value::String(user_id.to_string()))
);
self
.db
.execute(&change_sql, None)
.map_err(|e| format!("Executor Error in change: {:?}", e))?;
self.db.execute(&change_sql, None)?;
}
Ok(Some(notify_sql))
if type_obj.notify {
notify_sql = Some(format!(
"SELECT pg_notify('entity', {})",
Self::quote_literal(&Value::String(Value::Object(notification).to_string()))
));
}
Ok(notify_sql)
}
fn compare_entities(
@ -760,101 +923,7 @@ impl Merger {
changes
}
fn reduce_entity_relations(
&self,
mut matching_relations: Vec<crate::database::relation::Relation>,
relative: &serde_json::Map<String, Value>,
relation_name: &str,
) -> Result<Option<crate::database::relation::Relation>, String> {
if matching_relations.is_empty() {
return Ok(None);
}
if matching_relations.len() == 1 {
return Ok(Some(matching_relations.pop().unwrap()));
}
let exact_match: Vec<_> = matching_relations
.iter()
.filter(|r| r.prefix.as_deref() == Some(relation_name))
.cloned()
.collect();
if exact_match.len() == 1 {
return Ok(Some(exact_match.into_iter().next().unwrap()));
}
matching_relations.retain(|r| {
if let Some(prefix) = &r.prefix {
!relative.contains_key(prefix)
} else {
true
}
});
if matching_relations.len() == 1 {
Ok(Some(matching_relations.pop().unwrap()))
} else {
let constraints: Vec<_> = matching_relations
.iter()
.map(|r| r.constraint.clone())
.collect();
Err(format!(
"AMBIGUOUS_TYPE_RELATIONS: Could not reduce ambiguous type relations: {}",
constraints.join(", ")
))
}
}
fn get_entity_relation(
&self,
entity_type: &crate::database::r#type::Type,
relative: &serde_json::Map<String, Value>,
relation_name: &str,
) -> Result<Option<crate::database::relation::Relation>, String> {
let relative_type_name = match relative.get("type").and_then(|v| v.as_str()) {
Some(t) => t,
None => return Ok(None),
};
let relative_type = match self.db.types.get(relative_type_name) {
Some(t) => t,
None => return Ok(None),
};
let mut relative_relations: Vec<crate::database::relation::Relation> = Vec::new();
for r in self.db.relations.values() {
if r.source_type != "entity" && r.destination_type != "entity" {
let condition1 = relative_type.hierarchy.contains(&r.source_type)
&& entity_type.hierarchy.contains(&r.destination_type);
let condition2 = entity_type.hierarchy.contains(&r.source_type)
&& relative_type.hierarchy.contains(&r.destination_type);
if condition1 || condition2 {
relative_relations.push(r.clone());
}
}
}
let mut relative_relation =
self.reduce_entity_relations(relative_relations, relative, relation_name)?;
if relative_relation.is_none() {
let mut poly_relations: Vec<crate::database::relation::Relation> = Vec::new();
for r in self.db.relations.values() {
if r.destination_type == "entity" {
let condition1 = relative_type.hierarchy.contains(&r.source_type);
let condition2 = entity_type.hierarchy.contains(&r.source_type);
if condition1 || condition2 {
poly_relations.push(r.clone());
}
}
}
relative_relation = self.reduce_entity_relations(poly_relations, relative, relation_name)?;
}
Ok(relative_relation)
}
// Helper Functions
fn apply_entity_relation(
source_entity: &mut serde_json::Map<String, Value>,
@ -872,6 +941,34 @@ impl Merger {
}
}
fn format_sql_value(val: &Value, key: &str, entity_type: &Type) -> String {
if val.as_str() == Some("") {
return "NULL".to_string();
}
let mut is_pg_array = false;
if let Some(field_types_map) = entity_type.field_types.as_ref().and_then(|v| v.as_object()) {
if let Some(t_val) = field_types_map.get(key) {
if let Some(t_str) = t_val.as_str() {
if t_str.starts_with('_') {
is_pg_array = true;
}
}
}
}
if is_pg_array && val.is_array() {
let mut s = val.to_string();
if s.starts_with('[') && s.ends_with(']') {
s.replace_range(0..1, "{");
s.replace_range(s.len() - 1..s.len(), "}");
}
Self::quote_literal(&Value::String(s))
} else {
Self::quote_literal(val)
}
}
fn quote_literal(val: &Value) -> String {
match val {
Value::Null => "NULL".to_string(),

File diff suppressed because it is too large Load Diff

View File

@ -21,7 +21,6 @@ impl Queryer {
pub fn query(
&self,
schema_id: &str,
stem_opt: Option<&str>,
filters: Option<&serde_json::Value>,
) -> crate::drop::Drop {
let filters_map = filters.and_then(|f| f.as_object());
@ -32,19 +31,21 @@ impl Queryer {
Err(msg) => {
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "FILTER_PARSE_FAILED".to_string(),
message: msg,
message: msg.clone(),
details: crate::drop::ErrorDetails {
path: schema_id.to_string(),
path: "".to_string(), // filters apply to the root query
cause: Some(msg),
context: filters.cloned(),
schema: Some(schema_id.to_string()),
},
}]);
}
};
let stem_key = stem_opt.unwrap_or("/");
let cache_key = format!("{}(Stem:{}):{}", schema_id, stem_key, filter_keys.join(","));
let cache_key = format!("{}:{}", schema_id, filter_keys.join(","));
// 2. Fetch from cache or compile
let sql = match self.get_or_compile_sql(&cache_key, schema_id, stem_opt, &filter_keys) {
let sql = match self.get_or_compile_sql(&cache_key, schema_id, &filter_keys) {
Ok(sql) => sql,
Err(drop) => return drop,
};
@ -53,6 +54,45 @@ impl Queryer {
self.execute_sql(schema_id, &sql, &args)
}
fn extract_filters(
prefix: String,
val: &serde_json::Value,
entries: &mut Vec<(String, serde_json::Value)>,
) -> Result<(), String> {
if let Some(obj) = val.as_object() {
let mut is_op_obj = false;
if let Some(first_key) = obj.keys().next() {
if first_key.starts_with('$') {
is_op_obj = true;
}
}
if is_op_obj {
for (op, op_val) in obj {
if !op.starts_with('$') {
return Err(format!("Filter operator must start with '$', got: {}", op));
}
entries.push((format!("{}:{}", prefix, op), op_val.clone()));
}
} else {
for (k, v) in obj {
let next_prefix = if prefix.is_empty() {
k.clone()
} else {
format!("{}/{}", prefix, k)
};
Self::extract_filters(next_prefix, v, entries)?;
}
}
} else {
return Err(format!(
"Filter for path '{}' must be an operator object like {{$eq: ...}} or a nested map.",
prefix
));
}
Ok(())
}
fn parse_filter_entries(
&self,
filters_map: Option<&serde_json::Map<String, serde_json::Value>>,
@ -60,19 +100,7 @@ impl Queryer {
let mut filter_entries: Vec<(String, serde_json::Value)> = Vec::new();
if let Some(fm) = filters_map {
for (key, val) in fm {
if let Some(obj) = val.as_object() {
for (op, op_val) in obj {
if !op.starts_with('$') {
return Err(format!("Filter operator must start with '$', got: {}", op));
}
filter_entries.push((format!("{}:{}", key, op), op_val.clone()));
}
} else {
return Err(format!(
"Filter for field '{}' must be an object with operators like $eq, $in, etc.",
key
));
}
Self::extract_filters(key.clone(), val, &mut filter_entries)?;
}
}
filter_entries.sort_by(|a, b| a.0.cmp(&b.0));
@ -87,15 +115,19 @@ impl Queryer {
&self,
cache_key: &str,
schema_id: &str,
stem_opt: Option<&str>,
filter_keys: &[String],
) -> Result<String, crate::drop::Drop> {
if let Some(cached_sql) = self.cache.get(cache_key) {
return Ok(cached_sql.value().clone());
}
let compiler = compiler::SqlCompiler::new(self.db.clone());
match compiler.compile(schema_id, stem_opt, filter_keys) {
let compiler = compiler::Compiler {
db: &self.db,
filter_keys: filter_keys,
alias_counter: 0,
};
match compiler.compile(schema_id, filter_keys) {
Ok(compiled_sql) => {
self
.cache
@ -104,9 +136,12 @@ impl Queryer {
}
Err(e) => Err(crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "QUERY_COMPILATION_FAILED".to_string(),
message: e,
message: e.clone(),
details: crate::drop::ErrorDetails {
path: schema_id.to_string(),
path: "".to_string(),
cause: Some(e),
context: None,
schema: Some(schema_id.to_string()),
},
}])),
}
@ -130,14 +165,20 @@ impl Queryer {
code: "QUERY_FAILED".to_string(),
message: format!("Expected array from generic query, got: {:?}", other),
details: crate::drop::ErrorDetails {
path: schema_id.to_string(),
path: "".to_string(),
cause: Some(format!("Expected array, got {}", other)),
context: Some(serde_json::json!([sql])),
schema: Some(schema_id.to_string()),
},
}]),
Err(e) => crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "QUERY_FAILED".to_string(),
message: format!("SPI error in queryer: {}", e),
details: crate::drop::ErrorDetails {
path: schema_id.to_string(),
path: "".to_string(),
cause: Some(format!("SPI error in queryer: {}", e)),
context: Some(serde_json::json!([sql])),
schema: Some(schema_id.to_string()),
},
}]),
}

View File

@ -1463,18 +1463,6 @@ fn test_queryer_0_8() {
crate::tests::runner::run_test_case(&path, 0, 8).unwrap();
}
#[test]
fn test_queryer_0_9() {
let path = format!("{}/fixtures/queryer.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 9).unwrap();
}
#[test]
fn test_queryer_0_10() {
let path = format!("{}/fixtures/queryer.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 10).unwrap();
}
#[test]
fn test_not_0_0() {
let path = format!("{}/fixtures/not.json", env!("CARGO_MANIFEST_DIR"));
@ -2939,6 +2927,36 @@ fn test_minimum_1_6() {
crate::tests::runner::run_test_case(&path, 1, 6).unwrap();
}
#[test]
fn test_paths_0_0() {
let path = format!("{}/fixtures/paths.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 0).unwrap();
}
#[test]
fn test_paths_0_1() {
let path = format!("{}/fixtures/paths.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 1).unwrap();
}
#[test]
fn test_paths_0_2() {
let path = format!("{}/fixtures/paths.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 2).unwrap();
}
#[test]
fn test_paths_0_3() {
let path = format!("{}/fixtures/paths.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 3).unwrap();
}
#[test]
fn test_paths_0_4() {
let path = format!("{}/fixtures/paths.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 4).unwrap();
}
#[test]
fn test_one_of_0_0() {
let path = format!("{}/fixtures/oneOf.json", env!("CARGO_MANIFEST_DIR"));
@ -3449,12 +3467,6 @@ fn test_if_then_else_13_1() {
crate::tests::runner::run_test_case(&path, 13, 1).unwrap();
}
#[test]
fn test_stems_0_0() {
let path = format!("{}/fixtures/stems.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 0).unwrap();
}
#[test]
fn test_empty_string_0_0() {
let path = format!("{}/fixtures/emptyString.json", env!("CARGO_MANIFEST_DIR"));
@ -8566,3 +8578,33 @@ fn test_merger_0_7() {
let path = format!("{}/fixtures/merger.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 7).unwrap();
}
#[test]
fn test_merger_0_8() {
let path = format!("{}/fixtures/merger.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 8).unwrap();
}
#[test]
fn test_merger_0_9() {
let path = format!("{}/fixtures/merger.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 9).unwrap();
}
#[test]
fn test_merger_0_10() {
let path = format!("{}/fixtures/merger.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 10).unwrap();
}
#[test]
fn test_merger_0_11() {
let path = format!("{}/fixtures/merger.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 11).unwrap();
}
#[test]
fn test_merger_0_12() {
let path = format!("{}/fixtures/merger.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 12).unwrap();
}

View File

@ -2,7 +2,6 @@ use crate::*;
pub mod runner;
pub mod types;
use serde_json::json;
pub mod sql_validator;
// Database module tests moved to src/database/executors/mock.rs
@ -11,7 +10,7 @@ fn test_library_api() {
// 1. Initially, schemas are not cached.
// Expected uninitialized drop format: errors + null response
let uninitialized_drop = jspg_validate("test_schema", JsonB(json!({})));
let uninitialized_drop = jspg_validate("source_schema", JsonB(json!({})));
assert_eq!(
uninitialized_drop.0,
json!({
@ -28,17 +27,44 @@ fn test_library_api() {
let db_json = json!({
"puncs": [],
"enums": [],
"relations": [],
"types": [{
"schemas": [{
"$id": "test_schema",
"type": "object",
"properties": {
"name": { "type": "string" }
},
"required": ["name"]
}]
}]
"relations": [
{
"id": "11111111-1111-1111-1111-111111111111",
"type": "relation",
"constraint": "fk_test_target",
"source_type": "source_schema",
"source_columns": ["target_id"],
"destination_type": "target_schema",
"destination_columns": ["id"],
"prefix": "target"
}
],
"types": [
{
"name": "source_schema",
"hierarchy": ["source_schema", "entity"],
"schemas": [{
"$id": "source_schema",
"type": "object",
"properties": {
"name": { "type": "string" },
"target": { "$ref": "target_schema" }
},
"required": ["name"]
}]
},
{
"name": "target_schema",
"hierarchy": ["target_schema", "entity"],
"schemas": [{
"$id": "target_schema",
"type": "object",
"properties": {
"value": { "type": "number" }
}
}]
}
]
});
let cache_drop = jspg_setup(JsonB(db_json));
@ -57,20 +83,39 @@ fn test_library_api() {
json!({
"type": "drop",
"response": {
"test_schema": {
"$id": "test_schema",
"source_schema": {
"$id": "source_schema",
"type": "object",
"properties": {
"name": { "type": "string" }
"name": { "type": "string" },
"target": {
"$ref": "target_schema",
"compiledProperties": ["value"]
}
},
"required": ["name"]
"required": ["name"],
"compiledProperties": ["name", "target"],
"compiledEdges": {
"target": {
"constraint": "fk_test_target",
"forward": true
}
}
},
"target_schema": {
"$id": "target_schema",
"type": "object",
"properties": {
"value": { "type": "number" }
},
"compiledProperties": ["value"]
}
}
})
);
// 4. Validate Happy Path
let happy_drop = jspg_validate("test_schema", JsonB(json!({"name": "Neo"})));
let happy_drop = jspg_validate("source_schema", JsonB(json!({"name": "Neo"})));
assert_eq!(
happy_drop.0,
json!({
@ -80,7 +125,7 @@ fn test_library_api() {
);
// 5. Validate Unhappy Path
let unhappy_drop = jspg_validate("test_schema", JsonB(json!({"wrong": "data"})));
let unhappy_drop = jspg_validate("source_schema", JsonB(json!({"wrong": "data"})));
assert_eq!(
unhappy_drop.0,
json!({
@ -89,12 +134,12 @@ fn test_library_api() {
{
"code": "REQUIRED_FIELD_MISSING",
"message": "Missing name",
"details": { "path": "/name" }
"details": { "path": "name" }
},
{
"code": "STRICT_PROPERTY_VIOLATION",
"message": "Unexpected property 'wrong'",
"details": { "path": "/wrong" }
"details": { "path": "wrong" }
}
]
})

View File

@ -1,19 +1,10 @@
use crate::tests::types::Suite;
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
use std::sync::{Arc, OnceLock, RwLock};
#[derive(Debug, Deserialize)]
pub struct TestSuite {
#[allow(dead_code)]
pub description: String,
pub database: serde_json::Value,
pub tests: Vec<TestCase>,
}
use crate::tests::types::TestCase;
use serde_json::Value;
pub fn deserialize_some<'de, D>(deserializer: D) -> Result<Option<Value>, D::Error>
where
D: serde::Deserializer<'de>,
@ -23,7 +14,7 @@ where
}
// Type alias for easier reading
type CompiledSuite = Arc<Vec<(TestSuite, Arc<crate::database::Database>)>>;
type CompiledSuite = Arc<Vec<(Suite, Arc<crate::database::Database>)>>;
// Global cache mapping filename -> Vector of (Parsed JSON suite, Compiled Database)
static CACHE: OnceLock<RwLock<HashMap<String, CompiledSuite>>> = OnceLock::new();
@ -46,7 +37,7 @@ fn get_cached_file(path: &str) -> CompiledSuite {
} else {
let content =
fs::read_to_string(path).unwrap_or_else(|_| panic!("Failed to read file: {}", path));
let suites: Vec<TestSuite> = serde_json::from_str(&content)
let suites: Vec<Suite> = serde_json::from_str(&content)
.unwrap_or_else(|e| panic!("Failed to parse JSON in {}: {}", path, e));
let mut compiled_suites = Vec::new();

View File

@ -1,156 +0,0 @@
use sqlparser::ast::{
Expr, Join, JoinConstraint, JoinOperator, Query, Select, SelectItem, SetExpr, Statement,
TableFactor, TableWithJoins, Ident,
};
use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser;
use std::collections::HashSet;
pub fn validate_semantic_sql(sql: &str) -> Result<(), String> {
let dialect = PostgreSqlDialect {};
let statements = match Parser::parse_sql(&dialect, sql) {
Ok(s) => s,
Err(e) => return Err(format!("SQL Syntax Error: {}\nSQL: {}", e, sql)),
};
for statement in statements {
validate_statement(&statement, sql)?;
}
Ok(())
}
fn validate_statement(stmt: &Statement, original_sql: &str) -> Result<(), String> {
match stmt {
Statement::Query(query) => validate_query(query, original_sql)?,
Statement::Insert(insert) => {
if let Some(query) = &insert.source {
validate_query(query, original_sql)?
}
}
Statement::Update(update) => {
if let Some(expr) = &update.selection {
validate_expr(expr, &HashSet::new(), original_sql)?;
}
}
Statement::Delete(delete) => {
if let Some(expr) = &delete.selection {
validate_expr(expr, &HashSet::new(), original_sql)?;
}
}
_ => {}
}
Ok(())
}
fn validate_query(query: &Query, original_sql: &str) -> Result<(), String> {
if let SetExpr::Select(select) = &*query.body {
validate_select(select, original_sql)?;
}
Ok(())
}
fn validate_select(select: &Select, original_sql: &str) -> Result<(), String> {
let mut available_aliases = HashSet::new();
// 1. Collect all declared table aliases in the FROM clause and JOINs
for table_with_joins in &select.from {
collect_aliases_from_table_factor(&table_with_joins.relation, &mut available_aliases);
for join in &table_with_joins.joins {
collect_aliases_from_table_factor(&join.relation, &mut available_aliases);
}
}
// 2. Validate all SELECT projection fields
for projection in &select.projection {
if let SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } = projection {
validate_expr(expr, &available_aliases, original_sql)?;
}
}
// 3. Validate ON conditions in joins
for table_with_joins in &select.from {
for join in &table_with_joins.joins {
if let JoinOperator::Inner(JoinConstraint::On(expr))
| JoinOperator::LeftOuter(JoinConstraint::On(expr))
| JoinOperator::RightOuter(JoinConstraint::On(expr))
| JoinOperator::FullOuter(JoinConstraint::On(expr))
| JoinOperator::Join(JoinConstraint::On(expr)) = &join.join_operator
{
validate_expr(expr, &available_aliases, original_sql)?;
}
}
}
// 4. Validate WHERE conditions
if let Some(selection) = &select.selection {
validate_expr(selection, &available_aliases, original_sql)?;
}
Ok(())
}
fn collect_aliases_from_table_factor(tf: &TableFactor, aliases: &mut HashSet<String>) {
match tf {
TableFactor::Table { name, alias, .. } => {
if let Some(table_alias) = alias {
aliases.insert(table_alias.name.value.clone());
} else if let Some(last) = name.0.last() {
match last {
sqlparser::ast::ObjectNamePart::Identifier(i) => {
aliases.insert(i.value.clone());
}
_ => {}
}
}
}
TableFactor::Derived { alias: Some(table_alias), .. } => {
aliases.insert(table_alias.name.value.clone());
}
_ => {}
}
}
fn validate_expr(expr: &Expr, available_aliases: &HashSet<String>, sql: &str) -> Result<(), String> {
match expr {
Expr::CompoundIdentifier(idents) => {
if idents.len() == 2 {
let alias = &idents[0].value;
if !available_aliases.is_empty() && !available_aliases.contains(alias) {
return Err(format!(
"Semantic Error: Orchestrated query referenced table alias '{}' but it was not declared in the query's FROM/JOIN clauses.\nAvailable aliases: {:?}\nSQL: {}",
alias, available_aliases, sql
));
}
} else if idents.len() > 2 {
let alias = &idents[1].value; // In form schema.table.column, 'table' is idents[1]
if !available_aliases.is_empty() && !available_aliases.contains(alias) {
return Err(format!(
"Semantic Error: Orchestrated query referenced table '{}' but it was not mapped.\nAvailable aliases: {:?}\nSQL: {}",
alias, available_aliases, sql
));
}
}
}
Expr::BinaryOp { left, right, .. } => {
validate_expr(left, available_aliases, sql)?;
validate_expr(right, available_aliases, sql)?;
}
Expr::IsFalse(e) | Expr::IsNotFalse(e) | Expr::IsTrue(e) | Expr::IsNotTrue(e)
| Expr::IsNull(e) | Expr::IsNotNull(e) | Expr::InList { expr: e, .. }
| Expr::Nested(e) | Expr::UnaryOp { expr: e, .. } | Expr::Cast { expr: e, .. }
| Expr::Like { expr: e, .. } | Expr::ILike { expr: e, .. } | Expr::AnyOp { left: e, .. }
| Expr::AllOp { left: e, .. } => {
validate_expr(e, available_aliases, sql)?;
}
Expr::Function(func) => {
if let sqlparser::ast::FunctionArguments::List(args) = &func.args {
if let Some(sqlparser::ast::FunctionArg::Unnamed(sqlparser::ast::FunctionArgExpr::Expr(e))) = args.args.get(0) {
validate_expr(e, available_aliases, sql)?;
}
}
}
_ => {}
}
Ok(())
}

View File

@ -1,11 +1,11 @@
use super::expect::ExpectBlock;
use super::expect::Expect;
use crate::database::Database;
use serde::Deserialize;
use serde_json::Value;
use std::sync::Arc;
#[derive(Debug, Deserialize)]
pub struct TestCase {
pub struct Case {
pub description: String,
#[serde(default = "default_action")]
@ -16,9 +16,6 @@ pub struct TestCase {
pub schema_id: String,
// For Query
#[serde(default)]
pub stem: Option<String>,
#[serde(default)]
pub filters: Option<serde_json::Value>,
@ -30,15 +27,15 @@ pub struct TestCase {
#[serde(default)]
pub mocks: Option<serde_json::Value>,
pub expect: Option<ExpectBlock>,
pub expect: Option<Expect>,
}
fn default_action() -> String {
"validate".to_string()
}
impl TestCase {
pub fn run_compile(&self, db: Arc<Database>) -> Result<(), String> {
impl Case {
pub fn run_compile(&self, _db: Arc<Database>) -> Result<(), String> {
let expected_success = self.expect.as_ref().map(|e| e.success).unwrap_or(false);
// We assume db has already been setup and compiled successfully by runner.rs's `jspg_setup`
@ -52,24 +49,6 @@ impl TestCase {
));
}
// Assert stems
if let Some(expect) = &self.expect {
if let Some(expected_stems) = &expect.stems {
// Convert the Db stems (HashMap<String, HashMap<String, Arc<Stem>>>) to matching JSON shape
let db_stems_json = serde_json::to_value(&db.stems).unwrap();
let expect_stems_json = serde_json::to_value(expected_stems).unwrap();
if db_stems_json != expect_stems_json {
let expected_pretty = serde_json::to_string_pretty(&expect_stems_json).unwrap();
let got_pretty = serde_json::to_string_pretty(&db_stems_json).unwrap();
return Err(format!(
"Stem validation failed.\nExpected:\n{}\n\nGot:\n{}",
expected_pretty, got_pretty
));
}
}
}
Ok(())
}
@ -120,7 +99,7 @@ impl TestCase {
let merger = Merger::new(db.clone());
let test_data = self.data.clone().unwrap_or(Value::Null);
let result = merger.merge(test_data);
let result = merger.merge(&self.schema_id, test_data);
let expected_success = self.expect.as_ref().map(|e| e.success).unwrap_or(false);
let got_success = result.errors.is_empty();
@ -138,6 +117,7 @@ impl TestCase {
))
} else if let Some(expect) = &self.expect {
let queries = db.executor.get_queries();
expect.assert_pattern(&queries)?;
expect.assert_sql(&queries)
} else {
Ok(())
@ -157,8 +137,7 @@ impl TestCase {
use crate::queryer::Queryer;
let queryer = Queryer::new(db.clone());
let stem_opt = self.stem.as_deref();
let result = queryer.query(&self.schema_id, stem_opt, self.filters.as_ref());
let result = queryer.query(&self.schema_id, self.filters.as_ref());
let expected_success = self.expect.as_ref().map(|e| e.success).unwrap_or(false);
let got_success = result.errors.is_empty();
@ -176,6 +155,7 @@ impl TestCase {
))
} else if let Some(expect) = &self.expect {
let queries = db.executor.get_queries();
expect.assert_pattern(&queries)?;
expect.assert_sql(&queries)
} else {
Ok(())

View File

@ -0,0 +1,20 @@
pub mod pattern;
pub mod sql;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum SqlExpectation {
Single(String),
Multi(Vec<String>),
}
#[derive(Debug, Deserialize)]
pub struct Expect {
pub success: bool,
pub result: Option<serde_json::Value>,
pub errors: Option<Vec<serde_json::Value>>,
#[serde(default)]
pub sql: Option<Vec<SqlExpectation>>,
}

View File

@ -1,30 +1,13 @@
use super::Expect;
use regex::Regex;
use serde::Deserialize;
use std::collections::HashMap;
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum SqlExpectation {
Single(String),
Multi(Vec<String>),
}
#[derive(Debug, Deserialize)]
pub struct ExpectBlock {
pub success: bool,
pub result: Option<serde_json::Value>,
pub errors: Option<Vec<serde_json::Value>>,
pub stems: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
#[serde(default)]
pub sql: Option<Vec<SqlExpectation>>,
}
impl ExpectBlock {
impl Expect {
/// Advanced SQL execution assertion algorithm ported from `assert.go`.
/// This compares two arrays of strings, one containing {{uuid:name}} or {{timestamp}} placeholders,
/// and the other containing actual executed database queries. It ensures that placeholder UUIDs
/// are consistently mapped to the same actual UUIDs across all lines, and strictly validates line-by-line sequences.
pub fn assert_sql(&self, actual: &[String]) -> Result<(), String> {
pub fn assert_pattern(&self, actual: &[String]) -> Result<(), String> {
let patterns = match &self.sql {
Some(s) => s,
None => return Ok(()),
@ -39,12 +22,6 @@ impl ExpectBlock {
));
}
for query in actual {
if let Err(e) = crate::tests::sql_validator::validate_semantic_sql(query) {
return Err(e);
}
}
let ws_re = Regex::new(r"\s+").unwrap();
let types = HashMap::from([
@ -82,8 +59,8 @@ impl ExpectBlock {
let aline = clean_str(aline_raw);
let pattern_str_raw = match pattern_expect {
SqlExpectation::Single(s) => s.clone(),
SqlExpectation::Multi(m) => m.join(" "),
super::SqlExpectation::Single(s) => s.clone(),
super::SqlExpectation::Multi(m) => m.join(" "),
};
let pattern_str = clean_str(&pattern_str_raw);

View File

@ -0,0 +1,206 @@
use super::Expect;
use sqlparser::ast::{Expr, Query, SelectItem, Statement, TableFactor};
use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser;
use std::collections::HashSet;
impl Expect {
pub fn assert_sql(&self, actual: &[String]) -> Result<(), String> {
for query in actual {
if let Err(e) = Self::validate_semantic_sql(query) {
return Err(e);
}
}
Ok(())
}
pub fn validate_semantic_sql(sql: &str) -> Result<(), String> {
let dialect = PostgreSqlDialect {};
let statements = match Parser::parse_sql(&dialect, sql) {
Ok(s) => s,
Err(e) => return Err(format!("SQL Syntax Error: {}\nSQL: {}", e, sql)),
};
for statement in statements {
Self::validate_statement(&statement, sql)?;
}
Ok(())
}
fn validate_statement(stmt: &Statement, original_sql: &str) -> Result<(), String> {
match stmt {
Statement::Query(query) => Self::validate_query(query, &HashSet::new(), original_sql)?,
Statement::Insert(insert) => {
if let Some(query) = &insert.source {
Self::validate_query(query, &HashSet::new(), original_sql)?
}
}
Statement::Update(update) => {
if let Some(expr) = &update.selection {
Self::validate_expr(expr, &HashSet::new(), original_sql)?;
}
}
Statement::Delete(delete) => {
if let Some(expr) = &delete.selection {
Self::validate_expr(expr, &HashSet::new(), original_sql)?;
}
}
_ => {}
}
Ok(())
}
fn validate_query(
query: &Query,
available_aliases: &HashSet<String>,
original_sql: &str,
) -> Result<(), String> {
if let sqlparser::ast::SetExpr::Select(select) = &*query.body {
Self::validate_select(&select, available_aliases, original_sql)?;
}
Ok(())
}
fn validate_select(
select: &sqlparser::ast::Select,
parent_aliases: &HashSet<String>,
original_sql: &str,
) -> Result<(), String> {
let mut available_aliases = parent_aliases.clone();
// 1. Collect all declared table aliases in the FROM clause and JOINs
for table_with_joins in &select.from {
Self::collect_aliases_from_table_factor(&table_with_joins.relation, &mut available_aliases);
for join in &table_with_joins.joins {
Self::collect_aliases_from_table_factor(&join.relation, &mut available_aliases);
}
}
// 2. Validate all SELECT projection fields
for projection in &select.projection {
if let SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } = projection {
Self::validate_expr(expr, &available_aliases, original_sql)?;
}
}
// 3. Validate ON conditions in joins
for table_with_joins in &select.from {
for join in &table_with_joins.joins {
if let sqlparser::ast::JoinOperator::Inner(sqlparser::ast::JoinConstraint::On(expr))
| sqlparser::ast::JoinOperator::LeftOuter(sqlparser::ast::JoinConstraint::On(expr))
| sqlparser::ast::JoinOperator::RightOuter(sqlparser::ast::JoinConstraint::On(expr))
| sqlparser::ast::JoinOperator::FullOuter(sqlparser::ast::JoinConstraint::On(expr))
| sqlparser::ast::JoinOperator::Join(sqlparser::ast::JoinConstraint::On(expr)) =
&join.join_operator
{
Self::validate_expr(expr, &available_aliases, original_sql)?;
}
}
}
// 4. Validate WHERE conditions
if let Some(selection) = &select.selection {
Self::validate_expr(selection, &available_aliases, original_sql)?;
}
Ok(())
}
fn collect_aliases_from_table_factor(tf: &TableFactor, aliases: &mut HashSet<String>) {
match tf {
TableFactor::Table { name, alias, .. } => {
if let Some(table_alias) = alias {
aliases.insert(table_alias.name.value.clone());
} else if let Some(last) = name.0.last() {
match last {
sqlparser::ast::ObjectNamePart::Identifier(i) => {
aliases.insert(i.value.clone());
}
_ => {}
}
}
}
TableFactor::Derived {
subquery,
alias: Some(table_alias),
..
} => {
aliases.insert(table_alias.name.value.clone());
// A derived table is technically a nested scope which is opaque outside, but for pure semantic checks
// its internal contents should be validated purely within its own scope (not leaking external aliases in, usually)
// but Postgres allows lateral correlation. We will validate its interior with an empty scope.
let _ = Self::validate_query(subquery, &HashSet::new(), "");
}
_ => {}
}
}
fn validate_expr(
expr: &Expr,
available_aliases: &HashSet<String>,
sql: &str,
) -> Result<(), String> {
match expr {
Expr::CompoundIdentifier(idents) => {
if idents.len() == 2 {
let alias = &idents[0].value;
if !available_aliases.is_empty() && !available_aliases.contains(alias) {
return Err(format!(
"Semantic Error: Orchestrated query referenced table alias '{}' but it was not declared in the query's FROM/JOIN clauses.\nAvailable aliases: {:?}\nSQL: {}",
alias, available_aliases, sql
));
}
} else if idents.len() > 2 {
let alias = &idents[1].value; // In form schema.table.column, 'table' is idents[1]
if !available_aliases.is_empty() && !available_aliases.contains(alias) {
return Err(format!(
"Semantic Error: Orchestrated query referenced table '{}' but it was not mapped.\nAvailable aliases: {:?}\nSQL: {}",
alias, available_aliases, sql
));
}
}
}
Expr::Subquery(subquery) => Self::validate_query(subquery, available_aliases, sql)?,
Expr::Exists { subquery, .. } => Self::validate_query(subquery, available_aliases, sql)?,
Expr::InSubquery {
expr: e, subquery, ..
} => {
Self::validate_expr(e, available_aliases, sql)?;
Self::validate_query(subquery, available_aliases, sql)?;
}
Expr::BinaryOp { left, right, .. } => {
Self::validate_expr(left, available_aliases, sql)?;
Self::validate_expr(right, available_aliases, sql)?;
}
Expr::IsFalse(e)
| Expr::IsNotFalse(e)
| Expr::IsTrue(e)
| Expr::IsNotTrue(e)
| Expr::IsNull(e)
| Expr::IsNotNull(e)
| Expr::InList { expr: e, .. }
| Expr::Nested(e)
| Expr::UnaryOp { expr: e, .. }
| Expr::Cast { expr: e, .. }
| Expr::Like { expr: e, .. }
| Expr::ILike { expr: e, .. }
| Expr::AnyOp { left: e, .. }
| Expr::AllOp { left: e, .. } => {
Self::validate_expr(e, available_aliases, sql)?;
}
Expr::Function(func) => {
if let sqlparser::ast::FunctionArguments::List(args) = &func.args {
if let Some(sqlparser::ast::FunctionArg::Unnamed(
sqlparser::ast::FunctionArgExpr::Expr(e),
)) = args.args.get(0)
{
Self::validate_expr(e, available_aliases, sql)?;
}
}
}
_ => {}
}
Ok(())
}
}

View File

@ -2,6 +2,6 @@ pub mod case;
pub mod expect;
pub mod suite;
pub use case::TestCase;
pub use expect::ExpectBlock;
pub use suite::TestSuite;
pub use case::Case;
pub use expect::Expect;
pub use suite::Suite;

View File

@ -1,10 +1,10 @@
use super::case::TestCase;
use super::case::Case;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
pub struct TestSuite {
pub struct Suite {
#[allow(dead_code)]
pub description: String,
pub database: serde_json::Value,
pub tests: Vec<TestCase>,
pub tests: Vec<Case>,
}

View File

@ -41,6 +41,14 @@ impl<'a> ValidationContext<'a> {
}
}
pub fn join_path(&self, key: &str) -> String {
if self.path.is_empty() {
key.to_string()
} else {
format!("{}/{}", self.path, key)
}
}
pub fn derive(
&self,
schema: &'a Schema,

View File

@ -67,7 +67,12 @@ impl Validator {
.map(|e| crate::drop::Error {
code: e.code,
message: e.message,
details: crate::drop::ErrorDetails { path: e.path },
details: crate::drop::ErrorDetails {
path: e.path,
cause: None,
context: None,
schema: None,
},
})
.collect();
crate::drop::Drop::with_errors(errors)
@ -76,7 +81,12 @@ impl Validator {
Err(e) => crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: e.code,
message: e.message,
details: crate::drop::ErrorDetails { path: e.path },
details: crate::drop::ErrorDetails {
path: e.path,
cause: None,
context: None,
schema: None,
},
}]),
}
} else {
@ -84,7 +94,10 @@ impl Validator {
code: "SCHEMA_NOT_FOUND".to_string(),
message: format!("Schema {} not found", schema_id),
details: crate::drop::ErrorDetails {
path: "".to_string(),
path: "/".to_string(),
cause: None,
context: None,
schema: None,
},
}])
}

View File

@ -91,12 +91,17 @@ impl<'a> ValidationContext<'a> {
if let Some(ref prefix) = self.schema.prefix_items {
for (i, sub_schema) in prefix.iter().enumerate() {
if i < len {
let path = format!("{}/{}", self.path, i);
if let Some(child_instance) = arr.get(i) {
let mut item_path = self.join_path(&i.to_string());
if let Some(obj) = child_instance.as_object() {
if let Some(id_str) = obj.get("id").and_then(|v| v.as_str()) {
item_path = self.join_path(id_str);
}
}
let derived = self.derive(
sub_schema,
child_instance,
&path,
&item_path,
HashSet::new(),
self.extensible,
false,
@ -112,12 +117,17 @@ impl<'a> ValidationContext<'a> {
if let Some(ref items_schema) = self.schema.items {
for i in validation_index..len {
let path = format!("{}/{}", self.path, i);
if let Some(child_instance) = arr.get(i) {
let mut item_path = self.join_path(&i.to_string());
if let Some(obj) = child_instance.as_object() {
if let Some(id_str) = obj.get("id").and_then(|v| v.as_str()) {
item_path = self.join_path(id_str);
}
}
let derived = self.derive(
items_schema,
child_instance,
&path,
&item_path,
HashSet::new(),
self.extensible,
false,

View File

@ -44,7 +44,7 @@ impl<'a> ValidationContext<'a> {
result.errors.push(ValidationError {
code: "STRICT_PROPERTY_VIOLATION".to_string(),
message: format!("Unexpected property '{}'", key),
path: format!("{}/{}", self.path, key),
path: self.join_path(key),
});
}
}
@ -53,10 +53,18 @@ impl<'a> ValidationContext<'a> {
if let Some(arr) = self.instance.as_array() {
for i in 0..arr.len() {
if !result.evaluated_indices.contains(&i) {
let mut item_path = self.join_path(&i.to_string());
if let Some(child_instance) = arr.get(i) {
if let Some(obj) = child_instance.as_object() {
if let Some(id_str) = obj.get("id").and_then(|v| v.as_str()) {
item_path = self.join_path(id_str);
}
}
}
result.errors.push(ValidationError {
code: "STRICT_ITEM_VIOLATION".to_string(),
message: format!("Unexpected item at index {}", i),
path: format!("{}/{}", self.path, i),
path: item_path,
});
}
}

View File

@ -8,7 +8,7 @@ impl<'a> ValidationContext<'a> {
result: &mut ValidationResult,
) -> Result<bool, ValidationError> {
let current = self.instance;
if let Some(ref compiled_fmt) = self.schema.compiled_format {
if let Some(compiled_fmt) = self.schema.compiled_format.get() {
match compiled_fmt {
crate::database::schema::CompiledFormat::Func(f) => {
let should = if let Some(s) = current.as_str() {

View File

@ -13,28 +13,37 @@ impl<'a> ValidationContext<'a> {
) -> Result<bool, ValidationError> {
let current = self.instance;
if let Some(obj) = current.as_object() {
// Entity Bound Implicit Type Validation
if let Some(lookup_key) = self.schema.id.as_ref().or(self.schema.r#ref.as_ref()) {
let base_type_name = lookup_key.split('.').next_back().unwrap_or("").to_string();
if let Some(type_def) = self.db.types.get(&base_type_name)
&& let Some(type_val) = obj.get("type")
// Entity implicit type validation
if let Some(schema_identifier) = self.schema.identifier() {
// Kick in if the data object has a type field
if let Some(type_val) = obj.get("type")
&& let Some(type_str) = type_val.as_str()
{
if type_def.variations.contains(type_str) {
// Ensure it passes strict mode
result.evaluated_keys.insert("type".to_string());
// Check if the identifier is a global type name
if let Some(type_def) = self.db.types.get(&schema_identifier) {
// Ensure the instance type is a variation of the global type
if type_def.variations.contains(type_str) {
// Ensure it passes strict mode
result.evaluated_keys.insert("type".to_string());
} else {
result.errors.push(ValidationError {
code: "CONST_VIOLATED".to_string(), // Aligning with original const override errors
message: format!(
"Type '{}' is not a valid descendant for this entity bound schema",
type_str
),
path: self.join_path("type"),
});
}
} else {
result.errors.push(ValidationError {
code: "CONST_VIOLATED".to_string(), // Aligning with original const override errors
message: format!(
"Type '{}' is not a valid descendant for this entity bound schema",
type_str
),
path: format!("{}/type", self.path),
});
// Ad-Hoc schemas natively use strict schema discriminator strings instead of variation inheritance
if type_str == schema_identifier.as_str() {
result.evaluated_keys.insert("type".to_string());
}
}
}
}
if let Some(min) = self.schema.min_properties
&& (obj.len() as f64) < min
{
@ -44,6 +53,7 @@ impl<'a> ValidationContext<'a> {
path: self.path.to_string(),
});
}
if let Some(max) = self.schema.max_properties
&& (obj.len() as f64) > max
{
@ -53,13 +63,14 @@ impl<'a> ValidationContext<'a> {
path: self.path.to_string(),
});
}
if let Some(ref req) = self.schema.required {
for field in req {
if !obj.contains_key(field) {
result.errors.push(ValidationError {
code: "REQUIRED_FIELD_MISSING".to_string(),
message: format!("Missing {}", field),
path: format!("{}/{}", self.path, field),
path: self.join_path(field),
});
}
}
@ -98,7 +109,7 @@ impl<'a> ValidationContext<'a> {
}
if let Some(child_instance) = obj.get(key) {
let new_path = format!("{}/{}", self.path, key);
let new_path = self.join_path(key);
let is_ref = sub_schema.r#ref.is_some();
let next_extensible = if is_ref { false } else { self.extensible };
@ -114,10 +125,9 @@ impl<'a> ValidationContext<'a> {
// Entity Bound Implicit Type Interception
if key == "type"
&& let Some(lookup_key) = sub_schema.id.as_ref().or(sub_schema.r#ref.as_ref())
&& let Some(schema_bound) = sub_schema.identifier()
{
let base_type_name = lookup_key.split('.').next_back().unwrap_or("").to_string();
if let Some(type_def) = self.db.types.get(&base_type_name)
if let Some(type_def) = self.db.types.get(&schema_bound)
&& let Some(instance_type) = child_instance.as_str()
&& type_def.variations.contains(instance_type)
{
@ -133,11 +143,11 @@ impl<'a> ValidationContext<'a> {
}
}
if let Some(ref compiled_pp) = self.schema.compiled_pattern_properties {
if let Some(compiled_pp) = self.schema.compiled_pattern_properties.get() {
for (compiled_re, sub_schema) in compiled_pp {
for (key, child_instance) in obj {
if compiled_re.0.is_match(key) {
let new_path = format!("{}/{}", self.path, key);
let new_path = self.join_path(key);
let is_ref = sub_schema.r#ref.is_some();
let next_extensible = if is_ref { false } else { self.extensible };
@ -165,7 +175,7 @@ impl<'a> ValidationContext<'a> {
{
locally_matched = true;
}
if !locally_matched && let Some(ref compiled_pp) = self.schema.compiled_pattern_properties
if !locally_matched && let Some(compiled_pp) = self.schema.compiled_pattern_properties.get()
{
for (compiled_re, _) in compiled_pp {
if compiled_re.0.is_match(key) {
@ -176,7 +186,7 @@ impl<'a> ValidationContext<'a> {
}
if !locally_matched {
let new_path = format!("{}/{}", self.path, key);
let new_path = self.join_path(key);
let is_ref = additional_schema.r#ref.is_some();
let next_extensible = if is_ref { false } else { self.extensible };
@ -197,7 +207,7 @@ impl<'a> ValidationContext<'a> {
if let Some(ref property_names) = self.schema.property_names {
for key in obj.keys() {
let _new_path = format!("{}/propertyNames/{}", self.path, key);
let _new_path = self.join_path(&format!("propertyNames/{}", key));
let val_str = Value::String(key.to_string());
let ctx = ValidationContext::new(

View File

@ -28,7 +28,7 @@ impl<'a> ValidationContext<'a> {
path: self.path.to_string(),
});
}
if let Some(ref compiled_re) = self.schema.compiled_pattern {
if let Some(compiled_re) = self.schema.compiled_pattern.get() {
if !compiled_re.0.is_match(s) {
result.errors.push(ValidationError {
code: "PATTERN_VIOLATED".to_string(),

View File

@ -1 +1 @@
1.0.67
1.0.96