diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml index ea89c43204..b056e1b29a 100644 --- a/native/spark-expr/Cargo.toml +++ b/native/spark-expr/Cargo.toml @@ -80,6 +80,10 @@ harness = false name = "padding" harness = false +[[bench]] +name = "normalize_nan" +harness = false + [[test]] name = "test_udf_registration" path = "tests/spark_expr_reg.rs" diff --git a/native/spark-expr/benches/normalize_nan.rs b/native/spark-expr/benches/normalize_nan.rs new file mode 100644 index 0000000000..17413e7f07 --- /dev/null +++ b/native/spark-expr/benches/normalize_nan.rs @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for NormalizeNaNAndZero expression + +use arrow::array::Float64Array; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use datafusion::physical_expr::expressions::Column; +use datafusion::physical_expr::PhysicalExpr; +use datafusion_comet_spark_expr::NormalizeNaNAndZero; +use std::hint::black_box; +use std::sync::Arc; + +const BATCH_SIZE: usize = 8192; + +fn make_col(name: &str, index: usize) -> Arc { + Arc::new(Column::new(name, index)) +} + +/// Create a batch with float64 column containing various values including NaN and -0.0 +fn create_float_batch(nan_pct: usize, neg_zero_pct: usize, null_pct: usize) -> RecordBatch { + let mut values: Vec> = Vec::with_capacity(BATCH_SIZE); + + for i in 0..BATCH_SIZE { + if null_pct > 0 && i % (100 / null_pct.max(1)) == 0 { + values.push(None); + } else if nan_pct > 0 && i % (100 / nan_pct.max(1)) == 1 { + values.push(Some(f64::NAN)); + } else if neg_zero_pct > 0 && i % (100 / neg_zero_pct.max(1)) == 2 { + values.push(Some(-0.0)); + } else { + values.push(Some(i as f64 * 1.5)); + } + } + + let array = Float64Array::from(values); + let schema = Schema::new(vec![Field::new("c1", DataType::Float64, true)]); + + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap() +} + +fn bench_normalize_nan_and_zero(c: &mut Criterion) { + let mut group = c.benchmark_group("normalize_nan_and_zero"); + + // Test with different percentages of special values + let test_cases = [ + ("no_special", 0, 0, 0), + ("10pct_nan", 10, 0, 0), + ("10pct_neg_zero", 0, 10, 0), + ("10pct_null", 0, 0, 10), + ("mixed_10pct", 5, 5, 5), + ("all_normal", 0, 0, 0), + ]; + + for (name, nan_pct, neg_zero_pct, null_pct) in test_cases { + let batch = create_float_batch(nan_pct, neg_zero_pct, null_pct); + + let normalize_expr = Arc::new(NormalizeNaNAndZero::new( + DataType::Float64, + make_col("c1", 0), + )); + + group.bench_with_input(BenchmarkId::new("float64", name), &batch, |b, batch| { + b.iter(|| black_box(normalize_expr.evaluate(black_box(batch)).unwrap())); + }); + } + + group.finish(); +} + +criterion_group!(benches, bench_normalize_nan_and_zero); +criterion_main!(benches); diff --git a/native/spark-expr/src/math_funcs/internal/normalize_nan.rs b/native/spark-expr/src/math_funcs/internal/normalize_nan.rs index 0bd556ed73..4094bd7621 100644 --- a/native/spark-expr/src/math_funcs/internal/normalize_nan.rs +++ b/native/spark-expr/src/math_funcs/internal/normalize_nan.rs @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. +use arrow::compute::unary; use arrow::datatypes::{DataType, Schema}; use arrow::{ - array::{as_primitive_array, ArrayAccessor, ArrayIter, Float32Array, Float64Array}, - datatypes::{ArrowNativeType, Float32Type, Float64Type}, + array::{as_primitive_array, Float32Array, Float64Array}, + datatypes::{Float32Type, Float64Type}, record_batch::RecordBatch, }; use datafusion::logical_expr::ColumnarValue; @@ -78,14 +79,16 @@ impl PhysicalExpr for NormalizeNaNAndZero { match &self.data_type { DataType::Float32 => { - let v = eval_typed(as_primitive_array::(&array)); - let new_array = Float32Array::from(v); - Ok(ColumnarValue::Array(Arc::new(new_array))) + let input = as_primitive_array::(&array); + // Use unary which operates directly on values buffer without intermediate allocation + let result: Float32Array = unary(input, normalize_float); + Ok(ColumnarValue::Array(Arc::new(result))) } DataType::Float64 => { - let v = eval_typed(as_primitive_array::(&array)); - let new_array = Float64Array::from(v); - Ok(ColumnarValue::Array(Arc::new(new_array))) + let input = as_primitive_array::(&array); + // Use unary which operates directly on values buffer without intermediate allocation + let result: Float64Array = unary(input, normalize_float); + Ok(ColumnarValue::Array(Arc::new(result))) } dt => panic!("Unexpected data type {dt:?}"), } @@ -106,20 +109,17 @@ impl PhysicalExpr for NormalizeNaNAndZero { } } -fn eval_typed>(input: T) -> Vec> { - let iter = ArrayIter::new(input); - iter.map(|o| { - o.map(|v| { - if v.is_nan() { - v.nan() - } else if v.is_neg_zero() { - v.zero() - } else { - v - } - }) - }) - .collect() +/// Normalize a floating point value by converting all NaN representations to a canonical NaN +/// and negative zero to positive zero. This is used for Spark's comparison semantics. +#[inline] +fn normalize_float(v: T) -> T { + if v.is_nan() { + T::nan() + } else if v == T::neg_zero() { + T::zero() + } else { + v + } } impl Display for NormalizeNaNAndZero { @@ -127,39 +127,3 @@ impl Display for NormalizeNaNAndZero { write!(f, "FloatNormalize [child: {}]", self.child) } } - -trait FloatDouble: ArrowNativeType { - fn is_nan(&self) -> bool; - fn nan(&self) -> Self; - fn is_neg_zero(&self) -> bool; - fn zero(&self) -> Self; -} - -impl FloatDouble for f32 { - fn is_nan(&self) -> bool { - f32::is_nan(*self) - } - fn nan(&self) -> Self { - f32::NAN - } - fn is_neg_zero(&self) -> bool { - *self == -0.0 - } - fn zero(&self) -> Self { - 0.0 - } -} -impl FloatDouble for f64 { - fn is_nan(&self) -> bool { - f64::is_nan(*self) - } - fn nan(&self) -> Self { - f64::NAN - } - fn is_neg_zero(&self) -> bool { - *self == -0.0 - } - fn zero(&self) -> Self { - 0.0 - } -}