From 768b3e90f261c7aea58bdb98dc698b90deeeae34 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 14 Dec 2025 16:24:01 +0400 Subject: [PATCH 1/9] impl map_from_entries --- native/core/src/execution/jni_api.rs | 2 + .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +++++++++++- .../comet/CometMapExpressionSuite.scala | 45 +++++++++++++++++++ 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a24d993059..4f53cea3e6 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,6 +46,7 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; +use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -337,6 +338,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 54df2f1688..a99cf3824b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,7 +125,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays) + classOf[MapFromArrays] -> CometMapFromArrays, + classOf[MapFromEntries] -> CometMapFromEntries) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 2e217f6af0..498aa3594c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,9 +19,12 @@ package org.apache.comet.serde +import scala.annotation.tailrec + import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, MapType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -89,3 +92,27 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } + +object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { + val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" + val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" + + private def containsBinary(dataType: DataType): Boolean = { + dataType match { + case BinaryType => true + case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) + case ArrayType(elementType, _) => containsBinary(elementType) + case _ => false + } + } + + override def getSupportLevel(expr: MapFromEntries): SupportLevel = { + if (containsBinary(expr.dataType.keyType)) { + return Incompatible(Some(keyUnsupportedReason)) + } + if (containsBinary(expr.dataType.valueType)) { + return Incompatible(Some(valueUnsupportedReason)) + } + Compatible(None) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 88c13391a6..01b9744ed6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,7 +25,9 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.BinaryType +import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -125,4 +127,47 @@ class CometMapExpressionSuite extends CometTestBase { } } + test("map_from_entries") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val schemaGenOptions = + SchemaGenOptions( + generateArray = true, + generateStruct = true, + primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) + val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + schemaGenOptions, + dataGenOptions) + } + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + for (field <- df.schema.fieldNames) { + checkSparkAnswerAndOperator( + spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) + } + } + } + + test("map_from_entries - fallback for binary type") { + val table = "t2" + withTable(table) { + sql( + s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), + CometMapFromEntries.keyUnsupportedReason) + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(0, c1))) from $table"), + CometMapFromEntries.valueUnsupportedReason) + } + } + } From c68c3428676b5d991e7ba9e13464bf2ce1ec84e8 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 16 Dec 2025 16:10:43 +0400 Subject: [PATCH 2/9] Revert "impl map_from_entries" This reverts commit 768b3e90f261c7aea58bdb98dc698b90deeeae34. --- native/core/src/execution/jni_api.rs | 2 - .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +----------- .../comet/CometMapExpressionSuite.scala | 45 ------------------- 4 files changed, 2 insertions(+), 77 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 4f53cea3e6..a24d993059 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,7 +46,6 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; -use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -338,7 +337,6 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); - session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a99cf3824b..54df2f1688 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,8 +125,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays, - classOf[MapFromEntries] -> CometMapFromEntries) + classOf[MapFromArrays] -> CometMapFromArrays) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 498aa3594c..2e217f6af0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,12 +19,9 @@ package org.apache.comet.serde -import scala.annotation.tailrec - import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, MapType} -import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -92,27 +89,3 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } - -object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { - val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" - val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" - - private def containsBinary(dataType: DataType): Boolean = { - dataType match { - case BinaryType => true - case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) - case ArrayType(elementType, _) => containsBinary(elementType) - case _ => false - } - } - - override def getSupportLevel(expr: MapFromEntries): SupportLevel = { - if (containsBinary(expr.dataType.keyType)) { - return Incompatible(Some(keyUnsupportedReason)) - } - if (containsBinary(expr.dataType.valueType)) { - return Incompatible(Some(valueUnsupportedReason)) - } - Compatible(None) - } -} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 01b9744ed6..88c13391a6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,9 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.BinaryType -import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -127,47 +125,4 @@ class CometMapExpressionSuite extends CometTestBase { } } - test("map_from_entries") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - val filename = path.toString - val random = new Random(42) - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val schemaGenOptions = - SchemaGenOptions( - generateArray = true, - generateStruct = true, - primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) - val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) - ParquetGenerator.makeParquetFile( - random, - spark, - filename, - 100, - schemaGenOptions, - dataGenOptions) - } - val df = spark.read.parquet(filename) - df.createOrReplaceTempView("t1") - for (field <- df.schema.fieldNames) { - checkSparkAnswerAndOperator( - spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) - } - } - } - - test("map_from_entries - fallback for binary type") { - val table = "t2" - withTable(table) { - sql( - s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), - CometMapFromEntries.keyUnsupportedReason) - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(0, c1))) from $table"), - CometMapFromEntries.valueUnsupportedReason) - } - } - } From 5d153e9394642d56a32ce512afb565976f038294 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sat, 27 Dec 2025 20:31:24 +0400 Subject: [PATCH 3/9] work --- native/core/src/execution/planner.rs | 3 + native/proto/src/proto/expr.proto | 11 + native/spark-expr/src/csv_funcs/mod.rs | 18 ++ native/spark-expr/src/csv_funcs/to_csv.rs | 245 ++++++++++++++++++ native/spark-expr/src/lib.rs | 1 + .../org/apache/comet/serde/structs.scala | 12 +- 6 files changed, 289 insertions(+), 1 deletion(-) create mode 100644 native/spark-expr/src/csv_funcs/mod.rs create mode 100644 native/spark-expr/src/csv_funcs/to_csv.rs diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 8e8191dd0e..62c61d8229 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -644,6 +644,9 @@ impl PhysicalPlanner { ExprStruct::MonotonicallyIncreasingId(_) => Ok(Arc::new( MonotonicallyIncreasingId::from_partition_id(self.partition), )), + ExprStruct::ToCsv(expr) => { + + } expr => Err(GeneralError(format!("Not implemented: {expr:?}"))), } } diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 1c453b6336..8cd88175fc 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -86,6 +86,7 @@ message Expr { EmptyExpr spark_partition_id = 63; EmptyExpr monotonically_increasing_id = 64; FromJson from_json = 89; + ToCsv to_csv = 90; } } @@ -275,6 +276,16 @@ message FromJson { string timezone = 3; } +message ToCsv { + Expr child = 1; + CsvWriteOptions write_options = 2; +} + +message CsvWriteOptions { + string delimiter = 1; + string timezone = 2; +} + enum BinaryOutputStyle { UTF8 = 0; BASIC = 1; diff --git a/native/spark-expr/src/csv_funcs/mod.rs b/native/spark-expr/src/csv_funcs/mod.rs new file mode 100644 index 0000000000..acb57b5c56 --- /dev/null +++ b/native/spark-expr/src/csv_funcs/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod to_csv; diff --git a/native/spark-expr/src/csv_funcs/to_csv.rs b/native/spark-expr/src/csv_funcs/to_csv.rs new file mode 100644 index 0000000000..edb4adf741 --- /dev/null +++ b/native/spark-expr/src/csv_funcs/to_csv.rs @@ -0,0 +1,245 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{RecordBatch, StringBuilder, StructArray}; +use arrow::array::{Array, ArrayRef, StringArray}; +use arrow::datatypes::{DataType, Schema}; +use datafusion::common::Result; +use datafusion::logical_expr::ColumnarValue; +use datafusion::physical_expr::PhysicalExpr; +use std::any::Any; +use std::fmt::{Display, Formatter}; +use std::hash::Hash; +use std::sync::Arc; + +/// to_csv spark function +#[derive(Debug, Eq)] +pub struct ToCsv { + expr: Arc, + delimiter: char, + quote: char, + escape: char, + null_value: String, +} + +impl Hash for ToCsv { + fn hash(&self, state: &mut H) { + self.expr.hash(state); + self.delimiter.hash(state); + self.quote.hash(state); + self.escape.hash(state); + self.null_value.hash(state); + } +} + +impl PartialEq for ToCsv { + fn eq(&self, other: &Self) -> bool { + self.expr.eq(&other.expr) + && self.delimiter.eq(&other.delimiter) + && self.quote.eq(&other.quote) + && self.escape.eq(&other.escape) + && self.null_value.eq(&other.null_value) + } +} + +impl ToCsv { + pub fn new( + expr: Arc, delimiter: char, + quote: char, + escape: char, + null_value: String + ) -> Self { + Self { expr, delimiter, quote, escape, null_value } + } +} + +impl Display for ToCsv { + fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result { + unimplemented!() + } +} + +impl PhysicalExpr for ToCsv { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, _: &Schema) -> Result { + Ok(DataType::Utf8) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + let input_value = self.expr.evaluate(batch)?; + + let array = match input_value { + ColumnarValue::Array(arr) => arr, + ColumnarValue::Scalar(scalar) => scalar.to_array()?, + }; + + let mut builder = StringBuilder::with_capacity(array.len(), 1024); + + for row_idx in 0..array.len() { + if array.is_null(row_idx) { + builder.append_null(); + } else { + let csv_string = struct_to_csv(&array, row_idx)?; + builder.append_value(&csv_string); + } + } + + Ok(Arc::new(builder.finish())) + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.expr] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + todo!() + } + + fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result { + unimplemented!() + } +} + +fn struct_to_csv(array: &StructArray, row_idx: usize) -> Result { + use arrow::array::StructArray; + + let struct_array = array + .as_any() + .downcast_ref::() + .unwrap(); + + let mut csv_string = String::new(); + let num_columns = struct_array.num_columns(); + + for (col_idx, column) in struct_array.columns().iter().enumerate() { + if col_idx > 0 { + csv_string.push(self.options.delimiter); + } + + if column.is_null(row_idx) { + csv_string.push_str(&self.options.null_value); + } else { + let value = self.format_value(column, row_idx)?; + let escaped = self.escape_value(&value); + csv_string.push_str(&escaped); + } + } + + Ok(csv_string) +} + +fn format_value(array: &ArrayRef, row_idx: usize) -> Result { + use arrow::array::*; + + match array.data_type() { + DataType::Null => Ok(self.options.null_value.clone()), + DataType::Boolean => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::Int8 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::Int16 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::Int32 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::Int64 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::UInt8 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::UInt16 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::UInt32 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::UInt64 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::Float32 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::Float64 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::Utf8 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::LargeUtf8 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::Binary => { + let arr = array.as_any().downcast_ref::().unwrap(); + let bytes = arr.value(row_idx); + Ok(format!("{:?}", bytes)) + } + DataType::Date32 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::Date64 => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::Timestamp(unit, tz) => { + let arr = array.as_any().downcast_ref::().unwrap(); + Ok(arr.value(row_idx).to_string()) + } + DataType::Decimal128(precision, scale) => { + let arr = array.as_any().downcast_ref::().unwrap(); + let value = arr.value(row_idx); + let divisor = 10_i128.pow(*scale as u32); + let integer_part = value / divisor; + let fractional_part = (value % divisor).abs(); + Ok(format!("{}.{:0width$}", integer_part, fractional_part, width = *scale as usize)) + } + DataType::List(_) | DataType::LargeList(_) => { + // Для массивов рекурсивно форматируем + Ok(format!("[...]")) // Упрощенная версия + } + DataType::Struct(_) => { + // Вложенные структуры - рекурсивный вызов + self.struct_to_csv(array, row_idx) + } + _ => Err(DataFusionError::NotImplemented( + format!("to_csv not implemented for type: {:?}", array.data_type()) + )), + } +} diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 96e727ae55..deb730b059 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -57,6 +57,7 @@ mod conditional_funcs; mod conversion_funcs; mod math_funcs; mod nondetermenistic_funcs; +mod csv_funcs; pub use array_funcs::*; pub use bitwise_funcs::*; diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala b/spark/src/main/scala/org/apache/comet/serde/structs.scala index 55e031d346..99d8afe08c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/structs.scala +++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import scala.jdk.CollectionConverters._ -import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, GetArrayStructFields, GetStructField, JsonToStructs, StructsToJson} +import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, GetArrayStructFields, GetStructField, JsonToStructs, StructsToCsv, StructsToJson} import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType} import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -231,3 +231,13 @@ object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] { } } } + +object CometStructsToCsv extends CometExpressionSerde[StructsToCsv] { + + override def convert( + expr: StructsToCsv, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + None + } +} From f0f03d427774dfc5019be12f4600845d65d10239 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 28 Dec 2025 17:37:55 +0400 Subject: [PATCH 4/9] WIP --- .github/workflows/pr_build_linux.yml | 1 + native/core/src/execution/planner.rs | 12 +- native/proto/src/proto/expr.proto | 10 +- native/spark-expr/src/csv_funcs/mod.rs | 2 + native/spark-expr/src/csv_funcs/to_csv.rs | 240 ++++++++---------- native/spark-expr/src/lib.rs | 3 +- .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../org/apache/comet/serde/structs.scala | 25 +- .../comet/CometCsvExpressionSuite.scala | 58 +++++ 9 files changed, 211 insertions(+), 143 deletions(-) create mode 100644 spark/src/test/scala/org/apache/comet/CometCsvExpressionSuite.scala diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index e3b0e40566..06a834c738 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -161,6 +161,7 @@ jobs: org.apache.comet.CometStringExpressionSuite org.apache.comet.CometBitwiseExpressionSuite org.apache.comet.CometMapExpressionSuite + org.apache.comet.CometCsvExpressionSuite org.apache.comet.CometJsonExpressionSuite org.apache.comet.expressions.conditional.CometIfSuite org.apache.comet.expressions.conditional.CometCoalesceSuite diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 62c61d8229..34b4dc38cc 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -71,7 +71,7 @@ use datafusion::{ use datafusion_comet_spark_expr::{ create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle, BloomFilterAgg, BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, SparkSecond, - SumInteger, + SumInteger, ToCsv, }; use iceberg::expr::Bind; @@ -645,7 +645,15 @@ impl PhysicalPlanner { MonotonicallyIncreasingId::from_partition_id(self.partition), )), ExprStruct::ToCsv(expr) => { - + let csv_struct_expr = + self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; + Ok(Arc::new(ToCsv::new( + csv_struct_expr, + &expr.delimiter, + &expr.quote, + &expr.escape, + &expr.null_value, + ))) } expr => Err(GeneralError(format!("Not implemented: {expr:?}"))), } diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 8cd88175fc..1f9f37739e 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -278,12 +278,10 @@ message FromJson { message ToCsv { Expr child = 1; - CsvWriteOptions write_options = 2; -} - -message CsvWriteOptions { - string delimiter = 1; - string timezone = 2; + string delimiter = 2; + string quote = 3; + string escape = 4; + string null_value = 5; } enum BinaryOutputStyle { diff --git a/native/spark-expr/src/csv_funcs/mod.rs b/native/spark-expr/src/csv_funcs/mod.rs index acb57b5c56..1f76ce7c2f 100644 --- a/native/spark-expr/src/csv_funcs/mod.rs +++ b/native/spark-expr/src/csv_funcs/mod.rs @@ -16,3 +16,5 @@ // under the License. mod to_csv; + +pub use to_csv::ToCsv; diff --git a/native/spark-expr/src/csv_funcs/to_csv.rs b/native/spark-expr/src/csv_funcs/to_csv.rs index edb4adf741..3402a1210f 100644 --- a/native/spark-expr/src/csv_funcs/to_csv.rs +++ b/native/spark-expr/src/csv_funcs/to_csv.rs @@ -15,10 +15,13 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{RecordBatch, StringBuilder, StructArray}; -use arrow::array::{Array, ArrayRef, StringArray}; +use arrow::array::{ + Array, ArrayRef, BooleanArray, Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, + StringArray, StringBuilder, +}; +use arrow::array::{RecordBatch, StructArray}; use arrow::datatypes::{DataType, Schema}; -use datafusion::common::Result; +use datafusion::common::{exec_err, Result}; use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; use std::any::Any; @@ -30,9 +33,9 @@ use std::sync::Arc; #[derive(Debug, Eq)] pub struct ToCsv { expr: Arc, - delimiter: char, - quote: char, - escape: char, + delimiter: String, + quote: String, + escape: String, null_value: String, } @@ -58,18 +61,29 @@ impl PartialEq for ToCsv { impl ToCsv { pub fn new( - expr: Arc, delimiter: char, - quote: char, - escape: char, - null_value: String + expr: Arc, + delimiter: &str, + quote: &str, + escape: &str, + null_value: &str, ) -> Self { - Self { expr, delimiter, quote, escape, null_value } + Self { + expr, + delimiter: delimiter.to_owned(), + quote: quote.to_owned(), + escape: escape.to_owned(), + null_value: null_value.to_owned(), + } } } impl Display for ToCsv { - fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result { - unimplemented!() + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "to_csv({}, delimiter={}, quote={}, escape={}, null_value={})", + self.expr, self.delimiter, self.quote, self.escape, self.null_value + ) } } @@ -83,25 +97,16 @@ impl PhysicalExpr for ToCsv { } fn evaluate(&self, batch: &RecordBatch) -> Result { - let input_value = self.expr.evaluate(batch)?; + let input_value = self.expr.evaluate(batch)?.into_array(batch.num_rows())?; - let array = match input_value { - ColumnarValue::Array(arr) => arr, - ColumnarValue::Scalar(scalar) => scalar.to_array()?, - }; + let struct_array = input_value + .as_any() + .downcast_ref::() + .expect("A StructType is expected"); - let mut builder = StringBuilder::with_capacity(array.len(), 1024); - - for row_idx in 0..array.len() { - if array.is_null(row_idx) { - builder.append_null(); - } else { - let csv_string = struct_to_csv(&array, row_idx)?; - builder.append_value(&csv_string); - } - } + let result = struct_to_csv(struct_array, &self.delimiter, &self.null_value)?; - Ok(Arc::new(builder.finish())) + Ok(ColumnarValue::Array(result)) } fn children(&self) -> Vec<&Arc> { @@ -112,7 +117,13 @@ impl PhysicalExpr for ToCsv { self: Arc, children: Vec>, ) -> Result> { - todo!() + Ok(Arc::new(Self::new( + Arc::clone(&children[0]), + &self.delimiter, + &self.quote, + &self.escape, + &self.null_value, + ))) } fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result { @@ -120,126 +131,93 @@ impl PhysicalExpr for ToCsv { } } -fn struct_to_csv(array: &StructArray, row_idx: usize) -> Result { - use arrow::array::StructArray; - - let struct_array = array - .as_any() - .downcast_ref::() - .unwrap(); - - let mut csv_string = String::new(); - let num_columns = struct_array.num_columns(); - - for (col_idx, column) in struct_array.columns().iter().enumerate() { - if col_idx > 0 { - csv_string.push(self.options.delimiter); - } - - if column.is_null(row_idx) { - csv_string.push_str(&self.options.null_value); +fn struct_to_csv(array: &StructArray, delimiter: &str, null_value: &str) -> Result { + let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16); + let mut csv_string = String::with_capacity(array.len() * 16); + for row_idx in 0..array.len() { + if array.is_null(row_idx) { + builder.append_null(); } else { - let value = self.format_value(column, row_idx)?; - let escaped = self.escape_value(&value); - csv_string.push_str(&escaped); + csv_string.clear(); + for (col_idx, column) in array.columns().iter().enumerate() { + if col_idx > 0 { + csv_string.push_str(delimiter); + } + if column.is_null(row_idx) { + csv_string.push_str(null_value); + } else { + let value = convert_to_string(column, row_idx)?; + csv_string.push_str(&value); + } + } } + builder.append_value(&csv_string); } - - Ok(csv_string) + Ok(Arc::new(builder.finish())) } -fn format_value(array: &ArrayRef, row_idx: usize) -> Result { - use arrow::array::*; - +fn convert_to_string(array: &ArrayRef, row_idx: usize) -> Result { match array.data_type() { - DataType::Null => Ok(self.options.null_value.clone()), DataType::Boolean => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) + let array = array.as_any().downcast_ref::().unwrap(); + Ok(array.value(row_idx).to_string()) } DataType::Int8 => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) + let array = array.as_any().downcast_ref::().unwrap(); + Ok(array.value(row_idx).to_string()) } DataType::Int16 => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) + let array = array.as_any().downcast_ref::().unwrap(); + Ok(array.value(row_idx).to_string()) } DataType::Int32 => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) + let array = array.as_any().downcast_ref::().unwrap(); + Ok(array.value(row_idx).to_string()) } DataType::Int64 => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) - } - DataType::UInt8 => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) - } - DataType::UInt16 => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) - } - DataType::UInt32 => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) - } - DataType::UInt64 => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) - } - DataType::Float32 => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) - } - DataType::Float64 => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) + let array = array.as_any().downcast_ref::().unwrap(); + Ok(array.value(row_idx).to_string()) } DataType::Utf8 => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) + let array = array.as_any().downcast_ref::().unwrap(); + Ok(array.value(row_idx).to_string()) } DataType::LargeUtf8 => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) - } - DataType::Binary => { - let arr = array.as_any().downcast_ref::().unwrap(); - let bytes = arr.value(row_idx); - Ok(format!("{:?}", bytes)) - } - DataType::Date32 => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) - } - DataType::Date64 => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) + let array = array.as_any().downcast_ref::().unwrap(); + Ok(array.value(row_idx).to_string()) } - DataType::Timestamp(unit, tz) => { - let arr = array.as_any().downcast_ref::().unwrap(); - Ok(arr.value(row_idx).to_string()) - } - DataType::Decimal128(precision, scale) => { - let arr = array.as_any().downcast_ref::().unwrap(); - let value = arr.value(row_idx); - let divisor = 10_i128.pow(*scale as u32); - let integer_part = value / divisor; - let fractional_part = (value % divisor).abs(); - Ok(format!("{}.{:0width$}", integer_part, fractional_part, width = *scale as usize)) - } - DataType::List(_) | DataType::LargeList(_) => { - // Для массивов рекурсивно форматируем - Ok(format!("[...]")) // Упрощенная версия - } - DataType::Struct(_) => { - // Вложенные структуры - рекурсивный вызов - self.struct_to_csv(array, row_idx) - } - _ => Err(DataFusionError::NotImplemented( - format!("to_csv not implemented for type: {:?}", array.data_type()) - )), + _ => exec_err!("to_csv not implemented for type: {:?}", array.data_type()), + } +} + +#[cfg(test)] +mod tests { + use crate::csv_funcs::to_csv::struct_to_csv; + use arrow::array::{as_string_array, ArrayRef, Int32Array, StringArray, StructArray}; + use arrow::datatypes::{DataType, Field}; + use datafusion::common::Result; + use std::sync::Arc; + + #[test] + fn test_to_csv_basic() -> Result<()> { + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("a", DataType::Int32, false)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("b", DataType::Utf8, true)), + Arc::new(StringArray::from(vec![Some("foo"), None, Some("baz")])) as ArrayRef, + ), + ]); + + let expected = &StringArray::from(vec!["1,foo", "2,", "3,baz"]); + + let result = struct_to_csv(&Arc::new(struct_array), ",", "")?; + let result = as_string_array(&result); + + assert_eq!(result, expected); + + Ok(()) } } diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index deb730b059..bdfa614c2b 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -55,9 +55,9 @@ pub use bloom_filter::{BloomFilterAgg, BloomFilterMightContain}; mod conditional_funcs; mod conversion_funcs; +mod csv_funcs; mod math_funcs; mod nondetermenistic_funcs; -mod csv_funcs; pub use array_funcs::*; pub use bitwise_funcs::*; @@ -69,6 +69,7 @@ pub use comet_scalar_funcs::{ create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, register_all_comet_functions, }; +pub use csv_funcs::*; pub use datetime_funcs::{SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, TimestampTruncExpr}; pub use error::{SparkError, SparkResult}; pub use hash_funcs::*; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index e50b1d80e6..47c96d10cf 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -133,7 +133,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[GetArrayStructFields] -> CometGetArrayStructFields, classOf[GetStructField] -> CometGetStructField, classOf[JsonToStructs] -> CometJsonToStructs, - classOf[StructsToJson] -> CometStructsToJson) + classOf[StructsToJson] -> CometStructsToJson, + classOf[StructsToCsv] -> CometStructsToCsv) private val hashExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Md5] -> CometScalarFunction("md5"), diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala b/spark/src/main/scala/org/apache/comet/serde/structs.scala index 99d8afe08c..23812d310d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/structs.scala +++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala @@ -22,7 +22,7 @@ package org.apache.comet.serde import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, GetArrayStructFields, GetStructField, JsonToStructs, StructsToCsv, StructsToJson} -import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType} +import org.apache.spark.sql.types._ import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, serializeDataType} @@ -234,10 +234,31 @@ object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] { object CometStructsToCsv extends CometExpressionSerde[StructsToCsv] { + override def getSupportLevel(expr: StructsToCsv): SupportLevel = { + val isSupportedSchema = expr.inputSchema.fields + .forall(sf => QueryPlanSerde.supportedDataType(sf.dataType, allowComplex = false)) + if (!isSupportedSchema) { + return Unsupported(Some(s"Unsupported data type: ${expr.inputSchema}")) + } + Incompatible() + } + override def convert( expr: StructsToCsv, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { - None + for { + childProto <- exprToProtoInternal(expr.child, inputs, binding) + } yield { + val toCsv = ExprOuterClass.ToCsv + .newBuilder() + .setChild(childProto) + .setDelimiter(expr.options.getOrElse("delimiter", ",")) + .setQuote(expr.options.getOrElse("quote", "\"")) + .setEscape(expr.options.getOrElse("escape", "\\")) + .setEscape(expr.options.getOrElse("nullValue", "")) + .build() + ExprOuterClass.Expr.newBuilder().setToCsv(toCsv).build() + } } } diff --git a/spark/src/test/scala/org/apache/comet/CometCsvExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometCsvExpressionSuite.scala new file mode 100644 index 0000000000..2178f0d8d4 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometCsvExpressionSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import scala.util.Random + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.catalyst.expressions.StructsToCsv +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions._ + +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} + +class CometCsvExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { + + test("to_csv") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false), + DataGenOptions(allowNull = true, generateNegativeZero = true)) + } + withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[StructsToCsv]) -> "true") { + val df = spark.read + .parquet(filename) + .select(to_csv(struct(col("c0"), col("c1"), col("c2")))) + df.show(false) + checkSparkAnswerAndOperator(df) + + } + } + } +} From 4b02dd654d37ed26cc88d42d4105a34ffe49f502 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 28 Dec 2025 17:40:46 +0400 Subject: [PATCH 5/9] WIP --- .github/workflows/pr_build_macos.yml | 1 + .../test/scala/org/apache/comet/CometCsvExpressionSuite.scala | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index da06bff87b..7be1bbecb1 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -125,6 +125,7 @@ jobs: org.apache.comet.CometBitwiseExpressionSuite org.apache.comet.CometMapExpressionSuite org.apache.comet.CometJsonExpressionSuite + org.apache.comet.CometCsvExpressionSuite org.apache.comet.expressions.conditional.CometIfSuite org.apache.comet.expressions.conditional.CometCoalesceSuite org.apache.comet.expressions.conditional.CometCaseWhenSuite diff --git a/spark/src/test/scala/org/apache/comet/CometCsvExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometCsvExpressionSuite.scala index 2178f0d8d4..ab71e0765b 100644 --- a/spark/src/test/scala/org/apache/comet/CometCsvExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCsvExpressionSuite.scala @@ -49,7 +49,6 @@ class CometCsvExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper val df = spark.read .parquet(filename) .select(to_csv(struct(col("c0"), col("c1"), col("c2")))) - df.show(false) checkSparkAnswerAndOperator(df) } From 0f98a3c971bc20e8eba99139b361e4062c7511f2 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Mon, 29 Dec 2025 21:12:59 +0400 Subject: [PATCH 6/9] Add benchmark test --- docs/source/user-guide/latest/configs.md | 1 + .../CometCsvExpressionBenchmark.scala | 86 +++++++++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometCsvExpressionBenchmark.scala diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 1a273ad033..3fe09bddd6 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -324,6 +324,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.expression.StringTrimBoth.enabled` | Enable Comet acceleration for `StringTrimBoth` | true | | `spark.comet.expression.StringTrimLeft.enabled` | Enable Comet acceleration for `StringTrimLeft` | true | | `spark.comet.expression.StringTrimRight.enabled` | Enable Comet acceleration for `StringTrimRight` | true | +Add| `spark.comet.expression.StructsToCsv.enabled` | Enable Comet acceleration for `StructsToCsv` | true | | `spark.comet.expression.StructsToJson.enabled` | Enable Comet acceleration for `StructsToJson` | true | | `spark.comet.expression.Substring.enabled` | Enable Comet acceleration for `Substring` | true | | `spark.comet.expression.Subtract.enabled` | Enable Comet acceleration for `Subtract` | true | diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCsvExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCsvExpressionBenchmark.scala new file mode 100644 index 0000000000..4a9b387ab0 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCsvExpressionBenchmark.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +import org.apache.spark.sql.benchmark.CometJsonExpressionBenchmark.{prepareTable, runExpressionBenchmark, withTempPath, withTempTable} +import org.apache.spark.sql.benchmark.CometStringExpressionBenchmark.{spark, tbl} +import org.apache.spark.sql.catalyst.expressions.{CsvToStructs, JsonToStructs} + +import org.apache.comet.CometConf + +/** + * Configuration for a CSV expression benchmark. + * + * @param name + * Name for the benchmark + * @param query + * SQL query to benchmark + * @param extraCometConfigs + * Additional Comet configurations for the scan+exec case + */ +case class CsvExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Benchmark to measure performance of Comet CSV expressions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make + * benchmark-org.apache.spark.sql.benchmark.CometCsvExpressionBenchmark` Results will be written + * to "spark/benchmarks/CometCsvExpressionBenchmark-**results.txt". + */ +// spotless:on +object CometCsvExpressionBenchmark extends CometBenchmarkBase { + + /** + * Generic method to run a CSV expression benchmark with the given configuration. + */ + def runCsvExprBenchmark(config: CsvExprConfig, values: Int): Unit = { + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql( + s"SELECT CAST(value AS STRING) AS c1, CAST(value AS INT) AS c2, CAST(value AS LONG) AS c3 FROM $tbl")) + + val extraConfigs = Map( + CometConf.getExprAllowIncompatConfigKey( + classOf[CsvToStructs]) -> "true") ++ config.extraCometConfigs + + runExpressionBenchmark(config.name, values, config.query, extraConfigs) + } + } + } + + // Configuration for all CSV expression benchmarks + private val csvExpressions = List( + CsvExprConfig("to_csv", "SELECT to_csv(struct(c1, c2, c3)) FROM parquetV1Table")) + + override def runCometBenchmark(args: Array[String]): Unit = { + val values = 1024 * 1024 + + csvExpressions.foreach { config => + runBenchmarkWithTable(config.name, values) { value => + runCsvExprBenchmark(config, value) + } + } + } +} From d7a6036b8d846764a8847a1002bb7565bfb2b811 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 30 Dec 2025 15:14:26 +0400 Subject: [PATCH 7/9] WIP --- docs/source/user-guide/latest/configs.md | 2 +- native/spark-expr/src/csv_funcs/to_csv.rs | 46 +++++++++++-------- .../comet/CometCsvExpressionSuite.scala | 5 +- .../org/apache/spark/sql/CometTestBase.scala | 7 ++- .../CometCsvExpressionBenchmark.scala | 5 +- 5 files changed, 38 insertions(+), 27 deletions(-) diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 3fe09bddd6..bd062ec587 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -324,7 +324,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.expression.StringTrimBoth.enabled` | Enable Comet acceleration for `StringTrimBoth` | true | | `spark.comet.expression.StringTrimLeft.enabled` | Enable Comet acceleration for `StringTrimLeft` | true | | `spark.comet.expression.StringTrimRight.enabled` | Enable Comet acceleration for `StringTrimRight` | true | -Add| `spark.comet.expression.StructsToCsv.enabled` | Enable Comet acceleration for `StructsToCsv` | true | +| `spark.comet.expression.StructsToCsv.enabled` | Enable Comet acceleration for `StructsToCsv` | true | | `spark.comet.expression.StructsToJson.enabled` | Enable Comet acceleration for `StructsToJson` | true | | `spark.comet.expression.Substring.enabled` | Enable Comet acceleration for `Substring` | true | | `spark.comet.expression.Subtract.enabled` | Enable Comet acceleration for `Subtract` | true | diff --git a/native/spark-expr/src/csv_funcs/to_csv.rs b/native/spark-expr/src/csv_funcs/to_csv.rs index 3402a1210f..a0b9e9ce33 100644 --- a/native/spark-expr/src/csv_funcs/to_csv.rs +++ b/native/spark-expr/src/csv_funcs/to_csv.rs @@ -16,11 +16,11 @@ // under the License. use arrow::array::{ - Array, ArrayRef, BooleanArray, Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, - StringArray, StringBuilder, + as_boolean_array, as_largestring_array, as_string_array, Array, ArrayRef, StringBuilder, }; use arrow::array::{RecordBatch, StructArray}; use arrow::datatypes::{DataType, Schema}; +use datafusion::common::cast::{as_int16_array, as_int32_array, as_int64_array, as_int8_array}; use datafusion::common::{exec_err, Result}; use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExpr; @@ -96,6 +96,10 @@ impl PhysicalExpr for ToCsv { Ok(DataType::Utf8) } + fn nullable(&self, input_schema: &Schema) -> Result { + self.expr.nullable(input_schema) + } + fn evaluate(&self, batch: &RecordBatch) -> Result { let input_value = self.expr.evaluate(batch)?.into_array(batch.num_rows())?; @@ -134,6 +138,7 @@ impl PhysicalExpr for ToCsv { fn struct_to_csv(array: &StructArray, delimiter: &str, null_value: &str) -> Result { let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16); let mut csv_string = String::with_capacity(array.len() * 16); + for row_idx in 0..array.len() { if array.is_null(row_idx) { builder.append_null(); @@ -146,8 +151,7 @@ fn struct_to_csv(array: &StructArray, delimiter: &str, null_value: &str) -> Resu if column.is_null(row_idx) { csv_string.push_str(null_value); } else { - let value = convert_to_string(column, row_idx)?; - csv_string.push_str(&value); + convert_to_string(column, &mut csv_string, row_idx)?; } } } @@ -156,38 +160,40 @@ fn struct_to_csv(array: &StructArray, delimiter: &str, null_value: &str) -> Resu Ok(Arc::new(builder.finish())) } -fn convert_to_string(array: &ArrayRef, row_idx: usize) -> Result { +#[inline] +fn convert_to_string(array: &ArrayRef, csv_string: &mut String, row_idx: usize) -> Result<()> { match array.data_type() { DataType::Boolean => { - let array = array.as_any().downcast_ref::().unwrap(); - Ok(array.value(row_idx).to_string()) + let array = as_boolean_array(array); + csv_string.push_str(&array.value(row_idx).to_string()) } DataType::Int8 => { - let array = array.as_any().downcast_ref::().unwrap(); - Ok(array.value(row_idx).to_string()) + let array = as_int8_array(array)?; + csv_string.push_str(&array.value(row_idx).to_string()) } DataType::Int16 => { - let array = array.as_any().downcast_ref::().unwrap(); - Ok(array.value(row_idx).to_string()) + let array = as_int16_array(array)?; + csv_string.push_str(&array.value(row_idx).to_string()) } DataType::Int32 => { - let array = array.as_any().downcast_ref::().unwrap(); - Ok(array.value(row_idx).to_string()) + let array = as_int32_array(array)?; + csv_string.push_str(&array.value(row_idx).to_string()) } DataType::Int64 => { - let array = array.as_any().downcast_ref::().unwrap(); - Ok(array.value(row_idx).to_string()) + let array = as_int64_array(array)?; + csv_string.push_str(&array.value(row_idx).to_string()) } DataType::Utf8 => { - let array = array.as_any().downcast_ref::().unwrap(); - Ok(array.value(row_idx).to_string()) + let array = as_string_array(array); + csv_string.push_str(&array.value(row_idx).to_string()) } DataType::LargeUtf8 => { - let array = array.as_any().downcast_ref::().unwrap(); - Ok(array.value(row_idx).to_string()) + let array = as_largestring_array(array); + csv_string.push_str(&array.value(row_idx).to_string()) } - _ => exec_err!("to_csv not implemented for type: {:?}", array.data_type()), + _ => return exec_err!("to_csv not implemented for type: {:?}", array.data_type()), } + Ok(()) } #[cfg(test)] diff --git a/spark/src/test/scala/org/apache/comet/CometCsvExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometCsvExpressionSuite.scala index ab71e0765b..421bdc9625 100644 --- a/spark/src/test/scala/org/apache/comet/CometCsvExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCsvExpressionSuite.scala @@ -49,8 +49,9 @@ class CometCsvExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper val df = spark.read .parquet(filename) .select(to_csv(struct(col("c0"), col("c1"), col("c2")))) - checkSparkAnswerAndOperator(df) - + df.explain(true) + df.printSchema() + checkSparkAnswer(df) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 8011e5e70d..2e5916b1cf 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -22,6 +22,7 @@ package org.apache.spark.sql import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag import scala.util.{Success, Try} @@ -43,7 +44,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.internal._ import org.apache.spark.sql.test._ -import org.apache.spark.sql.types.{DecimalType, StructType} +import org.apache.spark.sql.types.{DecimalType, StringType, StructType} import org.apache.comet._ import org.apache.comet.shims.ShimCometSparkSessionExtensions @@ -119,6 +120,10 @@ abstract class CometTestBase if (withTol.isDefined) { checkAnswerWithTolerance(dfComet, expected, withTol.get) } else { + val df = + spark.createDataFrame(expected.toList.asJava, new StructType().add("value", StringType)) + df.show(false) + df.printSchema() checkAnswer(dfComet, expected) } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCsvExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCsvExpressionBenchmark.scala index 4a9b387ab0..08d27f3888 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCsvExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCsvExpressionBenchmark.scala @@ -43,9 +43,8 @@ case class CsvExprConfig( // spotless:off /** * Benchmark to measure performance of Comet CSV expressions. To run this benchmark: - * `SPARK_GENERATE_BENCHMARK_FILES=1 make - * benchmark-org.apache.spark.sql.benchmark.CometCsvExpressionBenchmark` Results will be written - * to "spark/benchmarks/CometCsvExpressionBenchmark-**results.txt". + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCsvExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometCsvExpressionBenchmark-**results.txt". */ // spotless:on object CometCsvExpressionBenchmark extends CometBenchmarkBase { From 902eb3a244263d1dea57d0e1ba404e0d60543ade Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Wed, 7 Jan 2026 17:29:51 +0400 Subject: [PATCH 8/9] Add benchmark --- native/core/src/execution/planner.rs | 9 +- native/proto/src/proto/expr.proto | 12 ++- native/spark-expr/Cargo.toml | 4 + native/spark-expr/benches/to_csv.rs | 94 +++++++++++++++++++ native/spark-expr/src/csv_funcs/mod.rs | 2 +- native/spark-expr/src/csv_funcs/to_csv.rs | 11 +-- .../org/apache/comet/serde/structs.scala | 18 +++- .../CometCsvExpressionBenchmark.scala | 9 +- 8 files changed, 132 insertions(+), 27 deletions(-) create mode 100644 native/spark-expr/benches/to_csv.rs diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index ad194c841e..06decf332a 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -647,12 +647,13 @@ impl PhysicalPlanner { ExprStruct::ToCsv(expr) => { let csv_struct_expr = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; + let options = expr.options.clone().unwrap(); Ok(Arc::new(ToCsv::new( csv_struct_expr, - &expr.delimiter, - &expr.quote, - &expr.escape, - &expr.null_value, + &options.delimiter, + &options.quote, + &options.escape, + &options.null_value, ))) } expr => Err(GeneralError(format!("Not implemented: {expr:?}"))), diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index f0ef07af02..1b6eb6b2ba 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -278,10 +278,14 @@ message FromJson { message ToCsv { Expr child = 1; - string delimiter = 2; - string quote = 3; - string escape = 4; - string null_value = 5; + CsvWriteOptions options = 2; +} + +message CsvWriteOptions { + string delimiter = 1; + string quote = 2; + string escape = 3; + string null_value = 4; } enum BinaryOutputStyle { diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml index 94653d8864..fd0a211b29 100644 --- a/native/spark-expr/Cargo.toml +++ b/native/spark-expr/Cargo.toml @@ -88,6 +88,10 @@ harness = false name = "normalize_nan" harness = false +[[bench]] +name = "to_csv" +harness = false + [[test]] name = "test_udf_registration" path = "tests/spark_expr_reg.rs" diff --git a/native/spark-expr/benches/to_csv.rs b/native/spark-expr/benches/to_csv.rs new file mode 100644 index 0000000000..55cd9af7cb --- /dev/null +++ b/native/spark-expr/benches/to_csv.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + BooleanBuilder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, StringBuilder, + StructArray, StructBuilder, +}; +use arrow::datatypes::{DataType, Field}; +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion_comet_spark_expr::struct_to_csv; +use std::hint::black_box; + +fn create_struct_array(array_size: usize) -> StructArray { + let fields = vec![ + Field::new("f1", DataType::Boolean, true), + Field::new("f2", DataType::Int8, true), + Field::new("f3", DataType::Int16, true), + Field::new("f4", DataType::Int32, true), + Field::new("f5", DataType::Int64, true), + Field::new("f6", DataType::Utf8, true), + ]; + let mut struct_builder = StructBuilder::from_fields(fields, array_size); + for i in 0..array_size { + struct_builder + .field_builder::(0) + .unwrap() + .append_option(if i % 10 == 0 { None } else { Some(i % 2 == 0) }); + + struct_builder + .field_builder::(1) + .unwrap() + .append_option(if i % 10 == 0 { + None + } else { + Some((i % 128) as i8) + }); + + struct_builder + .field_builder::(2) + .unwrap() + .append_option(if i % 10 == 0 { None } else { Some(i as i16) }); + + struct_builder + .field_builder::(3) + .unwrap() + .append_option(if i % 10 == 0 { None } else { Some(i as i32) }); + + struct_builder + .field_builder::(4) + .unwrap() + .append_option(if i % 10 == 0 { None } else { Some(i as i64) }); + + struct_builder + .field_builder::(5) + .unwrap() + .append_option(if i % 10 == 0 { + None + } else { + Some(format!("string_{}", i)) + }); + + struct_builder.append(true); + } + struct_builder.finish() +} + +fn criterion_benchmark(c: &mut Criterion) { + let array_size = 8192; + let struct_array = create_struct_array(array_size); + let default_delimiter = ","; + let default_null_value = ""; + c.bench_function("to_csv", |b| { + b.iter(|| { + black_box(struct_to_csv(&struct_array, default_delimiter, default_null_value).unwrap()) + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/native/spark-expr/src/csv_funcs/mod.rs b/native/spark-expr/src/csv_funcs/mod.rs index 1f76ce7c2f..311b509297 100644 --- a/native/spark-expr/src/csv_funcs/mod.rs +++ b/native/spark-expr/src/csv_funcs/mod.rs @@ -17,4 +17,4 @@ mod to_csv; -pub use to_csv::ToCsv; +pub use to_csv::{ToCsv, struct_to_csv}; diff --git a/native/spark-expr/src/csv_funcs/to_csv.rs b/native/spark-expr/src/csv_funcs/to_csv.rs index a0b9e9ce33..d99e970cfa 100644 --- a/native/spark-expr/src/csv_funcs/to_csv.rs +++ b/native/spark-expr/src/csv_funcs/to_csv.rs @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{ - as_boolean_array, as_largestring_array, as_string_array, Array, ArrayRef, StringBuilder, -}; +use arrow::array::{as_boolean_array, as_largestring_array, as_string_array, as_struct_array, Array, ArrayRef, StringBuilder}; use arrow::array::{RecordBatch, StructArray}; use arrow::datatypes::{DataType, Schema}; use datafusion::common::cast::{as_int16_array, as_int32_array, as_int64_array, as_int8_array}; @@ -103,10 +101,7 @@ impl PhysicalExpr for ToCsv { fn evaluate(&self, batch: &RecordBatch) -> Result { let input_value = self.expr.evaluate(batch)?.into_array(batch.num_rows())?; - let struct_array = input_value - .as_any() - .downcast_ref::() - .expect("A StructType is expected"); + let struct_array = as_struct_array(&input_value); let result = struct_to_csv(struct_array, &self.delimiter, &self.null_value)?; @@ -135,7 +130,7 @@ impl PhysicalExpr for ToCsv { } } -fn struct_to_csv(array: &StructArray, delimiter: &str, null_value: &str) -> Result { +pub fn struct_to_csv(array: &StructArray, delimiter: &str, null_value: &str) -> Result { let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16); let mut csv_string = String::with_capacity(array.len() * 16); diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala b/spark/src/main/scala/org/apache/comet/serde/structs.scala index c3f9e0eec0..f8d18e1dc9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/structs.scala +++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala @@ -235,7 +235,7 @@ object CometStructsToCsv extends CometExpressionSerde[StructsToCsv] { override def getSupportLevel(expr: StructsToCsv): SupportLevel = { val isSupportedSchema = expr.inputSchema.fields - .forall(sf => QueryPlanSerde.supportedDataType(sf.dataType, allowComplex = false)) + .forall(sf => QueryPlanSerde.supportedDataType(sf.dataType)) if (!isSupportedSchema) { return Unsupported(Some(s"Unsupported data type: ${expr.inputSchema}")) } @@ -249,15 +249,23 @@ object CometStructsToCsv extends CometExpressionSerde[StructsToCsv] { for { childProto <- exprToProtoInternal(expr.child, inputs, binding) } yield { + val optionsProto = options2Proto(expr.options) val toCsv = ExprOuterClass.ToCsv .newBuilder() .setChild(childProto) - .setDelimiter(expr.options.getOrElse("delimiter", ",")) - .setQuote(expr.options.getOrElse("quote", "\"")) - .setEscape(expr.options.getOrElse("escape", "\\")) - .setEscape(expr.options.getOrElse("nullValue", "")) + .setOptions(optionsProto) .build() ExprOuterClass.Expr.newBuilder().setToCsv(toCsv).build() } } + + private def options2Proto(options: Map[String, String]): ExprOuterClass.CsvWriteOptions = { + ExprOuterClass.CsvWriteOptions + .newBuilder() + .setDelimiter(options.getOrElse("delimiter", ",")) + .setQuote(options.getOrElse("quote", "\"")) + .setEscape(options.getOrElse("escape", "\\")) + .setEscape(options.getOrElse("nullValue", "")) + .build() + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCsvExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCsvExpressionBenchmark.scala index 08d27f3888..94288eb9cb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCsvExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCsvExpressionBenchmark.scala @@ -19,9 +19,7 @@ package org.apache.spark.sql.benchmark -import org.apache.spark.sql.benchmark.CometJsonExpressionBenchmark.{prepareTable, runExpressionBenchmark, withTempPath, withTempTable} -import org.apache.spark.sql.benchmark.CometStringExpressionBenchmark.{spark, tbl} -import org.apache.spark.sql.catalyst.expressions.{CsvToStructs, JsonToStructs} +import org.apache.spark.sql.catalyst.expressions.CsvToStructs import org.apache.comet.CometConf @@ -43,8 +41,9 @@ case class CsvExprConfig( // spotless:off /** * Benchmark to measure performance of Comet CSV expressions. To run this benchmark: - * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCsvExpressionBenchmark` - * Results will be written to "spark/benchmarks/CometCsvExpressionBenchmark-**results.txt". + * `SPARK_GENERATE_BENCHMARK_FILES=1 make + * benchmark-org.apache.spark.sql.benchmark.CometCsvExpressionBenchmark` Results will be written + * to "spark/benchmarks/CometCsvExpressionBenchmark-**results.txt". */ // spotless:on object CometCsvExpressionBenchmark extends CometBenchmarkBase { From 86c17e8259633d5325bc9c4bf8bcce7c686051f4 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Wed, 7 Jan 2026 20:26:11 +0400 Subject: [PATCH 9/9] add more options --- native/proto/src/proto/expr.proto | 7 ++++++ native/spark-expr/src/csv_funcs/to_csv.rs | 12 ++++++--- .../org/apache/comet/serde/structs.scala | 25 +++++++++++++++++-- 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 1b6eb6b2ba..f3b27380be 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -286,6 +286,13 @@ message CsvWriteOptions { string quote = 2; string escape = 3; string null_value = 4; + bool quote_all = 5; + bool ignore_leading_white_space = 6; + bool ignore_trailing_white_space = 7; + string date_format = 8; + string timestamp_format = 9; + string timestamp_ntz_format = 10; + string timezone = 11; } enum BinaryOutputStyle { diff --git a/native/spark-expr/src/csv_funcs/to_csv.rs b/native/spark-expr/src/csv_funcs/to_csv.rs index d99e970cfa..68f09c7ebe 100644 --- a/native/spark-expr/src/csv_funcs/to_csv.rs +++ b/native/spark-expr/src/csv_funcs/to_csv.rs @@ -35,6 +35,7 @@ pub struct ToCsv { quote: String, escape: String, null_value: String, + quote_all: bool, } impl Hash for ToCsv { @@ -44,6 +45,7 @@ impl Hash for ToCsv { self.quote.hash(state); self.escape.hash(state); self.null_value.hash(state); + self.quote_all.hash(state); } } @@ -54,6 +56,7 @@ impl PartialEq for ToCsv { && self.quote.eq(&other.quote) && self.escape.eq(&other.escape) && self.null_value.eq(&other.null_value) + && self.quote_all.eq(&other.quote_all) } } @@ -64,6 +67,7 @@ impl ToCsv { quote: &str, escape: &str, null_value: &str, + quote_all: bool ) -> Self { Self { expr, @@ -71,6 +75,7 @@ impl ToCsv { quote: quote.to_owned(), escape: escape.to_owned(), null_value: null_value.to_owned(), + quote_all, } } } @@ -79,8 +84,8 @@ impl Display for ToCsv { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "to_csv({}, delimiter={}, quote={}, escape={}, null_value={})", - self.expr, self.delimiter, self.quote, self.escape, self.null_value + "to_csv({}, delimiter={}, quote={}, escape={}, null_value={}, quote_all={})", + self.expr, self.delimiter, self.quote, self.escape, self.null_value, self.quote_all ) } } @@ -122,6 +127,7 @@ impl PhysicalExpr for ToCsv { &self.quote, &self.escape, &self.null_value, + self.quote_all, ))) } @@ -130,7 +136,7 @@ impl PhysicalExpr for ToCsv { } } -pub fn struct_to_csv(array: &StructArray, delimiter: &str, null_value: &str) -> Result { +pub fn struct_to_csv(array: &StructArray, delimiter: &str, null_value: &str, quote_all: bool) -> Result { let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16); let mut csv_string = String::with_capacity(array.len() * 16); diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala b/spark/src/main/scala/org/apache/comet/serde/structs.scala index f8d18e1dc9..f606c5aa24 100644 --- a/spark/src/main/scala/org/apache/comet/serde/structs.scala +++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala @@ -20,6 +20,7 @@ package org.apache.comet.serde import scala.jdk.CollectionConverters._ +import scala.util.Try import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, GetArrayStructFields, GetStructField, JsonToStructs, StructsToCsv, StructsToJson} import org.apache.spark.sql.types._ @@ -249,7 +250,7 @@ object CometStructsToCsv extends CometExpressionSerde[StructsToCsv] { for { childProto <- exprToProtoInternal(expr.child, inputs, binding) } yield { - val optionsProto = options2Proto(expr.options) + val optionsProto = options2Proto(expr.options, expr.timeZoneId) val toCsv = ExprOuterClass.ToCsv .newBuilder() .setChild(childProto) @@ -259,13 +260,33 @@ object CometStructsToCsv extends CometExpressionSerde[StructsToCsv] { } } - private def options2Proto(options: Map[String, String]): ExprOuterClass.CsvWriteOptions = { + private def options2Proto( + options: Map[String, String], + timeZoneId: Option[String]): ExprOuterClass.CsvWriteOptions = { ExprOuterClass.CsvWriteOptions .newBuilder() .setDelimiter(options.getOrElse("delimiter", ",")) .setQuote(options.getOrElse("quote", "\"")) .setEscape(options.getOrElse("escape", "\\")) .setEscape(options.getOrElse("nullValue", "")) + .setTimezone(timeZoneId.getOrElse("UTC")) + .setIgnoreLeadingWhiteSpace(options + .get("ignoreLeadingWhiteSpace") + .flatMap(ignoreLeadingWhiteSpace => Try(ignoreLeadingWhiteSpace.toBoolean).toOption) + .getOrElse(true)) + .setIgnoreTrailingWhiteSpace(options + .get("ignoreTrailingWhiteSpace") + .flatMap(ignoreTrailingWhiteSpace => Try(ignoreTrailingWhiteSpace.toBoolean).toOption) + .getOrElse(true)) + .setQuoteAll(options + .get("quoteAll") + .flatMap(quoteAll => Try(quoteAll.toBoolean).toOption) + .getOrElse(false)) + .setDateFormat(options.getOrElse("dateFormat", "yyyy-MM-dd")) + .setTimestampFormat(options + .getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]")) + .setTimestampNtzFormat(options + .getOrElse("timestampNTZFormat", "yyyy-MM-dd'T'HH:mm:ss[.SSS]")) .build() } }