-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
The current physical layer projection pushdown rule is:
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {Operators like FilterExec then treat the pushdown as all or nothing:
// If the projection does not narrow the schema, we should not try to push it down:
if projection.expr().len() < projection.input().schema().fields().len() {
// Each column in the predicate expression must exist after the projection.
if let Some(new_predicate) =
update_expr(self.predicate(), projection.expr(), false)?
{ ... {
}This poses a problem for #19387 for cases like complex_function(struct_col['field']).
I propose we resolve this via a logical optimizer rule that pushes these projections down below most physical operators.
For example, given the data:
copy (select {email: 'a@example.com', address: '123 Main St, NYC'} as user) to 'test.parquet';select lower(user['email'])
from 'test.parquet' t
where user['address'] ilike '%nyc%';Will produce a plan of the form:
Projection: lower(get_field(t.user, Utf8("email")))
Filter: get_field(t.user, Utf8("address")) ILIKE Utf8("%nyc%")
SubqueryAlias: t
TableScan: test.parquet
Ideally we want a plan of the form:
Projection: lower(t.__extracted_expr_0@0) AS "lower(get_field(t.user, 'email')"
Filter: t.__extracted_expr_0@0 ILIKE Utf8("%nyc%")
SubqueryAlias: t
Projection: get_field(user, Utf8("address")) AS "__extracted_expr_0"
TableScan: test.parquet
The point is that the expression user['address'] gets pushed down past filters, etc.
and all the way to the scan (or right above it, it gets pushed down into the actual scan in the physical layer currently but it can't have any filters, repartitions, etc. in the way). For a data source like Parquet that natively supports structs (and variant) this will allow it to optimize I/O by only reading the necessary fields.
To achieve this I propose we write a logical optimizer rule that can:
- Identify expressions we want to extract and push down
- Deduplicate the expressions being manipulated
- Handle remapping of qualifiers through table aliases
- Handle complexity of joins
- Drop column references that aren't needed anymore while persisting column references that are needed
Identifying expressions
I think we should use something like Expr::is_trivial from #19538. We can traverse the tree of expressions for each projection looking for subtrees where every node matches Expr::is_trivial. In practice I think it will look like this: let condition = |expr: &Expr| expr.is_trivial().
Pushing down through joins
For each expressions we look for sub-trees that match Expr::is_trivial and only reference columns from one side of the join. So the condition for these nodes becomes let condition = move |expr: &Expr| expr.is_trivial() && whitelist_columns(expr, join_side_columns)wherejoin_side_columns: HashMap` and is the columns from this side of the join. We push down into each side independently. We also need to deal with qualifiers and remapping them.
Table aliases
To push past a table alias we need to keep track of qualifiers and remap them. Something like https://github.com/apache/datafusion/pull/19404/changes#diff-237330fb473b7698f4abf2545c203d12e59b52e6febc3244da8a4dee5091626f PhysicalColumnRewriter may be useful.
Deduplicate expressions
If we have something like select lower(col['a']), upper(col['a']) we don't want to end up pushing down two different expressions into the scan and do the I/O twice. If we read the entire column we'd do the I/O once and then take references. So this would have to produces something like Projection: lower(__extracted_expr_0) as 'lower(col['a'])', upper(__extracted_expr_0) as 'upper(col['a'])'.
Drop unnecessary column refrences
Consider a plan like:
Projection: col['a'], other_col
Filter: col['a'] > 5
Projection: col, other_col
TableScan: test.parquet
If we transform into:
Projection: __extracted_0 as col['a'], other_col
Filter: __extracted_0 > 5
Projection col['a'] as __extracted_0
TableScan: test.parquet
We'd get an error: other_col is not projected.
So we need to preserve it. But naively:
Projection: __extracted_0 as col['a'], other_col
Filter: __extracted_0 > 5
Projection col['a'] as __extracted_0, col, other_col
TableScan: test.parquet
Then we still scan col... So we need to be intelligent about this.
Future opportunities
I think this approach could be applied to CAST operations as well: oftentimes file formats have different physical storage than our in-memory representation (e.g. Parquet only has Int32 and Int64, everything else is metadata, CSV is obviously untyped). I'm guessing it's almost always best to push casts into scans so that we can avoid reading data -> casting it to the table schema -> casting it again to the final type.
Alternatives considered: DuckDB
Here's a summary of a guided exploration of DuckDB's approach with Claude:
Overview
DuckDB implements projection pushdown for nested struct fields using a metadata-driven approach where expressions remain unchanged but the underlying data is pre-filtered at the scan level using hierarchical ColumnIndex structures.
Core Mechanism
- ColumnIndex Data Structure
DuckDB uses a hierarchical ColumnIndex structure to represent nested projections:
struct ColumnIndex {
idx_t index; // Primary column index
ColumnIndexType index_type; // FULL_READ or PUSHDOWN_EXTRACT
LogicalType type; // Type at this level
vector child_indexes; // Children for nested types (STRUCT/LIST)
};
Source: src/include/duckdb/common/column_index.hpp
- Optimizer Phase: RemoveUnusedColumns
The optimizer detects struct_extract() and array_extract() function calls and converts them into ColumnIndex paths:
// Query: SELECT struct_col['field_a'] FROM table
// Creates: ColumnIndex {
// index: struct_col,
// type: PUSHDOWN_EXTRACT,
// child_indexes: [{ index: field_a, type: FULL_READ }]
// }
Key code: src/optimizer/remove_unused_columns.cpp:713-753 (HandleStructExtractRecursive)
Important limitation: Only works for STRUCT types, not LIST types. List element extraction is blocked by a type check at line 730:
if (child->return_type.id() != LogicalTypeId::STRUCT) {
return false; // Stops pushdown for LIST types
}
- Scan Phase: Parquet Reader
The ColumnIndex is passed through TableFunctionInitInput.column_indexes to the Parquet reader:
CreateReaderRecursive (extension/parquet/parquet_reader.cpp:405-457):
// Creates child readers ONLY for projected fields
vector<unique_ptr> children;
children.resize(schema.children.size()); // Sized for all fields
if (!indexes.empty()) {
for (idx_t i = 0; i < indexes.size(); i++) {
auto child_index = indexes[i].GetPrimaryIndex();
children[child_index] = CreateReaderRecursive(
context,
indexes[i].GetChildIndexes(), // Recursive projection
schema.children[child_index]
);
}
// All other positions remain nullptr
}
StructColumnReader::Read() (extension/parquet/reader/struct_column_reader.cpp:44-52):
for (idx_t i = 0; i < child_readers.size(); i++) {
auto &child = child_readers[i];
if (!child) {
// Non-projected field: set to CONSTANT NULL
target_vector.SetVectorType(VectorType::CONSTANT_VECTOR);
ConstantVector::SetNull(target_vector, true);
continue;
}
// Read projected field from Parquet
child->Read(num_values, define_out, repeat_out, target_vector);
}
Result: Only projected leaf columns are read from Parquet. Unprojected fields become NULL vectors.
- Expression Evaluation: No Rewriting
Expressions above the scan are not rewritten. The original struct_extract(col, 'field') expressions remain in the plan.
Key insight: The scan outputs a struct column with the same column binding, but only projected fields are populated. When struct_extract() executes, it simply references the already-extracted child vector:
// struct_extract.cpp:12-25
static void StructExtractFunction(DataChunk &args, ...) {
auto &vec = args.data[0]; // Already-projected struct
auto &children = StructVector::GetEntries(vec);
auto &struct_child = children[info.index];
result.Reference(*struct_child); // Just a reference - no extraction!
}
Query Plan Example
SELECT col['a'][1]['b'] FROM 'struct.parquet';
Plan output:
PROJECTION
│ array_extract(array_extract(array_extract(col, 'a'), 1), 'b')
│
└─ TABLE_SCAN
Projections: col.a ← ColumnIndex metadata
Output: col ← Same binding, projected data
Why the apparent mismatch?
- Projections: col.a indicates only field 'a' is read from Parquet
- Expression still shows array_extract(col, 'a') because expressions aren't rewritten
- The scan outputs a col struct that only contains field 'a' (others are NULL)
- array_extract() becomes a cheap reference operation on pre-projected data
- Pushdown stops at col.a because array indexing [1] fails the STRUCT type check
Key Design Decisions
- STRUCT-Only Pushdown
- List/array element access does NOT get pushed down
- col['struct_field'] ✅ pushes down
- col['array_field'][0] ❌ stops at array field
- col['array_field'][0]['nested_struct'] ❌ reads entire array
- Unprojected Required Fields
- Parquet schema's REQUIRED vs OPTIONAL fields are tracked but not validated
- Unprojected required fields are silently set to NULL
- No error or warning is raised
- Design choice: Performance over strict schema validation
- Column Binding Stability
- Original column bindings are preserved throughout the plan
- The ColumnIndex acts as metadata alongside the binding
- Operators above the scan reference the same column, but the physical data is projected
- Expression Tree Unchanged
- Expressions remain as struct_extract() / array_extract() calls
- No rewriting or transformation of expression trees
- Extraction operations become cheap reference operations at runtime
Advantages
- Minimal plan changes: Expressions and bindings remain stable
- True I/O optimization: Only projected leaf columns read from Parquet
- Transparent to execution: Operators above scan work on smaller data structures
- Simple metadata propagation: ColumnIndex passed through initialization chain
Disadvantages
- Confusing query plans: Projections: col.a but expression shows array_extract(col, 'a')
- STRUCT-only limitation: No pushdown through LIST/ARRAY types
- Implicit semantics: Not obvious from plan that extraction is optimized
- Schema validation loss: Required fields can become NULL without warning
- Tightly coupled: Assumes all data sources can handle partial struct projection
My commentary on this: I think this approach would be a lot more code churn for us (e.g. introducing something like ColumnIndex) and would not work for the variant use case, and is generally too hardcoded against structs / Parquet for DataFusion.