Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::fmt;
pub struct Column {
/// relation/table reference.
pub relation: Option<TableReference>,
/// field/column name.
/// Field/column name.
pub name: String,
/// Original source code location, if known
pub spans: Spans,
Expand Down
22 changes: 22 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,28 @@ impl Expr {
}
}

/// Returns true if this expression is trivial (cheap to evaluate).
///
/// Trivial expressions include column references, literals, and nested
/// field access via `get_field`.
///
/// # Example
/// ```
/// # use datafusion_expr::col;
/// let expr = col("foo");
/// assert!(expr.is_trivial());
/// ```
pub fn is_trivial(&self) -> bool {
match self {
Expr::Column(_) | Expr::Literal(_, _) => true,
Expr::ScalarFunction(func) => {
func.func.is_trivial()
&& func.args.first().is_some_and(|arg| arg.is_trivial())
}
_ => false,
}
}

/// Return all references to columns in this expression.
///
/// # Example
Expand Down
21 changes: 21 additions & 0 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ impl ScalarUDF {
Self { inner: fun }
}

/// Returns true if this function is trivial (cheap to evaluate).
pub fn is_trivial(&self) -> bool {
self.inner.is_trivial()
}

/// Return the underlying [`ScalarUDFImpl`] trait object for this function
pub fn inner(&self) -> &Arc<dyn ScalarUDFImpl> {
&self.inner
Expand Down Expand Up @@ -846,6 +851,18 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync {
fn documentation(&self) -> Option<&Documentation> {
None
}

/// Returns true if this function is trivial (cheap to evaluate).
///
/// Trivial functions are lightweight accessor functions like `get_field`
/// (struct field access) that simply access nested data within a column
/// without significant computation.
///
/// This is used to identify expressions that are cheap to duplicate or
/// don't benefit from caching/partitioning optimizations.
fn is_trivial(&self) -> bool {
false
}
}

/// ScalarUDF that adds an alias to the underlying function. It is better to
Expand Down Expand Up @@ -964,6 +981,10 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
fn documentation(&self) -> Option<&Documentation> {
self.inner.documentation()
}

fn is_trivial(&self) -> bool {
self.inner.is_trivial()
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/functions/src/core/getfield.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,10 @@ impl ScalarUDFImpl for GetFieldFunc {
fn documentation(&self) -> Option<&Documentation> {
self.doc()
}

fn is_trivial(&self) -> bool {
true
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Project

// Check whether `expr` is trivial; i.e. it doesn't imply any computation.
fn is_expr_trivial(expr: &Expr) -> bool {
matches!(expr, Expr::Column(_) | Expr::Literal(_, _))
expr.is_trivial()
}

/// Rewrites a projection expression using the projection before it (i.e. its input)
Expand Down
14 changes: 14 additions & 0 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,20 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
fn is_volatile_node(&self) -> bool {
false
}

/// Returns true if this expression is trivial (cheap to evaluate).
///
/// Trivial expressions include:
/// - Column references
/// - Literal values
/// - Struct field access via `get_field`
/// - Nested combinations of field accessors (e.g., `col['a']['b']`)
///
/// This is used to identify expressions that are cheap to duplicate or
/// don't benefit from caching/partitioning optimizations.
fn is_trivial(&self) -> bool {
false
}
}

#[deprecated(
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/expressions/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ impl PhysicalExpr for Column {
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name)
}

fn is_trivial(&self) -> bool {
true
}
}

impl Column {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/expressions/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ impl PhysicalExpr for Literal {
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(self, f)
}

fn is_trivial(&self) -> bool {
true
}
}

/// Create a literal expression
Expand Down
7 changes: 7 additions & 0 deletions datafusion/physical-expr/src/scalar_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,13 @@ impl PhysicalExpr for ScalarFunctionExpr {
fn is_volatile_node(&self) -> bool {
self.fun.signature().volatility == Volatility::Volatile
}

fn is_trivial(&self) -> bool {
if !self.fun.is_trivial() {
return false;
}
self.args.iter().all(|arg| arg.is_trivial())
}
}

#[cfg(test)]
Expand Down
38 changes: 17 additions & 21 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//! of a projection on table `t1` where the expressions `a`, `b`, and `a+b` are the
//! projection expressions. `SELECT` without `FROM` will only evaluate expressions.

use super::expressions::{Column, Literal};
use super::expressions::Column;
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{
DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
Expand Down Expand Up @@ -253,18 +253,16 @@ impl ExecutionPlan for ProjectionExec {
}

fn benefits_from_input_partitioning(&self) -> Vec<bool> {
let all_simple_exprs =
self.projector
.projection()
.as_ref()
.iter()
.all(|proj_expr| {
proj_expr.expr.as_any().is::<Column>()
|| proj_expr.expr.as_any().is::<Literal>()
});
// If expressions are all either column_expr or Literal, then all computations in this projection are reorder or rename,
// and projection would not benefit from the repartition, benefits_from_input_partitioning will return false.
vec![!all_simple_exprs]
let all_trivial_exprs = self
.projector
.projection()
.as_ref()
.iter()
.all(|proj_expr| proj_expr.expr.is_trivial());
// If expressions are all trivial (columns, literals, or field accessors),
// then all computations in this projection are reorder or rename,
// and projection would not benefit from the repartition.
vec![!all_trivial_exprs]
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -630,11 +628,10 @@ pub fn make_with_child(
.map(|e| Arc::new(e) as _)
}

/// Returns `true` if all the expressions in the argument are `Column`s.
pub fn all_columns(exprs: &[ProjectionExpr]) -> bool {
exprs
.iter()
.all(|proj_expr| proj_expr.expr.as_any().is::<Column>())
/// Returns `true` if all the expressions in the argument are trivial
/// (columns, literals, or field accessors).
pub fn all_trivial(exprs: &[ProjectionExpr]) -> bool {
exprs.iter().all(|proj_expr| proj_expr.expr.is_trivial())
}

/// Updates the given lexicographic ordering according to given projected
Expand Down Expand Up @@ -990,10 +987,9 @@ fn new_columns_for_join_on(
}

/// Checks if the given expression is trivial.
/// An expression is considered trivial if it is either a `Column` or a `Literal`.
/// An expression is considered trivial if it is a `Column`, `Literal`, or field accessor.
fn is_expr_trivial(expr: &Arc<dyn PhysicalExpr>) -> bool {
expr.as_any().downcast_ref::<Column>().is_some()
|| expr.as_any().downcast_ref::<Literal>().is_some()
expr.is_trivial()
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::coalesce::LimitedBatchCoalescer;
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
use crate::hash_utils::create_hashes;
use crate::metrics::{BaselineMetrics, SpillMetrics};
use crate::projection::{ProjectionExec, all_columns, make_with_child, update_expr};
use crate::projection::{ProjectionExec, all_trivial, make_with_child, update_expr};
use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::spill::spill_manager::SpillManager;
use crate::spill::spill_pool::{self, SpillPoolWriter};
Expand Down Expand Up @@ -1054,7 +1054,7 @@ impl ExecutionPlan for RepartitionExec {

// If pushdown is not beneficial or applicable, break it.
if projection.benefits_from_input_partitioning()[0]
|| !all_columns(projection.expr())
|| !all_trivial(projection.expr())
{
return Ok(None);
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/unnest.slt
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,8 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[__unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1],depth=2)@0 as UNNEST(UNNEST(UNNEST(recursive_unnest_table.column3)[c1])), column3@1 as column3]
02)--UnnestExec
03)----ProjectionExec: expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), column3@1 as column3]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------ProjectionExec: expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), column3@1 as column3]
05)--------UnnestExec
06)----------ProjectionExec: expr=[column3@0 as __unnest_placeholder(recursive_unnest_table.column3), column3@0 as column3]
07)------------DataSourceExec: partitions=1, partition_sizes=[1]
Expand Down