Compare commits

...

18 Commits

13 changed files with 834 additions and 388 deletions

View File

@ -23,6 +23,12 @@ To support high-throughput operations while allowing for runtime updates (e.g.,
3. **Immutable AST Caching**: The `Validator` struct immutably owns the `Database` registry. Schemas themselves are frozen structurally, but utilize `OnceLock` interior mutability during the Compilation Phase to permanently cache resolved `$ref` inheritances, properties, and `compiled_edges` directly onto their AST nodes. This guarantees strict `O(1)` relationship and property validation execution at runtime without locking or recursive DB polling.
4. **Lock-Free Reads**: Incoming operations acquire a read lock just long enough to clone the `Arc` inside an `RwLock<Option<Arc<Validator>>>`, ensuring zero blocking during schema updates.
### Relational Edge Resolution
When compiling nested object graphs or arrays, the JSPG engine must dynamically infer which Postgres Foreign Key constraint correctly bridges the parent to the nested schema. It utilizes a strict 3-step hierarchical resolution applied during the `OnceLock` Compilation phase:
1. **Exact Prefix Match**: If an explicitly prefixed Foreign Key (e.g. `fk_invoice_counterparty_entity` -> `prefix: "counterparty"`) directly matches the name of the requested schema property (e.g. `{"counterparty": {...}}`), it is instantly selected.
2. **Ambiguity Elimination (M:M Twin Deduction)**: If multiple explicitly prefixed relations remain (which happens by design in Many-to-Many junction tables like `contact` or `role`), the compiler uses a process of elimination. It inspects the compiled child JSON schema AST to see which of the relational prefixes the child *natively consumes* as an explicit outbound property (e.g. `contact` natively defines `{ "target": ... }`). It considers that prefix arrow "used up" by the child, and mathematically deduces that its exact twin providing reverse ownership (`"source"`) MUST be the inbound link mapping from the parent. This logic relies on `OnceLock` recursive compilation to accurately peek at child structures.
3. **Implicit Base Fallback (1:M)**: If no explicit prefix matches, and M:M deduction fails, the compiler filters for exactly one remaining relation with a `null` prefix (e.g. `fk_invoice_line_invoice` -> `prefix: null`). A `null` prefix mathematically denotes the core structural parent-child ownership edge and is safely used as a fallback.
### Global API Reference
These functions operate on the global `GLOBAL_JSPG` engine instance and provide administrative boundaries:

58
LOOKUP_VERIFICATION.md Normal file
View File

@ -0,0 +1,58 @@
# The Postgres Partial Index Claiming Pattern
This document outlines the architectural strategy for securely handling the deduplication, claiming, and verification of sensitive unique identifiers (like email addresses or phone numbers) strictly through PostgreSQL without requiring "magical" logic in the JSPG `Merger`.
## The Denial of Service (DoS) Squatter Problem
If you enforce a standard `UNIQUE` constraint on an email address table:
1. Malicious User A signs up and adds `jeff.bezos@amazon.com` to their account but never verifies it.
2. The real Jeff Bezos signs up.
3. The Database blocks Jeff because the unique string already exists.
The squatter has effectively locked the legitimate owner out of the system.
## The Anti-Patterns
1. **Global Entity Flags**: Adding a global `verified` boolean to the root `entity` table forces unrelated objects (like Widgets, Invoices, Orders) to carry verification logic that doesn't belong to them.
2. **Magical Merger Logic**: Making JSPG's `Merger` aware of a specific `verified` field breaks its pure structural translation model. The Merger shouldn't need hardcoded conditional logic to know if it's allowed to update an unverified row.
## The Solution: Postgres Partial Unique Indexes
The holy grail is to defer all claiming logic natively to the database engine using a **Partial Unique Index**.
```sql
-- Remove any existing global unique constraint on address first
CREATE UNIQUE INDEX lk_email_address_verified
ON email_address (address)
WHERE verified_at IS NOT NULL;
```
### How the Lifecycle Works Natively
1. **Unverified Squatters (Isolated Rows):**
A hundred different users can send `{ "address": "jeff.bezos@amazon.com" }` through the `save_person` Punc. Because the Punc isolates them and doesn't allow setting the `verified_at` property natively (enforced by the JSON schema), the JSPG Merger inserts `NULL`.
Postgres permits all 100 `INSERT` commands to succeed because the Partial Index **ignores** rows where `verified_at IS NULL`. Every user gets their own isolated, unverified row acting as a placeholder on their contact edge.
2. **The Verification Race (The Claim):**
The real Jeff clicks his magic verification link. The backend securely executes a specific verification Punc that runs:
`UPDATE email_address SET verified_at = now() WHERE id = <jeff's-real-uuid>`
3. **The Lockout:**
Because Jeff's row now strictly satisfies `verified_at IS NOT NULL`, that exact row enters the Partial Unique Index.
If any of the other 99 squatters *ever* click their fake verification links (or if a new user tries to verify that same email), PostgreSQL hits the index and violently throws a **Unique Constraint Violation**, flawlessly blocking them. The winner has permanently claimed the slot across the entire environment!
### Periodic Cleanup
Since unverified rows are allowed to accumulate without colliding, a simple Postgres `pg_cron` job or backend worker can sweep the table nightly to prune abandoned claims and preserve storage:
```sql
DELETE FROM email_address
WHERE verified_at IS NULL
AND created_at < NOW() - INTERVAL '24 hours';
```
### Why this is the Ultimate Architecture
* The **JSPG Merger** remains mathematically pure. It doesn't know what `verified_at` is; it simply respects the database's structural limits (`O(1)` pure translation).
* **Row-Level Security (RLS)** naturally blocks users from seeing or claiming each other's unverified rows.
* You offload complex race-condition tracking entirely to the C-level PostgreSQL B-Tree indexing engine, guaranteeing absolute cluster-wide atomicity.

0
agreego.sql Normal file
View File

View File

@ -19,7 +19,7 @@
{
"id": "22222222-2222-2222-2222-222222222222",
"type": "relation",
"constraint": "fk_order_customer",
"constraint": "fk_order_customer_person",
"source_type": "order",
"source_columns": [
"customer_id"
@ -41,8 +41,7 @@
"destination_type": "order",
"destination_columns": [
"id"
],
"prefix": "lines"
]
},
{
"id": "44444444-4444-4444-4444-444444444444",
@ -75,6 +74,20 @@
"type"
],
"prefix": "target"
},
{
"id": "66666666-6666-6666-6666-666666666666",
"type": "relation",
"constraint": "fk_entity_organization",
"source_type": "entity",
"source_columns": [
"organization_id"
],
"destination_type": "organization",
"destination_columns": [
"id"
],
"prefix": null
}
],
"types": [
@ -283,6 +296,17 @@
}
}
}
},
"email_addresses": {
"type": "array",
"items": {
"$ref": "contact",
"properties": {
"target": {
"$ref": "email_address"
}
}
}
}
}
}
@ -972,7 +996,12 @@
"LEFT JOIN agreego.\"user\" t2 ON t2.id = t1.id",
"LEFT JOIN agreego.\"organization\" t3 ON t3.id = t1.id",
"LEFT JOIN agreego.\"entity\" t4 ON t4.id = t1.id",
"WHERE \"first_name\" = 'LookupFirst' AND \"last_name\" = 'LookupLast' AND \"date_of_birth\" = '1990-01-01T00:00:00Z' AND \"pronouns\" = 'they/them'"
"WHERE (",
" \"first_name\" = 'LookupFirst'",
" AND \"last_name\" = 'LookupLast'",
" AND \"date_of_birth\" = '1990-01-01T00:00:00Z'",
" AND \"pronouns\" = 'they/them'",
")"
],
[
"UPDATE agreego.\"person\"",
@ -1039,6 +1068,177 @@
]
}
},
{
"description": "Update existing person with id (lookup)",
"action": "merge",
"data": {
"id": "33333333-3333-3333-3333-333333333333",
"type": "person",
"first_name": "LookupFirst",
"last_name": "LookupLast",
"date_of_birth": "1990-01-01T00:00:00Z",
"pronouns": "they/them",
"contact_id": "abc-contact"
},
"mocks": [
{
"id": "22222222-2222-2222-2222-222222222222",
"type": "person",
"first_name": "LookupFirst",
"last_name": "LookupLast",
"date_of_birth": "1990-01-01T00:00:00Z",
"pronouns": "they/them",
"contact_id": "old-contact"
}
],
"schema_id": "person",
"expect": {
"success": true,
"sql": [
[
"SELECT to_jsonb(t1.*) || to_jsonb(t2.*) || to_jsonb(t3.*) || to_jsonb(t4.*)",
"FROM agreego.\"person\" t1",
"LEFT JOIN agreego.\"user\" t2 ON t2.id = t1.id",
"LEFT JOIN agreego.\"organization\" t3 ON t3.id = t1.id",
"LEFT JOIN agreego.\"entity\" t4 ON t4.id = t1.id",
"WHERE",
" t1.id = '33333333-3333-3333-3333-333333333333'",
" OR (",
" \"first_name\" = 'LookupFirst'",
" AND \"last_name\" = 'LookupLast'",
" AND \"date_of_birth\" = '1990-01-01T00:00:00Z'",
" AND \"pronouns\" = 'they/them'",
" )"
],
[
"UPDATE agreego.\"person\"",
"SET",
" \"contact_id\" = 'abc-contact'",
"WHERE",
" id = '22222222-2222-2222-2222-222222222222'"
],
[
"UPDATE agreego.\"entity\"",
"SET",
" \"modified_at\" = '2026-03-10T00:00:00Z',",
" \"modified_by\" = '00000000-0000-0000-0000-000000000000'",
"WHERE",
" id = '22222222-2222-2222-2222-222222222222'"
],
[
"INSERT INTO agreego.change (",
" \"old\",",
" \"new\",",
" entity_id,",
" id,",
" kind,",
" modified_at,",
" modified_by",
")",
"VALUES (",
" '{",
" \"contact_id\":\"old-contact\"",
" }',",
" '{",
" \"contact_id\":\"abc-contact\",",
" \"type\":\"person\"",
" }',",
" '22222222-2222-2222-2222-222222222222',",
" '{{uuid}}',",
" 'update',",
" '{{timestamp}}',",
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"contact_id\":\"abc-contact\",",
" \"date_of_birth\":\"1990-01-01T00:00:00Z\",",
" \"first_name\":\"LookupFirst\",",
" \"id\":\"22222222-2222-2222-2222-222222222222\",",
" \"last_name\":\"LookupLast\",",
" \"modified_at\":\"2026-03-10T00:00:00Z\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"pronouns\":\"they/them\",",
" \"type\":\"person\"",
" },",
" \"new\":{",
" \"contact_id\":\"abc-contact\",",
" \"type\":\"person\"",
" },",
" \"old\":{",
" \"contact_id\":\"old-contact\"",
" },",
" \"replaces\":\"33333333-3333-3333-3333-333333333333\"",
" }')"
]
]
}
},
{
"description": "Replace existing person with id and no changes (lookup)",
"action": "merge",
"data": {
"id": "33333333-3333-3333-3333-333333333333",
"type": "person",
"first_name": "LookupFirst",
"last_name": "LookupLast",
"date_of_birth": "1990-01-01T00:00:00Z",
"pronouns": "they/them"
},
"mocks": [
{
"id": "22222222-2222-2222-2222-222222222222",
"type": "person",
"first_name": "LookupFirst",
"last_name": "LookupLast",
"date_of_birth": "1990-01-01T00:00:00Z",
"pronouns": "they/them",
"contact_id": "old-contact"
}
],
"schema_id": "person",
"expect": {
"success": true,
"sql": [
[
"SELECT to_jsonb(t1.*) || to_jsonb(t2.*) || to_jsonb(t3.*) || to_jsonb(t4.*)",
"FROM agreego.\"person\" t1",
"LEFT JOIN agreego.\"user\" t2 ON t2.id = t1.id",
"LEFT JOIN agreego.\"organization\" t3 ON t3.id = t1.id",
"LEFT JOIN agreego.\"entity\" t4 ON t4.id = t1.id",
"WHERE",
" t1.id = '33333333-3333-3333-3333-333333333333'",
" OR (",
" \"first_name\" = 'LookupFirst'",
" AND \"last_name\" = 'LookupLast'",
" AND \"date_of_birth\" = '1990-01-01T00:00:00Z'",
" AND \"pronouns\" = 'they/them'",
" )"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"contact_id\":\"old-contact\",",
" \"date_of_birth\":\"1990-01-01T00:00:00Z\",",
" \"first_name\":\"LookupFirst\",",
" \"id\":\"22222222-2222-2222-2222-222222222222\",",
" \"last_name\":\"LookupLast\",",
" \"modified_at\":\"2026-03-10T00:00:00Z\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"pronouns\":\"they/them\",",
" \"type\":\"person\"",
" },",
" \"new\":{",
" \"type\":\"person\"",
" },",
" \"replaces\":\"33333333-3333-3333-3333-333333333333\"",
" }')"
]
]
}
},
{
"description": "Update existing person with id (no lookup)",
"action": "merge",
@ -1484,7 +1684,7 @@
"SELECT to_jsonb(t1.*) || to_jsonb(t2.*)",
"FROM agreego.\"order\" t1",
"LEFT JOIN agreego.\"entity\" t2 ON t2.id = t1.id",
"WHERE t1.id = 'abc'"
"WHERE t1.id = 'abc' OR (\"id\" = 'abc')"
],
[
"INSERT INTO agreego.\"entity\" (",
@ -1658,16 +1858,18 @@
"type": "contact",
"is_primary": false,
"target": {
"type": "phone_number",
"number": "555-0002"
"type": "email_address",
"address": "test@example.com"
}
},
}
],
"email_addresses": [
{
"type": "contact",
"is_primary": false,
"target": {
"type": "email_address",
"address": "test@example.com"
"address": "test2@example.com"
}
}
]
@ -1759,7 +1961,10 @@
" modified_by",
") VALUES (",
" NULL,",
" '{\"number\":\"555-0001\",\"type\":\"phone_number\"}',",
" '{",
" \"number\":\"555-0001\",",
" \"type\":\"phone_number\"",
" }',",
" '{{uuid:phone1_id}}',",
" '{{uuid}}',",
" 'create',",
@ -1830,115 +2035,6 @@
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
" \"created_by\",",
" \"id\",",
" \"modified_at\",",
" \"modified_by\",",
" \"type\"",
") VALUES (",
" '{{timestamp}}',",
" '00000000-0000-0000-0000-000000000000',",
" '{{uuid:phone2_id}}',",
" '{{timestamp}}',",
" '00000000-0000-0000-0000-000000000000',",
" 'phone_number'",
")"
],
[
"INSERT INTO agreego.\"phone_number\" (",
" \"number\"",
") VALUES (",
" '555-0002'",
")"
],
[
"INSERT INTO agreego.change (",
" \"old\",",
" \"new\",",
" entity_id,",
" id,",
" kind,",
" modified_at,",
" modified_by",
") VALUES (",
" NULL,",
" '{",
" \"number\":\"555-0002\",",
" \"type\":\"phone_number\"",
" }',",
" '{{uuid:phone2_id}}',",
" '{{uuid}}',",
" 'create',",
" '{{timestamp}}',",
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
" \"created_by\",",
" \"id\",",
" \"modified_at\",",
" \"modified_by\",",
" \"type\"",
") VALUES (",
" '{{timestamp}}',",
" '00000000-0000-0000-0000-000000000000',",
" '{{uuid:contact2_id}}',",
" '{{timestamp}}',",
" '00000000-0000-0000-0000-000000000000',",
" 'contact'",
")"
],
[
"INSERT INTO agreego.\"relationship\" (",
" \"source_id\",",
" \"source_type\",",
" \"target_id\",",
" \"target_type\"",
") VALUES (",
" '{{uuid:person_id}}',",
" 'person',",
" '{{uuid:phone2_id}}',",
" 'phone_number'",
")"
],
[
"INSERT INTO agreego.\"contact\" (",
" \"is_primary\"",
") VALUES (",
" false",
")"
],
[
"INSERT INTO agreego.change (",
" \"old\",",
" \"new\",",
" entity_id,",
" id,",
" kind,",
" modified_at,",
" modified_by",
") VALUES (",
" NULL,",
" '{",
" \"is_primary\":false,",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:phone2_id}}\",",
" \"target_type\":\"phone_number\",",
" \"type\":\"contact\"",
" }',",
" '{{uuid:contact2_id}}',",
" '{{uuid}}',",
" 'create',",
" '{{timestamp}}',",
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
@ -1996,7 +2092,7 @@
") VALUES (",
" '{{timestamp}}',",
" '00000000-0000-0000-0000-000000000000',",
" '{{uuid:contact3_id}}',",
" '{{uuid:contact2_id}}',",
" '{{timestamp}}',",
" '00000000-0000-0000-0000-000000000000',",
" 'contact'",
@ -2041,6 +2137,115 @@
" \"target_type\":\"email_address\",",
" \"type\":\"contact\"",
" }',",
" '{{uuid:contact2_id}}',",
" '{{uuid}}',",
" 'create',",
" '{{timestamp}}',",
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
" \"created_by\",",
" \"id\",",
" \"modified_at\",",
" \"modified_by\",",
" \"type\"",
") VALUES (",
" '{{timestamp}}',",
" '00000000-0000-0000-0000-000000000000',",
" '{{uuid:email2_id}}',",
" '{{timestamp}}',",
" '00000000-0000-0000-0000-000000000000',",
" 'email_address'",
")"
],
[
"INSERT INTO agreego.\"email_address\" (",
" \"address\"",
") VALUES (",
" 'test2@example.com'",
")"
],
[
"INSERT INTO agreego.change (",
" \"old\",",
" \"new\",",
" entity_id,",
" id,",
" kind,",
" modified_at,",
" modified_by",
") VALUES (",
" NULL,",
" '{",
" \"address\":\"test2@example.com\",",
" \"type\":\"email_address\"",
" }',",
" '{{uuid:email2_id}}',",
" '{{uuid}}',",
" 'create',",
" '{{timestamp}}',",
" '00000000-0000-0000-0000-000000000000'",
")"
],
[
"INSERT INTO agreego.\"entity\" (",
" \"created_at\",",
" \"created_by\",",
" \"id\",",
" \"modified_at\",",
" \"modified_by\",",
" \"type\"",
") VALUES (",
" '{{timestamp}}',",
" '00000000-0000-0000-0000-000000000000',",
" '{{uuid:contact3_id}}',",
" '{{timestamp}}',",
" '00000000-0000-0000-0000-000000000000',",
" 'contact'",
")"
],
[
"INSERT INTO agreego.\"relationship\" (",
" \"source_id\",",
" \"source_type\",",
" \"target_id\",",
" \"target_type\"",
") VALUES (",
" '{{uuid:person_id}}',",
" 'person',",
" '{{uuid:email2_id}}',",
" 'email_address'",
")"
],
[
"INSERT INTO agreego.\"contact\" (",
" \"is_primary\"",
") VALUES (",
" false",
")"
],
[
"INSERT INTO agreego.change (",
" \"old\",",
" \"new\",",
" entity_id,",
" id,",
" kind,",
" modified_at,",
" modified_by",
") VALUES (",
" NULL,",
" '{",
" \"is_primary\":false,",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:email2_id}}\",",
" \"target_type\":\"email_address\",",
" \"type\":\"contact\"",
" }',",
" '{{uuid:contact3_id}}',",
" '{{uuid}}',",
" 'create',",
@ -2073,16 +2278,16 @@
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"first_name\":\"Relation\",",
" \"id\":\"{{uuid:person_id}}\",",
" \"last_name\":\"Test\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"type\":\"person\"",
" },",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"first_name\":\"Relation\",",
" \"id\":\"{{uuid:person_id}}\",",
" \"last_name\":\"Test\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"type\":\"person\"",
" },",
" \"new\":{",
" \"first_name\":\"Relation\",",
" \"last_name\":\"Test\",",
@ -2092,19 +2297,19 @@
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact1_id}}\",",
" \"is_primary\":true,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:phone1_id}}\",",
" \"target_type\":\"phone_number\",",
" \"type\":\"contact\"",
" },",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact1_id}}\",",
" \"is_primary\":true,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:phone1_id}}\",",
" \"target_type\":\"phone_number\",",
" \"type\":\"contact\"",
" },",
" \"new\":{",
" \"is_primary\":true,",
" \"source_id\":\"{{uuid:person_id}}\",",
@ -2117,15 +2322,15 @@
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:phone1_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"number\":\"555-0001\",",
" \"type\":\"phone_number\"",
" },",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:phone1_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"number\":\"555-0001\",",
" \"type\":\"phone_number\"",
" },",
" \"new\":{",
" \"number\":\"555-0001\",",
" \"type\":\"phone_number\"",
@ -2134,87 +2339,87 @@
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact2_id}}\",",
" \"is_primary\":false,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:phone2_id}}\",",
" \"target_type\":\"phone_number\",",
" \"type\":\"contact\"",
" },",
" \"new\":{",
" \"is_primary\":false,",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:phone2_id}}\",",
" \"target_type\":\"phone_number\",",
" \"type\":\"contact\"",
" }",
" }')"
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact2_id}}\",",
" \"is_primary\":false,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:email1_id}}\",",
" \"target_type\":\"email_address\",",
" \"type\":\"contact\"",
" },",
" \"new\":{",
" \"is_primary\":false,",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:email1_id}}\",",
" \"target_type\":\"email_address\",",
" \"type\":\"contact\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:phone2_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"number\":\"555-0002\",",
" \"type\":\"phone_number\"",
" },",
" \"new\":{",
" \"number\":\"555-0002\",",
" \"type\":\"phone_number\"",
" }",
" }')"
" \"complete\":{",
" \"address\":\"test@example.com\",",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:email1_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"type\":\"email_address\"",
" },",
" \"new\":{",
" \"address\":\"test@example.com\",",
" \"type\":\"email_address\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact3_id}}\",",
" \"is_primary\":false,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:email1_id}}\",",
" \"target_type\":\"email_address\",",
" \"type\":\"contact\"",
" },",
" \"new\":{",
" \"is_primary\":false,",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:email1_id}}\",",
" \"target_type\":\"email_address\",",
" \"type\":\"contact\"",
" }",
" }')"
" \"complete\":{",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:contact3_id}}\",",
" \"is_primary\":false,",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:email2_id}}\",",
" \"target_type\":\"email_address\",",
" \"type\":\"contact\"",
" },",
" \"new\":{",
" \"is_primary\":false,",
" \"source_id\":\"{{uuid:person_id}}\",",
" \"source_type\":\"person\",",
" \"target_id\":\"{{uuid:email2_id}}\",",
" \"target_type\":\"email_address\",",
" \"type\":\"contact\"",
" }",
"}')"
],
[
"SELECT pg_notify('entity', '{",
" \"complete\":{",
" \"address\":\"test@example.com\",",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:email1_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"type\":\"email_address\"",
" },",
" \"new\":{",
" \"address\":\"test@example.com\",",
" \"type\":\"email_address\"",
" }",
" }')"
" \"complete\":{",
" \"address\":\"test2@example.com\",",
" \"created_at\":\"{{timestamp}}\",",
" \"created_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"id\":\"{{uuid:email2_id}}\",",
" \"modified_at\":\"{{timestamp}}\",",
" \"modified_by\":\"00000000-0000-0000-0000-000000000000\",",
" \"type\":\"email_address\"",
" },",
" \"new\":{",
" \"address\":\"test2@example.com\",",
" \"type\":\"email_address\"",
" }",
"}')"
]
]
}

