Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ harness = false
name = "padding"
harness = false

[[bench]]
name = "date_trunc"
harness = false

[[test]]
name = "test_udf_registration"
path = "tests/spark_expr_reg.rs"
55 changes: 55 additions & 0 deletions native/spark-expr/benches/date_trunc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, Date32Array};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion_comet_spark_expr::date_trunc_dyn;
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
let date_array = create_date_array();

let mut group = c.benchmark_group("date_trunc");

// Benchmark each truncation format
for format in ["YEAR", "QUARTER", "MONTH", "WEEK"] {
let array_ref: ArrayRef = Arc::new(date_array.clone());
group.bench_function(format!("date_trunc_{}", format.to_lowercase()), |b| {
b.iter(|| date_trunc_dyn(&array_ref, format.to_string()).unwrap());
});
}

group.finish();
}

fn create_date_array() -> Date32Array {
// Create 10000 dates spanning several years (more realistic workload)
// Days since Unix epoch: range from 0 (1970-01-01) to ~19000 (2022)
let dates: Vec<i32> = (0..10000).map(|i| (i * 2) % 19000).collect();
Date32Array::from(dates)
}

fn config() -> Criterion {
Criterion::default()
}

criterion_group! {
name = benches;
config = config();
targets = criterion_benchmark
}
criterion_main!(benches);
2 changes: 1 addition & 1 deletion native/spark-expr/src/kernels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
//! Kernels

pub mod strings;
pub(crate) mod temporal;
pub mod temporal;
197 changes: 102 additions & 95 deletions native/spark-expr/src/kernels/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

//! temporal kernels

use chrono::{DateTime, Datelike, Duration, NaiveDateTime, Timelike, Utc};
use chrono::{DateTime, Datelike, Duration, NaiveDate, Timelike, Utc};

use std::sync::Arc;

