Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,16 @@ incremental = false
inherits = "release"
debug = true
strip = false

# [patch.crates-io]
# parquet = { path = "../arrow-rs/parquet" }
# arrow = { path = "../arrow-rs/arrow" }
# arrow-array = { path = "../arrow-rs/arrow-array" }
# arrow-buffer = { path = "../arrow-rs/arrow-buffer" }
# arrow-cast = { path = "../arrow-rs/arrow-cast" }
# arrow-data = { path = "../arrow-rs/arrow-data" }
# arrow-ipc = { path = "../arrow-rs/arrow-ipc" }
# arrow-ord = { path = "../arrow-rs/arrow-ord" }
# arrow-schema = { path = "../arrow-rs/arrow-schema" }
# arrow-select = { path = "../arrow-rs/arrow-select" }
# arrow-string = { path = "../arrow-rs/arrow-string" }
1 change: 1 addition & 0 deletions datafusion/datasource-parquet/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub use metrics::ParquetFileMetrics;
pub use page_filter::PagePruningAccessPlanFilter;
pub use reader::*; // Expose so downstream crates can use it
pub use row_filter::build_row_filter;
#[expect(deprecated)]
pub use row_filter::can_expr_be_pushed_down_with_schemas;
pub use row_group_filter::RowGroupAccessPlanFilter;
pub use writer::plan_to_parquet;
193 changes: 17 additions & 176 deletions datafusion/datasource-parquet/src/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,20 @@
//! still be sorted by size.

use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::sync::Arc;

use arrow::array::BooleanArray;
use arrow::datatypes::{DataType, Schema, SchemaRef};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use itertools::Itertools;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::file::metadata::ParquetMetaData;

use datafusion_common::Result;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
use datafusion_physical_expr::{PhysicalExpr, split_conjunction};

use datafusion_physical_plan::metrics;
Expand Down Expand Up @@ -212,11 +210,12 @@ impl FilterCandidateBuilder {
/// * `Ok(None)` if the expression cannot be used as an ArrowFilter
/// * `Err(e)` if an error occurs while building the candidate
pub fn build(self, metadata: &ParquetMetaData) -> Result<Option<FilterCandidate>> {
let Some(required_column_indices) =
pushdown_columns(&self.expr, &self.file_schema)?
else {
return Ok(None);
};
let required_column_indices = collect_columns(&self.expr)
.into_iter()
.map(|c| c.index())
.sorted_unstable()
.dedup()
.collect_vec();

let projected_schema =
Arc::new(self.file_schema.project(&required_column_indices)?);
Expand All @@ -234,87 +233,6 @@ impl FilterCandidateBuilder {
}
}

/// Traverses a `PhysicalExpr` tree to determine if any column references would
/// prevent the expression from being pushed down to the parquet decoder.
///
/// An expression cannot be pushed down if it references:
/// - Non-primitive columns (like structs or lists)
/// - Columns that don't exist in the file schema
struct PushdownChecker<'schema> {
/// Does the expression require any non-primitive columns (like structs)?
non_primitive_columns: bool,
/// Does the expression reference any columns not present in the file schema?
projected_columns: bool,
/// Indices into the file schema of columns required to evaluate the expression.
required_columns: BTreeSet<usize>,
/// The Arrow schema of the parquet file.
file_schema: &'schema Schema,
}

impl<'schema> PushdownChecker<'schema> {
fn new(file_schema: &'schema Schema) -> Self {
Self {
non_primitive_columns: false,
projected_columns: false,
required_columns: BTreeSet::default(),
file_schema,
}
}

fn check_single_column(&mut self, column_name: &str) -> Option<TreeNodeRecursion> {
if let Ok(idx) = self.file_schema.index_of(column_name) {
self.required_columns.insert(idx);
if DataType::is_nested(self.file_schema.field(idx).data_type()) {
self.non_primitive_columns = true;
return Some(TreeNodeRecursion::Jump);
}
} else {
// Column does not exist in the file schema, so we can't push this down.
self.projected_columns = true;
return Some(TreeNodeRecursion::Jump);
}

None
}

#[inline]
fn prevents_pushdown(&self) -> bool {
self.non_primitive_columns || self.projected_columns
}
}

impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
type Node = Arc<dyn PhysicalExpr>;

fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
if let Some(column) = node.as_any().downcast_ref::<Column>()
&& let Some(recursion) = self.check_single_column(column.name())
{
return Ok(recursion);
}

Ok(TreeNodeRecursion::Continue)
}
}

/// Checks if a given expression can be pushed down to the parquet decoder.
///
/// Returns `Some(column_indices)` if the expression can be pushed down,
/// where `column_indices` are the indices into the file schema of all columns
/// required to evaluate the expression.
///
/// Returns `None` if the expression cannot be pushed down (e.g., references
/// non-primitive types or columns not in the file).
fn pushdown_columns(
expr: &Arc<dyn PhysicalExpr>,
file_schema: &Schema,
) -> Result<Option<Vec<usize>>> {
let mut checker = PushdownChecker::new(file_schema);
expr.visit(&mut checker)?;
Ok((!checker.prevents_pushdown())
.then_some(checker.required_columns.into_iter().collect()))
}

/// Checks if a predicate expression can be pushed down to the parquet decoder.
///
/// Returns `true` if all columns referenced by the expression:
Expand All @@ -325,14 +243,15 @@ fn pushdown_columns(
/// * `expr` - The filter expression to check
/// * `file_schema` - The Arrow schema of the parquet file (or table schema when
/// the file schema is not yet available during planning)
#[deprecated(
since = "52.0.0",
note = "Parquet accepts arbitrary expressions for pushdown now; this function will always return true"
)]
pub fn can_expr_be_pushed_down_with_schemas(
expr: &Arc<dyn PhysicalExpr>,
file_schema: &Schema,
_expr: &Arc<dyn PhysicalExpr>,
_file_schema: &Schema,
) -> bool {
match pushdown_columns(expr, file_schema) {
Ok(Some(_)) => true,
Ok(None) | Err(_) => false,
}
true
}

/// Calculate the total compressed size of all `Column`'s required for
Expand Down Expand Up @@ -462,6 +381,7 @@ pub fn build_row_filter(
#[cfg(test)]
mod test {
use super::*;
use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;

use arrow::datatypes::{Field, TimeUnit::Nanosecond};
Expand Down Expand Up @@ -588,83 +508,4 @@ mod test {
let filtered = row_filter.evaluate(first_rb);
assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true; 8])));
}

#[test]
fn nested_data_structures_prevent_pushdown() {
let table_schema = Arc::new(get_lists_table_schema());

let expr = col("utf8_list").is_not_null();
let expr = logical2physical(&expr, &table_schema);
check_expression_can_evaluate_against_schema(&expr, &table_schema);

assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}

#[test]
fn projected_columns_prevent_pushdown() {
let table_schema = get_basic_table_schema();

let expr =
Arc::new(Column::new("nonexistent_column", 0)) as Arc<dyn PhysicalExpr>;

assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}

#[test]
fn basic_expr_doesnt_prevent_pushdown() {
let table_schema = get_basic_table_schema();

let expr = col("string_col").is_null();
let expr = logical2physical(&expr, &table_schema);

assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}

#[test]
fn complex_expr_doesnt_prevent_pushdown() {
let table_schema = get_basic_table_schema();

let expr = col("string_col")
.is_not_null()
.or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)), None)));
let expr = logical2physical(&expr, &table_schema);

assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}

fn get_basic_table_schema() -> Schema {
let testdata = datafusion_common::test_util::parquet_test_data();
let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
.expect("opening file");

let reader = SerializedFileReader::new(file).expect("creating reader");

let metadata = reader.metadata();

parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema")
}

