Files
jspg/src/lib.rs

271 lines
8.0 KiB
Rust

use pgrx::*;
pg_module_magic!();
pub mod compiler;
pub mod drop;
pub mod formats;
pub mod registry;
mod schema;
pub mod util;
mod validator;
use crate::schema::Schema;
use serde_json::{Value, json};
use std::sync::{Arc, RwLock};
lazy_static::lazy_static! {
// Global Atomic Swap Container:
// - RwLock: To protect the SWAP of the Option.
// - Option: Because it starts empty.
// - Arc: Because multiple running threads might hold the OLD validator while we swap.
// - Validator: It immutably owns the Registry.
static ref GLOBAL_VALIDATOR: RwLock<Option<Arc<validator::Validator>>> = RwLock::new(None);
}
#[pg_extern(strict)]
fn cache_json_schemas(enums: JsonB, types: JsonB, puncs: JsonB) -> JsonB {
// 1. Build a new Registry LOCALLY (on stack)
let mut registry = registry::Registry::new();
// Generate Family Schemas from Types
{
let mut family_map: std::collections::HashMap<String, std::collections::HashSet<String>> =
std::collections::HashMap::new();
if let Value::Array(arr) = &types.0 {
for item in arr {
if let Some(name) = item.get("name").and_then(|v| v.as_str()) {
if let Some(hierarchy) = item.get("hierarchy").and_then(|v| v.as_array()) {
for ancestor in hierarchy {
if let Some(anc_str) = ancestor.as_str() {
family_map
.entry(anc_str.to_string())
.or_default()
.insert(name.to_string());
}
}
}
}
}
}
for (family_name, members) in family_map {
let id = format!("{}.family", family_name);
// Object Union (for polymorphic object validation)
// This allows the schema to match ANY of the types in the family hierarchy
let object_refs: Vec<Value> = members.iter().map(|s| json!({ "$ref": s })).collect();
let schema_json = json!({
"$id": id,
"oneOf": object_refs
});
if let Ok(schema) = serde_json::from_value::<Schema>(schema_json) {
registry.add(schema);
}
}
// Helper to parse and cache a list of items
let mut cache_items = |items: JsonB| {
if let Value::Array(arr) = items.0 {
for item in arr {
// For now, we assume the item structure matches what the generator expects
// or what `json_schemas.sql` sends.
// The `Schema` struct in `schema.rs` is designed to deserialize standard JSON Schema.
// However, the input here is an array of objects that *contain* a `schemas` array.
// We need to extract those inner schemas.
if let Some(schemas_val) = item.get("schemas") {
if let Value::Array(schemas) = schemas_val {
for schema_val in schemas {
// Deserialize into our robust Schema struct to ensure validity/parsing
if let Ok(schema) = serde_json::from_value::<Schema>(schema_val.clone()) {
// Registry handles compilation
registry.add(schema);
}
}
}
}
}
}
};
cache_items(enums);
cache_items(types);
cache_items(puncs); // public/private distinction logic to come later
}
// 2. Wrap in Validator and Arc
let new_validator = validator::Validator::new(registry);
let new_arc = Arc::new(new_validator);
// 3. ATOMIC SWAP
{
let mut lock = GLOBAL_VALIDATOR.write().unwrap();
*lock = Some(new_arc);
}
JsonB(json!({ "response": "success" }))
}
#[pg_extern(strict, parallel_safe)]
fn mask_json_schema(schema_id: &str, instance: JsonB) -> JsonB {
// 1. Acquire Snapshot
let validator_arc = {
let lock = GLOBAL_VALIDATOR.read().unwrap();
lock.clone()
};
// 2. Validate (Lock-Free)
if let Some(validator) = validator_arc {
// We need a mutable copy of the value to mask it
let mut mutable_instance = instance.0.clone();
match validator.mask(schema_id, &mut mutable_instance) {
Ok(result) => {
// If valid, return the MASKED instance
if result.is_valid() {
let drop = crate::drop::Drop::success_with_val(mutable_instance);
JsonB(serde_json::to_value(drop).unwrap())
} else {
// If invalid, return errors (Schema Validation Errors)
let errors: Vec<crate::drop::Error> = result
.errors
.into_iter()
.map(|e| crate::drop::Error {
punc: None,
code: e.code,
message: e.message,
details: crate::drop::ErrorDetails { path: e.path },
})
.collect();
let drop = crate::drop::Drop::with_errors(errors);
JsonB(serde_json::to_value(drop).unwrap())
}
}
Err(e) => {
// Schema Not Found or other fatal error
let error = crate::drop::Error {
punc: None,
code: e.code,
message: e.message,
details: crate::drop::ErrorDetails { path: e.path },
};
let drop = crate::drop::Drop::with_errors(vec![error]);
JsonB(serde_json::to_value(drop).unwrap())
}
}
} else {
JsonB(json!({
"punc": null,
"errors": [{
"code": "VALIDATOR_NOT_INITIALIZED",
"message": "JSON Schemas have not been cached yet. Run cache_json_schemas()",
"details": { "path": "" }
}]
}))
}
}
#[pg_extern(strict, parallel_safe)]
fn validate_json_schema(schema_id: &str, instance: JsonB) -> JsonB {
// 1. Acquire Snapshot
let validator_arc = {
let lock = GLOBAL_VALIDATOR.read().unwrap();
lock.clone()
};
// 2. Validate (Lock-Free)
if let Some(validator) = validator_arc {
match validator.validate(schema_id, &instance.0) {
Ok(result) => {
if result.is_valid() {
let drop = crate::drop::Drop::success();
JsonB(serde_json::to_value(drop).unwrap())
} else {
let errors: Vec<crate::drop::Error> = result
.errors
.into_iter()
.map(|e| crate::drop::Error {
punc: None,
code: e.code,
message: e.message,
details: crate::drop::ErrorDetails { path: e.path },
})
.collect();
let drop = crate::drop::Drop::with_errors(errors);
JsonB(serde_json::to_value(drop).unwrap())
}
}
Err(e) => {
let error = crate::drop::Error {
punc: None,
code: e.code,
message: e.message,
details: crate::drop::ErrorDetails { path: e.path },
};
let drop = crate::drop::Drop::with_errors(vec![error]);
JsonB(serde_json::to_value(drop).unwrap())
}
}
} else {
JsonB(json!({
"punc": null,
"errors": [{
"code": "VALIDATOR_NOT_INITIALIZED",
"message": "JSON Schemas have not been cached yet. Run cache_json_schemas()",
"details": { "path": "" }
}]
}))
}
}
#[pg_extern(strict, parallel_safe)]
fn json_schema_cached(schema_id: &str) -> bool {
if let Some(validator) = GLOBAL_VALIDATOR.read().unwrap().as_ref() {
match validator.validate(schema_id, &serde_json::Value::Null) {
Err(e) if e.code == "SCHEMA_NOT_FOUND" => false,
_ => true,
}
} else {
false
}
}
#[pg_extern(strict)]
fn clear_json_schemas() -> JsonB {
let mut lock = GLOBAL_VALIDATOR.write().unwrap();
*lock = None;
JsonB(json!({ "response": "success" }))
}
#[pg_extern(strict, parallel_safe)]
fn show_json_schemas() -> JsonB {
if let Some(_validator) = GLOBAL_VALIDATOR.read().unwrap().as_ref() {
JsonB(json!({ "response": "success", "status": "active" }))
} else {
JsonB(json!({ "response": "success", "status": "empty" }))
}
}
#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
use pgrx::prelude::*;
include!("tests.rs");
}
#[cfg(test)]
pub mod pg_test {
pub fn setup(_options: Vec<&str>) {
// perform any initialization common to all tests
}
pub fn postgresql_conf_options() -> Vec<&'static str> {
// return any postgresql.conf settings that are required for your tests
vec![]
}
}