use arrow::array::{
downcast_dictionary_array, downcast_temporal_array,
temporal_conversions::*,
timezone::Tz,
types::{ArrowDictionaryKeyType, ArrowTemporalType, Date32Type, TimestampMicrosecondType},
types::{ArrowDictionaryKeyType, ArrowTemporalType, TimestampMicrosecondType},
ArrowNumericType,
};
use arrow::{
Expand All @@ -46,47 +46,57 @@ macro_rules! return_compute_error_with {
// and the beginning of the Unix Epoch (1970-01-01)
const DAYS_TO_UNIX_EPOCH: i32 = 719_163;

// Copied from arrow_arith/temporal.rs with modification to the output datatype
// Transforms a array of NaiveDate to an array of Date32 after applying an operation
fn as_datetime_with_op<A: ArrayAccessor<Item = T::Native>, T: ArrowTemporalType, F>(
iter: ArrayIter<A>,
mut builder: PrimitiveBuilder<Date32Type>,
op: F,
) -> Date32Array
where
F: Fn(NaiveDateTime) -> i32,
i64: From<T::Native>,
{
iter.into_iter().for_each(|value| {
if let Some(value) = value {
match as_datetime::<T>(i64::from(value)) {
Some(dt) => builder.append_value(op(dt)),
None => builder.append_null(),
}
} else {
builder.append_null();
}
});
// Optimized date truncation functions that work directly with days since epoch
// These avoid the overhead of converting to/from NaiveDateTime

builder.finish()
/// Convert days since Unix epoch to NaiveDate
#[inline]
fn days_to_date(days: i32) -> Option<NaiveDate> {
NaiveDate::from_num_days_from_ce_opt(days + DAYS_TO_UNIX_EPOCH)
}

/// Truncate date to first day of year - optimized version
/// Uses ordinal (day of year) to avoid creating a new date
#[inline]
fn as_datetime_with_op_single<F>(
value: Option<i32>,
builder: &mut PrimitiveBuilder<Date32Type>,
op: F,
) where
F: Fn(NaiveDateTime) -> i32,
{
if let Some(value) = value {
match as_datetime::<Date32Type>(i64::from(value)) {
Some(dt) => builder.append_value(op(dt)),
None => builder.append_null(),
}
} else {
builder.append_null();
}
fn trunc_days_to_year(days: i32) -> Option<i32> {
let date = days_to_date(days)?;
let day_of_year_offset = date.ordinal() as i32 - 1;
Some(days - day_of_year_offset)
}

/// Truncate date to first day of quarter - optimized version
/// Computes offset from first day of quarter without creating a new date
#[inline]
fn trunc_days_to_quarter(days: i32) -> Option<i32> {
let date = days_to_date(days)?;
let month = date.month(); // 1-12
let quarter = (month - 1) / 3; // 0-3
let first_month_of_quarter = quarter * 3 + 1; // 1, 4, 7, or 10

// Find day of year for first day of quarter
let first_day_of_quarter = NaiveDate::from_ymd_opt(date.year(), first_month_of_quarter, 1)?;
let quarter_start_ordinal = first_day_of_quarter.ordinal() as i32;
let current_ordinal = date.ordinal() as i32;

Some(days - (current_ordinal - quarter_start_ordinal))
}

/// Truncate date to first day of month - optimized version
/// Instead of creating a new date, just subtract day offset
#[inline]
fn trunc_days_to_month(days: i32) -> Option<i32> {
let date = days_to_date(days)?;
let day_offset = date.day() as i32 - 1;
Some(days - day_offset)
}

/// Truncate date to first day of week (Monday) - optimized version
#[inline]
fn trunc_days_to_week(days: i32) -> Option<i32> {
let date = days_to_date(days)?;
// weekday().num_days_from_monday() gives 0 for Monday, 1 for Tuesday, etc.
let days_since_monday = date.weekday().num_days_from_monday() as i32;
Some(days - days_since_monday)
}

// Based on arrow_arith/temporal.rs:extract_component_from_datetime_array
Expand Down Expand Up @@ -143,11 +153,6 @@ where
Ok(())
}

#[inline]
fn as_days_from_unix_epoch(dt: Option<NaiveDateTime>) -> i32 {
dt.unwrap().num_days_from_ce() - DAYS_TO_UNIX_EPOCH
}

// Apply the Tz to the Naive Date Time,,convert to UTC, and return as microseconds in Unix epoch
#[inline]
fn as_micros_from_unix_epoch_utc(dt: Option<DateTime<Tz>>) -> i64 {
Expand Down Expand Up @@ -251,7 +256,7 @@ fn trunc_date_to_microsec<T: Timelike>(dt: T) -> Option<T> {
/// array is an array of Date32 values. The array may be a dictionary array.
///
/// format is a scalar string specifying the format to apply to the timestamp value.
pub(crate) fn date_trunc_dyn(array: &dyn Array, format: String) -> Result<ArrayRef, SparkError> {
pub fn date_trunc_dyn(array: &dyn Array, format: String) -> Result<ArrayRef, SparkError> {
match array.data_type().clone() {
DataType::Dictionary(_, _) => {
downcast_dictionary_array!(
Expand Down Expand Up @@ -282,41 +287,49 @@ where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
let builder = Date32Builder::with_capacity(array.len());
let iter = ArrayIter::new(array);
match array.data_type() {
DataType::Date32 => match format.to_uppercase().as_str() {
"YEAR" | "YYYY" | "YY" => Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
iter,
builder,
|dt| as_days_from_unix_epoch(trunc_date_to_year(dt)),
)),
"QUARTER" => Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
iter,
builder,
|dt| as_days_from_unix_epoch(trunc_date_to_quarter(dt)),
)),
"MONTH" | "MON" | "MM" => Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
iter,
builder,
|dt| as_days_from_unix_epoch(trunc_date_to_month(dt)),
)),
"WEEK" => Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
iter,
builder,
|dt| as_days_from_unix_epoch(trunc_date_to_week(dt)),
)),
_ => Err(SparkError::Internal(format!(
"Unsupported format: {format:?} for function 'date_trunc'"
))),
},
DataType::Date32 => {
// Use optimized path for Date32 that works directly with days
date_trunc_date32(
array
.as_any()
.downcast_ref::<Date32Array>()
.expect("Date32 type mismatch"),
format,
)
}
dt => return_compute_error_with!(
"Unsupported input type '{:?}' for function 'date_trunc'",
dt
),
}
}

/// Optimized date truncation for Date32 arrays
/// Works directly with days since epoch instead of converting to/from NaiveDateTime
fn date_trunc_date32(array: &Date32Array, format: String) -> Result<Date32Array, SparkError> {
// Select the truncation function based on format
let trunc_fn: fn(i32) -> Option<i32> = match format.to_uppercase().as_str() {
"YEAR" | "YYYY" | "YY" => trunc_days_to_year,
"QUARTER" => trunc_days_to_quarter,
"MONTH" | "MON" | "MM" => trunc_days_to_month,
"WEEK" => trunc_days_to_week,
_ => {
return Err(SparkError::Internal(format!(
"Unsupported format: {format:?} for function 'date_trunc'"
)))
}
};

// Apply truncation to each element
let result: Date32Array = array
.iter()
.map(|opt_days| opt_days.and_then(trunc_fn))
.collect();

Ok(result)
}

///
/// Implements the spark [TRUNC](https://spark.apache.org/docs/latest/api/sql/index.html#trunc)
/// function where the specified format may be an array
Expand Down Expand Up @@ -410,29 +423,23 @@ macro_rules! date_trunc_array_fmt_helper {
match $datatype {
DataType::Date32 => {
for (index, val) in iter.enumerate() {
let op_result = match $formats.value(index).to_uppercase().as_str() {
"YEAR" | "YYYY" | "YY" => {
Ok(as_datetime_with_op_single(val, &mut builder, |dt| {
as_days_from_unix_epoch(trunc_date_to_year(dt))
}))
}
"QUARTER" => Ok(as_datetime_with_op_single(val, &mut builder, |dt| {
as_days_from_unix_epoch(trunc_date_to_quarter(dt))
})),
"MONTH" | "MON" | "MM" => {
Ok(as_datetime_with_op_single(val, &mut builder, |dt| {
as_days_from_unix_epoch(trunc_date_to_month(dt))
}))
}
"WEEK" => Ok(as_datetime_with_op_single(val, &mut builder, |dt| {
as_days_from_unix_epoch(trunc_date_to_week(dt))
})),
_ => Err(SparkError::Internal(format!(
"Unsupported format: {:?} for function 'date_trunc'",
$formats.value(index)
))),
};
op_result?
let trunc_fn: fn(i32) -> Option<i32> =
match $formats.value(index).to_uppercase().as_str() {
"YEAR" | "YYYY" | "YY" => trunc_days_to_year,
"QUARTER" => trunc_days_to_quarter,
"MONTH" | "MON" | "MM" => trunc_days_to_month,
"WEEK" => trunc_days_to_week,
_ => {
return Err(SparkError::Internal(format!(
"Unsupported format: {:?} for function 'date_trunc'",
$formats.value(index)
)))
}
};
match val.and_then(trunc_fn) {
Some(days) => builder.append_value(days),
None => builder.append_null(),
}
}
Ok(builder.finish())
}
Expand Down
3 changes: 2 additions & 1 deletion native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@

mod error;

mod kernels;
pub mod kernels;
pub use kernels::temporal::date_trunc_dyn;
mod static_invoke;
pub use static_invoke::*;

Expand Down
Loading