queryer merger test progress
This commit is contained in:
@ -23,86 +23,34 @@ impl SqlCompiler {
|
||||
.get(schema_id)
|
||||
.ok_or_else(|| format!("Schema not found: {}", schema_id))?;
|
||||
|
||||
let resolved_arc;
|
||||
let target_schema = if let Some(path) = stem_path.filter(|p| !p.is_empty() && *p != "/") {
|
||||
self.resolve_stem(schema, path)?
|
||||
if let Some(stems_map) = self.db.stems.get(schema_id) {
|
||||
if let Some(stem) = stems_map.get(path) {
|
||||
resolved_arc = stem.schema.clone();
|
||||
} else {
|
||||
return Err(format!(
|
||||
"Stem entity type '{}' not found in schema '{}'",
|
||||
path, schema_id
|
||||
));
|
||||
}
|
||||
} else {
|
||||
return Err(format!(
|
||||
"Stem entity type '{}' not found in schema '{}'",
|
||||
path, schema_id
|
||||
));
|
||||
}
|
||||
resolved_arc.as_ref()
|
||||
} else {
|
||||
schema
|
||||
};
|
||||
|
||||
// 1. We expect the top level to typically be an Object or Array
|
||||
let (sql, _) = self.walk_schema(target_schema, "t1", None, filter_keys)?;
|
||||
// We expect the top level to typically be an Object or Array
|
||||
let is_stem_query = stem_path.is_some();
|
||||
let (sql, _) = self.walk_schema(target_schema, "t1", None, filter_keys, is_stem_query, 0)?;
|
||||
Ok(sql)
|
||||
}
|
||||
|
||||
fn resolve_stem<'a>(
|
||||
&'a self,
|
||||
mut schema: &'a crate::database::schema::Schema,
|
||||
path: &str,
|
||||
) -> Result<&'a crate::database::schema::Schema, String> {
|
||||
let parts: Vec<&str> = path.trim_start_matches('/').split('/').collect();
|
||||
for part in parts {
|
||||
let mut current = schema;
|
||||
let mut depth = 0;
|
||||
while let Some(r) = ¤t.obj.r#ref {
|
||||
if let Some(s) = self.db.schemas.get(r) {
|
||||
current = s;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
depth += 1;
|
||||
if depth > 20 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if current.obj.properties.is_none() && current.obj.items.is_some() {
|
||||
if let Some(items) = ¤t.obj.items {
|
||||
current = items;
|
||||
let mut depth2 = 0;
|
||||
while let Some(r) = ¤t.obj.r#ref {
|
||||
if let Some(s) = self.db.schemas.get(r) {
|
||||
current = s;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
depth2 += 1;
|
||||
if depth2 > 20 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(props) = ¤t.obj.properties {
|
||||
if let Some(next_schema) = props.get(part) {
|
||||
schema = next_schema;
|
||||
} else {
|
||||
return Err(format!("Stem part '{}' not found in schema", part));
|
||||
}
|
||||
} else {
|
||||
return Err(format!(
|
||||
"Cannot resolve stem part '{}': not an object",
|
||||
part
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let mut current = schema;
|
||||
let mut depth = 0;
|
||||
while let Some(r) = ¤t.obj.r#ref {
|
||||
if let Some(s) = self.db.schemas.get(r) {
|
||||
current = s;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
depth += 1;
|
||||
if depth > 20 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(current)
|
||||
}
|
||||
|
||||
/// Recursively walks the schema AST emitting native PostgreSQL jsonb mapping
|
||||
/// Returns a tuple of (SQL_String, Field_Type)
|
||||
fn walk_schema(
|
||||
@ -111,6 +59,8 @@ impl SqlCompiler {
|
||||
parent_alias: &str,
|
||||
prop_name_context: Option<&str>,
|
||||
filter_keys: &[String],
|
||||
is_stem_query: bool,
|
||||
depth: usize,
|
||||
) -> Result<(String, String), String> {
|
||||
// Determine the base schema type (could be an array, object, or literal)
|
||||
match &schema.obj.type_ {
|
||||
@ -119,6 +69,9 @@ impl SqlCompiler {
|
||||
if let Some(items) = &schema.obj.items {
|
||||
if let Some(ref_id) = &items.obj.r#ref {
|
||||
if let Some(type_def) = self.db.types.get(ref_id) {
|
||||
if is_stem_query && depth > 0 {
|
||||
return Ok(("".to_string(), "abort".to_string()));
|
||||
}
|
||||
return self.compile_entity_node(
|
||||
items,
|
||||
type_def,
|
||||
@ -126,11 +79,19 @@ impl SqlCompiler {
|
||||
prop_name_context,
|
||||
true,
|
||||
filter_keys,
|
||||
is_stem_query,
|
||||
depth,
|
||||
);
|
||||
}
|
||||
}
|
||||
let (item_sql, _) =
|
||||
self.walk_schema(items, parent_alias, prop_name_context, filter_keys)?;
|
||||
let (item_sql, _) = self.walk_schema(
|
||||
items,
|
||||
parent_alias,
|
||||
prop_name_context,
|
||||
filter_keys,
|
||||
is_stem_query,
|
||||
depth + 1,
|
||||
)?;
|
||||
return Ok((
|
||||
format!("(SELECT jsonb_agg({}) FROM TODO)", item_sql),
|
||||
"array".to_string(),
|
||||
@ -143,29 +104,57 @@ impl SqlCompiler {
|
||||
))
|
||||
}
|
||||
_ => {
|
||||
// Handle Objects & Direct Refs
|
||||
if let Some(ref_id) = &schema.obj.r#ref {
|
||||
// If it's a $ref, check if it points to an Entity Type
|
||||
if let Some(type_def) = self.db.types.get(ref_id) {
|
||||
return self.compile_entity_node(
|
||||
schema,
|
||||
type_def,
|
||||
parent_alias,
|
||||
prop_name_context,
|
||||
false,
|
||||
filter_keys,
|
||||
);
|
||||
// Determine if this schema represents a Database Entity
|
||||
let mut resolved_type = None;
|
||||
|
||||
// Target is generally a specific schema (e.g. 'base.person'), but it tells us what physical
|
||||
// database table hierarchy it maps to via the `schema.id` prefix/suffix convention.
|
||||
if let Some(lookup_key) = schema.obj.id.as_ref().or(schema.obj.r#ref.as_ref()) {
|
||||
let base_type_name = lookup_key.split('.').next_back().unwrap_or("").to_string();
|
||||
resolved_type = self.db.types.get(&base_type_name);
|
||||
}
|
||||
|
||||
if let Some(type_def) = resolved_type {
|
||||
if is_stem_query && depth > 0 {
|
||||
return Ok(("".to_string(), "abort".to_string()));
|
||||
}
|
||||
return self.compile_entity_node(
|
||||
schema,
|
||||
type_def,
|
||||
parent_alias,
|
||||
prop_name_context,
|
||||
false,
|
||||
filter_keys,
|
||||
is_stem_query,
|
||||
depth,
|
||||
);
|
||||
}
|
||||
|
||||
// Handle Direct Refs
|
||||
if let Some(ref_id) = &schema.obj.r#ref {
|
||||
// If it's just an ad-hoc struct ref, we should resolve it
|
||||
if let Some(target_schema) = self.db.schemas.get(ref_id) {
|
||||
return self.walk_schema(target_schema, parent_alias, prop_name_context, filter_keys);
|
||||
return self.walk_schema(
|
||||
target_schema,
|
||||
parent_alias,
|
||||
prop_name_context,
|
||||
filter_keys,
|
||||
is_stem_query,
|
||||
depth,
|
||||
);
|
||||
}
|
||||
return Err(format!("Unresolved $ref: {}", ref_id));
|
||||
}
|
||||
|
||||
// Just an inline object definition?
|
||||
if let Some(props) = &schema.obj.properties {
|
||||
return self.compile_inline_object(props, parent_alias, filter_keys);
|
||||
return self.compile_inline_object(
|
||||
props,
|
||||
parent_alias,
|
||||
filter_keys,
|
||||
is_stem_query,
|
||||
depth,
|
||||
);
|
||||
}
|
||||
|
||||
// Literal fallback
|
||||
@ -181,6 +170,27 @@ impl SqlCompiler {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_merged_properties(
|
||||
&self,
|
||||
schema: &crate::database::schema::Schema,
|
||||
) -> std::collections::BTreeMap<String, Arc<crate::database::schema::Schema>> {
|
||||
let mut props = std::collections::BTreeMap::new();
|
||||
|
||||
if let Some(ref_id) = &schema.obj.r#ref {
|
||||
if let Some(parent_schema) = self.db.schemas.get(ref_id) {
|
||||
props.extend(self.get_merged_properties(parent_schema));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(local_props) = &schema.obj.properties {
|
||||
for (k, v) in local_props {
|
||||
props.insert(k.clone(), v.clone());
|
||||
}
|
||||
}
|
||||
|
||||
props
|
||||
}
|
||||
|
||||
fn compile_entity_node(
|
||||
&self,
|
||||
schema: &crate::database::schema::Schema,
|
||||
@ -189,6 +199,8 @@ impl SqlCompiler {
|
||||
prop_name: Option<&str>,
|
||||
is_array: bool,
|
||||
filter_keys: &[String],
|
||||
is_stem_query: bool,
|
||||
depth: usize,
|
||||
) -> Result<(String, String), String> {
|
||||
// We are compiling a query block for an Entity.
|
||||
let mut select_args = Vec::new();
|
||||
@ -220,35 +232,45 @@ impl SqlCompiler {
|
||||
// grouped_fields is { "person": ["first_name", ...], "user": ["password"], ... }
|
||||
let grouped_fields = type_def.grouped_fields.as_ref().and_then(|v| v.as_object());
|
||||
|
||||
if let Some(props) = &schema.obj.properties {
|
||||
for (prop_key, prop_schema) in props {
|
||||
// Find which table owns this property
|
||||
// Find which table owns this property
|
||||
let mut owner_alias = table_aliases
|
||||
.get("entity")
|
||||
.cloned()
|
||||
.unwrap_or_else(|| format!("{}_t_err", parent_alias));
|
||||
let merged_props = self.get_merged_properties(schema);
|
||||
for (prop_key, prop_schema) in &merged_props {
|
||||
// Find which table owns this property
|
||||
// Find which table owns this property
|
||||
let mut owner_alias = table_aliases
|
||||
.get("entity")
|
||||
.cloned()
|
||||
.unwrap_or_else(|| format!("{}_t_err", parent_alias));
|
||||
|
||||
if let Some(gf) = grouped_fields {
|
||||
for (t_name, fields_val) in gf {
|
||||
if let Some(fields_arr) = fields_val.as_array() {
|
||||
if fields_arr.iter().any(|v| v.as_str() == Some(prop_key)) {
|
||||
owner_alias = table_aliases
|
||||
.get(t_name)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| parent_alias.to_string());
|
||||
break;
|
||||
}
|
||||
if let Some(gf) = grouped_fields {
|
||||
for (t_name, fields_val) in gf {
|
||||
if let Some(fields_arr) = fields_val.as_array() {
|
||||
if fields_arr.iter().any(|v| v.as_str() == Some(prop_key)) {
|
||||
owner_alias = table_aliases
|
||||
.get(t_name)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| parent_alias.to_string());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now we know `owner_alias`, e.g., `parent_t1` or `parent_t3`.
|
||||
// Walk the property to get its SQL value
|
||||
let (val_sql, _) =
|
||||
self.walk_schema(prop_schema, &owner_alias, Some(prop_key), filter_keys)?;
|
||||
select_args.push(format!("'{}', {}", prop_key, val_sql));
|
||||
}
|
||||
|
||||
// Now we know `owner_alias`, e.g., `parent_t1` or `parent_t3`.
|
||||
// Walk the property to get its SQL value
|
||||
let (val_sql, val_type) = self.walk_schema(
|
||||
prop_schema,
|
||||
&owner_alias,
|
||||
Some(prop_key),
|
||||
filter_keys,
|
||||
is_stem_query,
|
||||
depth + 1,
|
||||
)?;
|
||||
|
||||
if val_type == "abort" {
|
||||
continue;
|
||||
}
|
||||
|
||||
select_args.push(format!("'{}', {}", prop_key, val_sql));
|
||||
}
|
||||
|
||||
let jsonb_obj_sql = if select_args.is_empty() {
|
||||
@ -266,7 +288,7 @@ impl SqlCompiler {
|
||||
where_clauses.push(format!("NOT {}.archived", base_alias));
|
||||
// Filter Mapping - Only append filters if this is the ROOT table query (i.e. parent_alias is "t1")
|
||||
// Because cue.filters operates strictly on top-level root properties right now.
|
||||
if parent_alias == "t1" && prop_name.is_none() {
|
||||
if parent_alias == "t1" {
|
||||
for (i, filter_key) in filter_keys.iter().enumerate() {
|
||||
// Find which table owns this filter key
|
||||
let mut filter_alias = base_alias.clone(); // default to root table (e.g. t3 entity)
|
||||
@ -288,23 +310,36 @@ impl SqlCompiler {
|
||||
let mut is_ilike = false;
|
||||
let mut cast = "";
|
||||
|
||||
// Check schema for filter_key to determine datatype operation
|
||||
if let Some(props) = &schema.obj.properties {
|
||||
if let Some(ps) = props.get(filter_key) {
|
||||
let is_enum = ps.obj.enum_.is_some();
|
||||
if let Some(crate::database::schema::SchemaTypeOrArray::Single(t)) = &ps.obj.type_ {
|
||||
if t == "string" {
|
||||
if ps.obj.format.as_deref() == Some("uuid") {
|
||||
cast = "::uuid";
|
||||
} else if ps.obj.format.as_deref() == Some("date-time") {
|
||||
cast = "::timestamptz";
|
||||
} else if !is_enum {
|
||||
// Use PostgreSQL column type metadata for exact argument casting
|
||||
if let Some(field_types) = type_def.field_types.as_ref().and_then(|v| v.as_object()) {
|
||||
if let Some(pg_type_val) = field_types.get(filter_key) {
|
||||
if let Some(pg_type) = pg_type_val.as_str() {
|
||||
if pg_type == "uuid" {
|
||||
cast = "::uuid";
|
||||
} else if pg_type == "boolean" || pg_type == "bool" {
|
||||
cast = "::boolean";
|
||||
} else if pg_type.contains("timestamp")
|
||||
|| pg_type == "timestamptz"
|
||||
|| pg_type == "date"
|
||||
{
|
||||
cast = "::timestamptz";
|
||||
} else if pg_type == "numeric"
|
||||
|| pg_type.contains("int")
|
||||
|| pg_type == "real"
|
||||
|| pg_type == "double precision"
|
||||
{
|
||||
cast = "::numeric";
|
||||
} else if pg_type == "text" || pg_type.contains("char") {
|
||||
// Determine if this is an enum in the schema locally to avoid ILIKE on strict enums
|
||||
let mut is_enum = false;
|
||||
if let Some(props) = &schema.obj.properties {
|
||||
if let Some(ps) = props.get(filter_key) {
|
||||
is_enum = ps.obj.enum_.is_some();
|
||||
}
|
||||
}
|
||||
if !is_enum {
|
||||
is_ilike = true;
|
||||
}
|
||||
} else if t == "boolean" {
|
||||
cast = "::boolean";
|
||||
} else if t == "integer" || t == "number" {
|
||||
cast = "::numeric";
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -357,10 +392,22 @@ impl SqlCompiler {
|
||||
props: &std::collections::BTreeMap<String, std::sync::Arc<crate::database::schema::Schema>>,
|
||||
parent_alias: &str,
|
||||
filter_keys: &[String],
|
||||
is_stem_query: bool,
|
||||
depth: usize,
|
||||
) -> Result<(String, String), String> {
|
||||
let mut build_args = Vec::new();
|
||||
for (k, v) in props {
|
||||
let (child_sql, _) = self.walk_schema(v, parent_alias, Some(k), filter_keys)?;
|
||||
let (child_sql, val_type) = self.walk_schema(
|
||||
v,
|
||||
parent_alias,
|
||||
Some(k),
|
||||
filter_keys,
|
||||
is_stem_query,
|
||||
depth + 1,
|
||||
)?;
|
||||
if val_type == "abort" {
|
||||
continue;
|
||||
}
|
||||
build_args.push(format!("'{}', {}", k, child_sql));
|
||||
}
|
||||
let combined = format!("jsonb_build_object({})", build_args.join(", "));
|
||||
|
||||
@ -18,13 +18,12 @@ impl Queryer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Entrypoint to execute a dynamically compiled query based on a schema
|
||||
pub fn query(
|
||||
&self,
|
||||
schema_id: &str,
|
||||
stem_opt: Option<&str>,
|
||||
filters: Option<&serde_json::Value>,
|
||||
) -> Result<serde_json::Value, String> {
|
||||
) -> crate::drop::Drop {
|
||||
let filters_map: Option<&serde_json::Map<String, serde_json::Value>> =
|
||||
filters.and_then(|f| f.as_object());
|
||||
|
||||
@ -45,9 +44,21 @@ impl Queryer {
|
||||
} else {
|
||||
// Compile the massive base SQL string
|
||||
let compiler = compiler::SqlCompiler::new(self.db.clone());
|
||||
let compiled_sql = compiler.compile(schema_id, stem_opt, &filter_keys)?;
|
||||
self.cache.insert(cache_key.clone(), compiled_sql.clone());
|
||||
compiled_sql
|
||||
match compiler.compile(schema_id, stem_opt, &filter_keys) {
|
||||
Ok(compiled_sql) => {
|
||||
self.cache.insert(cache_key.clone(), compiled_sql.clone());
|
||||
compiled_sql
|
||||
}
|
||||
Err(e) => {
|
||||
return crate::drop::Drop::with_errors(vec![crate::drop::Error {
|
||||
code: "QUERY_COMPILATION_FAILED".to_string(),
|
||||
message: e,
|
||||
details: crate::drop::ErrorDetails {
|
||||
path: schema_id.to_string(),
|
||||
},
|
||||
}]);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// 2. Prepare the execution arguments from the filters
|
||||
@ -62,22 +73,29 @@ impl Queryer {
|
||||
}
|
||||
|
||||
// 3. Execute via Database Executor
|
||||
let fetched = match self.db.query(&sql, Some(&args)) {
|
||||
match self.db.query(&sql, Some(&args)) {
|
||||
Ok(serde_json::Value::Array(table)) => {
|
||||
if table.is_empty() {
|
||||
Ok(serde_json::Value::Null)
|
||||
crate::drop::Drop::success_with_val(serde_json::Value::Null)
|
||||
} else {
|
||||
// We expect the query to return a single JSONB column, already unpacked from row[0]
|
||||
Ok(table.first().unwrap().clone())
|
||||
crate::drop::Drop::success_with_val(table.first().unwrap().clone())
|
||||
}
|
||||
}
|
||||
Ok(other) => Err(format!(
|
||||
"Expected array from generic query, got: {:?}",
|
||||
other
|
||||
)),
|
||||
Err(e) => Err(format!("SPI error in queryer: {}", e)),
|
||||
}?;
|
||||
|
||||
Ok(fetched)
|
||||
Ok(other) => crate::drop::Drop::with_errors(vec![crate::drop::Error {
|
||||
code: "QUERY_FAILED".to_string(),
|
||||
message: format!("Expected array from generic query, got: {:?}", other),
|
||||
details: crate::drop::ErrorDetails {
|
||||
path: schema_id.to_string(),
|
||||
},
|
||||
}]),
|
||||
Err(e) => crate::drop::Drop::with_errors(vec![crate::drop::Error {
|
||||
code: "QUERY_FAILED".to_string(),
|
||||
message: format!("SPI error in queryer: {}", e),
|
||||
details: crate::drop::ErrorDetails {
|
||||
path: schema_id.to_string(),
|
||||
},
|
||||
}]),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user