Skip to content

Proposal: redesign projection pushdown to be more granular #19550

@adriangb

Description

@adriangb

The current physical layer projection pushdown rule is:

fn try_swapping_with_projection(
    &self,
    projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {

Operators like FilterExec then treat the pushdown as all or nothing:

// If the projection does not narrow the schema, we should not try to push it down:
if projection.expr().len() < projection.input().schema().fields().len() {
    // Each column in the predicate expression must exist after the projection.
    if let Some(new_predicate) =
        update_expr(self.predicate(), projection.expr(), false)?
    { ... {
 }

This poses a problem for #19387 for cases like complex_function(struct_col['field']).
I propose we resolve this via a logical optimizer rule that pushes these projections down below most physical operators.

For example, given the data:

copy (select {email: 'a@example.com', address: '123 Main St, NYC'} as user) to 'test.parquet';
select lower(user['email'])
from 'test.parquet' t
where user['address'] ilike '%nyc%';

Will produce a plan of the form:

Projection: lower(get_field(t.user, Utf8("email")))
  Filter: get_field(t.user, Utf8("address")) ILIKE Utf8("%nyc%")
    SubqueryAlias: t
      TableScan: test.parquet

Ideally we want a plan of the form:

Projection: lower(t.__extracted_expr_0@0) AS "lower(get_field(t.user, 'email')"
  Filter: t.__extracted_expr_0@0 ILIKE Utf8("%nyc%")
    SubqueryAlias: t
      Projection: get_field(user, Utf8("address")) AS "__extracted_expr_0"
        TableScan: test.parquet

The point is that the expression user['address'] gets pushed down past filters, etc.
and all the way to the scan (or right above it, it gets pushed down into the actual scan in the physical layer currently but it can't have any filters, repartitions, etc. in the way). For a data source like Parquet that natively supports structs (and variant) this will allow it to optimize I/O by only reading the necessary fields.

To achieve this I propose we write a logical optimizer rule that can:

  • Identify expressions we want to extract and push down
  • Deduplicate the expressions being manipulated
  • Handle remapping of qualifiers through table aliases
  • Handle complexity of joins
  • Drop column references that aren't needed anymore while persisting column references that are needed

Identifying expressions

I think we should use something like Expr::is_trivial from #19538. We can traverse the tree of expressions for each projection looking for subtrees where every node matches Expr::is_trivial. In practice I think it will look like this: let condition = |expr: &Expr| expr.is_trivial().

Pushing down through joins

For each expressions we look for sub-trees that match Expr::is_trivial and only reference columns from one side of the join. So the condition for these nodes becomes let condition = move |expr: &Expr| expr.is_trivial() && whitelist_columns(expr, join_side_columns)wherejoin_side_columns: HashMap` and is the columns from this side of the join. We push down into each side independently. We also need to deal with qualifiers and remapping them.

Table aliases

To push past a table alias we need to keep track of qualifiers and remap them. Something like https://github.com/apache/datafusion/pull/19404/changes#diff-237330fb473b7698f4abf2545c203d12e59b52e6febc3244da8a4dee5091626f PhysicalColumnRewriter may be useful.

Deduplicate expressions

If we have something like select lower(col['a']), upper(col['a']) we don't want to end up pushing down two different expressions into the scan and do the I/O twice. If we read the entire column we'd do the I/O once and then take references. So this would have to produces something like Projection: lower(__extracted_expr_0) as 'lower(col['a'])', upper(__extracted_expr_0) as 'upper(col['a'])'.

Drop unnecessary column refrences

Consider a plan like:

Projection: col['a'], other_col
  Filter: col['a'] > 5
    Projection: col, other_col
      TableScan: test.parquet

If we transform into:

Projection: __extracted_0 as col['a'], other_col
  Filter: __extracted_0 > 5
    Projection col['a'] as __extracted_0
      TableScan: test.parquet

We'd get an error: other_col is not projected.

So we need to preserve it. But naively:

Projection: __extracted_0 as col['a'], other_col
  Filter: __extracted_0 > 5
    Projection col['a'] as __extracted_0, col, other_col
      TableScan: test.parquet

Then we still scan col... So we need to be intelligent about this.

Future opportunities

I think this approach could be applied to CAST operations as well: oftentimes file formats have different physical storage than our in-memory representation (e.g. Parquet only has Int32 and Int64, everything else is metadata, CSV is obviously untyped). I'm guessing it's almost always best to push casts into scans so that we can avoid reading data -> casting it to the table schema -> casting it again to the final type.

Alternatives considered: DuckDB

Here's a summary of a guided exploration of DuckDB's approach with Claude:

Overview

DuckDB implements projection pushdown for nested struct fields using a metadata-driven approach where expressions remain unchanged but the underlying data is pre-filtered at the scan level using hierarchical ColumnIndex structures.

Core Mechanism

  1. ColumnIndex Data Structure

DuckDB uses a hierarchical ColumnIndex structure to represent nested projections:

struct ColumnIndex {
idx_t index; // Primary column index
ColumnIndexType index_type; // FULL_READ or PUSHDOWN_EXTRACT
LogicalType type; // Type at this level
vector child_indexes; // Children for nested types (STRUCT/LIST)
};

Source: src/include/duckdb/common/column_index.hpp

  1. Optimizer Phase: RemoveUnusedColumns

The optimizer detects struct_extract() and array_extract() function calls and converts them into ColumnIndex paths:

// Query: SELECT struct_col['field_a'] FROM table
// Creates: ColumnIndex {
// index: struct_col,
// type: PUSHDOWN_EXTRACT,
// child_indexes: [{ index: field_a, type: FULL_READ }]
// }

Key code: src/optimizer/remove_unused_columns.cpp:713-753 (HandleStructExtractRecursive)

Important limitation: Only works for STRUCT types, not LIST types. List element extraction is blocked by a type check at line 730:
if (child->return_type.id() != LogicalTypeId::STRUCT) {
return false; // Stops pushdown for LIST types
}

  1. Scan Phase: Parquet Reader

The ColumnIndex is passed through TableFunctionInitInput.column_indexes to the Parquet reader:

CreateReaderRecursive (extension/parquet/parquet_reader.cpp:405-457):
// Creates child readers ONLY for projected fields
vector<unique_ptr> children;
children.resize(schema.children.size()); // Sized for all fields

if (!indexes.empty()) {
for (idx_t i = 0; i < indexes.size(); i++) {
auto child_index = indexes[i].GetPrimaryIndex();
children[child_index] = CreateReaderRecursive(
context,
indexes[i].GetChildIndexes(), // Recursive projection
schema.children[child_index]
);
}
// All other positions remain nullptr
}

StructColumnReader::Read() (extension/parquet/reader/struct_column_reader.cpp:44-52):
for (idx_t i = 0; i < child_readers.size(); i++) {
auto &child = child_readers[i];
if (!child) {
// Non-projected field: set to CONSTANT NULL
target_vector.SetVectorType(VectorType::CONSTANT_VECTOR);
ConstantVector::SetNull(target_vector, true);
continue;
}
// Read projected field from Parquet
child->Read(num_values, define_out, repeat_out, target_vector);
}

Result: Only projected leaf columns are read from Parquet. Unprojected fields become NULL vectors.

  1. Expression Evaluation: No Rewriting

Expressions above the scan are not rewritten. The original struct_extract(col, 'field') expressions remain in the plan.

Key insight: The scan outputs a struct column with the same column binding, but only projected fields are populated. When struct_extract() executes, it simply references the already-extracted child vector:

// struct_extract.cpp:12-25
static void StructExtractFunction(DataChunk &args, ...) {
auto &vec = args.data[0]; // Already-projected struct
auto &children = StructVector::GetEntries(vec);
auto &struct_child = children[info.index];
result.Reference(*struct_child); // Just a reference - no extraction!
}

Query Plan Example

SELECT col['a'][1]['b'] FROM 'struct.parquet';

Plan output:
PROJECTION
│ array_extract(array_extract(array_extract(col, 'a'), 1), 'b')

└─ TABLE_SCAN
Projections: col.a ← ColumnIndex metadata
Output: col ← Same binding, projected data

Why the apparent mismatch?

  • Projections: col.a indicates only field 'a' is read from Parquet
  • Expression still shows array_extract(col, 'a') because expressions aren't rewritten
  • The scan outputs a col struct that only contains field 'a' (others are NULL)
  • array_extract() becomes a cheap reference operation on pre-projected data
  • Pushdown stops at col.a because array indexing [1] fails the STRUCT type check

Key Design Decisions

  1. STRUCT-Only Pushdown
  • List/array element access does NOT get pushed down
  • col['struct_field'] ✅ pushes down
  • col['array_field'][0] ❌ stops at array field
  • col['array_field'][0]['nested_struct'] ❌ reads entire array
  1. Unprojected Required Fields
  • Parquet schema's REQUIRED vs OPTIONAL fields are tracked but not validated
  • Unprojected required fields are silently set to NULL
  • No error or warning is raised
  • Design choice: Performance over strict schema validation
  1. Column Binding Stability
  • Original column bindings are preserved throughout the plan
  • The ColumnIndex acts as metadata alongside the binding
  • Operators above the scan reference the same column, but the physical data is projected
  1. Expression Tree Unchanged
  • Expressions remain as struct_extract() / array_extract() calls
  • No rewriting or transformation of expression trees
  • Extraction operations become cheap reference operations at runtime

Advantages

  1. Minimal plan changes: Expressions and bindings remain stable
  2. True I/O optimization: Only projected leaf columns read from Parquet
  3. Transparent to execution: Operators above scan work on smaller data structures
  4. Simple metadata propagation: ColumnIndex passed through initialization chain

Disadvantages

  1. Confusing query plans: Projections: col.a but expression shows array_extract(col, 'a')
  2. STRUCT-only limitation: No pushdown through LIST/ARRAY types
  3. Implicit semantics: Not obvious from plan that extraction is optimized
  4. Schema validation loss: Required fields can become NULL without warning
  5. Tightly coupled: Assumes all data sources can handle partial struct projection

My commentary on this: I think this approach would be a lot more code churn for us (e.g. introducing something like ColumnIndex) and would not work for the variant use case, and is generally too hardcoded against structs / Parquet for DataFusion.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions