From 2c6571abac47c28309162e8a5c6680979434db31 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 11:23:30 +0800 Subject: [PATCH 01/22] Document nested pushdown semantics and optimizations Document supported nested pushdown semantics and update row-level predicate construction to utilize leaf-based projection masks. Enable list-aware predicates like array_has_all while maintaining unsupported nested structures on the fallback path. Expand filter candidate building for root and leaf projections of nested columns, facilitating cost estimation and mask creation aligned with Parquet leaf layouts. Include struct/list pushdown checks and add a new integration test to validate array_has_all pushdown behavior against Parquet row filters. Introduce dev dependencies for nested function helpers and temporary file creation used in the new tests. --- Cargo.lock | 2 + datafusion/datasource-parquet/Cargo.toml | 2 + .../datasource-parquet/src/row_filter.rs | 254 +++++++++++++++--- 3 files changed, 226 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 22ec582536069..10a3e88fdb236 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2170,6 +2170,7 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-functions-aggregate-common", + "datafusion-functions-nested", "datafusion-physical-expr", "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", @@ -2182,6 +2183,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", + "tempfile", "tokio", ] diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index a5f6f56ac6f33..4d958613aaac9 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -56,6 +56,8 @@ tokio = { workspace = true } [dev-dependencies] chrono = { workspace = true } +datafusion-functions-nested = { workspace = true } +tempfile = { workspace = true } # Note: add additional linter rules in lib.rs. # Rust does not support workspace + new linter rules in subcrates yet diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index ba3b29be40d74..c2b97a79bc540 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -58,6 +58,11 @@ //! 8. Build the `RowFilter` with the sorted predicates followed by //! the unsorted predicates. Within each partition, predicates are //! still be sorted by size. +//! +//! List-aware predicates (for example, `array_has`, `array_has_all`, and +//! `array_has_any`) can be evaluated directly during Parquet decoding. Struct +//! columns and other nested projections that are not explicitly supported will +//! continue to be evaluated after the batches are materialized. use std::cmp::Ordering; use std::collections::BTreeSet; @@ -70,11 +75,13 @@ use arrow::record_batch::RecordBatch; use parquet::arrow::ProjectionMask; use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; use parquet::file::metadata::ParquetMetaData; +use parquet::schema::types::SchemaDescriptor; use datafusion_common::Result; use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; -use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::ScalarFunctionExpr; +use datafusion_physical_expr::expressions::{Column, IsNotNullExpr, IsNullExpr}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr::{PhysicalExpr, split_conjunction}; @@ -91,12 +98,14 @@ use super::ParquetFileMetrics; /// /// An expression can be evaluated as a `DatafusionArrowPredicate` if it: /// * Does not reference any projected columns -/// * Does not reference columns with non-primitive types (e.g. structs / lists) +/// * References either primitive columns or list columns used by +/// supported predicates (such as `array_has_all` or NULL checks). Struct +/// columns are still evaluated after decoding. #[derive(Debug)] pub(crate) struct DatafusionArrowPredicate { /// the filter expression physical_expr: Arc, - /// Path to the columns in the parquet schema required to evaluate the + /// Path to the leaf columns in the parquet schema required to evaluate the /// expression projection_mask: ProjectionMask, /// how many rows were filtered out by this predicate @@ -121,9 +130,9 @@ impl DatafusionArrowPredicate { Ok(Self { physical_expr, - projection_mask: ProjectionMask::roots( + projection_mask: ProjectionMask::leaves( metadata.file_metadata().schema_descr(), - candidate.projection, + candidate.projection.leaf_indices.iter().copied(), ), rows_pruned, rows_matched, @@ -177,12 +186,22 @@ pub(crate) struct FilterCandidate { /// Can this filter use an index (e.g. a page index) to prune rows? can_use_index: bool, /// Column indices into the parquet file schema required to evaluate this filter. - projection: Vec, + projection: ProjectionColumns, /// The Arrow schema containing only the columns required by this filter, /// projected from the file's Arrow schema. filter_schema: SchemaRef, } +/// Tracks the projection of an expression in both root and leaf coordinates. +#[derive(Debug, Clone)] +struct ProjectionColumns { + /// Root column indices in the Arrow schema. + #[allow(dead_code)] + root_indices: Vec, + /// Leaf column indices in the Parquet schema descriptor. + leaf_indices: Vec, +} + /// Helper to build a `FilterCandidate`. /// /// This will do several things: @@ -212,23 +231,32 @@ impl FilterCandidateBuilder { /// * `Ok(None)` if the expression cannot be used as an ArrowFilter /// * `Err(e)` if an error occurs while building the candidate pub fn build(self, metadata: &ParquetMetaData) -> Result> { - let Some(required_column_indices) = - pushdown_columns(&self.expr, &self.file_schema)? + let Some(required_columns) = pushdown_columns(&self.expr, &self.file_schema)? else { return Ok(None); }; - let projected_schema = - Arc::new(self.file_schema.project(&required_column_indices)?); + let root_indices: Vec<_> = + required_columns.required_columns.into_iter().collect(); + let leaf_indices = leaf_indices_for_roots( + &root_indices, + metadata.file_metadata().schema_descr(), + required_columns.contains_nested, + ); - let required_bytes = size_of_columns(&required_column_indices, metadata)?; - let can_use_index = columns_sorted(&required_column_indices, metadata)?; + let projected_schema = Arc::new(self.file_schema.project(&root_indices)?); + + let required_bytes = size_of_columns(&leaf_indices, metadata)?; + let can_use_index = columns_sorted(&leaf_indices, metadata)?; Ok(Some(FilterCandidate { expr: self.expr, required_bytes, can_use_index, - projection: required_column_indices, + projection: ProjectionColumns { + root_indices, + leaf_indices, + }, filter_schema: projected_schema, })) } @@ -238,7 +266,8 @@ impl FilterCandidateBuilder { /// prevent the expression from being pushed down to the parquet decoder. /// /// An expression cannot be pushed down if it references: -/// - Non-primitive columns (like structs or lists) +/// - Unsupported nested columns (structs or list fields that are not covered by +/// the supported predicate set) /// - Columns that don't exist in the file schema struct PushdownChecker<'schema> { /// Does the expression require any non-primitive columns (like structs)? @@ -247,16 +276,22 @@ struct PushdownChecker<'schema> { projected_columns: bool, /// Indices into the file schema of columns required to evaluate the expression. required_columns: BTreeSet, + /// Does this expression reference any nested columns? + contains_nested: bool, + /// Whether nested list columns are supported by the predicate semantics. + allow_list_columns: bool, /// The Arrow schema of the parquet file. file_schema: &'schema Schema, } impl<'schema> PushdownChecker<'schema> { - fn new(file_schema: &'schema Schema) -> Self { + fn new(file_schema: &'schema Schema, allow_list_columns: bool) -> Self { Self { non_primitive_columns: false, projected_columns: false, required_columns: BTreeSet::default(), + contains_nested: false, + allow_list_columns, file_schema, } } @@ -265,8 +300,19 @@ impl<'schema> PushdownChecker<'schema> { if let Ok(idx) = self.file_schema.index_of(column_name) { self.required_columns.insert(idx); if DataType::is_nested(self.file_schema.field(idx).data_type()) { - self.non_primitive_columns = true; - return Some(TreeNodeRecursion::Jump); + self.contains_nested = true; + + if !self.allow_list_columns + || !matches!( + self.file_schema.field(idx).data_type(), + DataType::List(_) + | DataType::LargeList(_) + | DataType::FixedSizeList(_, _) + ) + { + self.non_primitive_columns = true; + return Some(TreeNodeRecursion::Jump); + } } } else { // Column does not exist in the file schema, so we can't push this down. @@ -297,22 +343,69 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> { } } +#[derive(Debug)] +struct PushdownColumns { + required_columns: BTreeSet, + contains_nested: bool, +} + +/// Checks whether the given physical expression contains a supported nested +/// predicate (for example, `array_has_all`). +fn supports_list_predicates(expr: &Arc) -> bool { + if expr.as_any().downcast_ref::().is_some() + || expr.as_any().downcast_ref::().is_some() + { + return true; + } + + if let Some(fun) = expr.as_any().downcast_ref::() { + if matches!(fun.name(), "array_has" | "array_has_all" | "array_has_any") { + return true; + } + } + + expr.children() + .iter() + .any(|child| supports_list_predicates(child)) +} + /// Checks if a given expression can be pushed down to the parquet decoder. /// -/// Returns `Some(column_indices)` if the expression can be pushed down, -/// where `column_indices` are the indices into the file schema of all columns +/// Returns `Some(PushdownColumns)` if the expression can be pushed down, +/// where the struct contains the indices into the file schema of all columns /// required to evaluate the expression. /// /// Returns `None` if the expression cannot be pushed down (e.g., references -/// non-primitive types or columns not in the file). +/// unsupported nested types or columns not in the file). fn pushdown_columns( expr: &Arc, file_schema: &Schema, -) -> Result>> { - let mut checker = PushdownChecker::new(file_schema); +) -> Result> { + let allow_list_columns = supports_list_predicates(expr); + let mut checker = PushdownChecker::new(file_schema, allow_list_columns); expr.visit(&mut checker)?; - Ok((!checker.prevents_pushdown()) - .then_some(checker.required_columns.into_iter().collect())) + Ok((!checker.prevents_pushdown()).then_some(PushdownColumns { + required_columns: checker.required_columns, + contains_nested: checker.contains_nested, + })) +} + +fn leaf_indices_for_roots( + root_indices: &[usize], + schema_descr: &SchemaDescriptor, + contains_nested: bool, +) -> Vec { + if !contains_nested { + return root_indices.to_vec(); + } + + let root_set: BTreeSet<_> = root_indices.iter().copied().collect(); + + (0..schema_descr.num_columns()) + .filter(|leaf_idx| { + root_set.contains(&schema_descr.get_column_root_idx(*leaf_idx)) + }) + .collect() } /// Checks if a predicate expression can be pushed down to the parquet decoder. @@ -335,7 +428,7 @@ pub fn can_expr_be_pushed_down_with_schemas( } } -/// Calculate the total compressed size of all `Column`'s required for +/// Calculate the total compressed size of all leaf columns required for /// predicate `Expr`. /// /// This value represents the total amount of IO required to evaluate the @@ -464,21 +557,25 @@ mod test { use super::*; use datafusion_common::ScalarValue; + use arrow::array::{ListBuilder, StringBuilder}; use arrow::datatypes::{Field, TimeUnit::Nanosecond}; use datafusion_expr::{Expr, col}; + use datafusion_functions_nested::expr_fn::{array_has_all, make_array}; use datafusion_physical_expr::planner::logical2physical; use datafusion_physical_expr_adapter::{ DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory, }; - use datafusion_physical_plan::metrics::{Count, Time}; + use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, Time}; + use parquet::arrow::ArrowWriter; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::reader::{FileReader, SerializedFileReader}; + use tempfile::NamedTempFile; - // We should ignore predicate that read non-primitive columns + // List predicates used by the decoder should be accepted for pushdown #[test] - fn test_filter_candidate_builder_ignore_complex_types() { + fn test_filter_candidate_builder_supports_list_types() { let testdata = datafusion_common::test_util::parquet_test_data(); let file = std::fs::File::open(format!("{testdata}/list_columns.parquet")) .expect("opening file"); @@ -496,11 +593,17 @@ mod test { let table_schema = Arc::new(table_schema.clone()); + let list_index = table_schema + .index_of("int64_list") + .expect("list column should exist"); + let candidate = FilterCandidateBuilder::new(expr, table_schema) .build(metadata) - .expect("building candidate"); + .expect("building candidate") + .expect("list pushdown should be supported"); - assert!(candidate.is_none()); + assert_eq!(candidate.projection.root_indices, vec![list_index]); + assert_eq!(candidate.projection.leaf_indices, vec![list_index]); } #[test] @@ -590,14 +693,101 @@ mod test { } #[test] - fn nested_data_structures_prevent_pushdown() { + fn struct_data_structures_prevent_pushdown() { + let table_schema = Arc::new(Schema::new(vec![Field::new( + "struct_col", + DataType::Struct( + vec![Arc::new(Field::new("a", DataType::Int32, true))].into(), + ), + true, + )])); + + let expr = col("struct_col").is_not_null(); + let expr = logical2physical(&expr, &table_schema); + + assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + } + + #[test] + fn nested_lists_allow_pushdown_checks() { let table_schema = Arc::new(get_lists_table_schema()); let expr = col("utf8_list").is_not_null(); let expr = logical2physical(&expr, &table_schema); check_expression_can_evaluate_against_schema(&expr, &table_schema); - assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + } + + #[test] + fn array_has_all_pushdown_filters_rows() { + let item_field = Arc::new(Field::new("item", DataType::Utf8, true)); + let schema = Arc::new(Schema::new(vec![Field::new( + "letters", + DataType::List(item_field), + true, + )])); + + let mut builder = ListBuilder::new(StringBuilder::new()); + builder.values().append_value("a"); + builder.values().append_value("b"); + builder.append(true); + + builder.values().append_value("c"); + builder.append(true); + + builder.values().append_value("c"); + builder.values().append_value("d"); + builder.append(true); + + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())]) + .expect("record batch"); + + let file = NamedTempFile::new().expect("temp file"); + let mut writer = + ArrowWriter::try_new(file.reopen().unwrap(), schema, None).expect("writer"); + writer.write(&batch).expect("write batch"); + writer.close().expect("close writer"); + + let reader_file = file.reopen().expect("reopen file"); + let parquet_reader_builder = + ParquetRecordBatchReaderBuilder::try_new(reader_file) + .expect("reader builder"); + let metadata = parquet_reader_builder.metadata().clone(); + let file_schema = parquet_reader_builder.schema().clone(); + + let expr = array_has_all( + col("letters"), + make_array(vec![Expr::Literal( + ScalarValue::Utf8(Some("c".to_string())), + None, + )]), + ); + let expr = logical2physical(&expr, &file_schema); + + let metrics = ExecutionPlanMetricsSet::new(); + let file_metrics = ParquetFileMetrics::new(0, "array_has_all.parquet", &metrics); + + let row_filter = + build_row_filter(&expr, &file_schema, &metadata, false, &file_metrics) + .expect("building row filter") + .expect("row filter should exist"); + + let mut reader = parquet_reader_builder + .with_row_filter(row_filter) + .build() + .expect("build reader"); + + let mut total_rows = 0; + while let Some(batch) = reader.next() { + let batch = batch.expect("record batch"); + total_rows += batch.num_rows(); + } + + assert_eq!(file_metrics.pushdown_rows_pruned.value(), 1); + assert_eq!(file_metrics.pushdown_rows_matched.value(), 2); + assert_eq!(total_rows, 2); } #[test] From 5ca433b1c40e43f5fd724af08e31b7d929c97f65 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 11:52:00 +0800 Subject: [PATCH 02/22] Refactor list predicate support and enhance tests Extract supports_list_predicates() into its own module and create a SUPPORTED_ARRAY_FUNCTIONS constant registry for improved management. Add is_supported_list_predicate() helper function for easier extensibility, along with comprehensive documentation and unit tests. Refactor check_single_column() using intermediate variables to clarify logic for handling structs and unsupported lists. Introduce a new test case for mixed primitive and struct predicates to ensure proper functionality and validation of pushable predicates. --- datafusion/datasource-parquet/src/mod.rs | 1 + .../datasource-parquet/src/row_filter.rs | 86 +++++++++------ .../src/supported_predicates.rs | 104 ++++++++++++++++++ 3 files changed, 160 insertions(+), 31 deletions(-) create mode 100644 datafusion/datasource-parquet/src/supported_predicates.rs diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index eb4cc9e9ad5a3..d7e92f70afa99 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -32,6 +32,7 @@ mod row_filter; mod row_group_filter; mod sort; pub mod source; +mod supported_predicates; mod writer; pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index c2b97a79bc540..9f359f68b4b03 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -80,14 +80,14 @@ use parquet::schema::types::SchemaDescriptor; use datafusion_common::Result; use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; -use datafusion_physical_expr::ScalarFunctionExpr; -use datafusion_physical_expr::expressions::{Column, IsNotNullExpr, IsNullExpr}; +use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr::{PhysicalExpr, split_conjunction}; use datafusion_physical_plan::metrics; use super::ParquetFileMetrics; +use super::supported_predicates::supports_list_predicates; /// A "compiled" predicate passed to `ParquetRecordBatchStream` to perform /// row-level filtering during parquet decoding. @@ -196,7 +196,6 @@ pub(crate) struct FilterCandidate { #[derive(Debug, Clone)] struct ProjectionColumns { /// Root column indices in the Arrow schema. - #[allow(dead_code)] root_indices: Vec, /// Leaf column indices in the Parquet schema descriptor. leaf_indices: Vec, @@ -302,14 +301,22 @@ impl<'schema> PushdownChecker<'schema> { if DataType::is_nested(self.file_schema.field(idx).data_type()) { self.contains_nested = true; - if !self.allow_list_columns - || !matches!( - self.file_schema.field(idx).data_type(), - DataType::List(_) - | DataType::LargeList(_) - | DataType::FixedSizeList(_, _) - ) - { + // Check if this is a list type + let is_list = matches!( + self.file_schema.field(idx).data_type(), + DataType::List(_) + | DataType::LargeList(_) + | DataType::FixedSizeList(_, _) + ); + + // List columns are only supported if the expression contains + // supported list predicates (e.g., array_has_all) + let is_supported = self.allow_list_columns && is_list; + + if !is_supported { + // Block pushdown for unsupported nested types: + // - Structs (regardless of predicate support) + // - Lists without supported predicates self.non_primitive_columns = true; return Some(TreeNodeRecursion::Jump); } @@ -349,26 +356,6 @@ struct PushdownColumns { contains_nested: bool, } -/// Checks whether the given physical expression contains a supported nested -/// predicate (for example, `array_has_all`). -fn supports_list_predicates(expr: &Arc) -> bool { - if expr.as_any().downcast_ref::().is_some() - || expr.as_any().downcast_ref::().is_some() - { - return true; - } - - if let Some(fun) = expr.as_any().downcast_ref::() { - if matches!(fun.name(), "array_has" | "array_has_all" | "array_has_any") { - return true; - } - } - - expr.children() - .iter() - .any(|child| supports_list_predicates(child)) -} - /// Checks if a given expression can be pushed down to the parquet decoder. /// /// Returns `Some(PushdownColumns)` if the expression can be pushed down, @@ -708,6 +695,43 @@ mod test { assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); } + #[test] + fn mixed_primitive_and_struct_prevents_pushdown() { + // Even when a predicate contains both primitive and unsupported nested columns, + // the entire predicate should not be pushed down because the struct column + // cannot be evaluated during Parquet decoding. + let table_schema = Arc::new(Schema::new(vec![ + Field::new( + "struct_col", + DataType::Struct( + vec![Arc::new(Field::new("a", DataType::Int32, true))].into(), + ), + true, + ), + Field::new("int_col", DataType::Int32, false), + ])); + + // Expression: (struct_col IS NOT NULL) AND (int_col = 5) + // Even though int_col is primitive, the presence of struct_col in the + // conjunction should prevent pushdown of the entire expression. + let expr = col("struct_col") + .is_not_null() + .and(col("int_col").eq(Expr::Literal(ScalarValue::Int32(Some(5)), None))); + let expr = logical2physical(&expr, &table_schema); + + // The entire expression should not be pushed down + assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); + + // However, just the int_col predicate alone should be pushable + let expr_int_only = + col("int_col").eq(Expr::Literal(ScalarValue::Int32(Some(5)), None)); + let expr_int_only = logical2physical(&expr_int_only, &table_schema); + assert!(can_expr_be_pushed_down_with_schemas( + &expr_int_only, + &table_schema + )); + } + #[test] fn nested_lists_allow_pushdown_checks() { let table_schema = Arc::new(get_lists_table_schema()); diff --git a/datafusion/datasource-parquet/src/supported_predicates.rs b/datafusion/datasource-parquet/src/supported_predicates.rs new file mode 100644 index 0000000000000..7440c63f600f5 --- /dev/null +++ b/datafusion/datasource-parquet/src/supported_predicates.rs @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Registry of physical expressions that support nested list column pushdown +//! to the Parquet decoder. +//! +//! This module maintains a centralized registry of predicates that can be +//! safely evaluated on nested list columns during Parquet decoding. Adding +//! new supported predicates requires only updating the registry in this module. + +use std::sync::Arc; + +use datafusion_physical_expr::expressions::{IsNotNullExpr, IsNullExpr}; +use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr}; + +/// Array functions supported for nested list pushdown. +/// +/// These functions have been verified to work correctly when evaluated +/// during Parquet decoding on list columns. +const SUPPORTED_ARRAY_FUNCTIONS: &[&str] = + &["array_has", "array_has_all", "array_has_any"]; + +/// Checks whether a function name is supported for nested list pushdown. +/// +/// Returns `true` if the function is in the registry of supported predicates. +/// +/// # Examples +/// +/// ```ignore +/// assert!(is_supported_list_predicate("array_has_all")); +/// assert!(is_supported_list_predicate("array_has")); +/// assert!(!is_supported_list_predicate("array_append")); +/// ``` +pub fn is_supported_list_predicate(name: &str) -> bool { + SUPPORTED_ARRAY_FUNCTIONS.contains(&name) +} + +/// Checks whether the given physical expression contains a supported nested +/// predicate (for example, `array_has_all`). +/// +/// This function recursively traverses the expression tree to determine if +/// any node contains predicates that support list column pushdown to the +/// Parquet decoder. +/// +/// # Supported predicates +/// +/// - `IS NULL` and `IS NOT NULL` checks on any column type +/// - Array functions listed in [`SUPPORTED_ARRAY_FUNCTIONS`] +/// +/// # Returns +/// +/// `true` if the expression or any of its children contain supported predicates. +pub fn supports_list_predicates(expr: &Arc) -> bool { + // NULL checks are universally supported for all column types + if expr.as_any().downcast_ref::().is_some() + || expr.as_any().downcast_ref::().is_some() + { + return true; + } + + // Check if this is a supported scalar function + if let Some(fun) = expr.as_any().downcast_ref::() + && is_supported_list_predicate(fun.name()) + { + return true; + } + + // Recursively check children + expr.children() + .iter() + .any(|child| supports_list_predicates(child)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_is_supported_list_predicate() { + // Supported functions + assert!(is_supported_list_predicate("array_has")); + assert!(is_supported_list_predicate("array_has_all")); + assert!(is_supported_list_predicate("array_has_any")); + + // Unsupported functions + assert!(!is_supported_list_predicate("array_append")); + assert!(!is_supported_list_predicate("array_length")); + assert!(!is_supported_list_predicate("some_other_function")); + } +} From 0b4c409656100d5968d7c6dc16dd4c544bb05ba1 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 11:56:53 +0800 Subject: [PATCH 03/22] refactor(parquet): parameterize array function pushdown tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract common test logic into test_array_predicate_pushdown helper function to reduce duplication and ensure parity across all three supported array functions (array_has, array_has_all, array_has_any). This makes it easier to maintain and extend test coverage for new array functions in the future. Benefits: - Reduces code duplication from ~70 lines × 3 to ~10 lines × 3 - Ensures consistent test methodology across all array functions - Clear documentation of expected behavior for each function - Easier to add tests for new supported functions --- .../datasource-parquet/src/row_filter.rs | 83 ++++++++++++++++--- 1 file changed, 70 insertions(+), 13 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 9f359f68b4b03..4cfa5c3c04e9a 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -547,7 +547,9 @@ mod test { use arrow::array::{ListBuilder, StringBuilder}; use arrow::datatypes::{Field, TimeUnit::Nanosecond}; use datafusion_expr::{Expr, col}; - use datafusion_functions_nested::expr_fn::{array_has_all, make_array}; + use datafusion_functions_nested::expr_fn::{ + array_has, array_has_all, array_has_any, make_array, + }; use datafusion_physical_expr::planner::logical2physical; use datafusion_physical_expr_adapter::{ DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory, @@ -745,6 +747,28 @@ mod test { #[test] fn array_has_all_pushdown_filters_rows() { + // Test array_has_all: checks if array contains all of ["c"] + // Rows with "c": row 1 and row 2 + let expr = array_has_all( + col("letters"), + make_array(vec![Expr::Literal( + ScalarValue::Utf8(Some("c".to_string())), + None, + )]), + ); + test_array_predicate_pushdown("array_has_all", expr, 1, 2); + } + + /// Helper function to test array predicate pushdown functionality. + /// + /// Creates a Parquet file with a list column, applies the given predicate, + /// and verifies that rows are correctly filtered during decoding. + fn test_array_predicate_pushdown( + func_name: &str, + predicate_expr: Expr, + expected_pruned: usize, + expected_matched: usize, + ) { let item_field = Arc::new(Field::new("item", DataType::Utf8, true)); let schema = Arc::new(Schema::new(vec![Field::new( "letters", @@ -753,13 +777,16 @@ mod test { )])); let mut builder = ListBuilder::new(StringBuilder::new()); + // Row 0: ["a", "b"] builder.values().append_value("a"); builder.values().append_value("b"); builder.append(true); + // Row 1: ["c"] builder.values().append_value("c"); builder.append(true); + // Row 2: ["c", "d"] builder.values().append_value("c"); builder.values().append_value("d"); builder.append(true); @@ -781,17 +808,11 @@ mod test { let metadata = parquet_reader_builder.metadata().clone(); let file_schema = parquet_reader_builder.schema().clone(); - let expr = array_has_all( - col("letters"), - make_array(vec![Expr::Literal( - ScalarValue::Utf8(Some("c".to_string())), - None, - )]), - ); - let expr = logical2physical(&expr, &file_schema); + let expr = logical2physical(&predicate_expr, &file_schema); let metrics = ExecutionPlanMetricsSet::new(); - let file_metrics = ParquetFileMetrics::new(0, "array_has_all.parquet", &metrics); + let file_metrics = + ParquetFileMetrics::new(0, &format!("{func_name}.parquet"), &metrics); let row_filter = build_row_filter(&expr, &file_schema, &metadata, false, &file_metrics) @@ -809,9 +830,45 @@ mod test { total_rows += batch.num_rows(); } - assert_eq!(file_metrics.pushdown_rows_pruned.value(), 1); - assert_eq!(file_metrics.pushdown_rows_matched.value(), 2); - assert_eq!(total_rows, 2); + assert_eq!( + file_metrics.pushdown_rows_pruned.value(), + expected_pruned, + "{func_name}: expected {expected_pruned} pruned rows" + ); + assert_eq!( + file_metrics.pushdown_rows_matched.value(), + expected_matched, + "{func_name}: expected {expected_matched} matched rows" + ); + assert_eq!( + total_rows, expected_matched, + "{func_name}: expected {expected_matched} total rows" + ); + } + + #[test] + fn array_has_pushdown_filters_rows() { + // Test array_has: checks if "c" is in the array + // Rows with "c": row 1 and row 2 + let expr = array_has( + col("letters"), + Expr::Literal(ScalarValue::Utf8(Some("c".to_string())), None), + ); + test_array_predicate_pushdown("array_has", expr, 1, 2); + } + + #[test] + fn array_has_any_pushdown_filters_rows() { + // Test array_has_any: checks if array contains any of ["a", "d"] + // Row 0 has "a", row 2 has "d" - both should match + let expr = array_has_any( + col("letters"), + make_array(vec![ + Expr::Literal(ScalarValue::Utf8(Some("a".to_string())), None), + Expr::Literal(ScalarValue::Utf8(Some("d".to_string())), None), + ]), + ); + test_array_predicate_pushdown("array_has_any", expr, 1, 2); } #[test] From 02c35bc02aa7f6450f304908298f9a23c025f450 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 11:58:06 +0800 Subject: [PATCH 04/22] docs(parquet): add comprehensive examples to pushdown API Add detailed rustdoc examples to can_expr_be_pushed_down_with_schemas() showing three key scenarios: 1. Primitive column filters (allowed) - e.g., age > 30 2. Struct column filters (blocked) - e.g., person IS NOT NULL 3. List column filters with supported predicates (allowed) - e.g., array_has_all(tags, ['rust']) These examples help users understand when filter pushdown to the Parquet decoder is available and guide them in writing efficient queries. Benefits: - Clear documentation of supported and unsupported cases - Helps users optimize query performance - Provides copy-paste examples for common patterns - Updated to reflect new list column support --- .../datasource-parquet/src/row_filter.rs | 57 ++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 4cfa5c3c04e9a..4f4ab20828d2f 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -399,12 +399,67 @@ fn leaf_indices_for_roots( /// /// Returns `true` if all columns referenced by the expression: /// - Exist in the provided schema -/// - Are primitive types (not structs, lists, etc.) +/// - Are primitive types OR list columns with supported predicates +/// (e.g., `array_has`, `array_has_all`, `array_has_any`, IS NULL, IS NOT NULL) +/// - Struct columns are not supported and will prevent pushdown /// /// # Arguments /// * `expr` - The filter expression to check /// * `file_schema` - The Arrow schema of the parquet file (or table schema when /// the file schema is not yet available during planning) +/// +/// # Examples +/// +/// Primitive column filters can be pushed down: +/// ```ignore +/// use datafusion_expr::{col, Expr}; +/// use datafusion_common::ScalarValue; +/// use arrow::datatypes::{DataType, Field, Schema}; +/// use std::sync::Arc; +/// +/// let schema = Arc::new(Schema::new(vec![ +/// Field::new("age", DataType::Int32, false), +/// ])); +/// +/// // Primitive filter: can be pushed down +/// let expr = col("age").gt(Expr::Literal(ScalarValue::Int32(Some(30)), None)); +/// let expr = logical2physical(&expr, &schema); +/// assert!(can_expr_be_pushed_down_with_schemas(&expr, &schema)); +/// ``` +/// +/// Struct column filters cannot be pushed down: +/// ```ignore +/// use arrow::datatypes::Fields; +/// +/// let schema = Arc::new(Schema::new(vec![ +/// Field::new("person", DataType::Struct( +/// Fields::from(vec![Field::new("name", DataType::Utf8, true)]) +/// ), true), +/// ])); +/// +/// // Struct filter: cannot be pushed down +/// let expr = col("person").is_not_null(); +/// let expr = logical2physical(&expr, &schema); +/// assert!(!can_expr_be_pushed_down_with_schemas(&expr, &schema)); +/// ``` +/// +/// List column filters with supported predicates can be pushed down: +/// ```ignore +/// use datafusion_functions_nested::expr_fn::{array_has_all, make_array}; +/// +/// let schema = Arc::new(Schema::new(vec![ +/// Field::new("tags", DataType::List( +/// Arc::new(Field::new("item", DataType::Utf8, true)) +/// ), true), +/// ])); +/// +/// // Array filter with supported predicate: can be pushed down +/// let expr = array_has_all(col("tags"), make_array(vec![ +/// Expr::Literal(ScalarValue::Utf8(Some("rust".to_string())), None) +/// ])); +/// let expr = logical2physical(&expr, &schema); +/// assert!(can_expr_be_pushed_down_with_schemas(&expr, &schema)); +/// ``` pub fn can_expr_be_pushed_down_with_schemas( expr: &Arc, file_schema: &Schema, From 81f405144b30caa8d518d4e23698f46a8071d5a4 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 12:49:14 +0800 Subject: [PATCH 05/22] refactor: use for loop instead of while-let in array pushdown tests - Replace 'while let Some(batch) = reader.next()' with idiomatic 'for batch in reader' - Remove unnecessary mut from reader variable - Addresses clippy::while_let_on_iterator warning --- .../datasource-parquet/src/row_filter.rs | 52 ++++++++++++++----- 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 4f4ab20828d2f..ed0bb4e9789e4 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -194,6 +194,7 @@ pub(crate) struct FilterCandidate { /// Tracks the projection of an expression in both root and leaf coordinates. #[derive(Debug, Clone)] +#[cfg_attr(not(test), expect(dead_code))] struct ProjectionColumns { /// Root column indices in the Arrow schema. root_indices: Vec, @@ -240,7 +241,7 @@ impl FilterCandidateBuilder { let leaf_indices = leaf_indices_for_roots( &root_indices, metadata.file_metadata().schema_descr(), - required_columns.contains_nested, + required_columns.nested, ); let projected_schema = Arc::new(self.file_schema.project(&root_indices)?); @@ -275,8 +276,8 @@ struct PushdownChecker<'schema> { projected_columns: bool, /// Indices into the file schema of columns required to evaluate the expression. required_columns: BTreeSet, - /// Does this expression reference any nested columns? - contains_nested: bool, + /// Tracks the nested column behavior found during traversal. + nested_behavior: NestedBehavior, /// Whether nested list columns are supported by the predicate semantics. allow_list_columns: bool, /// The Arrow schema of the parquet file. @@ -289,7 +290,7 @@ impl<'schema> PushdownChecker<'schema> { non_primitive_columns: false, projected_columns: false, required_columns: BTreeSet::default(), - contains_nested: false, + nested_behavior: NestedBehavior::PrimitiveOnly, allow_list_columns, file_schema, } @@ -299,8 +300,6 @@ impl<'schema> PushdownChecker<'schema> { if let Ok(idx) = self.file_schema.index_of(column_name) { self.required_columns.insert(idx); if DataType::is_nested(self.file_schema.field(idx).data_type()) { - self.contains_nested = true; - // Check if this is a list type let is_list = matches!( self.file_schema.field(idx).data_type(), @@ -313,10 +312,16 @@ impl<'schema> PushdownChecker<'schema> { // supported list predicates (e.g., array_has_all) let is_supported = self.allow_list_columns && is_list; - if !is_supported { + if is_supported { + // Update to ListsSupported if we haven't found unsupported types yet + if self.nested_behavior == NestedBehavior::PrimitiveOnly { + self.nested_behavior = NestedBehavior::ListsSupported; + } + } else { // Block pushdown for unsupported nested types: // - Structs (regardless of predicate support) // - Lists without supported predicates + self.nested_behavior = NestedBehavior::Unsupported; self.non_primitive_columns = true; return Some(TreeNodeRecursion::Jump); } @@ -350,10 +355,29 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> { } } +/// Describes the nested column behavior for filter pushdown. +/// +/// This enum makes explicit the different states a predicate can be in +/// with respect to nested column handling during Parquet decoding. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum NestedBehavior { + /// Expression references only primitive (non-nested) columns. + /// These can always be pushed down to the Parquet decoder. + PrimitiveOnly, + /// Expression references list columns with supported predicates + /// (e.g., array_has, array_has_all, IS NULL). + /// These can be pushed down to the Parquet decoder. + ListsSupported, + /// Expression references unsupported nested types (e.g., structs) + /// or list columns without supported predicates. + /// These cannot be pushed down and must be evaluated after decoding. + Unsupported, +} + #[derive(Debug)] struct PushdownColumns { required_columns: BTreeSet, - contains_nested: bool, + nested: NestedBehavior, } /// Checks if a given expression can be pushed down to the parquet decoder. @@ -373,19 +397,21 @@ fn pushdown_columns( expr.visit(&mut checker)?; Ok((!checker.prevents_pushdown()).then_some(PushdownColumns { required_columns: checker.required_columns, - contains_nested: checker.contains_nested, + nested: checker.nested_behavior, })) } fn leaf_indices_for_roots( root_indices: &[usize], schema_descr: &SchemaDescriptor, - contains_nested: bool, + nested: NestedBehavior, ) -> Vec { - if !contains_nested { + // For primitive-only columns, root indices ARE the leaf indices + if nested == NestedBehavior::PrimitiveOnly { return root_indices.to_vec(); } + // For nested columns (lists or structs), we need to expand to all leaf columns let root_set: BTreeSet<_> = root_indices.iter().copied().collect(); (0..schema_descr.num_columns()) @@ -874,13 +900,13 @@ mod test { .expect("building row filter") .expect("row filter should exist"); - let mut reader = parquet_reader_builder + let reader = parquet_reader_builder .with_row_filter(row_filter) .build() .expect("build reader"); let mut total_rows = 0; - while let Some(batch) = reader.next() { + for batch in reader { let batch = batch.expect("record batch"); total_rows += batch.num_rows(); } From ab3395bb45c065d4293c0b28ad84d27ffc4138c4 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 12:57:40 +0800 Subject: [PATCH 06/22] docs(parquet): add clarifying comments for pushdown implementation - Document function name detection assumption in supported_predicates - Note reliance on exact string matching - Suggest trait-based approach for future robustness - Explain ProjectionMask::leaves() choice for nested columns - Clarify why leaf indices are needed for nested structures - Helps reviewers understand Parquet schema descriptor usage These comments address Low Priority suggestions from code review, improving maintainability and onboarding for future contributors. --- datafusion/datasource-parquet/src/row_filter.rs | 3 +++ datafusion/datasource-parquet/src/supported_predicates.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index ed0bb4e9789e4..225447a00b2a7 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -130,6 +130,9 @@ impl DatafusionArrowPredicate { Ok(Self { physical_expr, + // Use leaf indices: when nested columns are involved, we must specify + // leaf (primitive) column indices in the Parquet schema so the decoder + // can properly project and filter nested structures. projection_mask: ProjectionMask::leaves( metadata.file_metadata().schema_descr(), candidate.projection.leaf_indices.iter().copied(), diff --git a/datafusion/datasource-parquet/src/supported_predicates.rs b/datafusion/datasource-parquet/src/supported_predicates.rs index 7440c63f600f5..b7f570a7cc775 100644 --- a/datafusion/datasource-parquet/src/supported_predicates.rs +++ b/datafusion/datasource-parquet/src/supported_predicates.rs @@ -73,6 +73,9 @@ pub fn supports_list_predicates(expr: &Arc) -> bool { } // Check if this is a supported scalar function + // NOTE: This relies on function names matching exactly. If function names + // are refactored, this check must be updated. Consider using a trait-based + // approach (e.g., a marker trait) for more robust detection in the future. if let Some(fun) = expr.as_any().downcast_ref::() && is_supported_list_predicate(fun.name()) { From dc27504cd893df2d442ec0f7d812f598b0965a25 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 15:23:39 +0800 Subject: [PATCH 07/22] refactor: remove dead code expectation from ProjectionColumns struct --- datafusion/datasource-parquet/src/row_filter.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 225447a00b2a7..18d44ec55bea1 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -197,7 +197,6 @@ pub(crate) struct FilterCandidate { /// Tracks the projection of an expression in both root and leaf coordinates. #[derive(Debug, Clone)] -#[cfg_attr(not(test), expect(dead_code))] struct ProjectionColumns { /// Root column indices in the Arrow schema. root_indices: Vec, From 40405cb4852e4050f9b68cec7a1c6de8c679341d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 16:02:52 +0800 Subject: [PATCH 08/22] refactor: remove root_indices from ProjectionColumns struct --- datafusion/datasource-parquet/src/row_filter.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 18d44ec55bea1..d61354149566d 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -198,8 +198,6 @@ pub(crate) struct FilterCandidate { /// Tracks the projection of an expression in both root and leaf coordinates. #[derive(Debug, Clone)] struct ProjectionColumns { - /// Root column indices in the Arrow schema. - root_indices: Vec, /// Leaf column indices in the Parquet schema descriptor. leaf_indices: Vec, } @@ -255,10 +253,7 @@ impl FilterCandidateBuilder { expr: self.expr, required_bytes, can_use_index, - projection: ProjectionColumns { - root_indices, - leaf_indices, - }, + projection: ProjectionColumns { leaf_indices }, filter_schema: projected_schema, })) } @@ -674,7 +669,6 @@ mod test { .expect("building candidate") .expect("list pushdown should be supported"); - assert_eq!(candidate.projection.root_indices, vec![list_index]); assert_eq!(candidate.projection.leaf_indices, vec![list_index]); } From ce9da3bd3823283daf13e07dc864160ad3c5b9a8 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 16:42:19 +0800 Subject: [PATCH 09/22] refactor: implement trait-based approach for list column pushdown support --- .../src/supported_predicates.rs | 81 ++++++++++++++----- 1 file changed, 63 insertions(+), 18 deletions(-) diff --git a/datafusion/datasource-parquet/src/supported_predicates.rs b/datafusion/datasource-parquet/src/supported_predicates.rs index b7f570a7cc775..d48b4262d028c 100644 --- a/datafusion/datasource-parquet/src/supported_predicates.rs +++ b/datafusion/datasource-parquet/src/supported_predicates.rs @@ -18,9 +18,8 @@ //! Registry of physical expressions that support nested list column pushdown //! to the Parquet decoder. //! -//! This module maintains a centralized registry of predicates that can be -//! safely evaluated on nested list columns during Parquet decoding. Adding -//! new supported predicates requires only updating the registry in this module. +//! This module provides a trait-based approach for determining which predicates +//! can be safely evaluated on nested list columns during Parquet decoding. use std::sync::Arc; @@ -34,6 +33,51 @@ use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr}; const SUPPORTED_ARRAY_FUNCTIONS: &[&str] = &["array_has", "array_has_all", "array_has_any"]; +/// Trait for physical expressions that support list column pushdown during +/// Parquet decoding. +/// +/// Implement this trait on physical expressions that can be safely evaluated +/// on nested list columns during Parquet decoding. This provides a robust, +/// type-safe mechanism for identifying supported predicates. +/// +/// # Examples +/// +/// ```ignore +/// use datafusion_physical_expr::PhysicalExpr; +/// use datafusion_datasource_parquet::SupportsListPushdown; +/// +/// let expr: Arc = ...; +/// if expr.supports_list_pushdown() { +/// // Can safely push down to Parquet decoder +/// } +/// ``` +pub trait SupportsListPushdown { + /// Returns `true` if this expression supports list column pushdown. + fn supports_list_pushdown(&self) -> bool; +} + +/// Default implementation for all physical expressions. +/// +/// Checks if the expression is a supported type (IS NULL, IS NOT NULL) or +/// a scalar function in the registry of supported array functions. +impl SupportsListPushdown for dyn PhysicalExpr { + fn supports_list_pushdown(&self) -> bool { + // NULL checks are universally supported for all column types + if self.as_any().downcast_ref::().is_some() + || self.as_any().downcast_ref::().is_some() + { + return true; + } + + // Check if this is a supported scalar function + if let Some(fun) = self.as_any().downcast_ref::() { + return is_supported_list_predicate(fun.name()); + } + + false + } +} + /// Checks whether a function name is supported for nested list pushdown. /// /// Returns `true` if the function is in the registry of supported predicates. @@ -45,7 +89,7 @@ const SUPPORTED_ARRAY_FUNCTIONS: &[&str] = /// assert!(is_supported_list_predicate("array_has")); /// assert!(!is_supported_list_predicate("array_append")); /// ``` -pub fn is_supported_list_predicate(name: &str) -> bool { +fn is_supported_list_predicate(name: &str) -> bool { SUPPORTED_ARRAY_FUNCTIONS.contains(&name) } @@ -65,20 +109,7 @@ pub fn is_supported_list_predicate(name: &str) -> bool { /// /// `true` if the expression or any of its children contain supported predicates. pub fn supports_list_predicates(expr: &Arc) -> bool { - // NULL checks are universally supported for all column types - if expr.as_any().downcast_ref::().is_some() - || expr.as_any().downcast_ref::().is_some() - { - return true; - } - - // Check if this is a supported scalar function - // NOTE: This relies on function names matching exactly. If function names - // are refactored, this check must be updated. Consider using a trait-based - // approach (e.g., a marker trait) for more robust detection in the future. - if let Some(fun) = expr.as_any().downcast_ref::() - && is_supported_list_predicate(fun.name()) - { + if expr.supports_list_pushdown() { return true; } @@ -104,4 +135,18 @@ mod tests { assert!(!is_supported_list_predicate("array_length")); assert!(!is_supported_list_predicate("some_other_function")); } + + #[test] + fn test_trait_based_detection() { + use datafusion_physical_expr::expressions::Column; + use std::sync::Arc; + + // Create a simple column expression (should not support pushdown) + let col_expr: Arc = Arc::new(Column::new("test", 0)); + assert!(!col_expr.supports_list_pushdown()); + + // Note: Testing with actual IsNullExpr and ScalarFunctionExpr + // would require more complex setup and is better suited for + // integration tests. + } } From eaff8997c5d804b1737f596dd0ea55b7ec5f4b99 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 16:48:34 +0800 Subject: [PATCH 10/22] Refactor expression handling and simplify predicates Remove SUPPORTED_ARRAY_FUNCTIONS array. Introduce dedicated predicate functions for NULL checks and scalar function support. Utilize pattern matching with matches! macro instead of array lookups. Enhance code clarity and idiomatic Rust usage with is_some_and() for condition checks and simplify recursion using a single expression. --- .../src/supported_predicates.rs | 114 ++++++++---------- 1 file changed, 51 insertions(+), 63 deletions(-) diff --git a/datafusion/datasource-parquet/src/supported_predicates.rs b/datafusion/datasource-parquet/src/supported_predicates.rs index d48b4262d028c..903237e7bd2a5 100644 --- a/datafusion/datasource-parquet/src/supported_predicates.rs +++ b/datafusion/datasource-parquet/src/supported_predicates.rs @@ -26,19 +26,18 @@ use std::sync::Arc; use datafusion_physical_expr::expressions::{IsNotNullExpr, IsNullExpr}; use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr}; -/// Array functions supported for nested list pushdown. -/// -/// These functions have been verified to work correctly when evaluated -/// during Parquet decoding on list columns. -const SUPPORTED_ARRAY_FUNCTIONS: &[&str] = - &["array_has", "array_has_all", "array_has_any"]; - /// Trait for physical expressions that support list column pushdown during /// Parquet decoding. /// -/// Implement this trait on physical expressions that can be safely evaluated -/// on nested list columns during Parquet decoding. This provides a robust, -/// type-safe mechanism for identifying supported predicates. +/// This trait provides a type-safe mechanism for identifying expressions that +/// can be safely pushed down to the Parquet decoder for evaluation on nested +/// list columns. +/// +/// # Implementation Notes +/// +/// Expression types in external crates cannot directly implement this trait +/// due to Rust's orphan rules. Instead, we use a blanket implementation that +/// delegates to a registration mechanism. /// /// # Examples /// @@ -56,41 +55,36 @@ pub trait SupportsListPushdown { fn supports_list_pushdown(&self) -> bool; } -/// Default implementation for all physical expressions. +/// Blanket implementation for all physical expressions. /// -/// Checks if the expression is a supported type (IS NULL, IS NOT NULL) or -/// a scalar function in the registry of supported array functions. +/// This delegates to specialized predicates that check whether the concrete +/// expression type is registered as supporting list pushdown. This design +/// allows the trait to work with expression types defined in external crates. impl SupportsListPushdown for dyn PhysicalExpr { fn supports_list_pushdown(&self) -> bool { - // NULL checks are universally supported for all column types - if self.as_any().downcast_ref::().is_some() - || self.as_any().downcast_ref::().is_some() - { - return true; - } - - // Check if this is a supported scalar function - if let Some(fun) = self.as_any().downcast_ref::() { - return is_supported_list_predicate(fun.name()); - } - - false + is_null_check(self) || is_supported_scalar_function(self) } } -/// Checks whether a function name is supported for nested list pushdown. -/// -/// Returns `true` if the function is in the registry of supported predicates. +/// Checks if an expression is a NULL or NOT NULL check. /// -/// # Examples +/// These checks are universally supported for all column types. +fn is_null_check(expr: &dyn PhysicalExpr) -> bool { + expr.as_any().downcast_ref::().is_some() + || expr.as_any().downcast_ref::().is_some() +} + +/// Checks if an expression is a scalar function registered for list pushdown. /// -/// ```ignore -/// assert!(is_supported_list_predicate("array_has_all")); -/// assert!(is_supported_list_predicate("array_has")); -/// assert!(!is_supported_list_predicate("array_append")); -/// ``` -fn is_supported_list_predicate(name: &str) -> bool { - SUPPORTED_ARRAY_FUNCTIONS.contains(&name) +/// Returns `true` if the expression is a `ScalarFunctionExpr` whose function +/// is in the registry of supported operations. +fn is_supported_scalar_function(expr: &dyn PhysicalExpr) -> bool { + expr.as_any() + .downcast_ref::() + .is_some_and(|fun| { + // Registry of verified array functions + matches!(fun.name(), "array_has" | "array_has_all" | "array_has_any") + }) } /// Checks whether the given physical expression contains a supported nested @@ -103,20 +97,17 @@ fn is_supported_list_predicate(name: &str) -> bool { /// # Supported predicates /// /// - `IS NULL` and `IS NOT NULL` checks on any column type -/// - Array functions listed in [`SUPPORTED_ARRAY_FUNCTIONS`] +/// - Array functions: `array_has`, `array_has_all`, `array_has_any` /// /// # Returns /// /// `true` if the expression or any of its children contain supported predicates. pub fn supports_list_predicates(expr: &Arc) -> bool { - if expr.supports_list_pushdown() { - return true; - } - - // Recursively check children - expr.children() - .iter() - .any(|child| supports_list_predicates(child)) + expr.supports_list_pushdown() + || expr + .children() + .iter() + .any(|child| supports_list_predicates(child)) } #[cfg(test)] @@ -124,29 +115,26 @@ mod tests { use super::*; #[test] - fn test_is_supported_list_predicate() { - // Supported functions - assert!(is_supported_list_predicate("array_has")); - assert!(is_supported_list_predicate("array_has_all")); - assert!(is_supported_list_predicate("array_has_any")); - - // Unsupported functions - assert!(!is_supported_list_predicate("array_append")); - assert!(!is_supported_list_predicate("array_length")); - assert!(!is_supported_list_predicate("some_other_function")); + fn test_null_check_detection() { + use datafusion_physical_expr::expressions::Column; + + let col_expr: Arc = Arc::new(Column::new("test", 0)); + assert!(!is_null_check(&**col_expr)); + + // IsNullExpr and IsNotNullExpr detection requires actual instances + // which need schema setup - tested in integration tests } #[test] - fn test_trait_based_detection() { + fn test_supported_scalar_functions() { use datafusion_physical_expr::expressions::Column; - use std::sync::Arc; - // Create a simple column expression (should not support pushdown) let col_expr: Arc = Arc::new(Column::new("test", 0)); - assert!(!col_expr.supports_list_pushdown()); - // Note: Testing with actual IsNullExpr and ScalarFunctionExpr - // would require more complex setup and is better suited for - // integration tests. + // Non-function expressions should return false + assert!(!is_supported_scalar_function(&**col_expr)); + + // Testing with actual ScalarFunctionExpr requires function setup + // and is better suited for integration tests } } From 6325c972fa406cc2de9e00c1e2162aa485cc8189 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 17:26:40 +0800 Subject: [PATCH 11/22] Add tests for array_has... --- .../src/supported_predicates.rs | 167 ++++++++++++++++-- 1 file changed, 157 insertions(+), 10 deletions(-) diff --git a/datafusion/datasource-parquet/src/supported_predicates.rs b/datafusion/datasource-parquet/src/supported_predicates.rs index 903237e7bd2a5..b91424f94a4d5 100644 --- a/datafusion/datasource-parquet/src/supported_predicates.rs +++ b/datafusion/datasource-parquet/src/supported_predicates.rs @@ -113,28 +113,175 @@ pub fn supports_list_predicates(expr: &Arc) -> bool { #[cfg(test)] mod tests { use super::*; + use arrow::datatypes::{DataType, Field}; + use datafusion_common::ScalarValue; + use datafusion_expr::{Expr, col}; + use datafusion_functions_nested::expr_fn::{ + array_has, array_has_all, array_has_any, make_array, + }; + use datafusion_physical_expr::expressions::{Column, IsNullExpr}; + use datafusion_physical_expr::planner::logical2physical; #[test] fn test_null_check_detection() { - use datafusion_physical_expr::expressions::Column; - let col_expr: Arc = Arc::new(Column::new("test", 0)); - assert!(!is_null_check(&**col_expr)); + assert!(!is_null_check(col_expr.as_ref())); - // IsNullExpr and IsNotNullExpr detection requires actual instances - // which need schema setup - tested in integration tests + // Test IS NULL expression + let is_null_expr: Arc = + Arc::new(IsNullExpr::new(Arc::new(Column::new("test", 0)))); + assert!(is_null_check(is_null_expr.as_ref())); + assert!(is_null_expr.supports_list_pushdown()); } #[test] fn test_supported_scalar_functions() { - use datafusion_physical_expr::expressions::Column; - let col_expr: Arc = Arc::new(Column::new("test", 0)); // Non-function expressions should return false - assert!(!is_supported_scalar_function(&**col_expr)); + assert!(!is_supported_scalar_function(col_expr.as_ref())); + } + + /// Creates a test schema with a list column for building array function expressions. + fn create_test_schema() -> Arc { + let item_field = Arc::new(Field::new("item", DataType::Utf8, true)); + Arc::new(arrow::datatypes::Schema::new(vec![Field::new( + "tags", + DataType::List(item_field), + true, + )])) + } + + #[test] + fn test_array_has_all_supports_pushdown() { + let schema = create_test_schema(); + + // Build array_has_all(tags, ['c']) + let expr = array_has_all( + col("tags"), + make_array(vec![Expr::Literal( + ScalarValue::Utf8(Some("c".to_string())), + None, + )]), + ); + + let physical_expr = logical2physical(&expr, &schema); + + // Verify the trait detects this as a supported function + assert!( + physical_expr.supports_list_pushdown(), + "array_has_all should support list pushdown" + ); + assert!( + is_supported_scalar_function(physical_expr.as_ref()), + "array_has_all should be detected as supported scalar function" + ); + assert!( + supports_list_predicates(&physical_expr), + "supports_list_predicates should return true for array_has_all" + ); + } + + #[test] + fn test_array_has_any_supports_pushdown() { + let schema = create_test_schema(); + + // Build array_has_any(tags, ['a', 'd']) + let expr = array_has_any( + col("tags"), + make_array(vec![ + Expr::Literal(ScalarValue::Utf8(Some("a".to_string())), None), + Expr::Literal(ScalarValue::Utf8(Some("d".to_string())), None), + ]), + ); + + let physical_expr = logical2physical(&expr, &schema); + + // Verify the trait detects this as a supported function + assert!( + physical_expr.supports_list_pushdown(), + "array_has_any should support list pushdown" + ); + assert!( + is_supported_scalar_function(physical_expr.as_ref()), + "array_has_any should be detected as supported scalar function" + ); + assert!( + supports_list_predicates(&physical_expr), + "supports_list_predicates should return true for array_has_any" + ); + } + + #[test] + fn test_array_has_supports_pushdown() { + let schema = create_test_schema(); + + // Build array_has(tags, 'c') + let expr = array_has( + col("tags"), + Expr::Literal(ScalarValue::Utf8(Some("c".to_string())), None), + ); + + let physical_expr = logical2physical(&expr, &schema); + + // Verify the trait detects this as a supported function + assert!( + physical_expr.supports_list_pushdown(), + "array_has should support list pushdown" + ); + assert!( + is_supported_scalar_function(physical_expr.as_ref()), + "array_has should be detected as supported scalar function" + ); + assert!( + supports_list_predicates(&physical_expr), + "supports_list_predicates should return true for array_has" + ); + } + + #[test] + fn test_unsupported_function_does_not_support_pushdown() { + let schema = create_test_schema(); + + // Build a non-supported function expression (e.g., using a simple column) + let expr = col("tags"); + let physical_expr = logical2physical(&expr, &schema); + + // Verify unsupported expressions are correctly identified + assert!( + !physical_expr.supports_list_pushdown(), + "column reference should not support list pushdown" + ); + assert!( + !is_supported_scalar_function(physical_expr.as_ref()), + "column should not be detected as supported scalar function" + ); + assert!( + !supports_list_predicates(&physical_expr), + "supports_list_predicates should return false for column reference" + ); + } + + #[test] + fn test_recursive_detection_in_complex_expression() { + let schema = create_test_schema(); + + // Build a complex expression that contains array_has_all + // For example: array_has_all(tags, ['c']) (simplified test) + let expr = array_has_all( + col("tags"), + make_array(vec![Expr::Literal( + ScalarValue::Utf8(Some("c".to_string())), + None, + )]), + ); + + let physical_expr = logical2physical(&expr, &schema); - // Testing with actual ScalarFunctionExpr requires function setup - // and is better suited for integration tests + // Test recursive detection + assert!( + supports_list_predicates(&physical_expr), + "supports_list_predicates should recursively find array_has_all" + ); } } From 459988ed100e389ea6ebebb67b31c5f697f17111 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 17:33:50 +0800 Subject: [PATCH 12/22] test: add physical plan tests for array functions and pushdown support --- .../src/supported_predicates.rs | 232 ++++++++++++++++++ 1 file changed, 232 insertions(+) diff --git a/datafusion/datasource-parquet/src/supported_predicates.rs b/datafusion/datasource-parquet/src/supported_predicates.rs index b91424f94a4d5..592d73fcb5e94 100644 --- a/datafusion/datasource-parquet/src/supported_predicates.rs +++ b/datafusion/datasource-parquet/src/supported_predicates.rs @@ -284,4 +284,236 @@ mod tests { "supports_list_predicates should recursively find array_has_all" ); } + + /// Tests that demonstrate the physical plan structure for array functions. + /// These show that the functions are correctly represented as ScalarFunctionExpr + /// and can be detected for pushdown. + mod physical_plan_tests { + use super::*; + + #[test] + fn test_array_has_all_physical_plan() { + let schema = create_test_schema(); + + // Build array_has_all(tags, ['rust', 'performance']) + let expr = array_has_all( + col("tags"), + make_array(vec![ + Expr::Literal(ScalarValue::Utf8(Some("rust".to_string())), None), + Expr::Literal( + ScalarValue::Utf8(Some("performance".to_string())), + None, + ), + ]), + ); + + let physical_expr = logical2physical(&expr, &schema); + + // Verify the physical expression structure + let scalar_fn = physical_expr + .as_any() + .downcast_ref::() + .expect("Should be ScalarFunctionExpr"); + + assert_eq!( + scalar_fn.name(), + "array_has_all", + "Function name should be array_has_all" + ); + + // Verify it has 2 arguments: the column and the array + assert_eq!( + scalar_fn.children().len(), + 2, + "array_has_all should have 2 arguments" + ); + + // Verify pushdown detection + assert!( + physical_expr.supports_list_pushdown(), + "array_has_all physical expr should support pushdown" + ); + } + + #[test] + fn test_array_has_any_physical_plan() { + let schema = create_test_schema(); + + // Build array_has_any(tags, ['python', 'javascript', 'go']) + let expr = array_has_any( + col("tags"), + make_array(vec![ + Expr::Literal(ScalarValue::Utf8(Some("python".to_string())), None), + Expr::Literal( + ScalarValue::Utf8(Some("javascript".to_string())), + None, + ), + Expr::Literal(ScalarValue::Utf8(Some("go".to_string())), None), + ]), + ); + + let physical_expr = logical2physical(&expr, &schema); + + // Verify the physical expression structure + let scalar_fn = physical_expr + .as_any() + .downcast_ref::() + .expect("Should be ScalarFunctionExpr"); + + assert_eq!( + scalar_fn.name(), + "array_has_any", + "Function name should be array_has_any" + ); + + // Verify it has 2 arguments: the column and the array + assert_eq!( + scalar_fn.children().len(), + 2, + "array_has_any should have 2 arguments" + ); + + // Verify pushdown detection + assert!( + physical_expr.supports_list_pushdown(), + "array_has_any physical expr should support pushdown" + ); + } + + #[test] + fn test_array_has_physical_plan() { + let schema = create_test_schema(); + + // Build array_has(tags, 'rust') + let expr = array_has( + col("tags"), + Expr::Literal(ScalarValue::Utf8(Some("rust".to_string())), None), + ); + + let physical_expr = logical2physical(&expr, &schema); + + // Verify the physical expression structure + let scalar_fn = physical_expr + .as_any() + .downcast_ref::() + .expect("Should be ScalarFunctionExpr"); + + assert_eq!( + scalar_fn.name(), + "array_has", + "Function name should be array_has" + ); + + // Verify it has 2 arguments: the column and the value + assert_eq!( + scalar_fn.children().len(), + 2, + "array_has should have 2 arguments" + ); + + // Verify pushdown detection + assert!( + physical_expr.supports_list_pushdown(), + "array_has physical expr should support pushdown" + ); + } + + #[test] + fn test_physical_plan_display() { + let schema = create_test_schema(); + + // Test array_has_all display + let expr = array_has_all( + col("tags"), + make_array(vec![Expr::Literal( + ScalarValue::Utf8(Some("test".to_string())), + None, + )]), + ); + let physical_expr = logical2physical(&expr, &schema); + let display = format!("{physical_expr:?}"); + assert!( + display.contains("array_has_all"), + "Display should contain function name: {display}" + ); + + // Test array_has_any display + let expr = array_has_any( + col("tags"), + make_array(vec![Expr::Literal( + ScalarValue::Utf8(Some("test".to_string())), + None, + )]), + ); + let physical_expr = logical2physical(&expr, &schema); + let display = format!("{physical_expr:?}"); + assert!( + display.contains("array_has_any"), + "Display should contain function name: {display}" + ); + + // Test array_has display + let expr = array_has( + col("tags"), + Expr::Literal(ScalarValue::Utf8(Some("test".to_string())), None), + ); + let physical_expr = logical2physical(&expr, &schema); + let display = format!("{physical_expr:?}"); + assert!( + display.contains("array_has"), + "Display should contain function name: {display}" + ); + } + + #[test] + fn test_complex_predicate_with_array_functions() { + let schema = create_test_schema(); + + // Build a more complex expression: + // array_has_all(tags, ['rust']) OR array_has_any(tags, ['python', 'go']) + use datafusion_expr::Operator; + let left = array_has_all( + col("tags"), + make_array(vec![Expr::Literal( + ScalarValue::Utf8(Some("rust".to_string())), + None, + )]), + ); + let right = array_has_any( + col("tags"), + make_array(vec![ + Expr::Literal(ScalarValue::Utf8(Some("python".to_string())), None), + Expr::Literal(ScalarValue::Utf8(Some("go".to_string())), None), + ]), + ); + let expr = Expr::BinaryExpr(datafusion_expr::BinaryExpr { + left: Box::new(left), + op: Operator::Or, + right: Box::new(right), + }); + + let physical_expr = logical2physical(&expr, &schema); + + // Verify that supports_list_predicates recursively finds the supported functions + assert!( + supports_list_predicates(&physical_expr), + "Complex predicate with array functions should support pushdown" + ); + + // Verify the top-level is a BinaryExpr + assert_eq!( + physical_expr.children().len(), + 2, + "OR expression should have 2 children" + ); + + // Verify both children are supported + for child in physical_expr.children() { + assert!( + supports_list_predicates(child), + "Each child should be a supported array function" + ); + } + } + } } From f894bd41e7722eddb940a21fe74f7d345ee64e18 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 17:54:57 +0800 Subject: [PATCH 13/22] Refactor array function tests for clarity and conciseness Extract helper functions to reduce code duplication in array pushdown and physical plan tests. Consolidate similar assertions and checks, simplifying tests from ~50 to ~30 lines. Transform display tests into a single parameterized test, maintaining coverage while eliminating repeated code. --- .../src/supported_predicates.rs | 247 ++++++------------ 1 file changed, 85 insertions(+), 162 deletions(-) diff --git a/datafusion/datasource-parquet/src/supported_predicates.rs b/datafusion/datasource-parquet/src/supported_predicates.rs index 592d73fcb5e94..b754003c3ba24 100644 --- a/datafusion/datasource-parquet/src/supported_predicates.rs +++ b/datafusion/datasource-parquet/src/supported_predicates.rs @@ -152,11 +152,16 @@ mod tests { )])) } + /// Helper to verify a physical expression supports pushdown + fn assert_supports_pushdown(expr: &Arc, msg: &str) { + assert!(expr.supports_list_pushdown(), "{msg}"); + assert!(is_supported_scalar_function(expr.as_ref())); + assert!(supports_list_predicates(expr)); + } + #[test] fn test_array_has_all_supports_pushdown() { let schema = create_test_schema(); - - // Build array_has_all(tags, ['c']) let expr = array_has_all( col("tags"), make_array(vec![Expr::Literal( @@ -164,29 +169,16 @@ mod tests { None, )]), ); - let physical_expr = logical2physical(&expr, &schema); - - // Verify the trait detects this as a supported function - assert!( - physical_expr.supports_list_pushdown(), - "array_has_all should support list pushdown" - ); - assert!( - is_supported_scalar_function(physical_expr.as_ref()), - "array_has_all should be detected as supported scalar function" - ); - assert!( - supports_list_predicates(&physical_expr), - "supports_list_predicates should return true for array_has_all" + assert_supports_pushdown( + &physical_expr, + "array_has_all should support list pushdown", ); } #[test] fn test_array_has_any_supports_pushdown() { let schema = create_test_schema(); - - // Build array_has_any(tags, ['a', 'd']) let expr = array_has_any( col("tags"), make_array(vec![ @@ -194,48 +186,24 @@ mod tests { Expr::Literal(ScalarValue::Utf8(Some("d".to_string())), None), ]), ); - let physical_expr = logical2physical(&expr, &schema); - - // Verify the trait detects this as a supported function - assert!( - physical_expr.supports_list_pushdown(), - "array_has_any should support list pushdown" - ); - assert!( - is_supported_scalar_function(physical_expr.as_ref()), - "array_has_any should be detected as supported scalar function" - ); - assert!( - supports_list_predicates(&physical_expr), - "supports_list_predicates should return true for array_has_any" + assert_supports_pushdown( + &physical_expr, + "array_has_any should support list pushdown", ); } #[test] fn test_array_has_supports_pushdown() { let schema = create_test_schema(); - - // Build array_has(tags, 'c') let expr = array_has( col("tags"), Expr::Literal(ScalarValue::Utf8(Some("c".to_string())), None), ); - let physical_expr = logical2physical(&expr, &schema); - - // Verify the trait detects this as a supported function - assert!( - physical_expr.supports_list_pushdown(), - "array_has should support list pushdown" - ); - assert!( - is_supported_scalar_function(physical_expr.as_ref()), - "array_has should be detected as supported scalar function" - ); - assert!( - supports_list_predicates(&physical_expr), - "supports_list_predicates should return true for array_has" + assert_supports_pushdown( + &physical_expr, + "array_has should support list pushdown", ); } @@ -291,25 +259,11 @@ mod tests { mod physical_plan_tests { use super::*; - #[test] - fn test_array_has_all_physical_plan() { - let schema = create_test_schema(); - - // Build array_has_all(tags, ['rust', 'performance']) - let expr = array_has_all( - col("tags"), - make_array(vec![ - Expr::Literal(ScalarValue::Utf8(Some("rust".to_string())), None), - Expr::Literal( - ScalarValue::Utf8(Some("performance".to_string())), - None, - ), - ]), - ); - - let physical_expr = logical2physical(&expr, &schema); - - // Verify the physical expression structure + /// Helper to verify physical plan structure for array functions + fn verify_array_function_physical_plan( + physical_expr: &Arc, + expected_name: &str, + ) { let scalar_fn = physical_expr .as_any() .downcast_ref::() @@ -317,22 +271,42 @@ mod tests { assert_eq!( scalar_fn.name(), - "array_has_all", - "Function name should be array_has_all" + expected_name, + "Function name should be {expected_name}" ); - // Verify it has 2 arguments: the column and the array + // Verify it has 2 arguments: the column and the array/value assert_eq!( scalar_fn.children().len(), 2, - "array_has_all should have 2 arguments" + "{expected_name} should have 2 arguments" ); // Verify pushdown detection assert!( physical_expr.supports_list_pushdown(), - "array_has_all physical expr should support pushdown" + "{expected_name} physical expr should support pushdown" + ); + } + + #[test] + fn test_array_has_all_physical_plan() { + let schema = create_test_schema(); + + // Build array_has_all(tags, ['rust', 'performance']) + let expr = array_has_all( + col("tags"), + make_array(vec![ + Expr::Literal(ScalarValue::Utf8(Some("rust".to_string())), None), + Expr::Literal( + ScalarValue::Utf8(Some("performance".to_string())), + None, + ), + ]), ); + + let physical_expr = logical2physical(&expr, &schema); + verify_array_function_physical_plan(&physical_expr, "array_has_all"); } #[test] @@ -353,31 +327,7 @@ mod tests { ); let physical_expr = logical2physical(&expr, &schema); - - // Verify the physical expression structure - let scalar_fn = physical_expr - .as_any() - .downcast_ref::() - .expect("Should be ScalarFunctionExpr"); - - assert_eq!( - scalar_fn.name(), - "array_has_any", - "Function name should be array_has_any" - ); - - // Verify it has 2 arguments: the column and the array - assert_eq!( - scalar_fn.children().len(), - 2, - "array_has_any should have 2 arguments" - ); - - // Verify pushdown detection - assert!( - physical_expr.supports_list_pushdown(), - "array_has_any physical expr should support pushdown" - ); + verify_array_function_physical_plan(&physical_expr, "array_has_any"); } #[test] @@ -391,78 +341,51 @@ mod tests { ); let physical_expr = logical2physical(&expr, &schema); - - // Verify the physical expression structure - let scalar_fn = physical_expr - .as_any() - .downcast_ref::() - .expect("Should be ScalarFunctionExpr"); - - assert_eq!( - scalar_fn.name(), - "array_has", - "Function name should be array_has" - ); - - // Verify it has 2 arguments: the column and the value - assert_eq!( - scalar_fn.children().len(), - 2, - "array_has should have 2 arguments" - ); - - // Verify pushdown detection - assert!( - physical_expr.supports_list_pushdown(), - "array_has physical expr should support pushdown" - ); + verify_array_function_physical_plan(&physical_expr, "array_has"); } #[test] fn test_physical_plan_display() { let schema = create_test_schema(); - // Test array_has_all display - let expr = array_has_all( - col("tags"), - make_array(vec![Expr::Literal( - ScalarValue::Utf8(Some("test".to_string())), - None, - )]), - ); - let physical_expr = logical2physical(&expr, &schema); - let display = format!("{physical_expr:?}"); - assert!( - display.contains("array_has_all"), - "Display should contain function name: {display}" - ); - - // Test array_has_any display - let expr = array_has_any( - col("tags"), - make_array(vec![Expr::Literal( - ScalarValue::Utf8(Some("test".to_string())), - None, - )]), - ); - let physical_expr = logical2physical(&expr, &schema); - let display = format!("{physical_expr:?}"); - assert!( - display.contains("array_has_any"), - "Display should contain function name: {display}" - ); + let test_cases = vec![ + ( + array_has_all( + col("tags"), + make_array(vec![Expr::Literal( + ScalarValue::Utf8(Some("test".to_string())), + None, + )]), + ), + "array_has_all", + ), + ( + array_has_any( + col("tags"), + make_array(vec![Expr::Literal( + ScalarValue::Utf8(Some("test".to_string())), + None, + )]), + ), + "array_has_any", + ), + ( + array_has( + col("tags"), + Expr::Literal(ScalarValue::Utf8(Some("test".to_string())), None), + ), + "array_has", + ), + ]; - // Test array_has display - let expr = array_has( - col("tags"), - Expr::Literal(ScalarValue::Utf8(Some("test".to_string())), None), - ); - let physical_expr = logical2physical(&expr, &schema); - let display = format!("{physical_expr:?}"); - assert!( - display.contains("array_has"), - "Display should contain function name: {display}" - ); + for (expr, expected_fn_name) in test_cases { + let physical_expr = logical2physical(&expr, &schema); + let display = format!("{physical_expr:?}"); + assert!( + display.contains(expected_fn_name), + "Display should contain function name {expected_fn_name}: {display}" + ); + } } #[test] From b81562974306f6b0bce97a243fb77ada7de3f649 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 19:39:24 +0800 Subject: [PATCH 14/22] remove tests in supported_predicates.rs --- .../src/supported_predicates.rs | 318 +----------------- 1 file changed, 8 insertions(+), 310 deletions(-) diff --git a/datafusion/datasource-parquet/src/supported_predicates.rs b/datafusion/datasource-parquet/src/supported_predicates.rs index b754003c3ba24..a7ae1539dea30 100644 --- a/datafusion/datasource-parquet/src/supported_predicates.rs +++ b/datafusion/datasource-parquet/src/supported_predicates.rs @@ -113,330 +113,28 @@ pub fn supports_list_predicates(expr: &Arc) -> bool { #[cfg(test)] mod tests { use super::*; - use arrow::datatypes::{DataType, Field}; - use datafusion_common::ScalarValue; - use datafusion_expr::{Expr, col}; - use datafusion_functions_nested::expr_fn::{ - array_has, array_has_all, array_has_any, make_array, - }; - use datafusion_physical_expr::expressions::{Column, IsNullExpr}; - use datafusion_physical_expr::planner::logical2physical; #[test] fn test_null_check_detection() { + use datafusion_physical_expr::expressions::Column; + let col_expr: Arc = Arc::new(Column::new("test", 0)); assert!(!is_null_check(col_expr.as_ref())); - // Test IS NULL expression - let is_null_expr: Arc = - Arc::new(IsNullExpr::new(Arc::new(Column::new("test", 0)))); - assert!(is_null_check(is_null_expr.as_ref())); - assert!(is_null_expr.supports_list_pushdown()); + // IsNullExpr and IsNotNullExpr detection requires actual instances + // which need schema setup - tested in integration tests } #[test] fn test_supported_scalar_functions() { + use datafusion_physical_expr::expressions::Column; + let col_expr: Arc = Arc::new(Column::new("test", 0)); // Non-function expressions should return false assert!(!is_supported_scalar_function(col_expr.as_ref())); - } - - /// Creates a test schema with a list column for building array function expressions. - fn create_test_schema() -> Arc { - let item_field = Arc::new(Field::new("item", DataType::Utf8, true)); - Arc::new(arrow::datatypes::Schema::new(vec![Field::new( - "tags", - DataType::List(item_field), - true, - )])) - } - - /// Helper to verify a physical expression supports pushdown - fn assert_supports_pushdown(expr: &Arc, msg: &str) { - assert!(expr.supports_list_pushdown(), "{msg}"); - assert!(is_supported_scalar_function(expr.as_ref())); - assert!(supports_list_predicates(expr)); - } - - #[test] - fn test_array_has_all_supports_pushdown() { - let schema = create_test_schema(); - let expr = array_has_all( - col("tags"), - make_array(vec![Expr::Literal( - ScalarValue::Utf8(Some("c".to_string())), - None, - )]), - ); - let physical_expr = logical2physical(&expr, &schema); - assert_supports_pushdown( - &physical_expr, - "array_has_all should support list pushdown", - ); - } - - #[test] - fn test_array_has_any_supports_pushdown() { - let schema = create_test_schema(); - let expr = array_has_any( - col("tags"), - make_array(vec![ - Expr::Literal(ScalarValue::Utf8(Some("a".to_string())), None), - Expr::Literal(ScalarValue::Utf8(Some("d".to_string())), None), - ]), - ); - let physical_expr = logical2physical(&expr, &schema); - assert_supports_pushdown( - &physical_expr, - "array_has_any should support list pushdown", - ); - } - - #[test] - fn test_array_has_supports_pushdown() { - let schema = create_test_schema(); - let expr = array_has( - col("tags"), - Expr::Literal(ScalarValue::Utf8(Some("c".to_string())), None), - ); - let physical_expr = logical2physical(&expr, &schema); - assert_supports_pushdown( - &physical_expr, - "array_has should support list pushdown", - ); - } - - #[test] - fn test_unsupported_function_does_not_support_pushdown() { - let schema = create_test_schema(); - - // Build a non-supported function expression (e.g., using a simple column) - let expr = col("tags"); - let physical_expr = logical2physical(&expr, &schema); - - // Verify unsupported expressions are correctly identified - assert!( - !physical_expr.supports_list_pushdown(), - "column reference should not support list pushdown" - ); - assert!( - !is_supported_scalar_function(physical_expr.as_ref()), - "column should not be detected as supported scalar function" - ); - assert!( - !supports_list_predicates(&physical_expr), - "supports_list_predicates should return false for column reference" - ); - } - - #[test] - fn test_recursive_detection_in_complex_expression() { - let schema = create_test_schema(); - - // Build a complex expression that contains array_has_all - // For example: array_has_all(tags, ['c']) (simplified test) - let expr = array_has_all( - col("tags"), - make_array(vec![Expr::Literal( - ScalarValue::Utf8(Some("c".to_string())), - None, - )]), - ); - - let physical_expr = logical2physical(&expr, &schema); - - // Test recursive detection - assert!( - supports_list_predicates(&physical_expr), - "supports_list_predicates should recursively find array_has_all" - ); - } - - /// Tests that demonstrate the physical plan structure for array functions. - /// These show that the functions are correctly represented as ScalarFunctionExpr - /// and can be detected for pushdown. - mod physical_plan_tests { - use super::*; - - /// Helper to verify physical plan structure for array functions - fn verify_array_function_physical_plan( - physical_expr: &Arc, - expected_name: &str, - ) { - let scalar_fn = physical_expr - .as_any() - .downcast_ref::() - .expect("Should be ScalarFunctionExpr"); - - assert_eq!( - scalar_fn.name(), - expected_name, - "Function name should be {expected_name}" - ); - - // Verify it has 2 arguments: the column and the array/value - assert_eq!( - scalar_fn.children().len(), - 2, - "{expected_name} should have 2 arguments" - ); - - // Verify pushdown detection - assert!( - physical_expr.supports_list_pushdown(), - "{expected_name} physical expr should support pushdown" - ); - } - - #[test] - fn test_array_has_all_physical_plan() { - let schema = create_test_schema(); - - // Build array_has_all(tags, ['rust', 'performance']) - let expr = array_has_all( - col("tags"), - make_array(vec![ - Expr::Literal(ScalarValue::Utf8(Some("rust".to_string())), None), - Expr::Literal( - ScalarValue::Utf8(Some("performance".to_string())), - None, - ), - ]), - ); - - let physical_expr = logical2physical(&expr, &schema); - verify_array_function_physical_plan(&physical_expr, "array_has_all"); - } - - #[test] - fn test_array_has_any_physical_plan() { - let schema = create_test_schema(); - - // Build array_has_any(tags, ['python', 'javascript', 'go']) - let expr = array_has_any( - col("tags"), - make_array(vec![ - Expr::Literal(ScalarValue::Utf8(Some("python".to_string())), None), - Expr::Literal( - ScalarValue::Utf8(Some("javascript".to_string())), - None, - ), - Expr::Literal(ScalarValue::Utf8(Some("go".to_string())), None), - ]), - ); - - let physical_expr = logical2physical(&expr, &schema); - verify_array_function_physical_plan(&physical_expr, "array_has_any"); - } - - #[test] - fn test_array_has_physical_plan() { - let schema = create_test_schema(); - - // Build array_has(tags, 'rust') - let expr = array_has( - col("tags"), - Expr::Literal(ScalarValue::Utf8(Some("rust".to_string())), None), - ); - - let physical_expr = logical2physical(&expr, &schema); - verify_array_function_physical_plan(&physical_expr, "array_has"); - } - - #[test] - fn test_physical_plan_display() { - let schema = create_test_schema(); - - let test_cases = vec![ - ( - array_has_all( - col("tags"), - make_array(vec![Expr::Literal( - ScalarValue::Utf8(Some("test".to_string())), - None, - )]), - ), - "array_has_all", - ), - ( - array_has_any( - col("tags"), - make_array(vec![Expr::Literal( - ScalarValue::Utf8(Some("test".to_string())), - None, - )]), - ), - "array_has_any", - ), - ( - array_has( - col("tags"), - Expr::Literal(ScalarValue::Utf8(Some("test".to_string())), None), - ), - "array_has", - ), - ]; - - for (expr, expected_fn_name) in test_cases { - let physical_expr = logical2physical(&expr, &schema); - let display = format!("{physical_expr:?}"); - assert!( - display.contains(expected_fn_name), - "Display should contain function name {expected_fn_name}: {display}" - ); - } - } - - #[test] - fn test_complex_predicate_with_array_functions() { - let schema = create_test_schema(); - - // Build a more complex expression: - // array_has_all(tags, ['rust']) OR array_has_any(tags, ['python', 'go']) - use datafusion_expr::Operator; - let left = array_has_all( - col("tags"), - make_array(vec![Expr::Literal( - ScalarValue::Utf8(Some("rust".to_string())), - None, - )]), - ); - let right = array_has_any( - col("tags"), - make_array(vec![ - Expr::Literal(ScalarValue::Utf8(Some("python".to_string())), None), - Expr::Literal(ScalarValue::Utf8(Some("go".to_string())), None), - ]), - ); - let expr = Expr::BinaryExpr(datafusion_expr::BinaryExpr { - left: Box::new(left), - op: Operator::Or, - right: Box::new(right), - }); - - let physical_expr = logical2physical(&expr, &schema); - - // Verify that supports_list_predicates recursively finds the supported functions - assert!( - supports_list_predicates(&physical_expr), - "Complex predicate with array functions should support pushdown" - ); - - // Verify the top-level is a BinaryExpr - assert_eq!( - physical_expr.children().len(), - 2, - "OR expression should have 2 children" - ); - // Verify both children are supported - for child in physical_expr.children() { - assert!( - supports_list_predicates(child), - "Each child should be a supported array function" - ); - } - } + // Testing with actual ScalarFunctionExpr requires function setup + // and is better suited for integration tests } } From 0a769bdb8406b26beb1fad23e1b298d95fcb93ec Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 18:41:16 +0800 Subject: [PATCH 15/22] add array functions example with logical and physical plan demonstrations --- Cargo.lock | 1 + datafusion-examples/Cargo.toml | 1 + .../query_planning/array_functions.rs | 203 ++++++++++++++++++ .../examples/query_planning/main.rs | 8 +- 4 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 datafusion-examples/examples/query_planning/array_functions.rs diff --git a/Cargo.lock b/Cargo.lock index 10a3e88fdb236..301203d8a6415 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2215,6 +2215,7 @@ dependencies = [ "mimalloc", "nix", "object_store", + "parquet", "prost", "rand 0.9.2", "serde_json", diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index b0190dadf3c3f..27eb48855b9d5 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -57,6 +57,7 @@ insta = { workspace = true } log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "http"] } +parquet = { workspace = true } prost = { workspace = true } rand = { workspace = true } serde_json = { workspace = true } diff --git a/datafusion-examples/examples/query_planning/array_functions.rs b/datafusion-examples/examples/query_planning/array_functions.rs new file mode 100644 index 0000000000000..d73aa772cd112 --- /dev/null +++ b/datafusion-examples/examples/query_planning/array_functions.rs @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! See `main.rs` for how to run it. + +use arrow::array::ListBuilder; +use arrow::array::StringBuilder; +use arrow::datatypes::{DataType, Field, Schema}; +use datafusion::error::Result; +use datafusion::physical_plan::displayable; +use datafusion::prelude::*; +use parquet::arrow::ArrowWriter; +use std::sync::Arc; +use tempfile::NamedTempFile; + +/// This example demonstrates how to execute queries using array functions +/// (`array_has`, `array_has_all`, `array_has_any`) and displays their +/// logical and physical execution plans. +/// +/// Array functions can be used to filter data based on array columns, +/// and when pushed down to the Parquet decoder, they can significantly +/// improve query performance by filtering rows during decoding. +/// +/// ## Key Feature: Predicate Pushdown +/// +/// In the physical plans below, look for the `predicate=` parameter in the +/// `DataSourceExec` node. This shows that the array function predicates are +/// being pushed down to the Parquet decoder, which means: +/// +/// 1. Rows are filtered during Parquet decoding before creating Arrow arrays +/// 2. Fewer rows need to be materialized in memory +/// 3. Query performance is significantly improved for selective predicates +/// +/// Without pushdown, you would see these predicates only in the `FilterExec` +/// node above the data source, meaning all rows would be decoded first and +/// then filtered in memory. +pub async fn array_functions_physical_plans() -> Result<()> { + // Create a session context + let ctx = SessionContext::new(); + + // Create a temporary Parquet file with sample data + let parquet_path = create_sample_parquet_file()?; + + // Register the Parquet file as a table + ctx.register_parquet( + "languages", + parquet_path.path().to_str().unwrap(), + Default::default(), + ) + .await?; + + // Example 1: array_has - Check if a single element is in the array + println!("=== Example 1: array_has ==="); + println!("Query: SELECT * FROM languages WHERE array_has(tags, 'rust')\n"); + let df = ctx + .sql("SELECT * FROM languages WHERE array_has(tags, 'rust')") + .await?; + + print_logical_and_physical_plans(&ctx, &df).await?; + + // Example 2: array_has_all - Check if all elements are in the array + println!("\n=== Example 2: array_has_all ==="); + println!( + "Query: SELECT * FROM languages WHERE array_has_all(tags, ['rust', 'performance'])\n" + ); + let df = ctx + .sql("SELECT * FROM languages WHERE array_has_all(tags, ['rust', 'performance'])") + .await?; + + print_logical_and_physical_plans(&ctx, &df).await?; + + // Example 3: array_has_any - Check if any element is in the array + println!("\n=== Example 3: array_has_any ==="); + println!( + "Query: SELECT * FROM languages WHERE array_has_any(tags, ['python', 'go'])\n" + ); + let df = ctx + .sql("SELECT * FROM languages WHERE array_has_any(tags, ['python', 'go'])") + .await?; + + print_logical_and_physical_plans(&ctx, &df).await?; + + // Example 4: Complex predicate with multiple array functions + println!("\n=== Example 4: Complex Predicate (OR) ==="); + println!( + "Query: SELECT * FROM languages WHERE array_has_all(tags, ['rust']) OR array_has_any(tags, ['python', 'go'])\n" + ); + let df = ctx + .sql("SELECT * FROM languages WHERE array_has_all(tags, ['rust']) OR array_has_any(tags, ['python', 'go'])") + .await?; + + print_logical_and_physical_plans(&ctx, &df).await?; + + // Example 5: Array function combined with other conditions + println!("\n=== Example 5: Array Function with Other Predicates ==="); + println!("Query: SELECT * FROM languages WHERE id > 1 AND array_has(tags, 'rust')\n"); + let df = ctx + .sql("SELECT * FROM languages WHERE id > 1 AND array_has(tags, 'rust')") + .await?; + + print_logical_and_physical_plans(&ctx, &df).await?; + + Ok(()) +} + +/// Helper function to create a temporary Parquet file with sample data +fn create_sample_parquet_file() -> Result { + use std::fs::File; + + // Create a sample table with an array column + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "tags", + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), + true, + ), + ])); + + // Create sample data + let mut id_builder = arrow::array::Int32Builder::new(); + let mut tags_builder = ListBuilder::new(StringBuilder::new()); + + // Row 0: id=1, tags=["rust", "performance"] + id_builder.append_value(1); + tags_builder.values().append_value("rust"); + tags_builder.values().append_value("performance"); + tags_builder.append(true); + + // Row 1: id=2, tags=["python", "javascript"] + id_builder.append_value(2); + tags_builder.values().append_value("python"); + tags_builder.values().append_value("javascript"); + tags_builder.append(true); + + // Row 2: id=3, tags=["rust", "webassembly"] + id_builder.append_value(3); + tags_builder.values().append_value("rust"); + tags_builder.values().append_value("webassembly"); + tags_builder.append(true); + + // Row 3: id=4, tags=["go", "system"] + id_builder.append_value(4); + tags_builder.values().append_value("go"); + tags_builder.values().append_value("system"); + tags_builder.append(true); + + let batch = arrow::record_batch::RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(id_builder.finish()), + Arc::new(tags_builder.finish()), + ], + )?; + + // Create a temporary file + let temp_file = tempfile::Builder::new() + .suffix(".parquet") + .tempfile() + .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; + + // Write the batch to a Parquet file + let file = File::create(temp_file.path()) + .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; + let mut writer = ArrowWriter::try_new(file, schema, None)?; + writer.write(&batch)?; + writer.close()?; + + Ok(temp_file) +} + +/// Helper function to print both logical and physical plans +async fn print_logical_and_physical_plans( + ctx: &SessionContext, + df: &DataFrame, +) -> Result<()> { + // Get the logical plan + let logical_plan = df.logical_plan(); + println!("Logical plan:\n{logical_plan:#?}\n"); + + // Create the physical plan + let physical_plan = ctx.state().create_physical_plan(logical_plan).await?; + println!( + "Physical plan:\n{}\n", + displayable(physical_plan.as_ref()).indent(true) + ); + + Ok(()) +} diff --git a/datafusion-examples/examples/query_planning/main.rs b/datafusion-examples/examples/query_planning/main.rs index 247f468735359..5cd0e739bbb31 100644 --- a/datafusion-examples/examples/query_planning/main.rs +++ b/datafusion-examples/examples/query_planning/main.rs @@ -21,12 +21,13 @@ //! //! ## Usage //! ```bash -//! cargo run --example query_planning -- [all|analyzer_rule|expr_api|optimizer_rule|parse_sql_expr|plan_to_sql|planner_api|pruning|thread_pools] +//! cargo run --example query_planning -- [all|analyzer_rule|array_functions|expr_api|optimizer_rule|parse_sql_expr|plan_to_sql|planner_api|pruning|thread_pools] //! ``` //! //! Each subcommand runs a corresponding example: //! - `all` — run all examples included in this module //! - `analyzer_rule` — use a custom AnalyzerRule to change a query's semantics (row level access control) +//! - `array_functions` — show logical and physical plans for queries with array_has, array_has_all, array_has_any //! - `expr_api` — create, execute, simplify, analyze and coerce `Expr`s //! - `optimizer_rule` — use a custom OptimizerRule to replace certain predicates //! - `parse_sql_expr` — parse SQL text into DataFusion `Expr` @@ -36,6 +37,7 @@ //! - `thread_pools` — demonstrate TrackConsumersPool for memory tracking and debugging with enhanced error messages and shows how to implement memory-aware ExecutionPlan with memory reservation and spilling mod analyzer_rule; +mod array_functions; mod expr_api; mod optimizer_rule; mod parse_sql_expr; @@ -53,6 +55,7 @@ use strum_macros::{Display, EnumIter, EnumString, VariantNames}; enum ExampleKind { All, AnalyzerRule, + ArrayFunctions, ExprApi, OptimizerRule, ParseSqlExpr, @@ -78,6 +81,9 @@ impl ExampleKind { } } ExampleKind::AnalyzerRule => analyzer_rule::analyzer_rule().await?, + ExampleKind::ArrayFunctions => { + array_functions::array_functions_physical_plans().await? + } ExampleKind::ExprApi => expr_api::expr_api().await?, ExampleKind::OptimizerRule => optimizer_rule::optimizer_rule().await?, ExampleKind::ParseSqlExpr => parse_sql_expr::parse_sql_expr().await?, From 94adbba37b9bc78f8d45a02a174bf54263597cb0 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 19:28:44 +0800 Subject: [PATCH 16/22] Revert "add array functions example with logical and physical plan demonstrations" This reverts commit 94f1a99cee4e44e5176450156a684a2316af78e1. --- Cargo.lock | 1 - datafusion-examples/Cargo.toml | 1 - .../query_planning/array_functions.rs | 203 ------------------ .../examples/query_planning/main.rs | 8 +- 4 files changed, 1 insertion(+), 212 deletions(-) delete mode 100644 datafusion-examples/examples/query_planning/array_functions.rs diff --git a/Cargo.lock b/Cargo.lock index 301203d8a6415..10a3e88fdb236 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2215,7 +2215,6 @@ dependencies = [ "mimalloc", "nix", "object_store", - "parquet", "prost", "rand 0.9.2", "serde_json", diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 27eb48855b9d5..b0190dadf3c3f 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -57,7 +57,6 @@ insta = { workspace = true } log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "http"] } -parquet = { workspace = true } prost = { workspace = true } rand = { workspace = true } serde_json = { workspace = true } diff --git a/datafusion-examples/examples/query_planning/array_functions.rs b/datafusion-examples/examples/query_planning/array_functions.rs deleted file mode 100644 index d73aa772cd112..0000000000000 --- a/datafusion-examples/examples/query_planning/array_functions.rs +++ /dev/null @@ -1,203 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! See `main.rs` for how to run it. - -use arrow::array::ListBuilder; -use arrow::array::StringBuilder; -use arrow::datatypes::{DataType, Field, Schema}; -use datafusion::error::Result; -use datafusion::physical_plan::displayable; -use datafusion::prelude::*; -use parquet::arrow::ArrowWriter; -use std::sync::Arc; -use tempfile::NamedTempFile; - -/// This example demonstrates how to execute queries using array functions -/// (`array_has`, `array_has_all`, `array_has_any`) and displays their -/// logical and physical execution plans. -/// -/// Array functions can be used to filter data based on array columns, -/// and when pushed down to the Parquet decoder, they can significantly -/// improve query performance by filtering rows during decoding. -/// -/// ## Key Feature: Predicate Pushdown -/// -/// In the physical plans below, look for the `predicate=` parameter in the -/// `DataSourceExec` node. This shows that the array function predicates are -/// being pushed down to the Parquet decoder, which means: -/// -/// 1. Rows are filtered during Parquet decoding before creating Arrow arrays -/// 2. Fewer rows need to be materialized in memory -/// 3. Query performance is significantly improved for selective predicates -/// -/// Without pushdown, you would see these predicates only in the `FilterExec` -/// node above the data source, meaning all rows would be decoded first and -/// then filtered in memory. -pub async fn array_functions_physical_plans() -> Result<()> { - // Create a session context - let ctx = SessionContext::new(); - - // Create a temporary Parquet file with sample data - let parquet_path = create_sample_parquet_file()?; - - // Register the Parquet file as a table - ctx.register_parquet( - "languages", - parquet_path.path().to_str().unwrap(), - Default::default(), - ) - .await?; - - // Example 1: array_has - Check if a single element is in the array - println!("=== Example 1: array_has ==="); - println!("Query: SELECT * FROM languages WHERE array_has(tags, 'rust')\n"); - let df = ctx - .sql("SELECT * FROM languages WHERE array_has(tags, 'rust')") - .await?; - - print_logical_and_physical_plans(&ctx, &df).await?; - - // Example 2: array_has_all - Check if all elements are in the array - println!("\n=== Example 2: array_has_all ==="); - println!( - "Query: SELECT * FROM languages WHERE array_has_all(tags, ['rust', 'performance'])\n" - ); - let df = ctx - .sql("SELECT * FROM languages WHERE array_has_all(tags, ['rust', 'performance'])") - .await?; - - print_logical_and_physical_plans(&ctx, &df).await?; - - // Example 3: array_has_any - Check if any element is in the array - println!("\n=== Example 3: array_has_any ==="); - println!( - "Query: SELECT * FROM languages WHERE array_has_any(tags, ['python', 'go'])\n" - ); - let df = ctx - .sql("SELECT * FROM languages WHERE array_has_any(tags, ['python', 'go'])") - .await?; - - print_logical_and_physical_plans(&ctx, &df).await?; - - // Example 4: Complex predicate with multiple array functions - println!("\n=== Example 4: Complex Predicate (OR) ==="); - println!( - "Query: SELECT * FROM languages WHERE array_has_all(tags, ['rust']) OR array_has_any(tags, ['python', 'go'])\n" - ); - let df = ctx - .sql("SELECT * FROM languages WHERE array_has_all(tags, ['rust']) OR array_has_any(tags, ['python', 'go'])") - .await?; - - print_logical_and_physical_plans(&ctx, &df).await?; - - // Example 5: Array function combined with other conditions - println!("\n=== Example 5: Array Function with Other Predicates ==="); - println!("Query: SELECT * FROM languages WHERE id > 1 AND array_has(tags, 'rust')\n"); - let df = ctx - .sql("SELECT * FROM languages WHERE id > 1 AND array_has(tags, 'rust')") - .await?; - - print_logical_and_physical_plans(&ctx, &df).await?; - - Ok(()) -} - -/// Helper function to create a temporary Parquet file with sample data -fn create_sample_parquet_file() -> Result { - use std::fs::File; - - // Create a sample table with an array column - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new( - "tags", - DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), - true, - ), - ])); - - // Create sample data - let mut id_builder = arrow::array::Int32Builder::new(); - let mut tags_builder = ListBuilder::new(StringBuilder::new()); - - // Row 0: id=1, tags=["rust", "performance"] - id_builder.append_value(1); - tags_builder.values().append_value("rust"); - tags_builder.values().append_value("performance"); - tags_builder.append(true); - - // Row 1: id=2, tags=["python", "javascript"] - id_builder.append_value(2); - tags_builder.values().append_value("python"); - tags_builder.values().append_value("javascript"); - tags_builder.append(true); - - // Row 2: id=3, tags=["rust", "webassembly"] - id_builder.append_value(3); - tags_builder.values().append_value("rust"); - tags_builder.values().append_value("webassembly"); - tags_builder.append(true); - - // Row 3: id=4, tags=["go", "system"] - id_builder.append_value(4); - tags_builder.values().append_value("go"); - tags_builder.values().append_value("system"); - tags_builder.append(true); - - let batch = arrow::record_batch::RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(id_builder.finish()), - Arc::new(tags_builder.finish()), - ], - )?; - - // Create a temporary file - let temp_file = tempfile::Builder::new() - .suffix(".parquet") - .tempfile() - .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; - - // Write the batch to a Parquet file - let file = File::create(temp_file.path()) - .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; - let mut writer = ArrowWriter::try_new(file, schema, None)?; - writer.write(&batch)?; - writer.close()?; - - Ok(temp_file) -} - -/// Helper function to print both logical and physical plans -async fn print_logical_and_physical_plans( - ctx: &SessionContext, - df: &DataFrame, -) -> Result<()> { - // Get the logical plan - let logical_plan = df.logical_plan(); - println!("Logical plan:\n{logical_plan:#?}\n"); - - // Create the physical plan - let physical_plan = ctx.state().create_physical_plan(logical_plan).await?; - println!( - "Physical plan:\n{}\n", - displayable(physical_plan.as_ref()).indent(true) - ); - - Ok(()) -} diff --git a/datafusion-examples/examples/query_planning/main.rs b/datafusion-examples/examples/query_planning/main.rs index 5cd0e739bbb31..247f468735359 100644 --- a/datafusion-examples/examples/query_planning/main.rs +++ b/datafusion-examples/examples/query_planning/main.rs @@ -21,13 +21,12 @@ //! //! ## Usage //! ```bash -//! cargo run --example query_planning -- [all|analyzer_rule|array_functions|expr_api|optimizer_rule|parse_sql_expr|plan_to_sql|planner_api|pruning|thread_pools] +//! cargo run --example query_planning -- [all|analyzer_rule|expr_api|optimizer_rule|parse_sql_expr|plan_to_sql|planner_api|pruning|thread_pools] //! ``` //! //! Each subcommand runs a corresponding example: //! - `all` — run all examples included in this module //! - `analyzer_rule` — use a custom AnalyzerRule to change a query's semantics (row level access control) -//! - `array_functions` — show logical and physical plans for queries with array_has, array_has_all, array_has_any //! - `expr_api` — create, execute, simplify, analyze and coerce `Expr`s //! - `optimizer_rule` — use a custom OptimizerRule to replace certain predicates //! - `parse_sql_expr` — parse SQL text into DataFusion `Expr` @@ -37,7 +36,6 @@ //! - `thread_pools` — demonstrate TrackConsumersPool for memory tracking and debugging with enhanced error messages and shows how to implement memory-aware ExecutionPlan with memory reservation and spilling mod analyzer_rule; -mod array_functions; mod expr_api; mod optimizer_rule; mod parse_sql_expr; @@ -55,7 +53,6 @@ use strum_macros::{Display, EnumIter, EnumString, VariantNames}; enum ExampleKind { All, AnalyzerRule, - ArrayFunctions, ExprApi, OptimizerRule, ParseSqlExpr, @@ -81,9 +78,6 @@ impl ExampleKind { } } ExampleKind::AnalyzerRule => analyzer_rule::analyzer_rule().await?, - ExampleKind::ArrayFunctions => { - array_functions::array_functions_physical_plans().await? - } ExampleKind::ExprApi => expr_api::expr_api().await?, ExampleKind::OptimizerRule => optimizer_rule::optimizer_rule().await?, ExampleKind::ParseSqlExpr => parse_sql_expr::parse_sql_expr().await?, From 2007e47a2f0e01458801c1e09377baed2bb51f86 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 19:37:32 +0800 Subject: [PATCH 17/22] test: add array function predicate pushdown tests for Parquet files --- .../core/tests/parquet/filter_pushdown.rs | 250 ++++++++++++++++++ 1 file changed, 250 insertions(+) diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index e3a191ee9ade2..b16ac02afb199 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -731,3 +731,253 @@ impl PredicateCacheTest { Ok(()) } } + +/// Tests for array function predicate pushdown to Parquet files. +/// These tests verify that array_has, array_has_all, and array_has_any predicates +/// are correctly pushed down to the DataSourceExec node in the physical plan. +mod array_function_pushdown_tests { + use super::*; + use arrow::array::{ListBuilder, StringBuilder}; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::physical_plan::displayable; + use datafusion_common::Result; + use parquet::arrow::ArrowWriter; + use std::fs::File; + use std::sync::Arc; + use tempfile::NamedTempFile; + + /// Create a temporary Parquet file with array column data for testing + fn create_test_parquet_file() -> Result { + let item_field = Arc::new(Field::new("item", DataType::Utf8, true)); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("tags", DataType::List(item_field), true), + ])); + + // Create sample data + let mut id_builder = arrow::array::Int32Builder::new(); + let mut tags_builder = ListBuilder::new(StringBuilder::new()); + + // Row 0: id=1, tags=["rust", "performance"] + id_builder.append_value(1); + tags_builder.values().append_value("rust"); + tags_builder.values().append_value("performance"); + tags_builder.append(true); + + // Row 1: id=2, tags=["python", "javascript"] + id_builder.append_value(2); + tags_builder.values().append_value("python"); + tags_builder.values().append_value("javascript"); + tags_builder.append(true); + + // Row 2: id=3, tags=["rust", "webassembly"] + id_builder.append_value(3); + tags_builder.values().append_value("rust"); + tags_builder.values().append_value("webassembly"); + tags_builder.append(true); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(id_builder.finish()), + Arc::new(tags_builder.finish()), + ], + )?; + + // Create temporary Parquet file + let temp_file = tempfile::Builder::new().suffix(".parquet").tempfile()?; + + let file = File::create(temp_file.path())?; + let mut writer = ArrowWriter::try_new(file, schema, None)?; + writer.write(&batch)?; + writer.close()?; + + Ok(temp_file) + } + + /// Helper to verify that a predicate appears in the DataSourceExec node, + /// indicating successful pushdown to the Parquet decoder. + fn assert_predicate_pushed_down( + physical_plan_display: &str, + expected_predicate: &str, + ) { + // Check for DataSourceExec with the predicate + assert!( + physical_plan_display.contains("DataSourceExec"), + "Physical plan should contain DataSourceExec:\n{physical_plan_display}" + ); + assert!( + physical_plan_display.contains(&format!("predicate={expected_predicate}")), + "DataSourceExec should contain predicate '{expected_predicate}':\n{physical_plan_display}" + ); + } + + #[tokio::test] + async fn test_array_has_pushdown_in_physical_plan() { + let ctx = SessionContext::new(); + let parquet_file = create_test_parquet_file().unwrap(); + + ctx.register_parquet( + "test_table", + parquet_file.path().to_str().unwrap(), + Default::default(), + ) + .await + .unwrap(); + + let df = ctx + .sql("SELECT * FROM test_table WHERE array_has(tags, 'rust')") + .await + .unwrap(); + + let physical_plan = ctx + .state() + .create_physical_plan(df.logical_plan()) + .await + .unwrap(); + + let plan_display = + format!("{}", displayable(physical_plan.as_ref()).indent(true)); + + // Verify pushdown appears in physical plan + assert_predicate_pushed_down(&plan_display, "array_has(tags@1, rust)"); + } + + #[tokio::test] + async fn test_array_has_all_pushdown_in_physical_plan() { + let ctx = SessionContext::new(); + let parquet_file = create_test_parquet_file().unwrap(); + + ctx.register_parquet( + "test_table", + parquet_file.path().to_str().unwrap(), + Default::default(), + ) + .await + .unwrap(); + + let df = ctx + .sql( + "SELECT * FROM test_table WHERE array_has_all(tags, ['rust', 'performance'])", + ) + .await + .unwrap(); + + let physical_plan = ctx + .state() + .create_physical_plan(df.logical_plan()) + .await + .unwrap(); + + let plan_display = + format!("{}", displayable(physical_plan.as_ref()).indent(true)); + + // Verify pushdown appears in physical plan + assert_predicate_pushed_down( + &plan_display, + "array_has_all(tags@1, [rust, performance])", + ); + } + + #[tokio::test] + async fn test_array_has_any_pushdown_in_physical_plan() { + let ctx = SessionContext::new(); + let parquet_file = create_test_parquet_file().unwrap(); + + ctx.register_parquet( + "test_table", + parquet_file.path().to_str().unwrap(), + Default::default(), + ) + .await + .unwrap(); + + let df = ctx + .sql("SELECT * FROM test_table WHERE array_has_any(tags, ['python', 'go'])") + .await + .unwrap(); + + let physical_plan = ctx + .state() + .create_physical_plan(df.logical_plan()) + .await + .unwrap(); + + let plan_display = + format!("{}", displayable(physical_plan.as_ref()).indent(true)); + + // Verify pushdown appears in physical plan + assert_predicate_pushed_down( + &plan_display, + "array_has_any(tags@1, [python, go])", + ); + } + + #[tokio::test] + async fn test_complex_predicate_pushdown_in_physical_plan() { + let ctx = SessionContext::new(); + let parquet_file = create_test_parquet_file().unwrap(); + + ctx.register_parquet( + "test_table", + parquet_file.path().to_str().unwrap(), + Default::default(), + ) + .await + .unwrap(); + + let df = ctx + .sql("SELECT * FROM test_table WHERE array_has_all(tags, ['rust']) OR array_has_any(tags, ['python', 'go'])") + .await + .unwrap(); + + let physical_plan = ctx + .state() + .create_physical_plan(df.logical_plan()) + .await + .unwrap(); + + let plan_display = + format!("{}", displayable(physical_plan.as_ref()).indent(true)); + + // Verify the complex predicate is pushed down + assert_predicate_pushed_down( + &plan_display, + "array_has_all(tags@1, [rust]) OR array_has_any(tags@1, [python, go])", + ); + } + + #[tokio::test] + async fn test_array_function_with_other_predicates_pushdown() { + let ctx = SessionContext::new(); + let parquet_file = create_test_parquet_file().unwrap(); + + ctx.register_parquet( + "test_table", + parquet_file.path().to_str().unwrap(), + Default::default(), + ) + .await + .unwrap(); + + let df = ctx + .sql("SELECT * FROM test_table WHERE id > 1 AND array_has(tags, 'rust')") + .await + .unwrap(); + + let physical_plan = ctx + .state() + .create_physical_plan(df.logical_plan()) + .await + .unwrap(); + + let plan_display = + format!("{}", displayable(physical_plan.as_ref()).indent(true)); + + // Verify both predicates are pushed down + assert_predicate_pushed_down( + &plan_display, + "id@0 > 1 AND array_has(tags@1, rust)", + ); + } +} From e0f9cd3914a068da660a9470032f03efc0931139 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 20:14:09 +0800 Subject: [PATCH 18/22] refactor: rename NestedBehavior to NestedColumnSupport for clarity --- .../datasource-parquet/src/row_filter.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index d61354149566d..dbc3fec85e51c 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -274,7 +274,7 @@ struct PushdownChecker<'schema> { /// Indices into the file schema of columns required to evaluate the expression. required_columns: BTreeSet, /// Tracks the nested column behavior found during traversal. - nested_behavior: NestedBehavior, + nested_behavior: NestedColumnSupport, /// Whether nested list columns are supported by the predicate semantics. allow_list_columns: bool, /// The Arrow schema of the parquet file. @@ -287,7 +287,7 @@ impl<'schema> PushdownChecker<'schema> { non_primitive_columns: false, projected_columns: false, required_columns: BTreeSet::default(), - nested_behavior: NestedBehavior::PrimitiveOnly, + nested_behavior: NestedColumnSupport::PrimitiveOnly, allow_list_columns, file_schema, } @@ -311,14 +311,14 @@ impl<'schema> PushdownChecker<'schema> { if is_supported { // Update to ListsSupported if we haven't found unsupported types yet - if self.nested_behavior == NestedBehavior::PrimitiveOnly { - self.nested_behavior = NestedBehavior::ListsSupported; + if self.nested_behavior == NestedColumnSupport::PrimitiveOnly { + self.nested_behavior = NestedColumnSupport::ListsSupported; } } else { // Block pushdown for unsupported nested types: // - Structs (regardless of predicate support) // - Lists without supported predicates - self.nested_behavior = NestedBehavior::Unsupported; + self.nested_behavior = NestedColumnSupport::Unsupported; self.non_primitive_columns = true; return Some(TreeNodeRecursion::Jump); } @@ -357,7 +357,7 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> { /// This enum makes explicit the different states a predicate can be in /// with respect to nested column handling during Parquet decoding. #[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum NestedBehavior { +enum NestedColumnSupport { /// Expression references only primitive (non-nested) columns. /// These can always be pushed down to the Parquet decoder. PrimitiveOnly, @@ -374,7 +374,7 @@ enum NestedBehavior { #[derive(Debug)] struct PushdownColumns { required_columns: BTreeSet, - nested: NestedBehavior, + nested: NestedColumnSupport, } /// Checks if a given expression can be pushed down to the parquet decoder. @@ -401,10 +401,10 @@ fn pushdown_columns( fn leaf_indices_for_roots( root_indices: &[usize], schema_descr: &SchemaDescriptor, - nested: NestedBehavior, + nested: NestedColumnSupport, ) -> Vec { // For primitive-only columns, root indices ARE the leaf indices - if nested == NestedBehavior::PrimitiveOnly { + if nested == NestedColumnSupport::PrimitiveOnly { return root_indices.to_vec(); } From df94e30b95bd271abb7e4beba9e9ce85bbe3f8e9 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 20:17:52 +0800 Subject: [PATCH 19/22] Refactor nested type handling and simplify checks Extract handle_nested_type() to encapsulate logic for determining if a nested type prevents pushdown. Introduce is_nested_type_supported() to isolate type checking for List/LargeList/FixedSizeList and predicate support. Simplify check_single_column() by reducing nesting depth and delegating nested type logic to helper methods. --- .../datasource-parquet/src/row_filter.rs | 79 +++++++++++-------- 1 file changed, 47 insertions(+), 32 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index dbc3fec85e51c..733143acb9e42 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -294,42 +294,57 @@ impl<'schema> PushdownChecker<'schema> { } fn check_single_column(&mut self, column_name: &str) -> Option { - if let Ok(idx) = self.file_schema.index_of(column_name) { - self.required_columns.insert(idx); - if DataType::is_nested(self.file_schema.field(idx).data_type()) { - // Check if this is a list type - let is_list = matches!( - self.file_schema.field(idx).data_type(), - DataType::List(_) - | DataType::LargeList(_) - | DataType::FixedSizeList(_, _) - ); - - // List columns are only supported if the expression contains - // supported list predicates (e.g., array_has_all) - let is_supported = self.allow_list_columns && is_list; - - if is_supported { - // Update to ListsSupported if we haven't found unsupported types yet - if self.nested_behavior == NestedColumnSupport::PrimitiveOnly { - self.nested_behavior = NestedColumnSupport::ListsSupported; - } - } else { - // Block pushdown for unsupported nested types: - // - Structs (regardless of predicate support) - // - Lists without supported predicates - self.nested_behavior = NestedColumnSupport::Unsupported; - self.non_primitive_columns = true; - return Some(TreeNodeRecursion::Jump); - } + let idx = match self.file_schema.index_of(column_name) { + Ok(idx) => idx, + Err(_) => { + // Column does not exist in the file schema, so we can't push this down. + self.projected_columns = true; + return Some(TreeNodeRecursion::Jump); } + }; + + self.required_columns.insert(idx); + let data_type = self.file_schema.field(idx).data_type(); + + if DataType::is_nested(data_type) { + self.handle_nested_type(data_type) + } else { + None + } + } + + /// Determines whether a nested data type can be pushed down to Parquet decoding. + /// + /// Returns `Some(TreeNodeRecursion::Jump)` if the nested type prevents pushdown, + /// `None` if the type is supported and pushdown can continue. + fn handle_nested_type(&mut self, data_type: &DataType) -> Option { + if self.is_nested_type_supported(data_type) { + // Update to ListsSupported if we haven't encountered unsupported types yet + if self.nested_behavior == NestedColumnSupport::PrimitiveOnly { + self.nested_behavior = NestedColumnSupport::ListsSupported; + } + None } else { - // Column does not exist in the file schema, so we can't push this down. - self.projected_columns = true; - return Some(TreeNodeRecursion::Jump); + // Block pushdown for unsupported nested types: + // - Structs (regardless of predicate support) + // - Lists without supported predicates + self.nested_behavior = NestedColumnSupport::Unsupported; + self.non_primitive_columns = true; + Some(TreeNodeRecursion::Jump) } + } - None + /// Checks if a nested data type is supported for list column pushdown. + /// + /// List columns are only supported if: + /// 1. The data type is a list variant (List, LargeList, or FixedSizeList) + /// 2. The expression contains supported list predicates (e.g., array_has_all) + fn is_nested_type_supported(&self, data_type: &DataType) -> bool { + let is_list = matches!( + data_type, + DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _) + ); + self.allow_list_columns && is_list } #[inline] From 13d685bbc92a195f73854e4b9ab1e52be7c5034a Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 20:19:13 +0800 Subject: [PATCH 20/22] refactor: rename ProjectionColumns to LeafProjection for clarity --- datafusion/datasource-parquet/src/row_filter.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 733143acb9e42..d41008a9b47c3 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -189,7 +189,7 @@ pub(crate) struct FilterCandidate { /// Can this filter use an index (e.g. a page index) to prune rows? can_use_index: bool, /// Column indices into the parquet file schema required to evaluate this filter. - projection: ProjectionColumns, + projection: LeafProjection, /// The Arrow schema containing only the columns required by this filter, /// projected from the file's Arrow schema. filter_schema: SchemaRef, @@ -197,7 +197,7 @@ pub(crate) struct FilterCandidate { /// Tracks the projection of an expression in both root and leaf coordinates. #[derive(Debug, Clone)] -struct ProjectionColumns { +struct LeafProjection { /// Leaf column indices in the Parquet schema descriptor. leaf_indices: Vec, } @@ -253,7 +253,7 @@ impl FilterCandidateBuilder { expr: self.expr, required_bytes, can_use_index, - projection: ProjectionColumns { leaf_indices }, + projection: LeafProjection { leaf_indices }, filter_schema: projected_schema, })) } From 904c0d9cd5d4dc89ce3aa5f7521d5f365d9750a0 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 20:34:03 +0800 Subject: [PATCH 21/22] refactor: remove redundant array function predicate pushdown tests --- .../core/tests/parquet/filter_pushdown.rs | 250 ------------------ 1 file changed, 250 deletions(-) diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index b16ac02afb199..e3a191ee9ade2 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -731,253 +731,3 @@ impl PredicateCacheTest { Ok(()) } } - -/// Tests for array function predicate pushdown to Parquet files. -/// These tests verify that array_has, array_has_all, and array_has_any predicates -/// are correctly pushed down to the DataSourceExec node in the physical plan. -mod array_function_pushdown_tests { - use super::*; - use arrow::array::{ListBuilder, StringBuilder}; - use arrow::datatypes::{DataType, Field, Schema}; - use datafusion::physical_plan::displayable; - use datafusion_common::Result; - use parquet::arrow::ArrowWriter; - use std::fs::File; - use std::sync::Arc; - use tempfile::NamedTempFile; - - /// Create a temporary Parquet file with array column data for testing - fn create_test_parquet_file() -> Result { - let item_field = Arc::new(Field::new("item", DataType::Utf8, true)); - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("tags", DataType::List(item_field), true), - ])); - - // Create sample data - let mut id_builder = arrow::array::Int32Builder::new(); - let mut tags_builder = ListBuilder::new(StringBuilder::new()); - - // Row 0: id=1, tags=["rust", "performance"] - id_builder.append_value(1); - tags_builder.values().append_value("rust"); - tags_builder.values().append_value("performance"); - tags_builder.append(true); - - // Row 1: id=2, tags=["python", "javascript"] - id_builder.append_value(2); - tags_builder.values().append_value("python"); - tags_builder.values().append_value("javascript"); - tags_builder.append(true); - - // Row 2: id=3, tags=["rust", "webassembly"] - id_builder.append_value(3); - tags_builder.values().append_value("rust"); - tags_builder.values().append_value("webassembly"); - tags_builder.append(true); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(id_builder.finish()), - Arc::new(tags_builder.finish()), - ], - )?; - - // Create temporary Parquet file - let temp_file = tempfile::Builder::new().suffix(".parquet").tempfile()?; - - let file = File::create(temp_file.path())?; - let mut writer = ArrowWriter::try_new(file, schema, None)?; - writer.write(&batch)?; - writer.close()?; - - Ok(temp_file) - } - - /// Helper to verify that a predicate appears in the DataSourceExec node, - /// indicating successful pushdown to the Parquet decoder. - fn assert_predicate_pushed_down( - physical_plan_display: &str, - expected_predicate: &str, - ) { - // Check for DataSourceExec with the predicate - assert!( - physical_plan_display.contains("DataSourceExec"), - "Physical plan should contain DataSourceExec:\n{physical_plan_display}" - ); - assert!( - physical_plan_display.contains(&format!("predicate={expected_predicate}")), - "DataSourceExec should contain predicate '{expected_predicate}':\n{physical_plan_display}" - ); - } - - #[tokio::test] - async fn test_array_has_pushdown_in_physical_plan() { - let ctx = SessionContext::new(); - let parquet_file = create_test_parquet_file().unwrap(); - - ctx.register_parquet( - "test_table", - parquet_file.path().to_str().unwrap(), - Default::default(), - ) - .await - .unwrap(); - - let df = ctx - .sql("SELECT * FROM test_table WHERE array_has(tags, 'rust')") - .await - .unwrap(); - - let physical_plan = ctx - .state() - .create_physical_plan(df.logical_plan()) - .await - .unwrap(); - - let plan_display = - format!("{}", displayable(physical_plan.as_ref()).indent(true)); - - // Verify pushdown appears in physical plan - assert_predicate_pushed_down(&plan_display, "array_has(tags@1, rust)"); - } - - #[tokio::test] - async fn test_array_has_all_pushdown_in_physical_plan() { - let ctx = SessionContext::new(); - let parquet_file = create_test_parquet_file().unwrap(); - - ctx.register_parquet( - "test_table", - parquet_file.path().to_str().unwrap(), - Default::default(), - ) - .await - .unwrap(); - - let df = ctx - .sql( - "SELECT * FROM test_table WHERE array_has_all(tags, ['rust', 'performance'])", - ) - .await - .unwrap(); - - let physical_plan = ctx - .state() - .create_physical_plan(df.logical_plan()) - .await - .unwrap(); - - let plan_display = - format!("{}", displayable(physical_plan.as_ref()).indent(true)); - - // Verify pushdown appears in physical plan - assert_predicate_pushed_down( - &plan_display, - "array_has_all(tags@1, [rust, performance])", - ); - } - - #[tokio::test] - async fn test_array_has_any_pushdown_in_physical_plan() { - let ctx = SessionContext::new(); - let parquet_file = create_test_parquet_file().unwrap(); - - ctx.register_parquet( - "test_table", - parquet_file.path().to_str().unwrap(), - Default::default(), - ) - .await - .unwrap(); - - let df = ctx - .sql("SELECT * FROM test_table WHERE array_has_any(tags, ['python', 'go'])") - .await - .unwrap(); - - let physical_plan = ctx - .state() - .create_physical_plan(df.logical_plan()) - .await - .unwrap(); - - let plan_display = - format!("{}", displayable(physical_plan.as_ref()).indent(true)); - - // Verify pushdown appears in physical plan - assert_predicate_pushed_down( - &plan_display, - "array_has_any(tags@1, [python, go])", - ); - } - - #[tokio::test] - async fn test_complex_predicate_pushdown_in_physical_plan() { - let ctx = SessionContext::new(); - let parquet_file = create_test_parquet_file().unwrap(); - - ctx.register_parquet( - "test_table", - parquet_file.path().to_str().unwrap(), - Default::default(), - ) - .await - .unwrap(); - - let df = ctx - .sql("SELECT * FROM test_table WHERE array_has_all(tags, ['rust']) OR array_has_any(tags, ['python', 'go'])") - .await - .unwrap(); - - let physical_plan = ctx - .state() - .create_physical_plan(df.logical_plan()) - .await - .unwrap(); - - let plan_display = - format!("{}", displayable(physical_plan.as_ref()).indent(true)); - - // Verify the complex predicate is pushed down - assert_predicate_pushed_down( - &plan_display, - "array_has_all(tags@1, [rust]) OR array_has_any(tags@1, [python, go])", - ); - } - - #[tokio::test] - async fn test_array_function_with_other_predicates_pushdown() { - let ctx = SessionContext::new(); - let parquet_file = create_test_parquet_file().unwrap(); - - ctx.register_parquet( - "test_table", - parquet_file.path().to_str().unwrap(), - Default::default(), - ) - .await - .unwrap(); - - let df = ctx - .sql("SELECT * FROM test_table WHERE id > 1 AND array_has(tags, 'rust')") - .await - .unwrap(); - - let physical_plan = ctx - .state() - .create_physical_plan(df.logical_plan()) - .await - .unwrap(); - - let plan_display = - format!("{}", displayable(physical_plan.as_ref()).indent(true)); - - // Verify both predicates are pushed down - assert_predicate_pushed_down( - &plan_display, - "id@0 > 1 AND array_has(tags@1, rust)", - ); - } -} From a5bab42c5727108b7ead2ddc238bc12e73780baa Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 29 Dec 2025 20:34:15 +0800 Subject: [PATCH 22/22] test: add array function predicate pushdown slt tests for DataSourceExec --- .../test_files/parquet_filter_pushdown.slt | 111 ++++++++++++++++++ 1 file changed, 111 insertions(+) diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 8bb79d576990e..aa94e2e2f2c04 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -563,3 +563,114 @@ ORDER BY start_timestamp, trace_id LIMIT 1; ---- 2024-10-01T00:00:00 + +### +# Array function predicate pushdown tests +# These tests verify that array_has, array_has_all, and array_has_any predicates +# are correctly pushed down to the DataSourceExec node +### + +# Create test data with array columns +statement ok +COPY ( + SELECT 1 as id, ['rust', 'performance'] as tags + UNION ALL + SELECT 2 as id, ['python', 'javascript'] as tags + UNION ALL + SELECT 3 as id, ['rust', 'webassembly'] as tags +) +TO 'test_files/scratch/parquet_filter_pushdown/array_data/data.parquet'; + +statement ok +CREATE EXTERNAL TABLE array_test STORED AS PARQUET LOCATION 'test_files/scratch/parquet_filter_pushdown/array_data/'; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +# Test array_has predicate pushdown +query I? +SELECT id, tags FROM array_test WHERE array_has(tags, 'rust') ORDER BY id; +---- +1 [rust, performance] +3 [rust, webassembly] + +query TT +EXPLAIN SELECT id, tags FROM array_test WHERE array_has(tags, 'rust') ORDER BY id; +---- +logical_plan +01)Sort: array_test.id ASC NULLS LAST +02)--Filter: array_has(array_test.tags, Utf8("rust")) +03)----TableScan: array_test projection=[id, tags], partial_filters=[array_has(array_test.tags, Utf8("rust"))] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=array_has(tags@1, rust) + +# Test array_has_all predicate pushdown +query I? +SELECT id, tags FROM array_test WHERE array_has_all(tags, ['rust', 'performance']) ORDER BY id; +---- +1 [rust, performance] + +query TT +EXPLAIN SELECT id, tags FROM array_test WHERE array_has_all(tags, ['rust', 'performance']) ORDER BY id; +---- +logical_plan +01)Sort: array_test.id ASC NULLS LAST +02)--Filter: array_has_all(array_test.tags, List([rust, performance])) +03)----TableScan: array_test projection=[id, tags], partial_filters=[array_has_all(array_test.tags, List([rust, performance]))] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=array_has_all(tags@1, [rust, performance]) + +# Test array_has_any predicate pushdown +query I? +SELECT id, tags FROM array_test WHERE array_has_any(tags, ['python', 'go']) ORDER BY id; +---- +2 [python, javascript] + +query TT +EXPLAIN SELECT id, tags FROM array_test WHERE array_has_any(tags, ['python', 'go']) ORDER BY id; +---- +logical_plan +01)Sort: array_test.id ASC NULLS LAST +02)--Filter: array_has_any(array_test.tags, List([python, go])) +03)----TableScan: array_test projection=[id, tags], partial_filters=[array_has_any(array_test.tags, List([python, go]))] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=array_has_any(tags@1, [python, go]) + +# Test complex predicate with OR +query I? +SELECT id, tags FROM array_test WHERE array_has_all(tags, ['rust']) OR array_has_any(tags, ['python', 'go']) ORDER BY id; +---- +1 [rust, performance] +2 [python, javascript] +3 [rust, webassembly] + +query TT +EXPLAIN SELECT id, tags FROM array_test WHERE array_has_all(tags, ['rust']) OR array_has_any(tags, ['python', 'go']) ORDER BY id; +---- +logical_plan +01)Sort: array_test.id ASC NULLS LAST +02)--Filter: array_has_all(array_test.tags, List([rust])) OR array_has_any(array_test.tags, List([python, go])) +03)----TableScan: array_test projection=[id, tags], partial_filters=[array_has_all(array_test.tags, List([rust])) OR array_has_any(array_test.tags, List([python, go]))] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=array_has_all(tags@1, [rust]) OR array_has_any(tags@1, [python, go]) + +# Test array function with other predicates +query I? +SELECT id, tags FROM array_test WHERE id > 1 AND array_has(tags, 'rust') ORDER BY id; +---- +3 [rust, webassembly] + +query TT +EXPLAIN SELECT id, tags FROM array_test WHERE id > 1 AND array_has(tags, 'rust') ORDER BY id; +---- +logical_plan +01)Sort: array_test.id ASC NULLS LAST +02)--Filter: array_test.id > Int64(1) AND array_has(array_test.tags, Utf8("rust")) +03)----TableScan: array_test projection=[id, tags], partial_filters=[array_test.id > Int64(1), array_has(array_test.tags, Utf8("rust"))] +physical_plan +01)SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/array_data/data.parquet]]}, projection=[id, tags], file_type=parquet, predicate=id@0 > 1 AND array_has(tags@1, rust), pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[]