Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion::error::Result;
use datafusion::execution::context::{
FunctionFactory, RegisterFunction, SessionContext, SessionState,
};
use datafusion::logical_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion::logical_expr::simplify::{ExprSimplifyResult, SimplifyContext};
use datafusion::logical_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion::logical_expr::{
ColumnarValue, CreateFunction, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl,
Expand Down Expand Up @@ -145,7 +145,7 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
fn simplify(
&self,
args: Vec<Expr>,
_info: &dyn SimplifyInfo,
_info: &SimplifyContext,
) -> Result<ExprSimplifyResult> {
let replacement = Self::replacement(&self.expr, &args)?;

Expand Down
13 changes: 9 additions & 4 deletions datafusion-examples/examples/query_planning/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,9 @@ fn simplify_demo() -> Result<()> {
// the ExecutionProps carries information needed to simplify
// expressions, such as the current time (to evaluate `now()`
// correctly)
let props = ExecutionProps::new();
let context = SimplifyContext::new(&props).with_schema(schema);
let context = SimplifyContext::default()
.with_schema(schema)
.with_current_time();
let simplifier = ExprSimplifier::new(context);

// And then call the simplify_expr function:
Expand All @@ -191,7 +192,9 @@ fn simplify_demo() -> Result<()> {

// here are some other examples of what DataFusion is capable of
let schema = Schema::new(vec![make_field("i", DataType::Int64)]).to_dfschema_ref()?;
let context = SimplifyContext::new(&props).with_schema(schema.clone());
let context = SimplifyContext::default()
.with_schema(Arc::clone(&schema))
.with_current_time();
let simplifier = ExprSimplifier::new(context);

// basic arithmetic simplification
Expand Down Expand Up @@ -551,7 +554,9 @@ fn type_coercion_demo() -> Result<()> {
assert!(physical_expr.evaluate(&batch).is_ok());

// 2. Type coercion with `ExprSimplifier::coerce`.
let context = SimplifyContext::new(&props).with_schema(Arc::new(df_schema.clone()));
let context = SimplifyContext::default()
.with_schema(Arc::new(df_schema.clone()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably avoid this clone via https://docs.rs/datafusion/latest/datafusion/common/struct.DFSchema.html#method.inner

So like

Suggested change
.with_schema(Arc::new(df_schema.clone()))
.with_schema(Arc::clone(df_schema.inner()))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we need a DFSchema not an (arrow) Schema

.with_current_time();
let simplifier = ExprSimplifier::new(context);
let coerced_expr = simplifier.coerce(expr.clone(), &df_schema)?;
let physical_expr = datafusion::physical_expr::create_physical_expr(
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/udf/advanced_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion::logical_expr::{
Accumulator, AggregateUDF, AggregateUDFImpl, EmitTo, GroupsAccumulator, Signature,
expr::AggregateFunction,
function::{AccumulatorArgs, AggregateFunctionSimplification, StateFieldsArgs},
simplify::SimplifyInfo,
simplify::SimplifyContext,
};
use datafusion::prelude::*;

Expand Down Expand Up @@ -421,7 +421,7 @@ impl AggregateUDFImpl for SimplifiedGeoMeanUdaf {

/// Optionally replaces a UDAF with another expression during query optimization.
fn simplify(&self) -> Option<AggregateFunctionSimplification> {
let simplify = |aggregate_function: AggregateFunction, _: &dyn SimplifyInfo| {
let simplify = |aggregate_function: AggregateFunction, _: &SimplifyContext| {
// Replaces the UDAF with `GeoMeanUdaf` as a placeholder example to demonstrate the `simplify` method.
// In real-world scenarios, you might create UDFs from built-in expressions.
Ok(Expr::AggregateFunction(AggregateFunction::new_udf(
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/udf/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use datafusion::logical_expr::expr::{WindowFunction, WindowFunctionParams};
use datafusion::logical_expr::function::{
PartitionEvaluatorArgs, WindowFunctionSimplification, WindowUDFFieldArgs,
};
use datafusion::logical_expr::simplify::SimplifyInfo;
use datafusion::logical_expr::simplify::SimplifyContext;
use datafusion::logical_expr::{
Expr, LimitEffect, PartitionEvaluator, Signature, WindowFrame,
WindowFunctionDefinition, WindowUDF, WindowUDFImpl,
Expand Down Expand Up @@ -198,7 +198,7 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
/// this function will simplify `SimplifySmoothItUdf` to `AggregateUDF` for `Avg`
/// default implementation will not be called (left as `todo!()`)
fn simplify(&self) -> Option<WindowFunctionSimplification> {
let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| {
let simplify = |window_function: WindowFunction, _: &SimplifyContext| {
Ok(Expr::from(WindowFunction {
fun: WindowFunctionDefinition::AggregateUDF(avg_udaf()),
params: WindowFunctionParams {
Expand Down
4 changes: 1 addition & 3 deletions datafusion-examples/examples/udf/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use datafusion::common::{ScalarValue, plan_err};
use datafusion::datasource::TableProvider;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::error::Result;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::simplify::SimplifyContext;
use datafusion::logical_expr::{Expr, TableType};
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
Expand Down Expand Up @@ -142,8 +141,7 @@ impl TableFunctionImpl for LocalCsvTableFunc {
.get(1)
.map(|expr| {
// try to simplify the expression, so 1+2 becomes 3, for example
let execution_props = ExecutionProps::new();
let info = SimplifyContext::new(&execution_props);
let info = SimplifyContext::default();
let expr = ExprSimplifier::new(info).simplify(expr.clone())?;

if let Expr::Literal(ScalarValue::Int64(Some(limit)), _) = expr {
Expand Down
6 changes: 6 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,12 @@ impl TryFrom<SchemaRef> for DFSchema {
}
}

impl From<DFSchema> for SchemaRef {
fn from(dfschema: DFSchema) -> Self {
Arc::clone(&dfschema.inner)
}
}

// Hashing refers to a subset of fields considered in PartialEq.
impl Hash for DFSchema {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
Expand Down
28 changes: 20 additions & 8 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
planner::ExprPlanner,
};
use datafusion_optimizer::Analyzer;
use datafusion_optimizer::analyzer::type_coercion::TypeCoercion;
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
use datafusion_optimizer::{Analyzer, OptimizerContext};
use datafusion_optimizer::{AnalyzerRule, OptimizerRule};
use datafusion_session::SessionStore;

Expand Down Expand Up @@ -749,12 +749,19 @@ impl SessionContext {
);
}
}
// Store the unoptimized plan into the session state. Although storing the
// optimized plan or the physical plan would be more efficient, doing so is
// not currently feasible. This is because `now()` would be optimized to a
// constant value, causing each EXECUTE to yield the same result, which is
// incorrect behavior.
self.state.write().store_prepared(name, fields, input)?;
// Optimize the plan without evaluating expressions like now()
let optimizer_context = OptimizerContext::new_with_config_options(
Arc::clone(self.state().config().options()),
)
.without_query_execution_start_time();
let plan = self.state().optimizer().optimize(
Arc::unwrap_or_clone(input),
&optimizer_context,
|_1, _2| {},
)?;
self.state
.write()
.store_prepared(name, fields, Arc::new(plan))?;
self.return_empty_dataframe()
}
LogicalPlan::Statement(Statement::Execute(execute)) => {
Expand Down Expand Up @@ -1394,7 +1401,12 @@ impl SessionContext {
})?;

let state = self.state.read();
let context = SimplifyContext::new(state.execution_props());
let context = SimplifyContext::default()
.with_schema(Arc::clone(prepared.plan.schema()))
.with_config_options(Arc::clone(state.config_options()))
.with_query_execution_start_time(
state.execution_props().query_execution_start_time,
);
let simplifier = ExprSimplifier::new(context);

// Only allow literals as parameters for now.
Expand Down
57 changes: 17 additions & 40 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,8 @@ use datafusion_expr::planner::ExprPlanner;
#[cfg(feature = "sql")]
use datafusion_expr::planner::{RelationPlanner, TypePlanner};
use datafusion_expr::registry::{FunctionRegistry, SerializerRegistry};
use datafusion_expr::simplify::SimplifyInfo;
use datafusion_expr::{
AggregateUDF, Explain, Expr, ExprSchemable, LogicalPlan, ScalarUDF, WindowUDF,
};
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{AggregateUDF, Explain, Expr, LogicalPlan, ScalarUDF, WindowUDF};
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
use datafusion_optimizer::{
Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule,
Expand Down Expand Up @@ -744,13 +742,18 @@ impl SessionState {
expr: Expr,
df_schema: &DFSchema,
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
let simplifier =
ExprSimplifier::new(SessionSimplifyProvider::new(self, df_schema));
let config_options = self.config_options();
let simplify_context = SimplifyContext::default()
.with_schema(Arc::new(df_schema.clone()))
.with_config_options(Arc::clone(config_options))
.with_query_execution_start_time(
self.execution_props().query_execution_start_time,
);
let simplifier = ExprSimplifier::new(simplify_context);
// apply type coercion here to ensure types match
let mut expr = simplifier.coerce(expr, df_schema)?;

// rewrite Exprs to functions if necessary
let config_options = self.config_options();
for rewrite in self.analyzer.function_rewrites() {
expr = expr
.transform_up(|expr| rewrite.rewrite(expr, df_schema, config_options))?
Expand Down Expand Up @@ -1834,9 +1837,12 @@ impl ContextProvider for SessionContextProvider<'_> {
.get(name)
.cloned()
.ok_or_else(|| plan_datafusion_err!("table function '{name}' not found"))?;
let dummy_schema = DFSchema::empty();
let simplifier =
ExprSimplifier::new(SessionSimplifyProvider::new(self.state, &dummy_schema));
let simplify_context = SimplifyContext::default()
.with_config_options(Arc::clone(self.state.config_options()))
.with_query_execution_start_time(
self.state.execution_props().query_execution_start_time,
);
let simplifier = ExprSimplifier::new(simplify_context);
let args = args
.into_iter()
.map(|arg| simplifier.simplify(arg))
Expand Down Expand Up @@ -2063,7 +2069,7 @@ impl datafusion_execution::TaskContextProvider for SessionState {
}

impl OptimizerConfig for SessionState {
fn query_execution_start_time(&self) -> DateTime<Utc> {
fn query_execution_start_time(&self) -> Option<DateTime<Utc>> {
self.execution_props.query_execution_start_time
}

Expand Down Expand Up @@ -2115,35 +2121,6 @@ impl QueryPlanner for DefaultQueryPlanner {
}
}

struct SessionSimplifyProvider<'a> {
state: &'a SessionState,
df_schema: &'a DFSchema,
}

impl<'a> SessionSimplifyProvider<'a> {
fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self {
Self { state, df_schema }
}
}

impl SimplifyInfo for SessionSimplifyProvider<'_> {
fn is_boolean_type(&self, expr: &Expr) -> datafusion_common::Result<bool> {
Ok(expr.get_type(self.df_schema)? == DataType::Boolean)
}

fn nullable(&self, expr: &Expr) -> datafusion_common::Result<bool> {
expr.nullable(self.df_schema)
}

fn execution_props(&self) -> &ExecutionProps {
self.state.execution_props()
}

fn get_data_type(&self, expr: &Expr) -> datafusion_common::Result<DataType> {
expr.get_type(self.df_schema)
}
}

#[derive(Debug)]
pub(crate) struct PreparedPlan {
/// Data types of the parameters
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ impl TestParquetFile {
let df_schema = Arc::clone(&self.schema).to_dfschema_ref()?;

// run coercion on the filters to coerce types etc.
let props = ExecutionProps::new();
let context = SimplifyContext::new(&props).with_schema(Arc::clone(&df_schema));
let context = SimplifyContext::default().with_schema(Arc::clone(&df_schema));
if let Some(filter) = maybe_filter {
let simplifier = ExprSimplifier::new(context);
let filter = simplifier.coerce(filter, &df_schema).unwrap();
Expand Down
5 changes: 1 addition & 4 deletions datafusion/core/tests/expr_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use arrow::util::pretty::{pretty_format_batches, pretty_format_columns};
use datafusion::prelude::*;
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::ExprFunctionExt;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::NullTreatment;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_functions::core::expr_ext::FieldAccessor;
Expand Down Expand Up @@ -422,9 +421,7 @@ fn create_simplified_expr_test(expr: Expr, expected_expr: &str) {
let df_schema = DFSchema::try_from(batch.schema()).unwrap();

// Simplify the expression first
let props = ExecutionProps::new();
let simplify_context =
SimplifyContext::new(&props).with_schema(df_schema.clone().into());
let simplify_context = SimplifyContext::default().with_schema(Arc::new(df_schema));
let simplifier = ExprSimplifier::new(simplify_context).with_max_cycles(10);
let simplified = simplifier.simplify(expr).unwrap();
create_expr_test(simplified, expected_expr);
Expand Down
Loading