diff --git a/Cargo.lock b/Cargo.lock index 22ec582536069..10a3e88fdb236 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2170,6 +2170,7 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-functions-aggregate-common", + "datafusion-functions-nested", "datafusion-physical-expr", "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", @@ -2182,6 +2183,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", + "tempfile", "tokio", ] diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index a5f6f56ac6f33..4d958613aaac9 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -56,6 +56,8 @@ tokio = { workspace = true } [dev-dependencies] chrono = { workspace = true } +datafusion-functions-nested = { workspace = true } +tempfile = { workspace = true } # Note: add additional linter rules in lib.rs. # Rust does not support workspace + new linter rules in subcrates yet diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index eb4cc9e9ad5a3..d7e92f70afa99 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -32,6 +32,7 @@ mod row_filter; mod row_group_filter; mod sort; pub mod source; +mod supported_predicates; mod writer; pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index ba3b29be40d74..d41008a9b47c3 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -58,6 +58,11 @@ //! 8. Build the `RowFilter` with the sorted predicates followed by //! the unsorted predicates. Within each partition, predicates are //! still be sorted by size. +//! +//! List-aware predicates (for example, `array_has`, `array_has_all`, and +//! `array_has_any`) can be evaluated directly during Parquet decoding. Struct +//! columns and other nested projections that are not explicitly supported will +//! continue to be evaluated after the batches are materialized. use std::cmp::Ordering; use std::collections::BTreeSet; @@ -70,6 +75,7 @@ use arrow::record_batch::RecordBatch; use parquet::arrow::ProjectionMask; use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; use parquet::file::metadata::ParquetMetaData; +use parquet::schema::types::SchemaDescriptor; use datafusion_common::Result; use datafusion_common::cast::as_boolean_array; @@ -81,6 +87,7 @@ use datafusion_physical_expr::{PhysicalExpr, split_conjunction}; use datafusion_physical_plan::metrics; use super::ParquetFileMetrics; +use super::supported_predicates::supports_list_predicates; /// A "compiled" predicate passed to `ParquetRecordBatchStream` to perform /// row-level filtering during parquet decoding. @@ -91,12 +98,14 @@ use super::ParquetFileMetrics; /// /// An expression can be evaluated as a `DatafusionArrowPredicate` if it: /// * Does not reference any projected columns -/// * Does not reference columns with non-primitive types (e.g. structs / lists) +/// * References either primitive columns or list columns used by +/// supported predicates (such as `array_has_all` or NULL checks). Struct +/// columns are still evaluated after decoding. #[derive(Debug)] pub(crate) struct DatafusionArrowPredicate { /// the filter expression physical_expr: Arc, - /// Path to the columns in the parquet schema required to evaluate the + /// Path to the leaf columns in the parquet schema required to evaluate the /// expression projection_mask: ProjectionMask, /// how many rows were filtered out by this predicate @@ -121,9 +130,12 @@ impl DatafusionArrowPredicate { Ok(Self { physical_expr, - projection_mask: ProjectionMask::roots( + // Use leaf indices: when nested columns are involved, we must specify + // leaf (primitive) column indices in the Parquet schema so the decoder + // can properly project and filter nested structures. + projection_mask: ProjectionMask::leaves( metadata.file_metadata().schema_descr(), - candidate.projection, + candidate.projection.leaf_indices.iter().copied(), ), rows_pruned, rows_matched, @@ -177,12 +189,19 @@ pub(crate) struct FilterCandidate { /// Can this filter use an index (e.g. a page index) to prune rows? can_use_index: bool, /// Column indices into the parquet file schema required to evaluate this filter. - projection: Vec, + projection: LeafProjection, /// The Arrow schema containing only the columns required by this filter, /// projected from the file's Arrow schema. filter_schema: SchemaRef, } +/// Tracks the projection of an expression in both root and leaf coordinates. +#[derive(Debug, Clone)] +struct LeafProjection { + /// Leaf column indices in the Parquet schema descriptor. + leaf_indices: Vec, +} + /// Helper to build a `FilterCandidate`. /// /// This will do several things: @@ -212,23 +231,29 @@ impl FilterCandidateBuilder { /// * `Ok(None)` if the expression cannot be used as an ArrowFilter /// * `Err(e)` if an error occurs while building the candidate pub fn build(self, metadata: &ParquetMetaData) -> Result> { - let Some(required_column_indices) = - pushdown_columns(&self.expr, &self.file_schema)? + let Some(required_columns) = pushdown_columns(&self.expr, &self.file_schema)? else { return Ok(None); }; - let projected_schema = - Arc::new(self.file_schema.project(&required_column_indices)?); + let root_indices: Vec<_> = + required_columns.required_columns.into_iter().collect(); + let leaf_indices = leaf_indices_for_roots( + &root_indices, + metadata.file_metadata().schema_descr(), + required_columns.nested, + ); - let required_bytes = size_of_columns(&required_column_indices, metadata)?; - let can_use_index = columns_sorted(&required_column_indices, metadata)?; + let projected_schema = Arc::new(self.file_schema.project(&root_indices)?); + + let required_bytes = size_of_columns(&leaf_indices, metadata)?; + let can_use_index = columns_sorted(&leaf_indices, metadata)?; Ok(Some(FilterCandidate { expr: self.expr, required_bytes, can_use_index, - projection: required_column_indices, + projection: LeafProjection { leaf_indices }, filter_schema: projected_schema, })) } @@ -238,7 +263,8 @@ impl FilterCandidateBuilder { /// prevent the expression from being pushed down to the parquet decoder. /// /// An expression cannot be pushed down if it references: -/// - Non-primitive columns (like structs or lists) +/// - Unsupported nested columns (structs or list fields that are not covered by +/// the supported predicate set) /// - Columns that don't exist in the file schema struct PushdownChecker<'schema> { /// Does the expression require any non-primitive columns (like structs)? @@ -247,34 +273,78 @@ struct PushdownChecker<'schema> { projected_columns: bool, /// Indices into the file schema of columns required to evaluate the expression. required_columns: BTreeSet, + /// Tracks the nested column behavior found during traversal. + nested_behavior: NestedColumnSupport, + /// Whether nested list columns are supported by the predicate semantics. + allow_list_columns: bool, /// The Arrow schema of the parquet file. file_schema: &'schema Schema, } impl<'schema> PushdownChecker<'schema> { - fn new(file_schema: &'schema Schema) -> Self { + fn new(file_schema: &'schema Schema, allow_list_columns: bool) -> Self { Self { non_primitive_columns: false, projected_columns: false, required_columns: BTreeSet::default(), + nested_behavior: NestedColumnSupport::PrimitiveOnly, + allow_list_columns, file_schema, } } fn check_single_column(&mut self, column_name: &str) -> Option { - if let Ok(idx) = self.file_schema.index_of(column_name) { - self.required_columns.insert(idx); - if DataType::is_nested(self.file_schema.field(idx).data_type()) { - self.non_primitive_columns = true; + let idx = match self.file_schema.index_of(column_name) { + Ok(idx) => idx, + Err(_) => { + // Column does not exist in the file schema, so we can't push this down. + self.projected_columns = true; return Some(TreeNodeRecursion::Jump); } + }; + + self.required_columns.insert(idx); + let data_type = self.file_schema.field(idx).data_type(); + + if DataType::is_nested(data_type) { + self.handle_nested_type(data_type) + } else { + None + } + } + + /// Determines whether a nested data type can be pushed down to Parquet decoding. + /// + /// Returns `Some(TreeNodeRecursion::Jump)` if the nested type prevents pushdown, + /// `None` if the type is supported and pushdown can continue. + fn handle_nested_type(&mut self, data_type: &DataType) -> Option { + if self.is_nested_type_supported(data_type) { + // Update to ListsSupported if we haven't encountered unsupported types yet + if self.nested_behavior == NestedColumnSupport::PrimitiveOnly { + self.nested_behavior = NestedColumnSupport::ListsSupported; + } + None } else { - // Column does not exist in the file schema, so we can't push this down. - self.projected_columns = true; - return Some(TreeNodeRecursion::Jump); + // Block pushdown for unsupported nested types: + // - Structs (regardless of predicate support) + // - Lists without supported predicates + self.nested_behavior = NestedColumnSupport::Unsupported; + self.non_primitive_columns = true; + Some(TreeNodeRecursion::Jump) } + } - None + /// Checks if a nested data type is supported for list column pushdown. + /// + /// List columns are only supported if: + /// 1. The data type is a list variant (List, LargeList, or FixedSizeList) + /// 2. The expression contains supported list predicates (e.g., array_has_all) + fn is_nested_type_supported(&self, data_type: &DataType) -> bool { + let is_list = matches!( + data_type, + DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _) + ); + self.allow_list_columns && is_list } #[inline] @@ -297,34 +367,137 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> { } } +/// Describes the nested column behavior for filter pushdown. +/// +/// This enum makes explicit the different states a predicate can be in +/// with respect to nested column handling during Parquet decoding. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum NestedColumnSupport { + /// Expression references only primitive (non-nested) columns. + /// These can always be pushed down to the Parquet decoder. + PrimitiveOnly, + /// Expression references list columns with supported predicates + /// (e.g., array_has, array_has_all, IS NULL). + /// These can be pushed down to the Parquet decoder. + ListsSupported, + /// Expression references unsupported nested types (e.g., structs) + /// or list columns without supported predicates. + /// These cannot be pushed down and must be evaluated after decoding. + Unsupported, +} + +#[derive(Debug)] +struct PushdownColumns { + required_columns: BTreeSet, + nested: NestedColumnSupport, +} + /// Checks if a given expression can be pushed down to the parquet decoder. /// -/// Returns `Some(column_indices)` if the expression can be pushed down, -/// where `column_indices` are the indices into the file schema of all columns +/// Returns `Some(PushdownColumns)` if the expression can be pushed down, +/// where the struct contains the indices into the file schema of all columns /// required to evaluate the expression. /// /// Returns `None` if the expression cannot be pushed down (e.g., references -/// non-primitive types or columns not in the file). +/// unsupported nested types or columns not in the file). fn pushdown_columns( expr: &Arc, file_schema: &Schema, -) -> Result>> { - let mut checker = PushdownChecker::new(file_schema); +) -> Result> { + let allow_list_columns = supports_list_predicates(expr); + let mut checker = PushdownChecker::new(file_schema, allow_list_columns); expr.visit(&mut checker)?; - Ok((!checker.prevents_pushdown()) - .then_some(checker.required_columns.into_iter().collect())) + Ok((!checker.prevents_pushdown()).then_some(PushdownColumns { + required_columns: checker.required_columns, + nested: checker.nested_behavior, + })) +} + +fn leaf_indices_for_roots( + root_indices: &[usize], + schema_descr: &SchemaDescriptor, + nested: NestedColumnSupport, +) -> Vec { + // For primitive-only columns, root indices ARE the leaf indices + if nested == NestedColumnSupport::PrimitiveOnly { + return root_indices.to_vec(); + } + + // For nested columns (lists or structs), we need to expand to all leaf columns + let root_set: BTreeSet<_> = root_indices.iter().copied().collect(); + + (0..schema_descr.num_columns()) + .filter(|leaf_idx| { + root_set.contains(&schema_descr.get_column_root_idx(*leaf_idx)) + }) + .collect() } /// Checks if a predicate expression can be pushed down to the parquet decoder. /// /// Returns `true` if all columns referenced by the expression: /// - Exist in the provided schema -/// - Are primitive types (not structs, lists, etc.) +/// - Are primitive types OR list columns with supported predicates +/// (e.g., `array_has`, `array_has_all`, `array_has_any`, IS NULL, IS NOT NULL) +/// - Struct columns are not supported and will prevent pushdown /// /// # Arguments /// * `expr` - The filter expression to check /// * `file_schema` - The Arrow schema of the parquet file (or table schema when /// the file schema is not yet available during planning) +/// +/// # Examples +/// +/// Primitive column filters can be pushed down: +/// ```ignore +/// use datafusion_expr::{col, Expr}; +/// use datafusion_common::ScalarValue; +/// use arrow::datatypes::{DataType, Field, Schema}; +/// use std::sync::Arc; +/// +/// let schema = Arc::new(Schema::new(vec![ +/// Field::new("age", DataType::Int32, false), +/// ])); +/// +/// // Primitive filter: can be pushed down +/// let expr = col("age").gt(Expr::Literal(ScalarValue::Int32(Some(30)), None)); +/// let expr = logical2physical(&expr, &schema); +/// assert!(can_expr_be_pushed_down_with_schemas(&expr, &schema)); +/// ``` +/// +/// Struct column filters cannot be pushed down: +/// ```ignore +/// use arrow::datatypes::Fields; +/// +/// let schema = Arc::new(Schema::new(vec![ +/// Field::new("person", DataType::Struct( +/// Fields::from(vec![Field::new("name", DataType::Utf8, true)]) +/// ), true), +/// ])); +/// +/// // Struct filter: cannot be pushed down +/// let expr = col("person").is_not_null(); +/// let expr = logical2physical(&expr, &schema); +/// assert!(!can_expr_be_pushed_down_with_schemas(&expr, &schema)); +/// ``` +/// +/// List column filters with supported predicates can be pushed down: +/// ```ignore +/// use datafusion_functions_nested::expr_fn::{array_has_all, make_array}; +/// +/// let schema = Arc::new(Schema::new(vec![ +/// Field::new("tags", DataType::List( +/// Arc::new(Field::new("item", DataType::Utf8, true)) +/// ), true), +/// ])); +/// +/// // Array filter with supported predicate: can be pushed down +/// let expr = array_has_all(col("tags"), make_array(vec![ +/// Expr::Literal(ScalarValue::Utf8(Some("rust".to_string())), None) +/// ])); +/// let expr = logical2physical(&expr, &schema); +/// assert!(can_expr_be_pushed_down_with_schemas(&expr, &schema)); +/// ``` pub fn can_expr_be_pushed_down_with_schemas( expr: &Arc, file_schema: &Schema, @@ -335,7 +508,7 @@ pub fn can_expr_be_pushed_down_with_schemas( } } -/// Calculate the total compressed size of all `Column`'s required for +/// Calculate the total compressed size of all leaf columns required for /// predicate `Expr`. /// /// This value represents the total amount of IO required to evaluate the @@ -464,21 +637,27 @@ mod test { use super::*; use datafusion_common::ScalarValue; + use arrow::array::{ListBuilder, StringBuilder}; use arrow::datatypes::{Field, TimeUnit::Nanosecond}; use datafusion_expr::{Expr, col}; + use datafusion_functions_nested::expr_fn::{ + array_has, array_has_all, array_has_any, make_array, + }; use datafusion_physical_expr::planner::logical2physical; use datafusion_physical_expr_adapter::{ DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory, }; - use datafusion_physical_plan::metrics::{Count, Time}; + use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, Time}; + use parquet::arrow::ArrowWriter; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::reader::{FileReader, SerializedFileReader}; + use tempfile::NamedTempFile; - // We should ignore predicate that read non-primitive columns + // List predicates used by the decoder should be accepted for pushdown #[test] - fn test_filter_candidate_builder_ignore_complex_types() { + fn test_filter_candidate_builder_supports_list_types() { let testdata = datafusion_common::test_util::parquet_test_data(); let file = std::fs::File::open(format!("{testdata}/list_columns.parquet")) .expect("opening file"); @@ -496,11 +675,16 @@ mod test { let table_schema = Arc::new(table_schema.clone()); + let list_index = table_schema + .index_of("int64_list") + .expect("list column should exist"); + let candidate = FilterCandidateBuilder::new(expr, table_schema) .build(metadata) - .expect("building candidate"); + .expect("building candidate") + .expect("list pushdown should be supported"); - assert!(candidate.is_none()); + assert_eq!(candidate.projection.leaf_indices, vec![list_index]); } #[test] @@ -590,14 +774,193 @@ mod test { } #[test] - fn nested_data_structures_prevent_pushdown() { + fn struct_data_structures_prevent_pushdown() { + let table_schema = Arc::new(Schema::new(vec![Field::new( + "struct_col", + DataType::Struct( + vec![Arc::new(Field::new("a", DataType::Int32, true))].into(), + ), + true, + )])); + + let expr = col("struct_col").is_not_null(); + let expr = logical2physical(&expr, &table_schema); + + assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + } + + #[test] + fn mixed_primitive_and_struct_prevents_pushdown() { + // Even when a predicate contains both primitive and unsupported nested columns, + // the entire predicate should not be pushed down because the struct column + // cannot be evaluated during Parquet decoding. + let table_schema = Arc::new(Schema::new(vec![ + Field::new( + "struct_col", + DataType::Struct( + vec![Arc::new(Field::new("a", DataType::Int32, true))].into(), + ), + true, + ), + Field::new("int_col", DataType::Int32, false), + ])); + + // Expression: (struct_col IS NOT NULL) AND (int_col = 5) + // Even though int_col is primitive, the presence of struct_col in the + // conjunction should prevent pushdown of the entire expression. + let expr = col("struct_col") + .is_not_null() + .and(col("int_col").eq(Expr::Literal(ScalarValue::Int32(Some(5)), None))); + let expr = logical2physical(&expr, &table_schema); + + // The entire expression should not be pushed down + assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + + // However, just the int_col predicate alone should be pushable + let expr_int_only = + col("int_col").eq(Expr::Literal(ScalarValue::Int32(Some(5)), None)); + let expr_int_only = logical2physical(&expr_int_only, &table_schema); + assert!(can_expr_be_pushed_down_with_schemas( + &expr_int_only, + &table_schema + )); + } + + #[test] + fn nested_lists_allow_pushdown_checks() { let table_schema = Arc::new(get_lists_table_schema()); let expr = col("utf8_list").is_not_null(); let expr = logical2physical(&expr, &table_schema); check_expression_can_evaluate_against_schema(&expr, &table_schema); - assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + } + + #[test] + fn array_has_all_pushdown_filters_rows() { + // Test array_has_all: checks if array contains all of ["c"] + // Rows with "c": row 1 and row 2 + let expr = array_has_all( + col("letters"), + make_array(vec![Expr::Literal( + ScalarValue::Utf8(Some("c".to_string())), + None, + )]), + ); + test_array_predicate_pushdown("array_has_all", expr, 1, 2); + } + + /// Helper function to test array predicate pushdown functionality. + /// + /// Creates a Parquet file with a list column, applies the given predicate, + /// and verifies that rows are correctly filtered during decoding. + fn test_array_predicate_pushdown( + func_name: &str, + predicate_expr: Expr, + expected_pruned: usize, + expected_matched: usize, + ) { + let item_field = Arc::new(Field::new("item", DataType::Utf8, true)); + let schema = Arc::new(Schema::new(vec![Field::new( + "letters", + DataType::List(item_field), + true, + )])); + + let mut builder = ListBuilder::new(StringBuilder::new()); + // Row 0: ["a", "b"] + builder.values().append_value("a"); + builder.values().append_value("b"); + builder.append(true); + + // Row 1: ["c"] + builder.values().append_value("c"); + builder.append(true); + + // Row 2: ["c", "d"] + builder.values().append_value("c"); + builder.values().append_value("d"); + builder.append(true); + + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())]) + .expect("record batch"); + + let file = NamedTempFile::new().expect("temp file"); + let mut writer = + ArrowWriter::try_new(file.reopen().unwrap(), schema, None).expect("writer"); + writer.write(&batch).expect("write batch"); + writer.close().expect("close writer"); + + let reader_file = file.reopen().expect("reopen file"); + let parquet_reader_builder = + ParquetRecordBatchReaderBuilder::try_new(reader_file) + .expect("reader builder"); + let metadata = parquet_reader_builder.metadata().clone(); + let file_schema = parquet_reader_builder.schema().clone(); + + let expr = logical2physical(&predicate_expr, &file_schema); + + let metrics = ExecutionPlanMetricsSet::new(); + let file_metrics = + ParquetFileMetrics::new(0, &format!("{func_name}.parquet"), &metrics); + + let row_filter = + build_row_filter(&expr, &file_schema, &metadata, false, &file_metrics) + .expect("building row filter") + .expect("row filter should exist"); + + let reader = parquet_reader_builder + .with_row_filter(row_filter) + .build() + .expect("build reader"); + + let mut total_rows = 0; + for batch in reader { + let batch = batch.expect("record batch"); + total_rows += batch.num_rows(); + } + + assert_eq!( + file_metrics.pushdown_rows_pruned.value(), + expected_pruned, + "{func_name}: expected {expected_pruned} pruned rows" + ); + assert_eq!( + file_metrics.pushdown_rows_matched.value(), + expected_matched, + "{func_name}: expected {expected_matched} matched rows" + ); + assert_eq!( + total_rows, expected_matched, + "{func_name}: expected {expected_matched} total rows" + ); + } + + #[test] + fn array_has_pushdown_filters_rows() { + // Test array_has: checks if "c" is in the array + // Rows with "c": row 1 and row 2 + let expr = array_has( + col("letters"), + Expr::Literal(ScalarValue::Utf8(Some("c".to_string())), None), + ); + test_array_predicate_pushdown("array_has", expr, 1, 2); + } + + #[test] + fn array_has_any_pushdown_filters_rows() { + // Test array_has_any: checks if array contains any of ["a", "d"] + // Row 0 has "a", row 2 has "d" - both should match + let expr = array_has_any( + col("letters"), + make_array(vec![ + Expr::Literal(ScalarValue::Utf8(Some("a".to_string())), None), + Expr::Literal(ScalarValue::Utf8(Some("d".to_string())), None), + ]), + ); + test_array_predicate_pushdown("array_has_any", expr, 1, 2); } #[test] diff --git a/datafusion/datasource-parquet/src/supported_predicates.rs b/datafusion/datasource-parquet/src/supported_predicates.rs new file mode 100644 index 0000000000000..a7ae1539dea30 --- /dev/null +++ b/datafusion/datasource-parquet/src/supported_predicates.rs @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Registry of physical expressions that support nested list column pushdown +//! to the Parquet decoder. +//! +//! This module provides a trait-based approach for determining which predicates +//! can be safely evaluated on nested list columns during Parquet decoding. + +use std::sync::Arc; + +use datafusion_physical_expr::expressions::{IsNotNullExpr, IsNullExpr}; +use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr}; + +/// Trait for physical expressions that support list column pushdown during +/// Parquet decoding. +/// +/// This trait provides a type-safe mechanism for identifying expressions that +/// can be safely pushed down to the Parquet decoder for evaluation on nested +/// list columns. +/// +/// # Implementation Notes +/// +/// Expression types in external crates cannot directly implement this trait +/// due to Rust's orphan rules. Instead, we use a blanket implementation that +/// delegates to a registration mechanism. +/// +/// # Examples +/// +/// ```ignore +/// use datafusion_physical_expr::PhysicalExpr; +/// use datafusion_datasource_parquet::SupportsListPushdown; +/// +/// let expr: Arc = ...; +/// if expr.supports_list_pushdown() { +/// // Can safely push down to Parquet decoder +/// } +/// ``` +pub trait SupportsListPushdown { + /// Returns `true` if this expression supports list column pushdown. + fn supports_list_pushdown(&self) -> bool; +} + +/// Blanket implementation for all physical expressions. +/// +/// This delegates to specialized predicates that check whether the concrete +/// expression type is registered as supporting list pushdown. This design +/// allows the trait to work with expression types defined in external crates. +impl SupportsListPushdown for dyn PhysicalExpr { + fn supports_list_pushdown(&self) -> bool { + is_null_check(self) || is_supported_scalar_function(self) + } +} + +/// Checks if an expression is a NULL or NOT NULL check. +/// +/// These checks are universally supported for all column types. +fn is_null_check(expr: &dyn PhysicalExpr) -> bool { + expr.as_any().downcast_ref::().is_some() + || expr.as_any().downcast_ref::().is_some() +} + +/// Checks if an expression is a scalar function registered for list pushdown. +/// +/// Returns `true` if the expression is a `ScalarFunctionExpr` whose function +/// is in the registry of supported operations. +fn is_supported_scalar_function(expr: &dyn PhysicalExpr) -> bool { + expr.as_any() + .downcast_ref::() + .is_some_and(|fun| { + // Registry of verified array functions + matches!(fun.name(), "array_has" | "array_has_all" | "array_has_any") + }) +} + +/// Checks whether the given physical expression contains a supported nested +/// predicate (for example, `array_has_all`). +/// +/// This function recursively traverses the expression tree to determine if +/// any node contains predicates that support list column pushdown to the +/// Parquet decoder. +/// +/// # Supported predicates +/// +/// - `IS NULL` and `IS NOT NULL` checks on any column type +/// - Array functions: `array_has`, `array_has_all`, `array_has_any` +/// +/// # Returns +/// +/// `true` if the expression or any of its children contain supported predicates. +pub fn supports_list_predicates(expr: &Arc) -> bool { + expr.supports_list_pushdown() + || expr + .children() + .iter() + .any(|child| supports_list_predicates(child)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_null_check_detection() { + use datafusion_physical_expr::expressions::Column; + + let col_expr: Arc = Arc::new(Column::new("test", 0)); + assert!(!is_null_check(col_expr.as_ref())); + + // IsNullExpr and IsNotNullExpr detection requires actual instances + // which need schema setup - tested in integration tests + } + + #[test] + fn test_supported_scalar_functions() { + use datafusion_physical_expr::expressions::Column; + + let col_expr: Arc = Arc::new(Column::new("test", 0)); + + // Non-function expressions should return false + assert!(!is_supported_scalar_function(col_expr.as_ref())); + + // Testing with actual ScalarFunctionExpr requires function setup + // and is better suited for integration tests + } +} diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 8bb79d576990e..aa94e2e2f2c04 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -563,3 +563,114 @@ ORDER BY start_timestamp, trace_id LIMIT 1; ---- 2024-10-01T00:00:00 + +### +# Array function predicate pushdown tests +# These tests verify that array_has, array_has_all, and array_has_any predicates +# are correctly pushed down to the DataSourceExec node +### + +# Create test data with array columns +statement ok +COPY ( + SELECT 1 as id, ['rust', 'performance'] as tags + UNION ALL + SELECT 2 as id, ['python', 'javascript'] as tags + UNION ALL + SELECT 3 as id, ['rust', 'webassembly'] as tags +) +TO 'test_files/scratch/parquet_filter_pushdown/array_data/data.parquet'; + +statement ok +CREATE EXTERNAL TABLE array_test STORED AS PARQUET LOCATION 'test_files/scratch/parquet_filter_pushdown/array_data/'; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +# Test array_has predicate pushdown +query I? +SELECT id, tags FROM array_test WHERE array_has(tags, 'rust') ORDER BY id; +---- +1 [rust, performance] +3 [rust, webassembly] + +query TT +EXPLAIN SELECT id, tags FROM array_test WHERE array_has(tags, 'rust') ORDER BY id; +---- +logical_plan +01)Sort: array_test.id ASC NULLS LAST +02)--Filter: array_has(array_test.tags, Utf8("rust")) +03)----TableScan: array_test projection=[id, tags], partial_filters=[array_has(array_test.tags, Utf8("rust"))] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=array_has(tags@1, rust) + +# Test array_has_all predicate pushdown +query I? +SELECT id, tags FROM array_test WHERE array_has_all(tags, ['rust', 'performance']) ORDER BY id; +---- +1 [rust, performance] + +query TT +EXPLAIN SELECT id, tags FROM array_test WHERE array_has_all(tags, ['rust', 'performance']) ORDER BY id; +---- +logical_plan +01)Sort: array_test.id ASC NULLS LAST +02)--Filter: array_has_all(array_test.tags, List([rust, performance])) +03)----TableScan: array_test projection=[id, tags], partial_filters=[array_has_all(array_test.tags, List([rust, performance]))] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=array_has_all(tags@1, [rust, performance]) + +# Test array_has_any predicate pushdown +query I? +SELECT id, tags FROM array_test WHERE array_has_any(tags, ['python', 'go']) ORDER BY id; +---- +2 [python, javascript] + +query TT +EXPLAIN SELECT id, tags FROM array_test WHERE array_has_any(tags, ['python', 'go']) ORDER BY id; +---- +logical_plan +01)Sort: array_test.id ASC NULLS LAST +02)--Filter: array_has_any(array_test.tags, List([python, go])) +03)----TableScan: array_test projection=[id, tags], partial_filters=[array_has_any(array_test.tags, List([python, go]))] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=array_has_any(tags@1, [python, go]) + +# Test complex predicate with OR +query I? +SELECT id, tags FROM array_test WHERE array_has_all(tags, ['rust']) OR array_has_any(tags, ['python', 'go']) ORDER BY id; +---- +1 [rust, performance] +2 [python, javascript] +3 [rust, webassembly] + +query TT +EXPLAIN SELECT id, tags FROM array_test WHERE array_has_all(tags, ['rust']) OR array_has_any(tags, ['python', 'go']) ORDER BY id; +---- +logical_plan +01)Sort: array_test.id ASC NULLS LAST +02)--Filter: array_has_all(array_test.tags, List([rust])) OR array_has_any(array_test.tags, List([python, go])) +03)----TableScan: array_test projection=[id, tags], partial_filters=[array_has_all(array_test.tags, List([rust])) OR array_has_any(array_test.tags, List([python, go]))] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=array_has_all(tags@1, [rust]) OR array_has_any(tags@1, [python, go]) + +# Test array function with other predicates +query I? +SELECT id, tags FROM array_test WHERE id > 1 AND array_has(tags, 'rust') ORDER BY id; +---- +3 [rust, webassembly] + +query TT +EXPLAIN SELECT id, tags FROM array_test WHERE id > 1 AND array_has(tags, 'rust') ORDER BY id; +---- +logical_plan +01)Sort: array_test.id ASC NULLS LAST +02)--Filter: array_test.id > Int64(1) AND array_has(array_test.tags, Utf8("rust")) +03)----TableScan: array_test projection=[id, tags], partial_filters=[array_test.id > Int64(1), array_has(array_test.tags, Utf8("rust"))] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=id@0 > 1 AND array_has(tags@1, rust), pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[]