diff --git a/Cargo.toml b/Cargo.toml index 10fc88b7057c8..56263652fbb13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -272,3 +272,16 @@ incremental = false inherits = "release" debug = true strip = false + +# [patch.crates-io] +# parquet = { path = "../arrow-rs/parquet" } +# arrow = { path = "../arrow-rs/arrow" } +# arrow-array = { path = "../arrow-rs/arrow-array" } +# arrow-buffer = { path = "../arrow-rs/arrow-buffer" } +# arrow-cast = { path = "../arrow-rs/arrow-cast" } +# arrow-data = { path = "../arrow-rs/arrow-data" } +# arrow-ipc = { path = "../arrow-rs/arrow-ipc" } +# arrow-ord = { path = "../arrow-rs/arrow-ord" } +# arrow-schema = { path = "../arrow-rs/arrow-schema" } +# arrow-select = { path = "../arrow-rs/arrow-select" } +# arrow-string = { path = "../arrow-rs/arrow-string" } diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index eb4cc9e9ad5a3..9cc4829819782 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -40,6 +40,7 @@ pub use metrics::ParquetFileMetrics; pub use page_filter::PagePruningAccessPlanFilter; pub use reader::*; // Expose so downstream crates can use it pub use row_filter::build_row_filter; +#[expect(deprecated)] pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use row_group_filter::RowGroupAccessPlanFilter; pub use writer::plan_to_parquet; diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index ba3b29be40d74..1831d19f0143d 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -60,22 +60,20 @@ //! still be sorted by size. use std::cmp::Ordering; -use std::collections::BTreeSet; use std::sync::Arc; use arrow::array::BooleanArray; -use arrow::datatypes::{DataType, Schema, SchemaRef}; +use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; +use itertools::Itertools; use parquet::arrow::ProjectionMask; use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; use parquet::file::metadata::ParquetMetaData; use datafusion_common::Result; use datafusion_common::cast::as_boolean_array; -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; -use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::utils::reassign_expr_columns; +use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; use datafusion_physical_expr::{PhysicalExpr, split_conjunction}; use datafusion_physical_plan::metrics; @@ -212,11 +210,12 @@ impl FilterCandidateBuilder { /// * `Ok(None)` if the expression cannot be used as an ArrowFilter /// * `Err(e)` if an error occurs while building the candidate pub fn build(self, metadata: &ParquetMetaData) -> Result> { - let Some(required_column_indices) = - pushdown_columns(&self.expr, &self.file_schema)? - else { - return Ok(None); - }; + let required_column_indices = collect_columns(&self.expr) + .into_iter() + .map(|c| c.index()) + .sorted_unstable() + .dedup() + .collect_vec(); let projected_schema = Arc::new(self.file_schema.project(&required_column_indices)?); @@ -234,87 +233,6 @@ impl FilterCandidateBuilder { } } -/// Traverses a `PhysicalExpr` tree to determine if any column references would -/// prevent the expression from being pushed down to the parquet decoder. -/// -/// An expression cannot be pushed down if it references: -/// - Non-primitive columns (like structs or lists) -/// - Columns that don't exist in the file schema -struct PushdownChecker<'schema> { - /// Does the expression require any non-primitive columns (like structs)? - non_primitive_columns: bool, - /// Does the expression reference any columns not present in the file schema? - projected_columns: bool, - /// Indices into the file schema of columns required to evaluate the expression. - required_columns: BTreeSet, - /// The Arrow schema of the parquet file. - file_schema: &'schema Schema, -} - -impl<'schema> PushdownChecker<'schema> { - fn new(file_schema: &'schema Schema) -> Self { - Self { - non_primitive_columns: false, - projected_columns: false, - required_columns: BTreeSet::default(), - file_schema, - } - } - - fn check_single_column(&mut self, column_name: &str) -> Option { - if let Ok(idx) = self.file_schema.index_of(column_name) { - self.required_columns.insert(idx); - if DataType::is_nested(self.file_schema.field(idx).data_type()) { - self.non_primitive_columns = true; - return Some(TreeNodeRecursion::Jump); - } - } else { - // Column does not exist in the file schema, so we can't push this down. - self.projected_columns = true; - return Some(TreeNodeRecursion::Jump); - } - - None - } - - #[inline] - fn prevents_pushdown(&self) -> bool { - self.non_primitive_columns || self.projected_columns - } -} - -impl TreeNodeVisitor<'_> for PushdownChecker<'_> { - type Node = Arc; - - fn f_down(&mut self, node: &Self::Node) -> Result { - if let Some(column) = node.as_any().downcast_ref::() - && let Some(recursion) = self.check_single_column(column.name()) - { - return Ok(recursion); - } - - Ok(TreeNodeRecursion::Continue) - } -} - -/// Checks if a given expression can be pushed down to the parquet decoder. -/// -/// Returns `Some(column_indices)` if the expression can be pushed down, -/// where `column_indices` are the indices into the file schema of all columns -/// required to evaluate the expression. -/// -/// Returns `None` if the expression cannot be pushed down (e.g., references -/// non-primitive types or columns not in the file). -fn pushdown_columns( - expr: &Arc, - file_schema: &Schema, -) -> Result>> { - let mut checker = PushdownChecker::new(file_schema); - expr.visit(&mut checker)?; - Ok((!checker.prevents_pushdown()) - .then_some(checker.required_columns.into_iter().collect())) -} - /// Checks if a predicate expression can be pushed down to the parquet decoder. /// /// Returns `true` if all columns referenced by the expression: @@ -325,14 +243,15 @@ fn pushdown_columns( /// * `expr` - The filter expression to check /// * `file_schema` - The Arrow schema of the parquet file (or table schema when /// the file schema is not yet available during planning) +#[deprecated( + since = "52.0.0", + note = "Parquet accepts arbitrary expressions for pushdown now; this function will always return true" +)] pub fn can_expr_be_pushed_down_with_schemas( - expr: &Arc, - file_schema: &Schema, + _expr: &Arc, + _file_schema: &Schema, ) -> bool { - match pushdown_columns(expr, file_schema) { - Ok(Some(_)) => true, - Ok(None) | Err(_) => false, - } + true } /// Calculate the total compressed size of all `Column`'s required for @@ -462,6 +381,7 @@ pub fn build_row_filter( #[cfg(test)] mod test { use super::*; + use arrow::datatypes::DataType; use datafusion_common::ScalarValue; use arrow::datatypes::{Field, TimeUnit::Nanosecond}; @@ -588,83 +508,4 @@ mod test { let filtered = row_filter.evaluate(first_rb); assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true; 8]))); } - - #[test] - fn nested_data_structures_prevent_pushdown() { - let table_schema = Arc::new(get_lists_table_schema()); - - let expr = col("utf8_list").is_not_null(); - let expr = logical2physical(&expr, &table_schema); - check_expression_can_evaluate_against_schema(&expr, &table_schema); - - assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); - } - - #[test] - fn projected_columns_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - - let expr = - Arc::new(Column::new("nonexistent_column", 0)) as Arc; - - assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); - } - - #[test] - fn basic_expr_doesnt_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - - let expr = col("string_col").is_null(); - let expr = logical2physical(&expr, &table_schema); - - assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); - } - - #[test] - fn complex_expr_doesnt_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - - let expr = col("string_col") - .is_not_null() - .or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)), None))); - let expr = logical2physical(&expr, &table_schema); - - assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema)); - } - - fn get_basic_table_schema() -> Schema { - let testdata = datafusion_common::test_util::parquet_test_data(); - let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet")) - .expect("opening file"); - - let reader = SerializedFileReader::new(file).expect("creating reader"); - - let metadata = reader.metadata(); - - parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) - .expect("parsing schema") - } - - fn get_lists_table_schema() -> Schema { - let testdata = datafusion_common::test_util::parquet_test_data(); - let file = std::fs::File::open(format!("{testdata}/list_columns.parquet")) - .expect("opening file"); - - let reader = SerializedFileReader::new(file).expect("creating reader"); - - let metadata = reader.metadata(); - - parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) - .expect("parsing schema") - } - - /// Sanity check that the given expression could be evaluated against the given schema without any errors. - /// This will fail if the expression references columns that are not in the schema or if the types of the columns are incompatible, etc. - fn check_expression_can_evaluate_against_schema( - expr: &Arc, - table_schema: &Arc, - ) -> bool { - let batch = RecordBatch::new_empty(Arc::clone(table_schema)); - expr.evaluate(&batch).is_ok() - } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 2e0919b1447de..018e4b2f26a8e 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -25,7 +25,6 @@ use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; use crate::opener::ParquetOpener; use crate::opener::build_pruning_predicates; -use crate::row_filter::can_expr_be_pushed_down_with_schemas; use datafusion_common::config::ConfigOptions; #[cfg(feature = "parquet_encryption")] use datafusion_common::config::EncryptionFactoryOptions; @@ -670,7 +669,6 @@ impl FileSource for ParquetSource { filters: Vec>, config: &ConfigOptions, ) -> datafusion_common::Result>> { - let table_schema = self.table_schema.table_schema(); // Determine if based on configs we should push filters down. // If either the table / scan itself or the config has pushdown enabled, // we will push down the filters. @@ -685,13 +683,7 @@ impl FileSource for ParquetSource { let mut source = self.clone(); let filters: Vec = filters .into_iter() - .map(|filter| { - if can_expr_be_pushed_down_with_schemas(&filter, table_schema) { - PushedDownPredicate::supported(filter) - } else { - PushedDownPredicate::unsupported(filter) - } - }) + .map(PushedDownPredicate::supported) .collect(); if filters .iter() diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 8bb79d576990e..632dd8399eff4 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -563,3 +563,46 @@ ORDER BY start_timestamp, trace_id LIMIT 1; ---- 2024-10-01T00:00:00 + +# Tests for pushdown of filters involving struct columns +statement ok +COPY ( + SELECT * + FROM VALUES ({field: [{nested: 1}]}), ({field: [{nested: 2}]}) AS t(struct_col) +) +TO 'test_files/scratch/parquet_filter_pushdown/data/struct.parquet'; + +statement ok +DROP TABLE t1; + +statement ok +CREATE EXTERNAL TABLE t1 STORED AS PARQUET LOCATION 'test_files/scratch/parquet_filter_pushdown/data/struct.parquet'; + +query TT +explain +select struct_col['field'][1]['nested'] +from t1 +where struct_col['field'][1]['nested'] > 1; +---- +logical_plan +01)Projection: get_field(array_element(get_field(t1.struct_col, Utf8("field")), Int64(1)), Utf8("nested")) +02)--Filter: get_field(array_element(get_field(t1.struct_col, Utf8("field")), Int64(1)), Utf8("nested")) > Int64(1) +03)----TableScan: t1 projection=[struct_col], partial_filters=[get_field(array_element(get_field(t1.struct_col, Utf8("field")), Int64(1)), Utf8("nested")) > Int64(1)] +physical_plan +01)ProjectionExec: expr=[get_field(array_element(get_field(struct_col@0, field), 1), nested) as t1.struct_col[field][Int64(1)][nested]] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/data/struct.parquet]]}, projection=[struct_col], file_type=parquet, predicate=get_field(array_element(get_field(struct_col@0, field), 1), nested) > 1 + +query ? +select struct_col +from t1 +where struct_col['field'][1]['nested'] > 1; +---- +{field: [{nested: 2}]} + +query I +select struct_col['field'][1]['nested'] +from t1 +where struct_col['field'][1]['nested'] > 1; +---- +2