View File

@ -27,7 +27,9 @@
{
"$id": "get_orders.response",
"type": "array",
"items": { "$ref": "light.order" }
"items": {
"$ref": "light.order"
}
}
]
}
@ -69,7 +71,7 @@
{
"id": "22222222-2222-2222-2222-222222222222",
"type": "relation",
"constraint": "fk_order_customer",
"constraint": "fk_order_customer_person",
"source_type": "order",
"source_columns": [
"customer_id"
@ -80,6 +82,22 @@
],
"prefix": "customer"
},
{
"id": "22222222-2222-2222-2222-222222222227",
"type": "relation",
"constraint": "fk_order_counterparty_entity",
"source_type": "order",
"source_columns": [
"counterparty_id",
"counterparty_type"
],
"destination_type": "entity",
"destination_columns": [
"id",
"type"
],
"prefix": "counterparty"
},
{
"id": "33333333-3333-3333-3333-333333333333",
"type": "relation",
@ -91,8 +109,7 @@
"destination_type": "order",
"destination_columns": [
"id"
],
"prefix": "lines"
]
}
],
"types": [
@ -713,14 +730,18 @@
"created_by",
"modified_at",
"modified_by",
"archived"
"archived",
"counterparty_id",
"counterparty_type"
],
"grouped_fields": {
"order": [
"id",
"type",
"total",
"customer_id"
"customer_id",
"counterparty_id",
"counterparty_type"
],
"entity": [
"id",
@ -748,7 +769,9 @@
"created_at": "timestamptz",
"created_by": "uuid",
"modified_at": "timestamptz",
"modified_by": "uuid"
"modified_by": "uuid",
"counterparty_id": "uuid",
"counterparty_type": "text"
},
"variations": [
"order"

2
flows

Submodule flows updated: a7b0f5dc4d...4d61e13e00

View File

@ -45,7 +45,7 @@ impl MockExecutor {
#[cfg(test)]
impl DatabaseExecutor for MockExecutor {
fn query(&self, sql: &str, _args: Option<&[Value]>) -> Result<Value, String> {
println!("DEBUG SQL QUERY: {}", sql);
println!("JSPG_SQL: {}", sql);
MOCK_STATE.with(|state| {
let mut s = state.borrow_mut();
s.captured_queries.push(sql.to_string());
@ -66,7 +66,7 @@ impl DatabaseExecutor for MockExecutor {
}
fn execute(&self, sql: &str, _args: Option<&[Value]>) -> Result<(), String> {
println!("DEBUG SQL EXECUTE: {}", sql);
println!("JSPG_SQL: {}", sql);
MOCK_STATE.with(|state| {
let mut s = state.borrow_mut();
s.captured_queries.push(sql.to_string());
@ -124,42 +124,23 @@ fn parse_and_match_mocks(sql: &str, mocks: &[Value]) -> Option<Vec<Value>> {
return None;
};
// 2. Extract WHERE conditions
let mut conditions = Vec::new();
// 2. Extract WHERE conditions string
let mut where_clause = String::new();
if let Some(where_idx) = sql_upper.find(" WHERE ") {
let mut where_end = sql_upper.find(" ORDER BY ").unwrap_or(sql.len());
let mut where_end = sql_upper.find(" ORDER BY ").unwrap_or(sql_upper.len());
if let Some(limit_idx) = sql_upper.find(" LIMIT ") {
if limit_idx < where_end {
where_end = limit_idx;
}
}
let where_clause = &sql[where_idx + 7..where_end];
let and_regex = Regex::new(r"(?i)\s+AND\s+").ok()?;
let parts = and_regex.split(where_clause);
for part in parts {
if let Some(eq_idx) = part.find('=') {
let left = part[..eq_idx]
.trim()
.split('.')
.last()
.unwrap_or("")
.trim_matches('"');
let right = part[eq_idx + 1..].trim().trim_matches('\'');
conditions.push((left.to_string(), right.to_string()));
} else if part.to_uppercase().contains(" IS NULL") {
let left = part[..part.to_uppercase().find(" IS NULL").unwrap()]
.trim()
.split('.')
.last()
.unwrap_or("")
.replace('"', ""); // Remove quotes explicitly
conditions.push((left, "null".to_string()));
}
}
where_clause = sql[where_idx + 7..where_end].to_string();
}
// 3. Find matching mocks
let mut matches = Vec::new();
let or_regex = Regex::new(r"(?i)\s+OR\s+").ok()?;
let and_regex = Regex::new(r"(?i)\s+AND\s+").ok()?;
for mock in mocks {
if let Some(mock_obj) = mock.as_object() {
if let Some(t) = mock_obj.get("type") {
@ -168,25 +149,66 @@ fn parse_and_match_mocks(sql: &str, mocks: &[Value]) -> Option<Vec<Value>> {
}
}
let mut matches_all = true;
for (k, v) in &conditions {
let mock_val_str = match mock_obj.get(k) {
Some(Value::String(s)) => s.clone(),
Some(Value::Number(n)) => n.to_string(),
Some(Value::Bool(b)) => b.to_string(),
Some(Value::Null) => "null".to_string(),
_ => {
matches_all = false;
break;
if where_clause.is_empty() {
matches.push(mock.clone());
continue;
}
let or_parts = or_regex.split(&where_clause);
let mut any_branch_matched = false;
for or_part in or_parts {
let branch_str = or_part.replace('(', "").replace(')', "");
let mut branch_matches = true;
for part in and_regex.split(&branch_str) {
if let Some(eq_idx) = part.find('=') {
let left = part[..eq_idx]
.trim()
.split('.')
.last()
.unwrap_or("")
.trim_matches('"');
let right = part[eq_idx + 1..].trim().trim_matches('\'');
let mock_val_str = match mock_obj.get(left) {
Some(Value::String(s)) => s.clone(),
Some(Value::Number(n)) => n.to_string(),
Some(Value::Bool(b)) => b.to_string(),
Some(Value::Null) => "null".to_string(),
_ => "".to_string(),
};
if mock_val_str != right {
branch_matches = false;
break;
}
} else if part.to_uppercase().contains(" IS NULL") {
let left = part[..part.to_uppercase().find(" IS NULL").unwrap()]
.trim()
.split('.')
.last()
.unwrap_or("")
.trim_matches('"');
let mock_val_str = match mock_obj.get(left) {
Some(Value::Null) => "null".to_string(),
_ => "".to_string(),
};
if mock_val_str != "null" {
branch_matches = false;
break;
}
}
};
if mock_val_str != *v {
matches_all = false;
}
if branch_matches {
any_branch_matched = true;
break;
}
}
if matches_all {
if any_branch_matched {
matches.push(mock.clone());
}
}

View File

@ -79,9 +79,9 @@ impl DatabaseExecutor for SpiExecutor {
}
}
pgrx::debug1!("JSPG_SQL: {}", sql);
self.transact(|| {
Spi::connect(|client| {
pgrx::notice!("JSPG_SQL: {}", sql);
match client.select(sql, Some(args_with_oid.len() as i64), &args_with_oid) {
Ok(tup_table) => {
let mut results = Vec::new();
@ -110,9 +110,9 @@ impl DatabaseExecutor for SpiExecutor {
}
}
pgrx::debug1!("JSPG_SQL: {}", sql);
self.transact(|| {
Spi::connect_mut(|client| {
pgrx::notice!("JSPG_SQL: {}", sql);
match client.update(sql, Some(args_with_oid.len() as i64), &args_with_oid) {
Ok(_) => Ok(()),
Err(e) => Err(format!("SPI Execution Failure: {}", e)),

View File

@ -497,6 +497,10 @@ impl Schema {
Ok(())
}
/// Dynamically infers and compiles all structural database relationships between this Schema
/// and its nested children. This functions recursively traverses the JSON Schema abstract syntax
/// tree, identifies physical PostgreSQL table boundaries, and locks the resulting relation
/// constraint paths directly onto the `compiled_edges` map in O(1) memory.
pub fn compile_edges(
&self,
db: &crate::database::Database,
@ -504,19 +508,25 @@ impl Schema {
props: &std::collections::BTreeMap<String, std::sync::Arc<Schema>>,
) -> std::collections::BTreeMap<String, crate::database::edge::Edge> {
let mut schema_edges = std::collections::BTreeMap::new();
// Determine the physical Database Table Name this schema structurally represents
// Plucks the polymorphic discriminator via dot-notation (e.g. extracting "person" from "full.person")
let mut parent_type_name = None;
if let Some(family) = &self.obj.family {
parent_type_name = Some(family.split('.').next_back().unwrap_or(family).to_string());
} else if let Some(identifier) = self.obj.identifier() {
parent_type_name = Some(identifier);
parent_type_name = Some(identifier.split('.').next_back().unwrap_or(&identifier).to_string());
}
if let Some(p_type) = parent_type_name {
// Proceed only if the resolved table physically exists within the Postgres Type hierarchy
if db.types.contains_key(&p_type) {
// Iterate over all discovered schema boundaries mapped inside the object
for (prop_name, prop_schema) in props {
let mut child_type_name = None;
let mut target_schema = prop_schema.clone();
// Structurally unpack the inner target entity if the object maps to an array list
if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) =
&prop_schema.obj.type_
{
@ -527,24 +537,29 @@ impl Schema {
}
}
// Determine the physical Postgres table backing the nested child schema recursively
if let Some(family) = &target_schema.obj.family {
child_type_name = Some(family.split('.').next_back().unwrap_or(family).to_string());
} else if let Some(ref_id) = target_schema.obj.identifier() {
child_type_name = Some(ref_id);
child_type_name = Some(ref_id.split('.').next_back().unwrap_or(&ref_id).to_string());
} else if let Some(arr) = &target_schema.obj.one_of {
if let Some(first) = arr.first() {
if let Some(ref_id) = first.obj.identifier() {
child_type_name = Some(ref_id);
child_type_name = Some(ref_id.split('.').next_back().unwrap_or(&ref_id).to_string());
}
}
}
if let Some(c_type) = child_type_name {
if db.types.contains_key(&c_type) {
// Ensure the child Schema's AST has accurately compiled its own physical property keys so we can
// inject them securely for Many-to-Many Twin Deduction disambiguation matching.
target_schema.compile(db, visited);
if let Some(compiled_target_props) = target_schema.obj.compiled_properties.get() {
let keys_for_ambiguity: Vec<String> =
compiled_target_props.keys().cloned().collect();
// Interrogate the Database catalog graph to discover the exact Foreign Key Constraint connecting the components
if let Some((relation, is_forward)) =
resolve_relation(db, &p_type, &c_type, prop_name, Some(&keys_for_ambiguity))
{
@ -566,6 +581,8 @@ impl Schema {
}
}
/// Inspects the Postgres pg_constraint relations catalog to securely identify
/// the precise Foreign Key connecting a parent and child hierarchy path.
pub(crate) fn resolve_relation<'a>(
db: &'a crate::database::Database,
parent_type: &str,
@ -573,6 +590,8 @@ pub(crate) fn resolve_relation<'a>(
prop_name: &str,
relative_keys: Option<&Vec<String>>,
) -> Option<(&'a crate::database::relation::Relation, bool)> {
// Enforce graph locality by ensuring we don't accidentally crawl to pure structural entity boundaries
if parent_type == "entity" && child_type == "entity" {
return None;
}
@ -583,6 +602,9 @@ pub(crate) fn resolve_relation<'a>(
let mut matching_rels = Vec::new();
let mut directions = Vec::new();
// Scour the complete catalog for any Edge matching the inheritance scope of the two objects
// This automatically binds polymorphic structures (e.g. recognizing a relationship targeting User
// also natively binds instances specifically typed as Person).
for rel in db.relations.values() {
let is_forward = p_def.hierarchy.contains(&rel.source_type)
&& c_def.hierarchy.contains(&rel.destination_type);
@ -598,10 +620,12 @@ pub(crate) fn resolve_relation<'a>(
}
}
// Abort relation discovery early if no hierarchical inheritance match was found
if matching_rels.is_empty() {
return None;
}
// Ideal State: The objects only share a solitary structural relation, resolving ambiguity instantly.
if matching_rels.len() == 1 {
return Some((matching_rels[0], directions[0]));
}
@ -609,6 +633,8 @@ pub(crate) fn resolve_relation<'a>(
let mut chosen_idx = 0;
let mut resolved = false;
// Exact Prefix Disambiguation: Determine if the database specifically names this constraint
// directly mapping to the JSON Schema property name (e.g., `fk_{child}_{property_name}`)
for (i, rel) in matching_rels.iter().enumerate() {
if let Some(prefix) = &rel.prefix {
if prop_name.starts_with(prefix)
@ -622,18 +648,55 @@ pub(crate) fn resolve_relation<'a>(
}
}
// Complex Subgraph Resolution: The database contains multiple equally explicit foreign key constraints
// linking these objects (such as pointing to `source` and `target` in Many-to-Many junction models).
if !resolved && relative_keys.is_some() {
// Twin Deduction Pass 1: We inspect the exact properties structurally defined inside the compiled payload
// to observe which explicit relation arrow the child payload natively consumes.
let keys = relative_keys.unwrap();
let mut missing_prefix_ids = Vec::new();
let mut consumed_rel_idx = None;
for (i, rel) in matching_rels.iter().enumerate() {
if let Some(prefix) = &rel.prefix {
if !keys.contains(prefix) {
missing_prefix_ids.push(i);
if keys.contains(prefix) {
consumed_rel_idx = Some(i);
break; // Found the routing edge explicitly consumed by the schema payload
}
}
}
if missing_prefix_ids.len() == 1 {
chosen_idx = missing_prefix_ids[0];
// Twin Deduction Pass 2: Knowing which arrow points outbound, we can mathematically deduce its twin
// providing the reverse ownership on the same junction boundary must be the incoming Edge to the parent.
if let Some(used_idx) = consumed_rel_idx {
let used_rel = matching_rels[used_idx];
let mut twin_ids = Vec::new();
for (i, rel) in matching_rels.iter().enumerate() {
if i != used_idx
&& rel.source_type == used_rel.source_type
&& rel.destination_type == used_rel.destination_type
&& rel.prefix.is_some()
{
twin_ids.push(i);
}
}
if twin_ids.len() == 1 {
chosen_idx = twin_ids[0];
resolved = true;
}
}
}
// Implicit Base Fallback: If no complex explicit paths resolve, but exactly one relation
// sits entirely naked (without a constraint prefix), it must be the core structural parent ownership.
if !resolved {
let mut null_prefix_ids = Vec::new();
for (i, rel) in matching_rels.iter().enumerate() {
if rel.prefix.is_none() {
null_prefix_ids.push(i);
}
}
if null_prefix_ids.len() == 1 {
chosen_idx = null_prefix_ids[0];
}
}

View File

@ -3,8 +3,8 @@
pub mod cache;
use crate::database::r#type::Type;
use crate::database::Database;
use crate::database::r#type::Type;
use serde_json::Value;
use std::sync::Arc;
@ -25,19 +25,19 @@ impl Merger {
let mut notifications_queue = Vec::new();
let target_schema = match self.db.schemas.get(schema_id) {
Some(s) => Arc::new(s.clone()),
None => {
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "MERGE_FAILED".to_string(),
message: format!("Unknown schema_id: {}", schema_id),
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: Some(data),
schema: None,
},
}]);
}
Some(s) => Arc::new(s.clone()),
None => {
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "MERGE_FAILED".to_string(),
message: format!("Unknown schema_id: {}", schema_id),
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: Some(data),
schema: None,
},
}]);
}
};
let result = self.merge_internal(target_schema, data.clone(), &mut notifications_queue);
@ -50,18 +50,24 @@ impl Merger {
let mut final_cause = None;
if let Ok(Value::Object(map)) = serde_json::from_str::<Value>(&msg) {
if let (Some(Value::String(e_msg)), Some(Value::String(e_code))) = (map.get("error"), map.get("code")) {
if let (Some(Value::String(e_msg)), Some(Value::String(e_code))) =
(map.get("error"), map.get("code"))
{
final_message = e_msg.clone();
final_code = e_code.clone();
let mut cause_parts = Vec::new();
if let Some(Value::String(d)) = map.get("detail") {
if !d.is_empty() { cause_parts.push(d.clone()); }
if !d.is_empty() {
cause_parts.push(d.clone());
}
}
if let Some(Value::String(h)) = map.get("hint") {
if !h.is_empty() { cause_parts.push(h.clone()); }
if !h.is_empty() {
cause_parts.push(h.clone());
}
}
if !cause_parts.is_empty() {
final_cause = Some(cause_parts.join("\n"));
final_cause = Some(cause_parts.join("\n"));
}
}
}
@ -144,11 +150,11 @@ impl Merger {
) -> Result<Value, String> {
let mut item_schema = schema.clone();
if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) = &schema.obj.type_ {
if t == "array" {
if let Some(items_def) = &schema.obj.items {
item_schema = items_def.clone();
}
if t == "array" {
if let Some(items_def) = &schema.obj.items {
item_schema = items_def.clone();
}
}
}
let mut resolved_items = Vec::new();
@ -178,8 +184,8 @@ impl Merger {
};
let compiled_props = match schema.obj.compiled_properties.get() {
Some(props) => props,
None => return Err("Schema has no compiled properties for merging".to_string()),
Some(props) => props,
None => return Err("Schema has no compiled properties for merging".to_string()),
};
let mut entity_fields = serde_json::Map::new();
@ -189,37 +195,37 @@ impl Merger {
for (k, v) in obj {
// Always retain system and unmapped core fields natively implicitly mapped to the Postgres tables
if k == "id" || k == "type" || k == "created" {
entity_fields.insert(k.clone(), v.clone());
continue;
entity_fields.insert(k.clone(), v.clone());
continue;
}
if let Some(prop_schema) = compiled_props.get(&k) {
let mut is_edge = false;
if let Some(edges) = schema.obj.compiled_edges.get() {
if edges.contains_key(&k) {
is_edge = true;
}
}
if is_edge {
let typeof_v = match &v {
Value::Object(_) => "object",
Value::Array(_) => "array",
_ => "field", // Malformed edge data?
};
if typeof_v == "object" {
entity_objects.insert(k.clone(), (v.clone(), prop_schema.clone()));
} else if typeof_v == "array" {
entity_arrays.insert(k.clone(), (v.clone(), prop_schema.clone()));
} else {
entity_fields.insert(k.clone(), v.clone());
}
} else {
// Not an edge! It's a raw Postgres column (e.g., JSONB, text[])
entity_fields.insert(k.clone(), v.clone());
}
} else if type_def.fields.contains(&k) {
let mut is_edge = false;
if let Some(edges) = schema.obj.compiled_edges.get() {
if edges.contains_key(&k) {
is_edge = true;
}
}
if is_edge {
let typeof_v = match &v {
Value::Object(_) => "object",
Value::Array(_) => "array",
_ => "field", // Malformed edge data?
};
if typeof_v == "object" {
entity_objects.insert(k.clone(), (v.clone(), prop_schema.clone()));
} else if typeof_v == "array" {
entity_arrays.insert(k.clone(), (v.clone(), prop_schema.clone()));
} else {
entity_fields.insert(k.clone(), v.clone());
}
} else {
// Not an edge! It's a raw Postgres column (e.g., JSONB, text[])
entity_fields.insert(k.clone(), v.clone());
}
} else if type_def.fields.contains(&k) {
entity_fields.insert(k.clone(), v.clone());
}
}
@ -228,13 +234,15 @@ impl Merger {
let mut entity_change_kind = None;
let mut entity_fetched = None;
let mut entity_replaces = None;
if !type_def.relationship {
let (fields, kind, fetched) =
let (fields, kind, fetched, replaces) =
self.stage_entity(entity_fields.clone(), type_def, &user_id, &timestamp)?;
entity_fields = fields;
entity_change_kind = kind;
entity_fetched = fetched;
entity_replaces = replaces;
}
let mut entity_response = serde_json::Map::new();
@ -251,12 +259,10 @@ impl Merger {
};
if let Some(compiled_edges) = schema.obj.compiled_edges.get() {
println!("Compiled Edges keys for relation {}: {:?}", relation_name, compiled_edges.keys().collect::<Vec<_>>());
if let Some(edge) = compiled_edges.get(&relation_name) {
println!("FOUND EDGE {} -> {:?}", relation_name, edge.constraint);
if let Some(relation) = self.db.relations.get(&edge.constraint) {
let parent_is_source = edge.forward;
if parent_is_source {
if !relative.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
@ -264,15 +270,16 @@ impl Merger {
}
}
let mut merged_relative = match self.merge_internal(rel_schema.clone(), Value::Object(relative), notifications)? {
let mut merged_relative = match self.merge_internal(
rel_schema.clone(),
Value::Object(relative),
notifications,
)? {
Value::Object(m) => m,
_ => continue,
};
merged_relative.insert(
"type".to_string(),
Value::String(relative_type_name),
);
merged_relative.insert("type".to_string(), Value::String(relative_type_name));
Self::apply_entity_relation(
&mut entity_fields,
@ -295,7 +302,11 @@ impl Merger {
&entity_fields,
);
let merged_relative = match self.merge_internal(rel_schema.clone(), Value::Object(relative), notifications)? {
let merged_relative = match self.merge_internal(
rel_schema.clone(),
Value::Object(relative),
notifications,
)? {
Value::Object(m) => m,
_ => continue,
};
@ -308,11 +319,12 @@ impl Merger {
}
if type_def.relationship {
let (fields, kind, fetched) =
let (fields, kind, fetched, replaces) =
self.stage_entity(entity_fields.clone(), type_def, &user_id, &timestamp)?;
entity_fields = fields;
entity_change_kind = kind;
entity_fetched = fetched;
entity_replaces = replaces;
}
self.merge_entity_fields(
@ -357,19 +369,24 @@ impl Merger {
);
let mut item_schema = rel_schema.clone();
if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) = &rel_schema.obj.type_ {
if t == "array" {
if let Some(items_def) = &rel_schema.obj.items {
item_schema = items_def.clone();
}
if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) =
&rel_schema.obj.type_
{
if t == "array" {
if let Some(items_def) = &rel_schema.obj.items {
item_schema = items_def.clone();
}
}
}
let merged_relative =
match self.merge_internal(item_schema, Value::Object(relative_item), notifications)? {
Value::Object(m) => m,
_ => continue,
};
let merged_relative = match self.merge_internal(
item_schema,
Value::Object(relative_item),
notifications,
)? {
Value::Object(m) => m,
_ => continue,
};
relative_responses.push(Value::Object(merged_relative));
}
@ -388,6 +405,7 @@ impl Merger {
entity_change_kind.as_deref(),
&user_id,
&timestamp,
entity_replaces.as_deref(),
)?;
if let Some(sql) = notify_sql {
@ -419,6 +437,7 @@ impl Merger {
serde_json::Map<String, Value>,
Option<String>,
Option<serde_json::Map<String, Value>>,
Option<String>,
),
String,
> {
@ -428,8 +447,8 @@ impl Merger {
// An anchor is STRICTLY a struct containing merely an `id` and `type`.
// We aggressively bypass Database SPI `SELECT` fetches because there are no primitive
// mutations to apply to the row. PostgreSQL inherently protects relationships via Foreign Keys downstream.
let is_anchor = entity_fields.len() == 2
&& entity_fields.contains_key("id")
let is_anchor = entity_fields.len() == 2
&& entity_fields.contains_key("id")
&& entity_fields.contains_key("type");
let has_valid_id = entity_fields
@ -438,11 +457,22 @@ impl Merger {
.map_or(false, |s| !s.is_empty());
if is_anchor && has_valid_id {
return Ok((entity_fields, None, None));
return Ok((entity_fields, None, None, None));
}
let entity_fetched = self.fetch_entity(&entity_fields, type_def)?;
let mut replaces_id = None;
if let Some(ref fetched_row) = entity_fetched {
let provided_id = entity_fields.get("id").and_then(|v| v.as_str());
let fetched_id = fetched_row.get("id").and_then(|v| v.as_str());
if let (Some(pid), Some(fid)) = (provided_id, fetched_id) {
if !pid.is_empty() && pid != fid {
replaces_id = Some(pid.to_string());
}
}
}
let system_keys = vec![
"id".to_string(),
"type".to_string(),
@ -492,7 +522,7 @@ impl Merger {
);
entity_fields = new_fields;
} else if changes.is_empty() {
} else if changes.is_empty() && replaces_id.is_none() {
let mut new_fields = serde_json::Map::new();
new_fields.insert(
"id".to_string(),
@ -508,6 +538,8 @@ impl Merger {
.unwrap_or(false);
entity_change_kind = if is_archived {
Some("delete".to_string())
} else if changes.is_empty() && replaces_id.is_some() {
Some("replace".to_string())
} else {
Some("update".to_string())
};
@ -530,7 +562,12 @@ impl Merger {
entity_fields = new_fields;
}
Ok((entity_fields, entity_change_kind, entity_fetched))
Ok((
entity_fields,
entity_change_kind,
entity_fetched,
replaces_id,
))
}
fn fetch_entity(
@ -585,11 +622,14 @@ impl Merger {
template
};
let where_clause = if let Some(id) = id_val {
format!("WHERE t1.id = {}", Self::quote_literal(id))
} else if lookup_complete {
let mut lookup_predicates = Vec::new();
let mut where_parts = Vec::new();
if let Some(id) = id_val {
where_parts.push(format!("t1.id = {}", Self::quote_literal(id)));
}
if lookup_complete {
let mut lookup_predicates = Vec::new();
for column in &entity_type.lookup_fields {
let val = entity_fields.get(column).unwrap_or(&Value::Null);
if column == "type" {
@ -598,10 +638,14 @@ impl Merger {
lookup_predicates.push(format!("\"{}\" = {}", column, Self::quote_literal(val)));
}
}
format!("WHERE {}", lookup_predicates.join(" AND "))
} else {
where_parts.push(format!("({})", lookup_predicates.join(" AND ")));
}
if where_parts.is_empty() {
return Ok(None);
};
}
let where_clause = format!("WHERE {}", where_parts.join(" OR "));
let final_sql = format!("{} {}", fetch_sql_template, where_clause);
@ -710,9 +754,7 @@ impl Merger {
columns.join(", "),
values.join(", ")
);
self
.db
.execute(&sql, None)?;
self.db.execute(&sql, None)?;
} else if change_kind == "update" || change_kind == "delete" {
entity_pairs.remove("id");
entity_pairs.remove("type");
@ -744,9 +786,7 @@ impl Merger {
set_clauses.join(", "),
Self::quote_literal(&Value::String(id_str.to_string()))
);
self
.db
.execute(&sql, None)?;
self.db.execute(&sql, None)?;
}
}
@ -761,6 +801,7 @@ impl Merger {
entity_change_kind: Option<&str>,
user_id: &str,
timestamp: &str,
replaces_id: Option<&str>,
) -> Result<Option<String>, String> {
let change_kind = match entity_change_kind {
Some(k) => k,
@ -772,9 +813,9 @@ impl Merger {
let mut old_vals = serde_json::Map::new();
let mut new_vals = serde_json::Map::new();
let is_update = change_kind == "update" || change_kind == "delete";
let exists = change_kind == "update" || change_kind == "delete" || change_kind == "replace";
if !is_update {
if !exists {
let system_keys = vec![
"id".to_string(),
"created_by".to_string(),
@ -811,7 +852,7 @@ impl Merger {
}
let mut complete = entity_fields.clone();
if is_update {
if exists {
if let Some(fetched) = entity_fetched {
let mut temp = fetched.clone();
for (k, v) in entity_fields {
@ -831,13 +872,17 @@ impl Merger {
let mut notification = serde_json::Map::new();
notification.insert("complete".to_string(), Value::Object(complete));
notification.insert("new".to_string(), new_val_obj.clone());
if old_val_obj != Value::Null {
notification.insert("old".to_string(), old_val_obj.clone());
notification.insert("old".to_string(), old_val_obj.clone());
}
if let Some(rep) = replaces_id {
notification.insert("replaces".to_string(), Value::String(rep.to_string()));
}
let mut notify_sql = None;
if type_obj.historical {
if type_obj.historical && change_kind != "replace" {
let change_sql = format!(
"INSERT INTO agreego.change (\"old\", \"new\", entity_id, id, kind, modified_at, modified_by) VALUES ({}, {}, {}, {}, {}, {}, {})",
Self::quote_literal(&old_val_obj),

View File

@ -67,7 +67,10 @@ impl<'a> Compiler<'a> {
if let Some(items) = &node.schema.obj.items {
let mut resolved_type = None;
if let Some(family_target) = items.obj.family.as_ref() {
let base_type_name = family_target.split('.').next_back().unwrap_or(family_target);
let base_type_name = family_target
.split('.')
.next_back()
.unwrap_or(family_target);
resolved_type = self.db.types.get(base_type_name);
} else if let Some(base_type_name) = items.obj.identifier() {
resolved_type = self.db.types.get(&base_type_name);
@ -89,7 +92,10 @@ impl<'a> Compiler<'a> {
}
// 3. Fallback for root execution of standalone non-entity arrays
Err("Cannot compile a root array without a valid entity reference or table mapped via `items`.".to_string())
Err(
"Cannot compile a root array without a valid entity reference or table mapped via `items`."
.to_string(),
)
}
fn compile_reference(&mut self, node: Node<'a>) -> Result<(String, String), String> {
@ -452,7 +458,6 @@ impl<'a> Compiler<'a> {
},
};
let (val_sql, val_type) = self.compile_node(child_node)?;
if val_type != "abort" {
@ -515,7 +520,13 @@ impl<'a> Compiler<'a> {
// Determine if the property schema resolves to a physical Database Entity
let mut bound_type_name = None;
if let Some(family_target) = prop_schema.obj.family.as_ref() {
bound_type_name = Some(family_target.split('.').next_back().unwrap_or(family_target).to_string());
bound_type_name = Some(
family_target
.split('.')
.next_back()
.unwrap_or(family_target)
.to_string(),
);
} else if let Some(lookup_key) = prop_schema.obj.identifier() {
bound_type_name = Some(lookup_key);
}
@ -536,7 +547,10 @@ impl<'a> Compiler<'a> {
}
if let Some(col) = poly_col {
if let Some(alias) = type_aliases.get(table_to_alias).or_else(|| type_aliases.get(&node.parent_alias)) {
if let Some(alias) = type_aliases
.get(table_to_alias)
.or_else(|| type_aliases.get(&node.parent_alias))
{
where_clauses.push(format!("{}.{} = '{}'", alias, col, type_name));
}
}
@ -710,8 +724,6 @@ impl<'a> Compiler<'a> {
) -> Result<(), String> {
if let Some(prop_ref) = &node.property_name {
let prop = prop_ref.as_str();
println!("DEBUG: Eval prop: {}", prop);
let mut parent_relation_alias = node.parent_alias.clone();
let mut child_relation_alias = base_alias.to_string();

View File

@ -8596,3 +8596,15 @@ fn test_merger_0_10() {
let path = format!("{}/fixtures/merger.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 10).unwrap();
}
#[test]
fn test_merger_0_11() {
let path = format!("{}/fixtures/merger.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 11).unwrap();
}
#[test]
fn test_merger_0_12() {
let path = format!("{}/fixtures/merger.json", env!("CARGO_MANIFEST_DIR"));
crate::tests::runner::run_test_case(&path, 0, 12).unwrap();
}

View File

@ -1 +1 @@
1.0.93
1.0.101