fn get_lists_table_schema() -> Schema {
let testdata = datafusion_common::test_util::parquet_test_data();
let file = std::fs::File::open(format!("{testdata}/list_columns.parquet"))
.expect("opening file");

let reader = SerializedFileReader::new(file).expect("creating reader");

let metadata = reader.metadata();

parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema")
}

/// Sanity check that the given expression could be evaluated against the given schema without any errors.
/// This will fail if the expression references columns that are not in the schema or if the types of the columns are incompatible, etc.
fn check_expression_can_evaluate_against_schema(
expr: &Arc<dyn PhysicalExpr>,
table_schema: &Arc<Schema>,
) -> bool {
let batch = RecordBatch::new_empty(Arc::clone(table_schema));
expr.evaluate(&batch).is_ok()
}
}
10 changes: 1 addition & 9 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::DefaultParquetFileReaderFactory;
use crate::ParquetFileReaderFactory;
use crate::opener::ParquetOpener;
use crate::opener::build_pruning_predicates;
use crate::row_filter::can_expr_be_pushed_down_with_schemas;
use datafusion_common::config::ConfigOptions;
#[cfg(feature = "parquet_encryption")]
use datafusion_common::config::EncryptionFactoryOptions;
Expand Down Expand Up @@ -670,7 +669,6 @@ impl FileSource for ParquetSource {
filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
let table_schema = self.table_schema.table_schema();
// Determine if based on configs we should push filters down.
// If either the table / scan itself or the config has pushdown enabled,
// we will push down the filters.
Expand All @@ -685,13 +683,7 @@ impl FileSource for ParquetSource {
let mut source = self.clone();
let filters: Vec<PushedDownPredicate> = filters
.into_iter()
.map(|filter| {
if can_expr_be_pushed_down_with_schemas(&filter, table_schema) {
PushedDownPredicate::supported(filter)
} else {
PushedDownPredicate::unsupported(filter)
}
})
.map(PushedDownPredicate::supported)
.collect();
if filters
.iter()
Expand Down
43 changes: 43 additions & 0 deletions datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
Original file line number Diff line number Diff line change
Expand Up @@ -563,3 +563,46 @@ ORDER BY start_timestamp, trace_id
LIMIT 1;
----
2024-10-01T00:00:00

# Tests for pushdown of filters involving struct columns
statement ok
COPY (
SELECT *
FROM VALUES ({field: [{nested: 1}]}), ({field: [{nested: 2}]}) AS t(struct_col)
)
TO 'test_files/scratch/parquet_filter_pushdown/data/struct.parquet';

statement ok
DROP TABLE t1;

statement ok
CREATE EXTERNAL TABLE t1 STORED AS PARQUET LOCATION 'test_files/scratch/parquet_filter_pushdown/data/struct.parquet';

query TT
explain
select struct_col['field'][1]['nested']
from t1
where struct_col['field'][1]['nested'] > 1;
----
logical_plan
01)Projection: get_field(array_element(get_field(t1.struct_col, Utf8("field")), Int64(1)), Utf8("nested"))
02)--Filter: get_field(array_element(get_field(t1.struct_col, Utf8("field")), Int64(1)), Utf8("nested")) > Int64(1)
03)----TableScan: t1 projection=[struct_col], partial_filters=[get_field(array_element(get_field(t1.struct_col, Utf8("field")), Int64(1)), Utf8("nested")) > Int64(1)]
physical_plan
01)ProjectionExec: expr=[get_field(array_element(get_field(struct_col@0, field), 1), nested) as t1.struct_col[field][Int64(1)][nested]]
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/data/struct.parquet]]}, projection=[struct_col], file_type=parquet, predicate=get_field(array_element(get_field(struct_col@0, field), 1), nested) > 1

query ?
select struct_col
from t1
where struct_col['field'][1]['nested'] > 1;
----
{field: [{nested: 2}]}

query I
select struct_col['field'][1]['nested']
from t1
where struct_col['field'][1]['nested'] > 1;
----
2
Loading