diff --git a/datafusion/spark/src/function/datetime/extract.rs b/datafusion/spark/src/function/datetime/extract.rs new file mode 100644 index 0000000000000..eea16fcd4733c --- /dev/null +++ b/datafusion/spark/src/function/datetime/extract.rs @@ -0,0 +1,268 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; + +use arrow::array::ArrayRef; +use arrow::compute::{DatePart, date_part}; +use arrow::datatypes::DataType; +use datafusion_common::Result; +use datafusion_common::utils::take_function_args; +use datafusion_expr::{ + Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, + TypeSignatureClass, Volatility, +}; +use datafusion_functions::utils::make_scalar_function; + +/// Creates a signature for datetime extraction functions that accept timestamp types. +fn extract_signature() -> Signature { + Signature::coercible( + vec![Coercion::new_exact(TypeSignatureClass::Timestamp)], + Volatility::Immutable, + ) +} + +// ----------------------------------------------------------------------------- +// SparkHour +// ----------------------------------------------------------------------------- + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkHour { + signature: Signature, +} + +impl Default for SparkHour { + fn default() -> Self { + Self::new() + } +} + +impl SparkHour { + pub fn new() -> Self { + Self { + signature: extract_signature(), + } + } +} + +impl ScalarUDFImpl for SparkHour { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "hour" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + make_scalar_function(spark_hour, vec![])(&args.args) + } +} + +fn spark_hour(args: &[ArrayRef]) -> Result { + let [ts_arg] = take_function_args("hour", args)?; + let result = date_part(ts_arg.as_ref(), DatePart::Hour)?; + Ok(result) +} + +// ----------------------------------------------------------------------------- +// SparkMinute +// ----------------------------------------------------------------------------- + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkMinute { + signature: Signature, +} + +impl Default for SparkMinute { + fn default() -> Self { + Self::new() + } +} + +impl SparkMinute { + pub fn new() -> Self { + Self { + signature: extract_signature(), + } + } +} + +impl ScalarUDFImpl for SparkMinute { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "minute" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + make_scalar_function(spark_minute, vec![])(&args.args) + } +} + +fn spark_minute(args: &[ArrayRef]) -> Result { + let [ts_arg] = take_function_args("minute", args)?; + let result = date_part(ts_arg.as_ref(), DatePart::Minute)?; + Ok(result) +} + +// ----------------------------------------------------------------------------- +// SparkSecond +// ----------------------------------------------------------------------------- + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkSecond { + signature: Signature, +} + +impl Default for SparkSecond { + fn default() -> Self { + Self::new() + } +} + +impl SparkSecond { + pub fn new() -> Self { + Self { + signature: extract_signature(), + } + } +} + +impl ScalarUDFImpl for SparkSecond { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "second" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + make_scalar_function(spark_second, vec![])(&args.args) + } +} + +fn spark_second(args: &[ArrayRef]) -> Result { + let [ts_arg] = take_function_args("second", args)?; + let result = date_part(ts_arg.as_ref(), DatePart::Second)?; + Ok(result) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use arrow::array::{Array, Int32Array, TimestampMicrosecondArray}; + use arrow::datatypes::TimeUnit; + + #[test] + fn test_spark_hour() { + // Create a timestamp array: 2024-01-15 14:30:45 UTC (in microseconds) + // 14:30:45 -> hour = 14 + let ts_micros = 1705329045_000_000i64; // 2024-01-15 14:30:45 UTC + let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]); + let ts_array = Arc::new(ts_array) as ArrayRef; + + let result = spark_hour(&[ts_array]).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(result.value(0), 14); + assert!(result.is_null(1)); + } + + #[test] + fn test_spark_minute() { + // 14:30:45 -> minute = 30 + let ts_micros = 1705329045_000_000i64; + let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]); + let ts_array = Arc::new(ts_array) as ArrayRef; + + let result = spark_minute(&[ts_array]).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(result.value(0), 30); + assert!(result.is_null(1)); + } + + #[test] + fn test_spark_second() { + // 14:30:45 -> second = 45 + let ts_micros = 1705329045_000_000i64; + let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]); + let ts_array = Arc::new(ts_array) as ArrayRef; + + let result = spark_second(&[ts_array]).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(result.value(0), 45); + assert!(result.is_null(1)); + } + + #[test] + fn test_hour_return_type() { + let func = SparkHour::new(); + let result = func + .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)]) + .unwrap(); + assert_eq!(result, DataType::Int32); + } + + #[test] + fn test_minute_return_type() { + let func = SparkMinute::new(); + let result = func + .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)]) + .unwrap(); + assert_eq!(result, DataType::Int32); + } + + #[test] + fn test_second_return_type() { + let func = SparkSecond::new(); + let result = func + .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)]) + .unwrap(); + assert_eq!(result, DataType::Int32); + } +} diff --git a/datafusion/spark/src/function/datetime/mod.rs b/datafusion/spark/src/function/datetime/mod.rs index a6adc99607665..849aa20895990 100644 --- a/datafusion/spark/src/function/datetime/mod.rs +++ b/datafusion/spark/src/function/datetime/mod.rs @@ -17,6 +17,7 @@ pub mod date_add; pub mod date_sub; +pub mod extract; pub mod last_day; pub mod make_dt_interval; pub mod make_interval; @@ -28,6 +29,9 @@ use std::sync::Arc; make_udf_function!(date_add::SparkDateAdd, date_add); make_udf_function!(date_sub::SparkDateSub, date_sub); +make_udf_function!(extract::SparkHour, hour); +make_udf_function!(extract::SparkMinute, minute); +make_udf_function!(extract::SparkSecond, second); make_udf_function!(last_day::SparkLastDay, last_day); make_udf_function!(make_dt_interval::SparkMakeDtInterval, make_dt_interval); make_udf_function!(make_interval::SparkMakeInterval, make_interval); @@ -46,6 +50,17 @@ pub mod expr_fn { "Returns the date that is days days before start. The function returns NULL if at least one of the input parameters is NULL.", arg1 arg2 )); + export_functions!((hour, "Extracts the hour component of a timestamp.", arg1)); + export_functions!(( + minute, + "Extracts the minute component of a timestamp.", + arg1 + )); + export_functions!(( + second, + "Extracts the second component of a timestamp.", + arg1 + )); export_functions!(( last_day, "Returns the last day of the month which the date belongs to.", @@ -74,6 +89,9 @@ pub fn functions() -> Vec> { vec![ date_add(), date_sub(), + hour(), + minute(), + second(), last_day(), make_dt_interval(), make_interval(), diff --git a/datafusion/sqllogictest/test_files/spark/datetime/hour.slt b/datafusion/sqllogictest/test_files/spark/datetime/hour.slt index e129b271658ab..b595a98b579c8 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/hour.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/hour.slt @@ -23,5 +23,24 @@ ## Original Query: SELECT hour('2009-07-30 12:58:59'); ## PySpark 3.5.5 Result: {'hour(2009-07-30 12:58:59)': 12, 'typeof(hour(2009-07-30 12:58:59))': 'int', 'typeof(2009-07-30 12:58:59)': 'string'} -#query -#SELECT hour('2009-07-30 12:58:59'::string); +query I +SELECT hour('2009-07-30 12:58:59'::timestamp); +---- +12 + +# Test with different hours +query I +SELECT hour('2009-07-30 00:00:00'::timestamp); +---- +0 + +query I +SELECT hour('2009-07-30 23:59:59'::timestamp); +---- +23 + +# Test with NULL +query I +SELECT hour(NULL::timestamp); +---- +NULL diff --git a/datafusion/sqllogictest/test_files/spark/datetime/minute.slt b/datafusion/sqllogictest/test_files/spark/datetime/minute.slt index dbe1e64be8377..8792c544736d0 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/minute.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/minute.slt @@ -23,5 +23,24 @@ ## Original Query: SELECT minute('2009-07-30 12:58:59'); ## PySpark 3.5.5 Result: {'minute(2009-07-30 12:58:59)': 58, 'typeof(minute(2009-07-30 12:58:59))': 'int', 'typeof(2009-07-30 12:58:59)': 'string'} -#query -#SELECT minute('2009-07-30 12:58:59'::string); +query I +SELECT minute('2009-07-30 12:58:59'::timestamp); +---- +58 + +# Test with different minutes +query I +SELECT minute('2009-07-30 12:00:00'::timestamp); +---- +0 + +query I +SELECT minute('2009-07-30 12:59:59'::timestamp); +---- +59 + +# Test with NULL +query I +SELECT minute(NULL::timestamp); +---- +NULL diff --git a/datafusion/sqllogictest/test_files/spark/datetime/second.slt b/datafusion/sqllogictest/test_files/spark/datetime/second.slt index f69c9af4a62d9..7a99dd8967b02 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/second.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/second.slt @@ -23,5 +23,24 @@ ## Original Query: SELECT second('2009-07-30 12:58:59'); ## PySpark 3.5.5 Result: {'second(2009-07-30 12:58:59)': 59, 'typeof(second(2009-07-30 12:58:59))': 'int', 'typeof(2009-07-30 12:58:59)': 'string'} -#query -#SELECT second('2009-07-30 12:58:59'::string); +query I +SELECT second('2009-07-30 12:58:59'::timestamp); +---- +59 + +# Test with different seconds +query I +SELECT second('2009-07-30 12:58:00'::timestamp); +---- +0 + +query I +SELECT second('2009-07-30 12:58:30'::timestamp); +---- +30 + +# Test with NULL +query I +SELECT second(NULL::timestamp); +---- +NULL