Files
jspg/src/merger/mod.rs

941 lines
28 KiB
Rust

//! The `merger` module handles executing Postgres SPI directives dynamically based on JSON payloads
//! using the structurally isolated schema rules provided by the `Database` registry.
pub mod cache;
use crate::database::r#type::Type;
use crate::database::Database;
use serde_json::Value;
use std::sync::Arc;
pub struct Merger {
pub db: Arc<Database>,
pub cache: cache::StatementCache,
}
impl Merger {
pub fn new(db: Arc<Database>) -> Self {
Self {
db,
cache: cache::StatementCache::new(10_000),
}
}
pub fn merge(&self, schema_id: &str, data: Value) -> crate::drop::Drop {
let mut notifications_queue = Vec::new();
let target_schema = match self.db.schemas.get(schema_id) {
Some(s) => Arc::new(s.clone()),
None => {
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "MERGE_FAILED".to_string(),
message: format!("Unknown schema_id: {}", schema_id),
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: Some(data),
schema: None,
},
}]);
}
};
let result = self.merge_internal(target_schema, data.clone(), &mut notifications_queue);
let val_resolved = match result {
Ok(val) => val,
Err(msg) => {
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "MERGE_FAILED".to_string(),
message: msg,
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: Some(data),
schema: None,
},
}]);
}
};
// Execute the globally collected, pre-ordered notifications last!
for notify_sql in notifications_queue {
if let Err(e) = self.db.execute(&notify_sql, None) {
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
code: "MERGE_FAILED".to_string(),
message: format!("Executor Error in pre-ordered notify: {:?}", e),
details: crate::drop::ErrorDetails {
path: "".to_string(),
cause: None,
context: None,
schema: None,
},
}]);
}
}
let stripped_val = match val_resolved {
Value::Object(mut map) => {
let mut out = serde_json::Map::new();
if let Some(id) = map.remove("id") {
out.insert("id".to_string(), id);
}
Value::Object(out)
}
Value::Array(arr) => {
let mut out_arr = Vec::new();
for item in arr {
if let Value::Object(mut map) = item {
let mut out = serde_json::Map::new();
if let Some(id) = map.remove("id") {
out.insert("id".to_string(), id);
}
out_arr.push(Value::Object(out));
} else {
out_arr.push(Value::Null);
}
}
Value::Array(out_arr)
}
other => other,
};
crate::drop::Drop::success_with_val(stripped_val)
}
pub(crate) fn merge_internal(
&self,
schema: Arc<crate::database::schema::Schema>,
data: Value,
notifications: &mut Vec<String>,
) -> Result<Value, String> {
match data {
Value::Array(items) => self.merge_array(schema, items, notifications),
Value::Object(map) => self.merge_object(schema, map, notifications),
_ => Err("Invalid merge payload: root must be an Object or Array".to_string()),
}
}
fn merge_array(
&self,
schema: Arc<crate::database::schema::Schema>,
items: Vec<Value>,
notifications: &mut Vec<String>,
) -> Result<Value, String> {
let mut item_schema = schema.clone();
if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) = &schema.obj.type_ {
if t == "array" {
if let Some(items_def) = &schema.obj.items {
item_schema = items_def.clone();
}
}
}
let mut resolved_items = Vec::new();
for item in items {
let resolved = self.merge_internal(item_schema.clone(), item, notifications)?;
resolved_items.push(resolved);
}
Ok(Value::Array(resolved_items))
}
fn merge_object(
&self,
schema: Arc<crate::database::schema::Schema>,
obj: serde_json::Map<String, Value>,
notifications: &mut Vec<String>,
) -> Result<Value, String> {
let queue_start = notifications.len();
let type_name = match obj.get("type").and_then(|v| v.as_str()) {
Some(t) => t.to_string(),
None => return Err("Missing required 'type' field on object".to_string()),
};
let type_def = match self.db.types.get(&type_name) {
Some(t) => t,
None => return Err(format!("Unknown entity type: {}", type_name)),
};
let compiled_props = match schema.obj.compiled_properties.get() {
Some(props) => props,
None => return Err("Schema has no compiled properties for merging".to_string()),
};
let mut entity_fields = serde_json::Map::new();
let mut entity_objects = std::collections::BTreeMap::new();
let mut entity_arrays = std::collections::BTreeMap::new();
for (k, v) in obj {
// Always retain system and unmapped core fields natively implicitly mapped to the Postgres tables
if k == "id" || k == "type" || k == "created" {
entity_fields.insert(k.clone(), v.clone());
continue;
}
if let Some(prop_schema) = compiled_props.get(&k) {
let mut is_edge = false;
if let Some(edges) = schema.obj.compiled_edges.get() {
if edges.contains_key(&k) {
is_edge = true;
}
}
if is_edge {
let typeof_v = match &v {
Value::Object(_) => "object",
Value::Array(_) => "array",
_ => "field", // Malformed edge data?
};
if typeof_v == "object" {
entity_objects.insert(k.clone(), (v.clone(), prop_schema.clone()));
} else if typeof_v == "array" {
entity_arrays.insert(k.clone(), (v.clone(), prop_schema.clone()));
} else {
entity_fields.insert(k.clone(), v.clone());
}
} else {
// Not an edge! It's a raw Postgres column (e.g., JSONB, text[])
entity_fields.insert(k.clone(), v.clone());
}
} else if type_def.fields.contains(&k) {
entity_fields.insert(k.clone(), v.clone());
}
}
let user_id = self.db.auth_user_id()?;
let timestamp = self.db.timestamp()?;
let mut entity_change_kind = None;
let mut entity_fetched = None;
if !type_def.relationship {
let (fields, kind, fetched) =
self.stage_entity(entity_fields.clone(), type_def, &user_id, &timestamp)?;
entity_fields = fields;
entity_change_kind = kind;
entity_fetched = fetched;
}
let mut entity_response = serde_json::Map::new();
for (relation_name, (relative_val, rel_schema)) in entity_objects {
let mut relative = match relative_val {
Value::Object(m) => m,
_ => continue,
};
let relative_type_name = match relative.get("type").and_then(|v| v.as_str()) {
Some(t) => t.to_string(),
None => continue,
};
if let Some(compiled_edges) = schema.obj.compiled_edges.get() {
println!("Compiled Edges keys for relation {}: {:?}", relation_name, compiled_edges.keys().collect::<Vec<_>>());
if let Some(edge) = compiled_edges.get(&relation_name) {
println!("FOUND EDGE {} -> {:?}", relation_name, edge.constraint);
if let Some(relation) = self.db.relations.get(&edge.constraint) {
let parent_is_source = edge.forward;
if parent_is_source {
if !relative.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
relative.insert("organization_id".to_string(), org_id.clone());
}
}
let mut merged_relative = match self.merge_internal(rel_schema.clone(), Value::Object(relative), notifications)? {
Value::Object(m) => m,
_ => continue,
};
merged_relative.insert(
"type".to_string(),
Value::String(relative_type_name),
);
Self::apply_entity_relation(
&mut entity_fields,
&relation.source_columns,
&relation.destination_columns,
&merged_relative,
);
entity_response.insert(relation_name, Value::Object(merged_relative));
} else {
if !relative.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
relative.insert("organization_id".to_string(), org_id.clone());
}
}
Self::apply_entity_relation(
&mut relative,
&relation.source_columns,
&relation.destination_columns,
&entity_fields,
);
let merged_relative = match self.merge_internal(rel_schema.clone(), Value::Object(relative), notifications)? {
Value::Object(m) => m,
_ => continue,
};
entity_response.insert(relation_name, Value::Object(merged_relative));
}
}
}
}
}
if type_def.relationship {
let (fields, kind, fetched) =
self.stage_entity(entity_fields.clone(), type_def, &user_id, &timestamp)?;
entity_fields = fields;
entity_change_kind = kind;
entity_fetched = fetched;
}
self.merge_entity_fields(
entity_change_kind.as_deref().unwrap_or(""),
&type_name,
type_def,
&entity_fields,
entity_fetched.as_ref(),
)?;
for (k, v) in &entity_fields {
entity_response.insert(k.clone(), v.clone());
}
for (relation_name, (relative_val, rel_schema)) in entity_arrays {
let relative_arr = match relative_val {
Value::Array(a) => a,
_ => continue,
};
if relative_arr.is_empty() {
continue;
}
if let Some(compiled_edges) = schema.obj.compiled_edges.get() {
if let Some(edge) = compiled_edges.get(&relation_name) {
if let Some(relation) = self.db.relations.get(&edge.constraint) {
let mut relative_responses = Vec::new();
for relative_item_val in relative_arr {
if let Value::Object(mut relative_item) = relative_item_val {
if !relative_item.contains_key("organization_id") {
if let Some(org_id) = entity_fields.get("organization_id") {
relative_item.insert("organization_id".to_string(), org_id.clone());
}
}
Self::apply_entity_relation(
&mut relative_item,
&relation.source_columns,
&relation.destination_columns,
&entity_fields,
);
let mut item_schema = rel_schema.clone();
if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) = &rel_schema.obj.type_ {
if t == "array" {
if let Some(items_def) = &rel_schema.obj.items {
item_schema = items_def.clone();
}
}
}
let merged_relative =
match self.merge_internal(item_schema, Value::Object(relative_item), notifications)? {
Value::Object(m) => m,
_ => continue,
};
relative_responses.push(Value::Object(merged_relative));
}
}
entity_response.insert(relation_name, Value::Array(relative_responses));
}
}
}
}
// 7. Perform change tracking dynamically suppressing noise based on type bounds!
let notify_sql = self.merge_entity_change(
type_def,
&entity_fields,
entity_fetched.as_ref(),
entity_change_kind.as_deref(),
&user_id,
&timestamp,
)?;
if let Some(sql) = notify_sql {
notifications.insert(queue_start, sql);
}
// Produce the full tree response
let mut final_response = serde_json::Map::new();
if let Some(fetched) = entity_fetched {
for (k, v) in fetched {
final_response.insert(k, v);
}
}
for (k, v) in entity_response {
final_response.insert(k, v);
}
Ok(Value::Object(final_response))
}
fn stage_entity(
&self,
mut entity_fields: serde_json::Map<String, Value>,
type_def: &crate::database::r#type::Type,
user_id: &str,
timestamp: &str,
) -> Result<
(
serde_json::Map<String, Value>,
Option<String>,
Option<serde_json::Map<String, Value>>,
),
String,
> {
let type_name = type_def.name.as_str();
let entity_fetched = self.fetch_entity(&entity_fields, type_def)?;
let system_keys = vec![
"id".to_string(),
"type".to_string(),
"created_by".to_string(),
"modified_by".to_string(),
"created_at".to_string(),
"modified_at".to_string(),
];
let changes = self.compare_entities(
entity_fetched.as_ref(),
&entity_fields,
&type_def.fields,
&system_keys,
);
let mut entity_change_kind = None;
if entity_fetched.is_none() {
let entity_id = entity_fields
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("");
let id_val = if entity_id.is_empty() {
Value::String(uuid::Uuid::new_v4().to_string())
} else {
Value::String(entity_id.to_string())
};
entity_change_kind = Some("create".to_string());
let mut new_fields = changes.clone();
new_fields.insert("id".to_string(), id_val);
new_fields.insert("type".to_string(), Value::String(type_name.to_string()));
new_fields.insert("created_by".to_string(), Value::String(user_id.to_string()));
new_fields.insert(
"created_at".to_string(),
Value::String(timestamp.to_string()),
);
new_fields.insert(
"modified_by".to_string(),
Value::String(user_id.to_string()),
);
new_fields.insert(
"modified_at".to_string(),
Value::String(timestamp.to_string()),
);
entity_fields = new_fields;
} else if changes.is_empty() {
let mut new_fields = serde_json::Map::new();
new_fields.insert(
"id".to_string(),
entity_fetched.as_ref().unwrap().get("id").unwrap().clone(),
);
new_fields.insert("type".to_string(), Value::String(type_name.to_string()));
entity_fields = new_fields;
} else {
let is_archived = changes
.get("archived")
.and_then(|v| v.as_bool())
.unwrap_or(false);
entity_change_kind = if is_archived {
Some("delete".to_string())
} else {
Some("update".to_string())
};
let mut new_fields = changes.clone();
new_fields.insert(
"id".to_string(),
entity_fetched.as_ref().unwrap().get("id").unwrap().clone(),
);
new_fields.insert("type".to_string(), Value::String(type_name.to_string()));
new_fields.insert(
"modified_by".to_string(),
Value::String(user_id.to_string()),
);
new_fields.insert(
"modified_at".to_string(),
Value::String(timestamp.to_string()),
);
entity_fields = new_fields;
}
Ok((entity_fields, entity_change_kind, entity_fetched))
}
fn fetch_entity(
&self,
entity_fields: &serde_json::Map<String, Value>,
entity_type: &crate::database::r#type::Type,
) -> Result<Option<serde_json::Map<String, Value>>, String> {
let id_val = entity_fields.get("id");
let entity_type_name = entity_type.name.as_str();
let mut lookup_complete = false;
if !entity_type.lookup_fields.is_empty() {
lookup_complete = true;
for column in &entity_type.lookup_fields {
match entity_fields.get(column) {
Some(Value::Null) | None => {
lookup_complete = false;
break;
}
Some(Value::String(s)) if s.is_empty() => {
lookup_complete = false;
break;
}
_ => {}
}
}
}
if id_val.is_none() && !lookup_complete {
return Ok(None);
}
let fetch_sql_template = if let Some(cached) = self.cache.get(entity_type_name) {
cached
} else {
let mut select_list = String::from("to_jsonb(t1.*)");
let mut join_clauses = format!("FROM agreego.\"{}\" t1", entity_type.hierarchy[0]);
for (i, table_name) in entity_type.hierarchy.iter().enumerate().skip(1) {
let t_alias = format!("t{}", i + 1);
join_clauses.push_str(&format!(
" LEFT JOIN agreego.\"{}\" {} ON {}.id = t1.id",
table_name, t_alias, t_alias
));
select_list.push_str(&format!(" || to_jsonb({}.*)", t_alias));
}
let template = format!("SELECT {} {}", select_list, join_clauses);
self
.cache
.insert(entity_type_name.to_string(), template.clone());
template
};
let where_clause = if let Some(id) = id_val {
format!("WHERE t1.id = {}", Self::quote_literal(id))
} else if lookup_complete {
let mut lookup_predicates = Vec::new();
for column in &entity_type.lookup_fields {
let val = entity_fields.get(column).unwrap_or(&Value::Null);
if column == "type" {
lookup_predicates.push(format!("t1.\"{}\" = {}", column, Self::quote_literal(val)));
} else {
lookup_predicates.push(format!("\"{}\" = {}", column, Self::quote_literal(val)));
}
}
format!("WHERE {}", lookup_predicates.join(" AND "))
} else {
return Ok(None);
};
let final_sql = format!("{} {}", fetch_sql_template, where_clause);
let fetched = match self.db.query(&final_sql, None) {
Ok(Value::Array(table)) => {
if table.len() > 1 {
Err(format!(
"TOO_MANY_LOOKUP_ROWS: Lookup for {} found too many existing rows",
entity_type_name
))
} else if table.is_empty() {
Ok(None)
} else {
let row = table.first().unwrap();
match row {
Value::Object(map) => Ok(Some(map.clone())),
other => Err(format!("Expected JSON object, got: {:?}", other)),
}
}
}
Ok(_) => Err("Expected array from query in fetch_entity".to_string()),
Err(e) => Err(format!("SPI error in fetch_entity: {:?}", e)),
}?;
Ok(fetched)
}
fn merge_entity_fields(
&self,
change_kind: &str,
entity_type_name: &str,
entity_type: &crate::database::r#type::Type,
entity_fields: &serde_json::Map<String, Value>,
_entity_fetched: Option<&serde_json::Map<String, Value>>,
) -> Result<(), String> {
if change_kind.is_empty() {
return Ok(());
}
let id_str = match entity_fields.get("id").and_then(|v| v.as_str()) {
Some(id) => id,
None => return Err("Missing 'id' for merge execution".to_string()),
};
let grouped_fields = match &entity_type.grouped_fields {
Some(Value::Object(map)) => map,
_ => {
return Err(format!(
"Grouped fields missing for type {}",
entity_type_name
));
}
};
let mut execute_order: Vec<String> = entity_type.hierarchy.clone();
if change_kind == "create" {
execute_order.reverse();
}
for table_name in execute_order {
let table_fields = match grouped_fields.get(&table_name).and_then(|v| v.as_array()) {
Some(arr) => arr
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect::<Vec<_>>(),
None => continue,
};
let mut entity_pairs = serde_json::Map::new();
for (k, v) in entity_fields {
if table_fields.contains(k) {
entity_pairs.insert(k.clone(), v.clone());
}
}
if change_kind == "create" {
if !entity_pairs.contains_key("id") && table_fields.contains(&"id".to_string()) {
entity_pairs.insert("id".to_string(), Value::String(id_str.to_string()));
}
if !entity_pairs.contains_key("type") && table_fields.contains(&"type".to_string()) {
entity_pairs.insert(
"type".to_string(),
Value::String(entity_type_name.to_string()),
);
}
let mut columns = Vec::new();
let mut values = Vec::new();
let mut sorted_keys: Vec<_> = entity_pairs.keys().cloned().collect();
sorted_keys.sort();
for key in &sorted_keys {
columns.push(format!("\"{}\"", key));
let val = entity_pairs.get(key).unwrap();
values.push(Self::format_sql_value(val, key, entity_type));
}
if columns.is_empty() {
continue;
}
let sql = format!(
"INSERT INTO agreego.\"{}\" ({}) VALUES ({})",
table_name,
columns.join(", "),
values.join(", ")
);
self
.db
.execute(&sql, None)
.map_err(|e| format!("SPI Error in INSERT: {:?}", e))?;
} else if change_kind == "update" || change_kind == "delete" {
entity_pairs.remove("id");
entity_pairs.remove("type");
if entity_pairs.is_empty() {
continue;
}
let mut set_clauses = Vec::new();
let mut sorted_keys: Vec<_> = entity_pairs.keys().cloned().collect();
sorted_keys.sort();
for key in &sorted_keys {
let val = entity_pairs.get(key).unwrap();
if val.as_str() == Some("") {
set_clauses.push(format!("\"{}\" = NULL", key));
} else {
set_clauses.push(format!(
"\"{}\" = {}",
key,
Self::format_sql_value(val, key, entity_type)
));
}
}
let sql = format!(
"UPDATE agreego.\"{}\" SET {} WHERE id = {}",
table_name,
set_clauses.join(", "),
Self::quote_literal(&Value::String(id_str.to_string()))
);
self
.db
.execute(&sql, None)
.map_err(|e| format!("SPI Error in UPDATE: {:?}", e))?;
}
}
Ok(())
}
fn merge_entity_change(
&self,
type_obj: &Type,
entity_fields: &serde_json::Map<String, Value>,
entity_fetched: Option<&serde_json::Map<String, Value>>,
entity_change_kind: Option<&str>,
user_id: &str,
timestamp: &str,
) -> Result<Option<String>, String> {
let change_kind = match entity_change_kind {
Some(k) => k,
None => return Ok(None),
};
let id_str = entity_fields.get("id").unwrap();
let type_name = entity_fields.get("type").unwrap();
let mut old_vals = serde_json::Map::new();
let mut new_vals = serde_json::Map::new();
let is_update = change_kind == "update" || change_kind == "delete";
if !is_update {
let system_keys = vec![
"id".to_string(),
"created_by".to_string(),
"modified_by".to_string(),
"created_at".to_string(),
"modified_at".to_string(),
];
for (k, v) in entity_fields {
if !system_keys.contains(k) {
new_vals.insert(k.clone(), v.clone());
}
}
} else {
let system_keys = vec![
"id".to_string(),
"type".to_string(),
"created_by".to_string(),
"modified_by".to_string(),
"created_at".to_string(),
"modified_at".to_string(),
];
for (k, v) in entity_fields {
if !system_keys.contains(k) {
if let Some(fetched) = entity_fetched {
let old_val = fetched.get(k).unwrap_or(&Value::Null);
if v != old_val {
new_vals.insert(k.clone(), v.clone());
old_vals.insert(k.clone(), old_val.clone());
}
}
}
}
new_vals.insert("type".to_string(), type_name.clone());
}
let mut complete = entity_fields.clone();
if is_update {
if let Some(fetched) = entity_fetched {
let mut temp = fetched.clone();
for (k, v) in entity_fields {
temp.insert(k.clone(), v.clone());
}
complete = temp;
}
}
let new_val_obj = Value::Object(new_vals);
let old_val_obj = if old_vals.is_empty() {
Value::Null
} else {
Value::Object(old_vals)
};
let mut notification = serde_json::Map::new();
notification.insert("complete".to_string(), Value::Object(complete));
notification.insert("new".to_string(), new_val_obj.clone());
if old_val_obj != Value::Null {
notification.insert("old".to_string(), old_val_obj.clone());
}
let mut notify_sql = None;
if type_obj.historical {
let change_sql = format!(
"INSERT INTO agreego.change (\"old\", \"new\", entity_id, id, kind, modified_at, modified_by) VALUES ({}, {}, {}, {}, {}, {}, {})",
Self::quote_literal(&old_val_obj),
Self::quote_literal(&new_val_obj),
Self::quote_literal(id_str),
Self::quote_literal(&Value::String(uuid::Uuid::new_v4().to_string())),
Self::quote_literal(&Value::String(change_kind.to_string())),
Self::quote_literal(&Value::String(timestamp.to_string())),
Self::quote_literal(&Value::String(user_id.to_string()))
);
self
.db
.execute(&change_sql, None)
.map_err(|e| format!("Executor Error in change: {:?}", e))?;
}
if type_obj.notify {
notify_sql = Some(format!(
"SELECT pg_notify('entity', {})",
Self::quote_literal(&Value::String(Value::Object(notification).to_string()))
));
}
Ok(notify_sql)
}
fn compare_entities(
&self,
fetched_entity: Option<&serde_json::Map<String, Value>>,
new_fields: &serde_json::Map<String, Value>,
type_fields: &[String],
system_keys: &[String],
) -> serde_json::Map<String, Value> {
let mut changes = serde_json::Map::new();
if fetched_entity.is_none() {
for (k, v) in new_fields {
if type_fields.contains(k) && !system_keys.contains(k) {
changes.insert(k.clone(), v.clone());
}
}
return changes;
}
let old_map = fetched_entity.unwrap();
for (k, v) in new_fields {
if type_fields.contains(k) && !system_keys.contains(k) {
let old_val = old_map.get(k).unwrap_or(&Value::Null);
if v != old_val {
changes.insert(k.clone(), v.clone());
}
}
}
changes
}
// Helper Functions
fn apply_entity_relation(
source_entity: &mut serde_json::Map<String, Value>,
source_columns: &[String],
destination_columns: &[String],
destination_entity: &serde_json::Map<String, Value>,
) {
if source_columns.len() != destination_columns.len() {
return;
}
for i in 0..source_columns.len() {
if let Some(dest_val) = destination_entity.get(&destination_columns[i]) {
source_entity.insert(source_columns[i].clone(), dest_val.clone());
}
}
}
fn format_sql_value(val: &Value, key: &str, entity_type: &Type) -> String {
if val.as_str() == Some("") {
return "NULL".to_string();
}
let mut is_pg_array = false;
if let Some(field_types_map) = entity_type.field_types.as_ref().and_then(|v| v.as_object()) {
if let Some(t_val) = field_types_map.get(key) {
if let Some(t_str) = t_val.as_str() {
if t_str.starts_with('_') {
is_pg_array = true;
}
}
}
}
if is_pg_array && val.is_array() {
let mut s = val.to_string();
if s.starts_with('[') && s.ends_with(']') {
s.replace_range(0..1, "{");
s.replace_range(s.len() - 1..s.len(), "}");
}
Self::quote_literal(&Value::String(s))
} else {
Self::quote_literal(val)
}
}
fn quote_literal(val: &Value) -> String {
match val {
Value::Null => "NULL".to_string(),
Value::Bool(b) => {
if *b {
"true".to_string()
} else {
"false".to_string()
}
}
Value::Number(n) => {
if let Some(f) = n.as_f64() {
if f.fract() == 0.0 {
return f.trunc().to_string();
}
}
n.to_string()
}
Value::String(s) => {
if s.is_empty() {
"NULL".to_string()
} else {
format!("'{}'", s.replace('\'', "''"))
}
}
_ => format!(
"'{}'",
serde_json::to_string(val).unwrap().replace('\'', "''")
),
}
}
}