From f663c03dd5ba2ff441424143fd9df49299862000 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 27 Dec 2025 08:53:54 -0700 Subject: [PATCH 1/4] feat: Implement Spark functions hour, minute, second --- .../spark/src/function/datetime/extract.rs | 300 ++++++++++++++++++ datafusion/spark/src/function/datetime/mod.rs | 22 ++ .../test_files/spark/datetime/hour.slt | 23 +- .../test_files/spark/datetime/minute.slt | 23 +- .../test_files/spark/datetime/second.slt | 23 +- 5 files changed, 385 insertions(+), 6 deletions(-) create mode 100644 datafusion/spark/src/function/datetime/extract.rs diff --git a/datafusion/spark/src/function/datetime/extract.rs b/datafusion/spark/src/function/datetime/extract.rs new file mode 100644 index 0000000000000..d325f9d15e8b1 --- /dev/null +++ b/datafusion/spark/src/function/datetime/extract.rs @@ -0,0 +1,300 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::ArrayRef; +use arrow::compute::{date_part, DatePart}; +use arrow::datatypes::DataType; +use datafusion_common::utils::take_function_args; +use datafusion_common::Result; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, +}; +use datafusion_functions::utils::make_scalar_function; + +/// Creates a signature for datetime extraction functions that accept timestamp types. +fn extract_signature() -> Signature { + Signature::one_of( + vec![ + TypeSignature::Exact(vec![DataType::Timestamp( + arrow::datatypes::TimeUnit::Microsecond, + None, + )]), + TypeSignature::Exact(vec![DataType::Timestamp( + arrow::datatypes::TimeUnit::Microsecond, + Some(Arc::from("")), + )]), + TypeSignature::Exact(vec![DataType::Timestamp( + arrow::datatypes::TimeUnit::Millisecond, + None, + )]), + TypeSignature::Exact(vec![DataType::Timestamp( + arrow::datatypes::TimeUnit::Millisecond, + Some(Arc::from("")), + )]), + TypeSignature::Exact(vec![DataType::Timestamp( + arrow::datatypes::TimeUnit::Second, + None, + )]), + TypeSignature::Exact(vec![DataType::Timestamp( + arrow::datatypes::TimeUnit::Second, + Some(Arc::from("")), + )]), + TypeSignature::Exact(vec![DataType::Timestamp( + arrow::datatypes::TimeUnit::Nanosecond, + None, + )]), + TypeSignature::Exact(vec![DataType::Timestamp( + arrow::datatypes::TimeUnit::Nanosecond, + Some(Arc::from("")), + )]), + ], + Volatility::Immutable, + ) +} + +// ----------------------------------------------------------------------------- +// SparkHour +// ----------------------------------------------------------------------------- + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkHour { + signature: Signature, +} + +impl Default for SparkHour { + fn default() -> Self { + Self::new() + } +} + +impl SparkHour { + pub fn new() -> Self { + Self { + signature: extract_signature(), + } + } +} + +impl ScalarUDFImpl for SparkHour { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "hour" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + make_scalar_function(spark_hour, vec![])(&args.args) + } +} + +fn spark_hour(args: &[ArrayRef]) -> Result { + let [ts_arg] = take_function_args("hour", args)?; + let result = date_part(ts_arg.as_ref(), DatePart::Hour)?; + Ok(result) +} + +// ----------------------------------------------------------------------------- +// SparkMinute +// ----------------------------------------------------------------------------- + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkMinute { + signature: Signature, +} + +impl Default for SparkMinute { + fn default() -> Self { + Self::new() + } +} + +impl SparkMinute { + pub fn new() -> Self { + Self { + signature: extract_signature(), + } + } +} + +impl ScalarUDFImpl for SparkMinute { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "minute" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + make_scalar_function(spark_minute, vec![])(&args.args) + } +} + +fn spark_minute(args: &[ArrayRef]) -> Result { + let [ts_arg] = take_function_args("minute", args)?; + let result = date_part(ts_arg.as_ref(), DatePart::Minute)?; + Ok(result) +} + +// ----------------------------------------------------------------------------- +// SparkSecond +// ----------------------------------------------------------------------------- + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkSecond { + signature: Signature, +} + +impl Default for SparkSecond { + fn default() -> Self { + Self::new() + } +} + +impl SparkSecond { + pub fn new() -> Self { + Self { + signature: extract_signature(), + } + } +} + +impl ScalarUDFImpl for SparkSecond { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "second" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + make_scalar_function(spark_second, vec![])(&args.args) + } +} + +fn spark_second(args: &[ArrayRef]) -> Result { + let [ts_arg] = take_function_args("second", args)?; + let result = date_part(ts_arg.as_ref(), DatePart::Second)?; + Ok(result) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Array, Int32Array, TimestampMicrosecondArray}; + use arrow::datatypes::TimeUnit; + + #[test] + fn test_spark_hour() { + // Create a timestamp array: 2024-01-15 14:30:45 UTC (in microseconds) + // 14:30:45 -> hour = 14 + let ts_micros = 1705329045_000_000i64; // 2024-01-15 14:30:45 UTC + let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]); + let ts_array = Arc::new(ts_array) as ArrayRef; + + let result = spark_hour(&[ts_array]).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(result.value(0), 14); + assert!(result.is_null(1)); + } + + #[test] + fn test_spark_minute() { + // 14:30:45 -> minute = 30 + let ts_micros = 1705329045_000_000i64; + let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]); + let ts_array = Arc::new(ts_array) as ArrayRef; + + let result = spark_minute(&[ts_array]).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(result.value(0), 30); + assert!(result.is_null(1)); + } + + #[test] + fn test_spark_second() { + // 14:30:45 -> second = 45 + let ts_micros = 1705329045_000_000i64; + let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]); + let ts_array = Arc::new(ts_array) as ArrayRef; + + let result = spark_second(&[ts_array]).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(result.value(0), 45); + assert!(result.is_null(1)); + } + + #[test] + fn test_hour_return_type() { + let func = SparkHour::new(); + let result = func + .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)]) + .unwrap(); + assert_eq!(result, DataType::Int32); + } + + #[test] + fn test_minute_return_type() { + let func = SparkMinute::new(); + let result = func + .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)]) + .unwrap(); + assert_eq!(result, DataType::Int32); + } + + #[test] + fn test_second_return_type() { + let func = SparkSecond::new(); + let result = func + .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)]) + .unwrap(); + assert_eq!(result, DataType::Int32); + } +} diff --git a/datafusion/spark/src/function/datetime/mod.rs b/datafusion/spark/src/function/datetime/mod.rs index a6adc99607665..e84dc4235d3f0 100644 --- a/datafusion/spark/src/function/datetime/mod.rs +++ b/datafusion/spark/src/function/datetime/mod.rs @@ -17,6 +17,7 @@ pub mod date_add; pub mod date_sub; +pub mod extract; pub mod last_day; pub mod make_dt_interval; pub mod make_interval; @@ -28,6 +29,9 @@ use std::sync::Arc; make_udf_function!(date_add::SparkDateAdd, date_add); make_udf_function!(date_sub::SparkDateSub, date_sub); +make_udf_function!(extract::SparkHour, hour); +make_udf_function!(extract::SparkMinute, minute); +make_udf_function!(extract::SparkSecond, second); make_udf_function!(last_day::SparkLastDay, last_day); make_udf_function!(make_dt_interval::SparkMakeDtInterval, make_dt_interval); make_udf_function!(make_interval::SparkMakeInterval, make_interval); @@ -46,6 +50,21 @@ pub mod expr_fn { "Returns the date that is days days before start. The function returns NULL if at least one of the input parameters is NULL.", arg1 arg2 )); + export_functions!(( + hour, + "Extracts the hour component of a timestamp.", + arg1 + )); + export_functions!(( + minute, + "Extracts the minute component of a timestamp.", + arg1 + )); + export_functions!(( + second, + "Extracts the second component of a timestamp.", + arg1 + )); export_functions!(( last_day, "Returns the last day of the month which the date belongs to.", @@ -74,6 +93,9 @@ pub fn functions() -> Vec> { vec![ date_add(), date_sub(), + hour(), + minute(), + second(), last_day(), make_dt_interval(), make_interval(), diff --git a/datafusion/sqllogictest/test_files/spark/datetime/hour.slt b/datafusion/sqllogictest/test_files/spark/datetime/hour.slt index e129b271658ab..b595a98b579c8 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/hour.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/hour.slt @@ -23,5 +23,24 @@ ## Original Query: SELECT hour('2009-07-30 12:58:59'); ## PySpark 3.5.5 Result: {'hour(2009-07-30 12:58:59)': 12, 'typeof(hour(2009-07-30 12:58:59))': 'int', 'typeof(2009-07-30 12:58:59)': 'string'} -#query -#SELECT hour('2009-07-30 12:58:59'::string); +query I +SELECT hour('2009-07-30 12:58:59'::timestamp); +---- +12 + +# Test with different hours +query I +SELECT hour('2009-07-30 00:00:00'::timestamp); +---- +0 + +query I +SELECT hour('2009-07-30 23:59:59'::timestamp); +---- +23 + +# Test with NULL +query I +SELECT hour(NULL::timestamp); +---- +NULL diff --git a/datafusion/sqllogictest/test_files/spark/datetime/minute.slt b/datafusion/sqllogictest/test_files/spark/datetime/minute.slt index dbe1e64be8377..8792c544736d0 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/minute.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/minute.slt @@ -23,5 +23,24 @@ ## Original Query: SELECT minute('2009-07-30 12:58:59'); ## PySpark 3.5.5 Result: {'minute(2009-07-30 12:58:59)': 58, 'typeof(minute(2009-07-30 12:58:59))': 'int', 'typeof(2009-07-30 12:58:59)': 'string'} -#query -#SELECT minute('2009-07-30 12:58:59'::string); +query I +SELECT minute('2009-07-30 12:58:59'::timestamp); +---- +58 + +# Test with different minutes +query I +SELECT minute('2009-07-30 12:00:00'::timestamp); +---- +0 + +query I +SELECT minute('2009-07-30 12:59:59'::timestamp); +---- +59 + +# Test with NULL +query I +SELECT minute(NULL::timestamp); +---- +NULL diff --git a/datafusion/sqllogictest/test_files/spark/datetime/second.slt b/datafusion/sqllogictest/test_files/spark/datetime/second.slt index f69c9af4a62d9..7a99dd8967b02 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/second.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/second.slt @@ -23,5 +23,24 @@ ## Original Query: SELECT second('2009-07-30 12:58:59'); ## PySpark 3.5.5 Result: {'second(2009-07-30 12:58:59)': 59, 'typeof(second(2009-07-30 12:58:59))': 'int', 'typeof(2009-07-30 12:58:59)': 'string'} -#query -#SELECT second('2009-07-30 12:58:59'::string); +query I +SELECT second('2009-07-30 12:58:59'::timestamp); +---- +59 + +# Test with different seconds +query I +SELECT second('2009-07-30 12:58:00'::timestamp); +---- +0 + +query I +SELECT second('2009-07-30 12:58:30'::timestamp); +---- +30 + +# Test with NULL +query I +SELECT second(NULL::timestamp); +---- +NULL From cd70b3b43ba1a54a662c69f84f063f217aaa1b02 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 27 Dec 2025 09:00:54 -0700 Subject: [PATCH 2/4] format --- datafusion/spark/src/function/datetime/extract.rs | 7 ++++--- datafusion/spark/src/function/datetime/mod.rs | 6 +----- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/datafusion/spark/src/function/datetime/extract.rs b/datafusion/spark/src/function/datetime/extract.rs index d325f9d15e8b1..a4adaf0cc74f7 100644 --- a/datafusion/spark/src/function/datetime/extract.rs +++ b/datafusion/spark/src/function/datetime/extract.rs @@ -19,12 +19,13 @@ use std::any::Any; use std::sync::Arc; use arrow::array::ArrayRef; -use arrow::compute::{date_part, DatePart}; +use arrow::compute::{DatePart, date_part}; use arrow::datatypes::DataType; -use datafusion_common::utils::take_function_args; use datafusion_common::Result; +use datafusion_common::utils::take_function_args; use datafusion_expr::{ - ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, + Volatility, }; use datafusion_functions::utils::make_scalar_function; diff --git a/datafusion/spark/src/function/datetime/mod.rs b/datafusion/spark/src/function/datetime/mod.rs index e84dc4235d3f0..849aa20895990 100644 --- a/datafusion/spark/src/function/datetime/mod.rs +++ b/datafusion/spark/src/function/datetime/mod.rs @@ -50,11 +50,7 @@ pub mod expr_fn { "Returns the date that is days days before start. The function returns NULL if at least one of the input parameters is NULL.", arg1 arg2 )); - export_functions!(( - hour, - "Extracts the hour component of a timestamp.", - arg1 - )); + export_functions!((hour, "Extracts the hour component of a timestamp.", arg1)); export_functions!(( minute, "Extracts the minute component of a timestamp.", From 105978a49886a11332458d155f5714e390b5c553 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 28 Dec 2025 09:10:44 -0700 Subject: [PATCH 3/4] address feedback --- .../spark/src/function/datetime/extract.rs | 42 ++----------------- 1 file changed, 4 insertions(+), 38 deletions(-) diff --git a/datafusion/spark/src/function/datetime/extract.rs b/datafusion/spark/src/function/datetime/extract.rs index a4adaf0cc74f7..5d2c64527441a 100644 --- a/datafusion/spark/src/function/datetime/extract.rs +++ b/datafusion/spark/src/function/datetime/extract.rs @@ -16,7 +16,6 @@ // under the License. use std::any::Any; -use std::sync::Arc; use arrow::array::ArrayRef; use arrow::compute::{DatePart, date_part}; @@ -24,48 +23,15 @@ use arrow::datatypes::DataType; use datafusion_common::Result; use datafusion_common::utils::take_function_args; use datafusion_expr::{ - ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, - Volatility, + Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, + TypeSignatureClass, Volatility, }; use datafusion_functions::utils::make_scalar_function; /// Creates a signature for datetime extraction functions that accept timestamp types. fn extract_signature() -> Signature { - Signature::one_of( - vec![ - TypeSignature::Exact(vec![DataType::Timestamp( - arrow::datatypes::TimeUnit::Microsecond, - None, - )]), - TypeSignature::Exact(vec![DataType::Timestamp( - arrow::datatypes::TimeUnit::Microsecond, - Some(Arc::from("")), - )]), - TypeSignature::Exact(vec![DataType::Timestamp( - arrow::datatypes::TimeUnit::Millisecond, - None, - )]), - TypeSignature::Exact(vec![DataType::Timestamp( - arrow::datatypes::TimeUnit::Millisecond, - Some(Arc::from("")), - )]), - TypeSignature::Exact(vec![DataType::Timestamp( - arrow::datatypes::TimeUnit::Second, - None, - )]), - TypeSignature::Exact(vec![DataType::Timestamp( - arrow::datatypes::TimeUnit::Second, - Some(Arc::from("")), - )]), - TypeSignature::Exact(vec![DataType::Timestamp( - arrow::datatypes::TimeUnit::Nanosecond, - None, - )]), - TypeSignature::Exact(vec![DataType::Timestamp( - arrow::datatypes::TimeUnit::Nanosecond, - Some(Arc::from("")), - )]), - ], + Signature::coercible( + vec![Coercion::new_exact(TypeSignatureClass::Timestamp)], Volatility::Immutable, ) } From 8e44005f5b92eb4b5a83c66e52c8560b732d0b33 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 28 Dec 2025 09:12:56 -0700 Subject: [PATCH 4/4] fix --- datafusion/spark/src/function/datetime/extract.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/spark/src/function/datetime/extract.rs b/datafusion/spark/src/function/datetime/extract.rs index 5d2c64527441a..eea16fcd4733c 100644 --- a/datafusion/spark/src/function/datetime/extract.rs +++ b/datafusion/spark/src/function/datetime/extract.rs @@ -192,6 +192,7 @@ fn spark_second(args: &[ArrayRef]) -> Result { #[cfg(test)] mod tests { use super::*; + use std::sync::Arc; use arrow::array::{Array, Int32Array, TimestampMicrosecondArray}; use arrow::datatypes::TimeUnit;