From ffa7a73f833c1bc408bbaeecb0f1c0ce9bead5f7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 29 Dec 2025 00:42:08 -0600 Subject: [PATCH 1/2] initial implementation of projection pushdown for nexted expressions --- datafusion/common/src/column.rs | 2 +- datafusion/expr/src/expr.rs | 22 +++++++++++ datafusion/expr/src/udf.rs | 17 +++++++++ datafusion/functions/src/core/getfield.rs | 4 ++ .../optimizer/src/optimize_projections/mod.rs | 2 +- .../physical-expr-common/src/physical_expr.rs | 14 +++++++ .../physical-expr/src/expressions/column.rs | 4 ++ .../physical-expr/src/expressions/literal.rs | 4 ++ .../physical-expr/src/scalar_function.rs | 7 ++++ datafusion/physical-plan/src/projection.rs | 38 +++++++++---------- .../physical-plan/src/repartition/mod.rs | 4 +- 11 files changed, 93 insertions(+), 25 deletions(-) diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs index c7f0b5a4f4881..f97276e3c3761 100644 --- a/datafusion/common/src/column.rs +++ b/datafusion/common/src/column.rs @@ -30,7 +30,7 @@ use std::fmt; pub struct Column { /// relation/table reference. pub relation: Option, - /// field/column name. + /// Field/column name. pub name: String, /// Original source code location, if known pub spans: Spans, diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index c7d825ce1d52f..d15edd5ae4527 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1883,6 +1883,28 @@ impl Expr { } } + /// Returns true if this expression is trivial (cheap to evaluate). + /// + /// Trivial expressions include column references, literals, and nested + /// field access via `get_field`. + /// + /// # Example + /// ``` + /// # use datafusion_expr::col; + /// let expr = col("foo"); + /// assert!(expr.is_trivial()); + /// ``` + pub fn is_trivial(&self) -> bool { + match self { + Expr::Column(_) | Expr::Literal(_, _) => true, + Expr::ScalarFunction(func) => { + func.func.is_trivial() + && func.args.first().is_some_and(|arg| arg.is_trivial()) + } + _ => false, + } + } + /// Return all references to columns in this expression. /// /// # Example diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 26d7fc99cb17c..b04c779896de5 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -122,6 +122,11 @@ impl ScalarUDF { Self { inner: fun } } + /// Returns true if this function is trivial (cheap to evaluate). + pub fn is_trivial(&self) -> bool { + self.inner.is_trivial() + } + /// Return the underlying [`ScalarUDFImpl`] trait object for this function pub fn inner(&self) -> &Arc { &self.inner @@ -846,6 +851,18 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync { fn documentation(&self) -> Option<&Documentation> { None } + + /// Returns true if this function is trivial (cheap to evaluate). + /// + /// Trivial functions are lightweight accessor functions like `get_field` + /// (struct field access) that simply access nested data within a column + /// without significant computation. + /// + /// This is used to identify expressions that are cheap to duplicate or + /// don't benefit from caching/partitioning optimizations. + fn is_trivial(&self) -> bool { + false + } } /// ScalarUDF that adds an alias to the underlying function. It is better to diff --git a/datafusion/functions/src/core/getfield.rs b/datafusion/functions/src/core/getfield.rs index 3e961e4da4e75..7c0df516ed599 100644 --- a/datafusion/functions/src/core/getfield.rs +++ b/datafusion/functions/src/core/getfield.rs @@ -499,6 +499,10 @@ impl ScalarUDFImpl for GetFieldFunc { fn documentation(&self) -> Option<&Documentation> { self.doc() } + + fn is_trivial(&self) -> bool { + true + } } #[cfg(test)] diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 548eadffa242e..fca56c66d4cf0 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -593,7 +593,7 @@ fn merge_consecutive_projections(proj: Projection) -> Result bool { - matches!(expr, Expr::Column(_) | Expr::Literal(_, _)) + expr.is_trivial() } /// Rewrites a projection expression using the projection before it (i.e. its input) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 2358a21940912..5dc12693ea250 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -430,6 +430,20 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { fn is_volatile_node(&self) -> bool { false } + + /// Returns true if this expression is trivial (cheap to evaluate). + /// + /// Trivial expressions include: + /// - Column references + /// - Literal values + /// - Struct field access via `get_field` + /// - Nested combinations of field accessors (e.g., `col['a']['b']`) + /// + /// This is used to identify expressions that are cheap to duplicate or + /// don't benefit from caching/partitioning optimizations. + fn is_trivial(&self) -> bool { + false + } } #[deprecated( diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 8c7e8c319fff4..86dcc6fa87752 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -146,6 +146,10 @@ impl PhysicalExpr for Column { fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.name) } + + fn is_trivial(&self) -> bool { + true + } } impl Column { diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 1f3fefc60b7ad..6aaa3b0c77575 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -134,6 +134,10 @@ impl PhysicalExpr for Literal { fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { std::fmt::Display::fmt(self, f) } + + fn is_trivial(&self) -> bool { + true + } } /// Create a literal expression diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index e6a6db75bebd7..8959bf57aa246 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -366,6 +366,13 @@ impl PhysicalExpr for ScalarFunctionExpr { fn is_volatile_node(&self) -> bool { self.fun.signature().volatility == Volatility::Volatile } + + fn is_trivial(&self) -> bool { + if !self.fun.is_trivial() { + return false; + } + self.args.iter().all(|arg| arg.is_trivial()) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index a56e9272f119e..d476a4da7a179 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -20,7 +20,7 @@ //! of a projection on table `t1` where the expressions `a`, `b`, and `a+b` are the //! projection expressions. `SELECT` without `FROM` will only evaluate expressions. -use super::expressions::{Column, Literal}; +use super::expressions::Column; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{ DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, @@ -253,18 +253,16 @@ impl ExecutionPlan for ProjectionExec { } fn benefits_from_input_partitioning(&self) -> Vec { - let all_simple_exprs = - self.projector - .projection() - .as_ref() - .iter() - .all(|proj_expr| { - proj_expr.expr.as_any().is::() - || proj_expr.expr.as_any().is::() - }); - // If expressions are all either column_expr or Literal, then all computations in this projection are reorder or rename, - // and projection would not benefit from the repartition, benefits_from_input_partitioning will return false. - vec![!all_simple_exprs] + let all_trivial_exprs = self + .projector + .projection() + .as_ref() + .iter() + .all(|proj_expr| proj_expr.expr.is_trivial()); + // If expressions are all trivial (columns, literals, or field accessors), + // then all computations in this projection are reorder or rename, + // and projection would not benefit from the repartition. + vec![!all_trivial_exprs] } fn children(&self) -> Vec<&Arc> { @@ -630,11 +628,10 @@ pub fn make_with_child( .map(|e| Arc::new(e) as _) } -/// Returns `true` if all the expressions in the argument are `Column`s. -pub fn all_columns(exprs: &[ProjectionExpr]) -> bool { - exprs - .iter() - .all(|proj_expr| proj_expr.expr.as_any().is::()) +/// Returns `true` if all the expressions in the argument are trivial +/// (columns, literals, or field accessors). +pub fn all_trivial(exprs: &[ProjectionExpr]) -> bool { + exprs.iter().all(|proj_expr| proj_expr.expr.is_trivial()) } /// Updates the given lexicographic ordering according to given projected @@ -990,10 +987,9 @@ fn new_columns_for_join_on( } /// Checks if the given expression is trivial. -/// An expression is considered trivial if it is either a `Column` or a `Literal`. +/// An expression is considered trivial if it is a `Column`, `Literal`, or field accessor. fn is_expr_trivial(expr: &Arc) -> bool { - expr.as_any().downcast_ref::().is_some() - || expr.as_any().downcast_ref::().is_some() + expr.is_trivial() } #[cfg(test)] diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 1efdaaabc7d6a..f9ad1c175af8d 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -34,7 +34,7 @@ use crate::coalesce::LimitedBatchCoalescer; use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; use crate::hash_utils::create_hashes; use crate::metrics::{BaselineMetrics, SpillMetrics}; -use crate::projection::{ProjectionExec, all_columns, make_with_child, update_expr}; +use crate::projection::{ProjectionExec, all_trivial, make_with_child, update_expr}; use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::spill_manager::SpillManager; use crate::spill::spill_pool::{self, SpillPoolWriter}; @@ -1054,7 +1054,7 @@ impl ExecutionPlan for RepartitionExec { // If pushdown is not beneficial or applicable, break it. if projection.benefits_from_input_partitioning()[0] - || !all_columns(projection.expr()) + || !all_trivial(projection.expr()) { return Ok(None); } From 85890eef3ee7ccc939b2c7beaaac42631a45239c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 29 Dec 2025 10:07:29 -0600 Subject: [PATCH 2/2] fix lints, test --- datafusion/expr/src/udf.rs | 4 ++++ datafusion/sqllogictest/test_files/unnest.slt | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index b04c779896de5..9643a821b7bbb 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -981,6 +981,10 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl { fn documentation(&self) -> Option<&Documentation> { self.inner.documentation() } + + fn is_trivial(&self) -> bool { + self.inner.is_trivial() + } } #[cfg(test)] diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 352056adbf813..8cb10909096de 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -659,8 +659,8 @@ logical_plan physical_plan 01)ProjectionExec: expr=[__unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1],depth=2)@0 as UNNEST(UNNEST(UNNEST(recursive_unnest_table.column3)[c1])), column3@1 as column3] 02)--UnnestExec -03)----ProjectionExec: expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), column3@1 as column3] -04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------ProjectionExec: expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), column3@1 as column3] 05)--------UnnestExec 06)----------ProjectionExec: expr=[column3@0 as __unnest_placeholder(recursive_unnest_table.column3), column3@0 as column3] 07)------------DataSourceExec: partitions=1, partition_sizes=[1]