From cddee7b39fa949ca77113291b39087fa679ebe6b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 27 Dec 2025 09:21:26 -0700 Subject: [PATCH] feat: Improve performance of date truncate --- native/spark-expr/Cargo.toml | 4 + native/spark-expr/benches/date_trunc.rs | 55 ++++++ native/spark-expr/src/kernels/mod.rs | 2 +- native/spark-expr/src/kernels/temporal.rs | 197 +++++++++++----------- native/spark-expr/src/lib.rs | 3 +- 5 files changed, 164 insertions(+), 97 deletions(-) create mode 100644 native/spark-expr/benches/date_trunc.rs diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml index ea89c43204..5174c05107 100644 --- a/native/spark-expr/Cargo.toml +++ b/native/spark-expr/Cargo.toml @@ -80,6 +80,10 @@ harness = false name = "padding" harness = false +[[bench]] +name = "date_trunc" +harness = false + [[test]] name = "test_udf_registration" path = "tests/spark_expr_reg.rs" diff --git a/native/spark-expr/benches/date_trunc.rs b/native/spark-expr/benches/date_trunc.rs new file mode 100644 index 0000000000..a3dd0900c6 --- /dev/null +++ b/native/spark-expr/benches/date_trunc.rs @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, Date32Array}; +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion_comet_spark_expr::date_trunc_dyn; +use std::sync::Arc; + +fn criterion_benchmark(c: &mut Criterion) { + let date_array = create_date_array(); + + let mut group = c.benchmark_group("date_trunc"); + + // Benchmark each truncation format + for format in ["YEAR", "QUARTER", "MONTH", "WEEK"] { + let array_ref: ArrayRef = Arc::new(date_array.clone()); + group.bench_function(format!("date_trunc_{}", format.to_lowercase()), |b| { + b.iter(|| date_trunc_dyn(&array_ref, format.to_string()).unwrap()); + }); + } + + group.finish(); +} + +fn create_date_array() -> Date32Array { + // Create 10000 dates spanning several years (more realistic workload) + // Days since Unix epoch: range from 0 (1970-01-01) to ~19000 (2022) + let dates: Vec = (0..10000).map(|i| (i * 2) % 19000).collect(); + Date32Array::from(dates) +} + +fn config() -> Criterion { + Criterion::default() +} + +criterion_group! { + name = benches; + config = config(); + targets = criterion_benchmark +} +criterion_main!(benches); diff --git a/native/spark-expr/src/kernels/mod.rs b/native/spark-expr/src/kernels/mod.rs index 3669ff13ad..0092f016c9 100644 --- a/native/spark-expr/src/kernels/mod.rs +++ b/native/spark-expr/src/kernels/mod.rs @@ -18,4 +18,4 @@ //! Kernels pub mod strings; -pub(crate) mod temporal; +pub mod temporal; diff --git a/native/spark-expr/src/kernels/temporal.rs b/native/spark-expr/src/kernels/temporal.rs index 09e2c905c7..2668e5095a 100644 --- a/native/spark-expr/src/kernels/temporal.rs +++ b/native/spark-expr/src/kernels/temporal.rs @@ -17,7 +17,7 @@ //! temporal kernels -use chrono::{DateTime, Datelike, Duration, NaiveDateTime, Timelike, Utc}; +use chrono::{DateTime, Datelike, Duration, NaiveDate, Timelike, Utc}; use std::sync::Arc; @@ -25,7 +25,7 @@ use arrow::array::{ downcast_dictionary_array, downcast_temporal_array, temporal_conversions::*, timezone::Tz, - types::{ArrowDictionaryKeyType, ArrowTemporalType, Date32Type, TimestampMicrosecondType}, + types::{ArrowDictionaryKeyType, ArrowTemporalType, TimestampMicrosecondType}, ArrowNumericType, }; use arrow::{ @@ -46,47 +46,57 @@ macro_rules! return_compute_error_with { // and the beginning of the Unix Epoch (1970-01-01) const DAYS_TO_UNIX_EPOCH: i32 = 719_163; -// Copied from arrow_arith/temporal.rs with modification to the output datatype -// Transforms a array of NaiveDate to an array of Date32 after applying an operation -fn as_datetime_with_op, T: ArrowTemporalType, F>( - iter: ArrayIter, - mut builder: PrimitiveBuilder, - op: F, -) -> Date32Array -where - F: Fn(NaiveDateTime) -> i32, - i64: From, -{ - iter.into_iter().for_each(|value| { - if let Some(value) = value { - match as_datetime::(i64::from(value)) { - Some(dt) => builder.append_value(op(dt)), - None => builder.append_null(), - } - } else { - builder.append_null(); - } - }); +// Optimized date truncation functions that work directly with days since epoch +// These avoid the overhead of converting to/from NaiveDateTime - builder.finish() +/// Convert days since Unix epoch to NaiveDate +#[inline] +fn days_to_date(days: i32) -> Option { + NaiveDate::from_num_days_from_ce_opt(days + DAYS_TO_UNIX_EPOCH) } +/// Truncate date to first day of year - optimized version +/// Uses ordinal (day of year) to avoid creating a new date #[inline] -fn as_datetime_with_op_single( - value: Option, - builder: &mut PrimitiveBuilder, - op: F, -) where - F: Fn(NaiveDateTime) -> i32, -{ - if let Some(value) = value { - match as_datetime::(i64::from(value)) { - Some(dt) => builder.append_value(op(dt)), - None => builder.append_null(), - } - } else { - builder.append_null(); - } +fn trunc_days_to_year(days: i32) -> Option { + let date = days_to_date(days)?; + let day_of_year_offset = date.ordinal() as i32 - 1; + Some(days - day_of_year_offset) +} + +/// Truncate date to first day of quarter - optimized version +/// Computes offset from first day of quarter without creating a new date +#[inline] +fn trunc_days_to_quarter(days: i32) -> Option { + let date = days_to_date(days)?; + let month = date.month(); // 1-12 + let quarter = (month - 1) / 3; // 0-3 + let first_month_of_quarter = quarter * 3 + 1; // 1, 4, 7, or 10 + + // Find day of year for first day of quarter + let first_day_of_quarter = NaiveDate::from_ymd_opt(date.year(), first_month_of_quarter, 1)?; + let quarter_start_ordinal = first_day_of_quarter.ordinal() as i32; + let current_ordinal = date.ordinal() as i32; + + Some(days - (current_ordinal - quarter_start_ordinal)) +} + +/// Truncate date to first day of month - optimized version +/// Instead of creating a new date, just subtract day offset +#[inline] +fn trunc_days_to_month(days: i32) -> Option { + let date = days_to_date(days)?; + let day_offset = date.day() as i32 - 1; + Some(days - day_offset) +} + +/// Truncate date to first day of week (Monday) - optimized version +#[inline] +fn trunc_days_to_week(days: i32) -> Option { + let date = days_to_date(days)?; + // weekday().num_days_from_monday() gives 0 for Monday, 1 for Tuesday, etc. + let days_since_monday = date.weekday().num_days_from_monday() as i32; + Some(days - days_since_monday) } // Based on arrow_arith/temporal.rs:extract_component_from_datetime_array @@ -143,11 +153,6 @@ where Ok(()) } -#[inline] -fn as_days_from_unix_epoch(dt: Option) -> i32 { - dt.unwrap().num_days_from_ce() - DAYS_TO_UNIX_EPOCH -} - // Apply the Tz to the Naive Date Time,,convert to UTC, and return as microseconds in Unix epoch #[inline] fn as_micros_from_unix_epoch_utc(dt: Option>) -> i64 { @@ -251,7 +256,7 @@ fn trunc_date_to_microsec(dt: T) -> Option { /// array is an array of Date32 values. The array may be a dictionary array. /// /// format is a scalar string specifying the format to apply to the timestamp value. -pub(crate) fn date_trunc_dyn(array: &dyn Array, format: String) -> Result { +pub fn date_trunc_dyn(array: &dyn Array, format: String) -> Result { match array.data_type().clone() { DataType::Dictionary(_, _) => { downcast_dictionary_array!( @@ -282,34 +287,17 @@ where T: ArrowTemporalType + ArrowNumericType, i64: From, { - let builder = Date32Builder::with_capacity(array.len()); - let iter = ArrayIter::new(array); match array.data_type() { - DataType::Date32 => match format.to_uppercase().as_str() { - "YEAR" | "YYYY" | "YY" => Ok(as_datetime_with_op::<&PrimitiveArray, T, _>( - iter, - builder, - |dt| as_days_from_unix_epoch(trunc_date_to_year(dt)), - )), - "QUARTER" => Ok(as_datetime_with_op::<&PrimitiveArray, T, _>( - iter, - builder, - |dt| as_days_from_unix_epoch(trunc_date_to_quarter(dt)), - )), - "MONTH" | "MON" | "MM" => Ok(as_datetime_with_op::<&PrimitiveArray, T, _>( - iter, - builder, - |dt| as_days_from_unix_epoch(trunc_date_to_month(dt)), - )), - "WEEK" => Ok(as_datetime_with_op::<&PrimitiveArray, T, _>( - iter, - builder, - |dt| as_days_from_unix_epoch(trunc_date_to_week(dt)), - )), - _ => Err(SparkError::Internal(format!( - "Unsupported format: {format:?} for function 'date_trunc'" - ))), - }, + DataType::Date32 => { + // Use optimized path for Date32 that works directly with days + date_trunc_date32( + array + .as_any() + .downcast_ref::() + .expect("Date32 type mismatch"), + format, + ) + } dt => return_compute_error_with!( "Unsupported input type '{:?}' for function 'date_trunc'", dt @@ -317,6 +305,31 @@ where } } +/// Optimized date truncation for Date32 arrays +/// Works directly with days since epoch instead of converting to/from NaiveDateTime +fn date_trunc_date32(array: &Date32Array, format: String) -> Result { + // Select the truncation function based on format + let trunc_fn: fn(i32) -> Option = match format.to_uppercase().as_str() { + "YEAR" | "YYYY" | "YY" => trunc_days_to_year, + "QUARTER" => trunc_days_to_quarter, + "MONTH" | "MON" | "MM" => trunc_days_to_month, + "WEEK" => trunc_days_to_week, + _ => { + return Err(SparkError::Internal(format!( + "Unsupported format: {format:?} for function 'date_trunc'" + ))) + } + }; + + // Apply truncation to each element + let result: Date32Array = array + .iter() + .map(|opt_days| opt_days.and_then(trunc_fn)) + .collect(); + + Ok(result) +} + /// /// Implements the spark [TRUNC](https://spark.apache.org/docs/latest/api/sql/index.html#trunc) /// function where the specified format may be an array @@ -410,29 +423,23 @@ macro_rules! date_trunc_array_fmt_helper { match $datatype { DataType::Date32 => { for (index, val) in iter.enumerate() { - let op_result = match $formats.value(index).to_uppercase().as_str() { - "YEAR" | "YYYY" | "YY" => { - Ok(as_datetime_with_op_single(val, &mut builder, |dt| { - as_days_from_unix_epoch(trunc_date_to_year(dt)) - })) - } - "QUARTER" => Ok(as_datetime_with_op_single(val, &mut builder, |dt| { - as_days_from_unix_epoch(trunc_date_to_quarter(dt)) - })), - "MONTH" | "MON" | "MM" => { - Ok(as_datetime_with_op_single(val, &mut builder, |dt| { - as_days_from_unix_epoch(trunc_date_to_month(dt)) - })) - } - "WEEK" => Ok(as_datetime_with_op_single(val, &mut builder, |dt| { - as_days_from_unix_epoch(trunc_date_to_week(dt)) - })), - _ => Err(SparkError::Internal(format!( - "Unsupported format: {:?} for function 'date_trunc'", - $formats.value(index) - ))), - }; - op_result? + let trunc_fn: fn(i32) -> Option = + match $formats.value(index).to_uppercase().as_str() { + "YEAR" | "YYYY" | "YY" => trunc_days_to_year, + "QUARTER" => trunc_days_to_quarter, + "MONTH" | "MON" | "MM" => trunc_days_to_month, + "WEEK" => trunc_days_to_week, + _ => { + return Err(SparkError::Internal(format!( + "Unsupported format: {:?} for function 'date_trunc'", + $formats.value(index) + ))) + } + }; + match val.and_then(trunc_fn) { + Some(days) => builder.append_value(days), + None => builder.append_null(), + } } Ok(builder.finish()) } diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 96e727ae55..f26fd911d8 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -21,7 +21,8 @@ mod error; -mod kernels; +pub mod kernels; +pub use kernels::temporal::date_trunc_dyn; mod static_invoke; pub use static_invoke::*;