From eb3e84309504f652892fa3e3021c694216b9de10 Mon Sep 17 00:00:00 2001 From: Aly Date: Sat, 27 Dec 2025 21:30:55 +0200 Subject: [PATCH 1/5] feat: validate parquet writer_version at config time Add ParquetWriterVersion enum to validate writer_version values ("1.0" or "2.0") immediately when setting via SET command or proto deserialization, instead of failing later during parquet file writing. - Create ParquetWriterVersion enum with FromStr, Display, ConfigField - Change ParquetOptions.writer_version from String to ParquetWriterVersion - Remove parse_version_string function (validation now happens earlier) - Update proto conversions to validate during deserialization - Add test for early validation This ensures invalid values are caught immediately with clear error messages, following the same pattern as ExplainFormat. --- datafusion/common/src/config.rs | 56 ++++++++- .../common/src/file_options/parquet_writer.rs | 26 ++-- datafusion/common/src/lib.rs | 2 +- datafusion/common/src/parquet_config.rs | 113 ++++++++++++++++++ datafusion/proto-common/src/from_proto/mod.rs | 4 +- datafusion/proto-common/src/to_proto/mod.rs | 2 +- .../proto/src/logical_plan/file_formats.rs | 6 +- .../tests/cases/roundtrip_logical_plan.rs | 3 +- 8 files changed, 187 insertions(+), 25 deletions(-) create mode 100644 datafusion/common/src/parquet_config.rs diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 2bea2ec5a4526..0d3e61baf9c2d 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -23,6 +23,7 @@ use arrow_ipc::CompressionType; use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties}; use crate::error::_config_err; use crate::format::{ExplainAnalyzeLevel, ExplainFormat}; +use crate::parquet_config::ParquetWriterVersion; use crate::parsers::CompressionTypeVariant; use crate::utils::get_available_parallelism; use crate::{DataFusionError, Result}; @@ -742,7 +743,7 @@ config_namespace! { /// (writing) Sets parquet writer version /// valid values are "1.0" and "2.0" - pub writer_version: String, default = "1.0".to_string() + pub writer_version: ParquetWriterVersion, default = ParquetWriterVersion::default() /// (writing) Skip encoding the embedded arrow metadata in the KV_meta /// @@ -3455,4 +3456,57 @@ mod tests { let parsed_metadata = table_config.parquet.key_value_metadata; assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into()))); } + #[cfg(feature = "parquet")] + #[test] + fn test_parquet_writer_version_validation() { + use crate::{config::ConfigOptions, parquet_config::ParquetWriterVersion}; + + let mut config = ConfigOptions::default(); + + // Valid values should work + config + .set("datafusion.execution.parquet.writer_version", "1.0") + .unwrap(); + assert_eq!( + config.execution.parquet.writer_version, + ParquetWriterVersion::V1_0 + ); + + config + .set("datafusion.execution.parquet.writer_version", "2.0") + .unwrap(); + assert_eq!( + config.execution.parquet.writer_version, + ParquetWriterVersion::V2_0 + ); + + // Invalid value should error immediately at SET time + // Currently this will succeed (no validation), but after adding enum validation, + // this should fail with a clear error message + let result = config.set("datafusion.execution.parquet.writer_version", "3.0"); + assert!( + result.is_err(), + "Setting invalid writer_version '3.0' should fail at SET time" + ); + let error_msg = result.unwrap_err().to_string(); + assert!( + error_msg.contains("writer version") + || error_msg.contains("1.0") + || error_msg.contains("2.0"), + "Error message should mention valid writer version values. Got: {error_msg}" + ); + + // Test case-insensitive (should work for valid values) + config + .set("datafusion.execution.parquet.writer_version", "1.0") + .unwrap(); + + // Another invalid value + let result2 = + config.set("datafusion.execution.parquet.writer_version", "invalid"); + assert!( + result2.is_err(), + "Setting invalid writer_version 'invalid' should fail at SET time" + ); + } } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 8aa0134d09ec8..6db0f3efb78cb 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -33,7 +33,7 @@ use parquet::{ metadata::KeyValue, properties::{ DEFAULT_STATISTICS_ENABLED, EnabledStatistics, WriterProperties, - WriterPropertiesBuilder, WriterVersion, + WriterPropertiesBuilder, }, }, schema::types::ColumnPath, @@ -214,7 +214,7 @@ impl ParquetOptions { let mut builder = WriterProperties::builder() .set_data_page_size_limit(*data_pagesize_limit) .set_write_batch_size(*write_batch_size) - .set_writer_version(parse_version_string(writer_version.as_str())?) + .set_writer_version((*writer_version).into()) .set_dictionary_page_size_limit(*dictionary_page_size_limit) .set_statistics_enabled( statistics_enabled @@ -373,17 +373,6 @@ pub fn parse_compression_string( } } -pub(crate) fn parse_version_string(str_setting: &str) -> Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - match str_setting_lower { - "1.0" => Ok(WriterVersion::PARQUET_1_0), - "2.0" => Ok(WriterVersion::PARQUET_2_0), - _ => Err(DataFusionError::Configuration(format!( - "Unknown or unsupported parquet writer version {str_setting} \ - valid options are 1.0 and 2.0" - ))), - } -} pub(crate) fn parse_statistics_string(str_setting: &str) -> Result { let str_setting_lower: &str = &str_setting.to_lowercase(); @@ -405,6 +394,7 @@ mod tests { #[cfg(feature = "parquet_encryption")] use crate::config::ConfigFileEncryptionProperties; use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions}; + use crate::parquet_config::ParquetWriterVersion; use parquet::basic::Compression; use parquet::file::properties::{ BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_NDV, @@ -431,16 +421,16 @@ mod tests { fn parquet_options_with_non_defaults() -> ParquetOptions { let defaults = ParquetOptions::default(); - let writer_version = if defaults.writer_version.eq("1.0") { - "2.0" + let writer_version = if defaults.writer_version.eq(&ParquetWriterVersion::V1_0) { + ParquetWriterVersion::V2_0 } else { - "1.0" + ParquetWriterVersion::V1_0 }; ParquetOptions { data_pagesize_limit: 42, write_batch_size: 42, - writer_version: writer_version.into(), + writer_version, compression: Some("zstd(22)".into()), dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)), dictionary_page_size_limit: 42, @@ -548,7 +538,7 @@ mod tests { // global options data_pagesize_limit: props.dictionary_page_size_limit(), write_batch_size: props.write_batch_size(), - writer_version: format!("{}.0", props.writer_version().as_num()), + writer_version: props.writer_version().into(), dictionary_page_size_limit: props.dictionary_page_size_limit(), max_row_group_size: props.max_row_group_size(), created_by: props.created_by().to_string(), diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 3bec9bd35cbd0..d9b579595659f 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -61,7 +61,7 @@ pub mod test_util; pub mod tree_node; pub mod types; pub mod utils; - +pub mod parquet_config; /// Reexport arrow crate pub use arrow; pub use column::Column; diff --git a/datafusion/common/src/parquet_config.rs b/datafusion/common/src/parquet_config.rs new file mode 100644 index 0000000000000..de4067c2d1459 --- /dev/null +++ b/datafusion/common/src/parquet_config.rs @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{self, Display}; +use std::str::FromStr; + +use crate::config::{ConfigField, Visit}; +use crate::error::{DataFusionError, Result}; + +/// Parquet writer version options for controlling the Parquet file format version +/// +/// This enum validates parquet writer version values at configuration time, +/// ensuring only valid versions ("1.0" or "2.0") can be set via `SET` commands +/// or proto deserialization. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ParquetWriterVersion { + /// Parquet format version 1.0 + V1_0, + /// Parquet format version 2.0 + V2_0, +} + +/// Implement parsing strings to `ParquetWriterVersion` +impl FromStr for ParquetWriterVersion { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "1.0" => Ok(ParquetWriterVersion::V1_0), + "2.0" => Ok(ParquetWriterVersion::V2_0), + other => Err(DataFusionError::Configuration(format!( + "Invalid parquet writer version: {other}. Expected one of: 1.0, 2.0" + ))), + } + } +} + +impl Display for ParquetWriterVersion { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + ParquetWriterVersion::V1_0 => "1.0", + ParquetWriterVersion::V2_0 => "2.0", + }; + write!(f, "{s}") + } +} + +impl Default for ParquetWriterVersion { + fn default() -> Self { + ParquetWriterVersion::V1_0 + } +} + +impl ConfigField for ParquetWriterVersion { + fn visit(&self, v: &mut V, key: &str, description: &'static str) { + v.some(key, self, description) + } + + fn set(&mut self, _: &str, value: &str) -> Result<()> { + *self = ParquetWriterVersion::from_str(value)?; + Ok(()) + } +} + +/// Convert `ParquetWriterVersion` to parquet crate's `WriterVersion` +/// +/// This conversion is infallible since `ParquetWriterVersion` only contains +/// valid values that have been validated at configuration time. +#[cfg(feature = "parquet")] +impl From for parquet::file::properties::WriterVersion { + fn from(value: ParquetWriterVersion) -> Self { + match value { + ParquetWriterVersion::V1_0 => { + parquet::file::properties::WriterVersion::PARQUET_1_0 + } + ParquetWriterVersion::V2_0 => { + parquet::file::properties::WriterVersion::PARQUET_2_0 + } + } + } +} + +/// Convert parquet crate's `WriterVersion` to `ParquetWriterVersion` +/// +/// This is used when converting from existing parquet writer properties, +/// such as when reading from proto or test code. +#[cfg(feature = "parquet")] +impl From for ParquetWriterVersion { + fn from(version: parquet::file::properties::WriterVersion) -> Self { + match version { + parquet::file::properties::WriterVersion::PARQUET_1_0 => { + ParquetWriterVersion::V1_0 + } + parquet::file::properties::WriterVersion::PARQUET_2_0 => { + ParquetWriterVersion::V2_0 + } + } + } +} diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 7cb7a92031427..e8e71c3884586 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -951,7 +951,9 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { force_filter_selections: value.force_filter_selections, data_pagesize_limit: value.data_pagesize_limit as usize, write_batch_size: value.write_batch_size as usize, - writer_version: value.writer_version.clone(), + writer_version: value.writer_version.parse().map_err(|e| { + DataFusionError::Internal(format!("Failed to parse writer_version: {e}")) + })?, compression: value.compression_opt.clone().map(|opt| match opt { protobuf::parquet_options::CompressionOpt::Compression(v) => Some(v), }).unwrap_or(None), diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index dfa136717f3ad..fee3656482005 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -860,7 +860,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { force_filter_selections: value.force_filter_selections, data_pagesize_limit: value.data_pagesize_limit as u64, write_batch_size: value.write_batch_size as u64, - writer_version: value.writer_version.clone(), + writer_version: value.writer_version.to_string(), compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression), dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled), dictionary_page_size_limit: value.dictionary_page_size_limit as u64, diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 87ce4d524f61b..401d21ce73085 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -381,7 +381,7 @@ mod parquet { force_filter_selections: global_options.global.force_filter_selections, data_pagesize_limit: global_options.global.data_pagesize_limit as u64, write_batch_size: global_options.global.write_batch_size as u64, - writer_version: global_options.global.writer_version.clone(), + writer_version: global_options.global.writer_version.to_string(), compression_opt: global_options.global.compression.map(|compression| { parquet_options::CompressionOpt::Compression(compression) }), @@ -477,7 +477,9 @@ mod parquet { force_filter_selections: proto.force_filter_selections, data_pagesize_limit: proto.data_pagesize_limit as usize, write_batch_size: proto.write_batch_size as usize, - writer_version: proto.writer_version.clone(), + writer_version: proto.writer_version.parse().expect(" + Invalid parquet writer version in proto, expected '1.0' or '2.0' + "), compression: proto.compression_opt.as_ref().map(|opt| match opt { parquet_options::CompressionOpt::Compression(compression) => compression.clone(), }), diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 77676fc2fd2d9..969aab0ff8596 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -31,6 +31,7 @@ use datafusion::datasource::listing::{ use datafusion::execution::options::ArrowReadOptions; use datafusion::optimizer::Optimizer; use datafusion::optimizer::optimize_unions::OptimizeUnions; +use datafusion_common::parquet_config::ParquetWriterVersion; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_functions_aggregate::sum::sum_distinct; use prost::Message; @@ -464,7 +465,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> { parquet_format.global.bloom_filter_on_read = true; parquet_format.global.created_by = "DataFusion Test".to_string(); - parquet_format.global.writer_version = "PARQUET_2_0".to_string(); + parquet_format.global.writer_version = ParquetWriterVersion::V2_0; parquet_format.global.write_batch_size = 111; parquet_format.global.data_pagesize_limit = 222; parquet_format.global.data_page_row_count_limit = 333; From a7119107dd7f8eeb1363c5e25be6bb4c2f8a8f75 Mon Sep 17 00:00:00 2001 From: Aly Date: Sat, 27 Dec 2025 21:37:21 +0200 Subject: [PATCH 2/5] refactor: rename ParquetWriterVersion to DFWriterVersion Align naming with planned convention for validation enums. --- datafusion/common/src/config.rs | 10 ++--- .../common/src/file_options/parquet_writer.rs | 8 ++-- datafusion/common/src/parquet_config.rs | 44 +++++++++---------- .../tests/cases/roundtrip_logical_plan.rs | 4 +- 4 files changed, 33 insertions(+), 33 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0d3e61baf9c2d..c99a76ea5a25e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -23,7 +23,7 @@ use arrow_ipc::CompressionType; use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties}; use crate::error::_config_err; use crate::format::{ExplainAnalyzeLevel, ExplainFormat}; -use crate::parquet_config::ParquetWriterVersion; +use crate::parquet_config::DFWriterVersion; use crate::parsers::CompressionTypeVariant; use crate::utils::get_available_parallelism; use crate::{DataFusionError, Result}; @@ -743,7 +743,7 @@ config_namespace! { /// (writing) Sets parquet writer version /// valid values are "1.0" and "2.0" - pub writer_version: ParquetWriterVersion, default = ParquetWriterVersion::default() + pub writer_version: DFWriterVersion, default = DFWriterVersion::default() /// (writing) Skip encoding the embedded arrow metadata in the KV_meta /// @@ -3459,7 +3459,7 @@ mod tests { #[cfg(feature = "parquet")] #[test] fn test_parquet_writer_version_validation() { - use crate::{config::ConfigOptions, parquet_config::ParquetWriterVersion}; + use crate::{config::ConfigOptions, parquet_config::DFWriterVersion}; let mut config = ConfigOptions::default(); @@ -3469,7 +3469,7 @@ mod tests { .unwrap(); assert_eq!( config.execution.parquet.writer_version, - ParquetWriterVersion::V1_0 + DFWriterVersion::V1_0 ); config @@ -3477,7 +3477,7 @@ mod tests { .unwrap(); assert_eq!( config.execution.parquet.writer_version, - ParquetWriterVersion::V2_0 + DFWriterVersion::V2_0 ); // Invalid value should error immediately at SET time diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 6db0f3efb78cb..be00754c05119 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -394,7 +394,7 @@ mod tests { #[cfg(feature = "parquet_encryption")] use crate::config::ConfigFileEncryptionProperties; use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions}; - use crate::parquet_config::ParquetWriterVersion; + use crate::parquet_config::DFWriterVersion; use parquet::basic::Compression; use parquet::file::properties::{ BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_NDV, @@ -421,10 +421,10 @@ mod tests { fn parquet_options_with_non_defaults() -> ParquetOptions { let defaults = ParquetOptions::default(); - let writer_version = if defaults.writer_version.eq(&ParquetWriterVersion::V1_0) { - ParquetWriterVersion::V2_0 + let writer_version = if defaults.writer_version.eq(&DFWriterVersion::V1_0) { + DFWriterVersion::V2_0 } else { - ParquetWriterVersion::V1_0 + DFWriterVersion::V1_0 }; ParquetOptions { diff --git a/datafusion/common/src/parquet_config.rs b/datafusion/common/src/parquet_config.rs index de4067c2d1459..0cd70f621b44e 100644 --- a/datafusion/common/src/parquet_config.rs +++ b/datafusion/common/src/parquet_config.rs @@ -27,21 +27,21 @@ use crate::error::{DataFusionError, Result}; /// ensuring only valid versions ("1.0" or "2.0") can be set via `SET` commands /// or proto deserialization. #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum ParquetWriterVersion { +pub enum DFWriterVersion { /// Parquet format version 1.0 V1_0, /// Parquet format version 2.0 V2_0, } -/// Implement parsing strings to `ParquetWriterVersion` -impl FromStr for ParquetWriterVersion { +/// Implement parsing strings to `DFWriterVersion` +impl FromStr for DFWriterVersion { type Err = DataFusionError; fn from_str(s: &str) -> Result { match s.to_lowercase().as_str() { - "1.0" => Ok(ParquetWriterVersion::V1_0), - "2.0" => Ok(ParquetWriterVersion::V2_0), + "1.0" => Ok(DFWriterVersion::V1_0), + "2.0" => Ok(DFWriterVersion::V2_0), other => Err(DataFusionError::Configuration(format!( "Invalid parquet writer version: {other}. Expected one of: 1.0, 2.0" ))), @@ -49,64 +49,64 @@ impl FromStr for ParquetWriterVersion { } } -impl Display for ParquetWriterVersion { +impl Display for DFWriterVersion { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let s = match self { - ParquetWriterVersion::V1_0 => "1.0", - ParquetWriterVersion::V2_0 => "2.0", + DFWriterVersion::V1_0 => "1.0", + DFWriterVersion::V2_0 => "2.0", }; write!(f, "{s}") } } -impl Default for ParquetWriterVersion { +impl Default for DFWriterVersion { fn default() -> Self { - ParquetWriterVersion::V1_0 + DFWriterVersion::V1_0 } } -impl ConfigField for ParquetWriterVersion { +impl ConfigField for DFWriterVersion { fn visit(&self, v: &mut V, key: &str, description: &'static str) { v.some(key, self, description) } fn set(&mut self, _: &str, value: &str) -> Result<()> { - *self = ParquetWriterVersion::from_str(value)?; + *self = DFWriterVersion::from_str(value)?; Ok(()) } } -/// Convert `ParquetWriterVersion` to parquet crate's `WriterVersion` +/// Convert `DFWriterVersion` to parquet crate's `WriterVersion` /// -/// This conversion is infallible since `ParquetWriterVersion` only contains +/// This conversion is infallible since `DFWriterVersion` only contains /// valid values that have been validated at configuration time. #[cfg(feature = "parquet")] -impl From for parquet::file::properties::WriterVersion { - fn from(value: ParquetWriterVersion) -> Self { +impl From for parquet::file::properties::WriterVersion { + fn from(value: DFWriterVersion) -> Self { match value { - ParquetWriterVersion::V1_0 => { + DFWriterVersion::V1_0 => { parquet::file::properties::WriterVersion::PARQUET_1_0 } - ParquetWriterVersion::V2_0 => { + DFWriterVersion::V2_0 => { parquet::file::properties::WriterVersion::PARQUET_2_0 } } } } -/// Convert parquet crate's `WriterVersion` to `ParquetWriterVersion` +/// Convert parquet crate's `WriterVersion` to `DFWriterVersion` /// /// This is used when converting from existing parquet writer properties, /// such as when reading from proto or test code. #[cfg(feature = "parquet")] -impl From for ParquetWriterVersion { +impl From for DFWriterVersion { fn from(version: parquet::file::properties::WriterVersion) -> Self { match version { parquet::file::properties::WriterVersion::PARQUET_1_0 => { - ParquetWriterVersion::V1_0 + DFWriterVersion::V1_0 } parquet::file::properties::WriterVersion::PARQUET_2_0 => { - ParquetWriterVersion::V2_0 + DFWriterVersion::V2_0 } } } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 969aab0ff8596..e1f81e8668c72 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -31,7 +31,7 @@ use datafusion::datasource::listing::{ use datafusion::execution::options::ArrowReadOptions; use datafusion::optimizer::Optimizer; use datafusion::optimizer::optimize_unions::OptimizeUnions; -use datafusion_common::parquet_config::ParquetWriterVersion; +use datafusion_common::parquet_config::DFWriterVersion; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_functions_aggregate::sum::sum_distinct; use prost::Message; @@ -465,7 +465,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> { parquet_format.global.bloom_filter_on_read = true; parquet_format.global.created_by = "DataFusion Test".to_string(); - parquet_format.global.writer_version = ParquetWriterVersion::V2_0; + parquet_format.global.writer_version = DFWriterVersion::V2_0; parquet_format.global.write_batch_size = 111; parquet_format.global.data_pagesize_limit = 222; parquet_format.global.data_page_row_count_limit = 333; From 11a8fe16b4fc993bd5f82b5df2095c7cd74d5400 Mon Sep 17 00:00:00 2001 From: Aly Date: Sat, 27 Dec 2025 22:29:34 +0200 Subject: [PATCH 3/5] chore: fix clippy and fmt warnings --- datafusion/common/src/file_options/parquet_writer.rs | 1 - datafusion/common/src/lib.rs | 2 +- datafusion/common/src/parquet_config.rs | 9 ++------- 3 files changed, 3 insertions(+), 9 deletions(-) diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index be00754c05119..6679bdfc634a6 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -373,7 +373,6 @@ pub fn parse_compression_string( } } - pub(crate) fn parse_statistics_string(str_setting: &str) -> Result { let str_setting_lower: &str = &str_setting.to_lowercase(); match str_setting_lower { diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index d9b579595659f..acb725774f499 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -51,6 +51,7 @@ pub mod instant; pub mod metadata; pub mod nested_struct; mod null_equality; +pub mod parquet_config; pub mod parsers; pub mod pruning; pub mod rounding; @@ -61,7 +62,6 @@ pub mod test_util; pub mod tree_node; pub mod types; pub mod utils; -pub mod parquet_config; /// Reexport arrow crate pub use arrow; pub use column::Column; diff --git a/datafusion/common/src/parquet_config.rs b/datafusion/common/src/parquet_config.rs index 0cd70f621b44e..89444731d037e 100644 --- a/datafusion/common/src/parquet_config.rs +++ b/datafusion/common/src/parquet_config.rs @@ -26,9 +26,10 @@ use crate::error::{DataFusionError, Result}; /// This enum validates parquet writer version values at configuration time, /// ensuring only valid versions ("1.0" or "2.0") can be set via `SET` commands /// or proto deserialization. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub enum DFWriterVersion { /// Parquet format version 1.0 + #[default] V1_0, /// Parquet format version 2.0 V2_0, @@ -59,12 +60,6 @@ impl Display for DFWriterVersion { } } -impl Default for DFWriterVersion { - fn default() -> Self { - DFWriterVersion::V1_0 - } -} - impl ConfigField for DFWriterVersion { fn visit(&self, v: &mut V, key: &str, description: &'static str) { v.some(key, self, description) From cf66a8edf358dda4a4e958e1003c31dbd448d7ce Mon Sep 17 00:00:00 2001 From: Aly Date: Tue, 30 Dec 2025 16:22:32 +0200 Subject: [PATCH 4/5] fix:editing test comments in config.rs --- datafusion/common/src/config.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index c99a76ea5a25e..d23067b966e1c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -3481,8 +3481,6 @@ mod tests { ); // Invalid value should error immediately at SET time - // Currently this will succeed (no validation), but after adding enum validation, - // this should fail with a clear error message let result = config.set("datafusion.execution.parquet.writer_version", "3.0"); assert!( result.is_err(), From c8806805108df9178c3109bd79c359c3862f8761 Mon Sep 17 00:00:00 2001 From: Aly Date: Tue, 30 Dec 2025 18:06:58 +0200 Subject: [PATCH 5/5] refactor: rename to DFParquetWriterVersion and simplify test --- datafusion/common/src/config.rs | 40 +++++-------------- .../common/src/file_options/parquet_writer.rs | 9 +++-- datafusion/common/src/parquet_config.rs | 40 +++++++++---------- .../proto/src/logical_plan/file_formats.rs | 1 + .../tests/cases/roundtrip_logical_plan.rs | 4 +- 5 files changed, 39 insertions(+), 55 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index d23067b966e1c..95a02147438b0 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -23,7 +23,7 @@ use arrow_ipc::CompressionType; use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties}; use crate::error::_config_err; use crate::format::{ExplainAnalyzeLevel, ExplainFormat}; -use crate::parquet_config::DFWriterVersion; +use crate::parquet_config::DFParquetWriterVersion; use crate::parsers::CompressionTypeVariant; use crate::utils::get_available_parallelism; use crate::{DataFusionError, Result}; @@ -743,7 +743,7 @@ config_namespace! { /// (writing) Sets parquet writer version /// valid values are "1.0" and "2.0" - pub writer_version: DFWriterVersion, default = DFWriterVersion::default() + pub writer_version: DFParquetWriterVersion, default = DFParquetWriterVersion::default() /// (writing) Skip encoding the embedded arrow metadata in the KV_meta /// @@ -3459,7 +3459,7 @@ mod tests { #[cfg(feature = "parquet")] #[test] fn test_parquet_writer_version_validation() { - use crate::{config::ConfigOptions, parquet_config::DFWriterVersion}; + use crate::{config::ConfigOptions, parquet_config::DFParquetWriterVersion}; let mut config = ConfigOptions::default(); @@ -3469,7 +3469,7 @@ mod tests { .unwrap(); assert_eq!( config.execution.parquet.writer_version, - DFWriterVersion::V1_0 + DFParquetWriterVersion::V1_0 ); config @@ -3477,34 +3477,16 @@ mod tests { .unwrap(); assert_eq!( config.execution.parquet.writer_version, - DFWriterVersion::V2_0 + DFParquetWriterVersion::V2_0 ); // Invalid value should error immediately at SET time - let result = config.set("datafusion.execution.parquet.writer_version", "3.0"); - assert!( - result.is_err(), - "Setting invalid writer_version '3.0' should fail at SET time" - ); - let error_msg = result.unwrap_err().to_string(); - assert!( - error_msg.contains("writer version") - || error_msg.contains("1.0") - || error_msg.contains("2.0"), - "Error message should mention valid writer version values. Got: {error_msg}" - ); - - // Test case-insensitive (should work for valid values) - config - .set("datafusion.execution.parquet.writer_version", "1.0") - .unwrap(); - - // Another invalid value - let result2 = - config.set("datafusion.execution.parquet.writer_version", "invalid"); - assert!( - result2.is_err(), - "Setting invalid writer_version 'invalid' should fail at SET time" + let err = config + .set("datafusion.execution.parquet.writer_version", "3.0") + .unwrap_err(); + assert_eq!( + err.to_string(), + "Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0" ); } } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 6679bdfc634a6..196cb96f3832d 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -393,7 +393,7 @@ mod tests { #[cfg(feature = "parquet_encryption")] use crate::config::ConfigFileEncryptionProperties; use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions}; - use crate::parquet_config::DFWriterVersion; + use crate::parquet_config::DFParquetWriterVersion; use parquet::basic::Compression; use parquet::file::properties::{ BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_NDV, @@ -420,10 +420,11 @@ mod tests { fn parquet_options_with_non_defaults() -> ParquetOptions { let defaults = ParquetOptions::default(); - let writer_version = if defaults.writer_version.eq(&DFWriterVersion::V1_0) { - DFWriterVersion::V2_0 + let writer_version = if defaults.writer_version.eq(&DFParquetWriterVersion::V1_0) + { + DFParquetWriterVersion::V2_0 } else { - DFWriterVersion::V1_0 + DFParquetWriterVersion::V1_0 }; ParquetOptions { diff --git a/datafusion/common/src/parquet_config.rs b/datafusion/common/src/parquet_config.rs index 89444731d037e..9d6d7a88566a7 100644 --- a/datafusion/common/src/parquet_config.rs +++ b/datafusion/common/src/parquet_config.rs @@ -27,7 +27,7 @@ use crate::error::{DataFusionError, Result}; /// ensuring only valid versions ("1.0" or "2.0") can be set via `SET` commands /// or proto deserialization. #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] -pub enum DFWriterVersion { +pub enum DFParquetWriterVersion { /// Parquet format version 1.0 #[default] V1_0, @@ -35,14 +35,14 @@ pub enum DFWriterVersion { V2_0, } -/// Implement parsing strings to `DFWriterVersion` -impl FromStr for DFWriterVersion { +/// Implement parsing strings to `DFParquetWriterVersion` +impl FromStr for DFParquetWriterVersion { type Err = DataFusionError; fn from_str(s: &str) -> Result { match s.to_lowercase().as_str() { - "1.0" => Ok(DFWriterVersion::V1_0), - "2.0" => Ok(DFWriterVersion::V2_0), + "1.0" => Ok(DFParquetWriterVersion::V1_0), + "2.0" => Ok(DFParquetWriterVersion::V2_0), other => Err(DataFusionError::Configuration(format!( "Invalid parquet writer version: {other}. Expected one of: 1.0, 2.0" ))), @@ -50,58 +50,58 @@ impl FromStr for DFWriterVersion { } } -impl Display for DFWriterVersion { +impl Display for DFParquetWriterVersion { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let s = match self { - DFWriterVersion::V1_0 => "1.0", - DFWriterVersion::V2_0 => "2.0", + DFParquetWriterVersion::V1_0 => "1.0", + DFParquetWriterVersion::V2_0 => "2.0", }; write!(f, "{s}") } } -impl ConfigField for DFWriterVersion { +impl ConfigField for DFParquetWriterVersion { fn visit(&self, v: &mut V, key: &str, description: &'static str) { v.some(key, self, description) } fn set(&mut self, _: &str, value: &str) -> Result<()> { - *self = DFWriterVersion::from_str(value)?; + *self = DFParquetWriterVersion::from_str(value)?; Ok(()) } } -/// Convert `DFWriterVersion` to parquet crate's `WriterVersion` +/// Convert `DFParquetWriterVersion` to parquet crate's `WriterVersion` /// -/// This conversion is infallible since `DFWriterVersion` only contains +/// This conversion is infallible since `DFParquetWriterVersion` only contains /// valid values that have been validated at configuration time. #[cfg(feature = "parquet")] -impl From for parquet::file::properties::WriterVersion { - fn from(value: DFWriterVersion) -> Self { +impl From for parquet::file::properties::WriterVersion { + fn from(value: DFParquetWriterVersion) -> Self { match value { - DFWriterVersion::V1_0 => { + DFParquetWriterVersion::V1_0 => { parquet::file::properties::WriterVersion::PARQUET_1_0 } - DFWriterVersion::V2_0 => { + DFParquetWriterVersion::V2_0 => { parquet::file::properties::WriterVersion::PARQUET_2_0 } } } } -/// Convert parquet crate's `WriterVersion` to `DFWriterVersion` +/// Convert parquet crate's `WriterVersion` to `DFParquetWriterVersion` /// /// This is used when converting from existing parquet writer properties, /// such as when reading from proto or test code. #[cfg(feature = "parquet")] -impl From for DFWriterVersion { +impl From for DFParquetWriterVersion { fn from(version: parquet::file::properties::WriterVersion) -> Self { match version { parquet::file::properties::WriterVersion::PARQUET_1_0 => { - DFWriterVersion::V1_0 + DFParquetWriterVersion::V1_0 } parquet::file::properties::WriterVersion::PARQUET_2_0 => { - DFWriterVersion::V2_0 + DFParquetWriterVersion::V2_0 } } } diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 401d21ce73085..436a06493766d 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -477,6 +477,7 @@ mod parquet { force_filter_selections: proto.force_filter_selections, data_pagesize_limit: proto.data_pagesize_limit as usize, write_batch_size: proto.write_batch_size as usize, + // TODO: Consider changing to TryFrom to avoid panic on invalid proto data writer_version: proto.writer_version.parse().expect(" Invalid parquet writer version in proto, expected '1.0' or '2.0' "), diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index e1f81e8668c72..bcfda648b53e5 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -31,7 +31,7 @@ use datafusion::datasource::listing::{ use datafusion::execution::options::ArrowReadOptions; use datafusion::optimizer::Optimizer; use datafusion::optimizer::optimize_unions::OptimizeUnions; -use datafusion_common::parquet_config::DFWriterVersion; +use datafusion_common::parquet_config::DFParquetWriterVersion; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_functions_aggregate::sum::sum_distinct; use prost::Message; @@ -465,7 +465,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> { parquet_format.global.bloom_filter_on_read = true; parquet_format.global.created_by = "DataFusion Test".to_string(); - parquet_format.global.writer_version = DFWriterVersion::V2_0; + parquet_format.global.writer_version = DFParquetWriterVersion::V2_0; parquet_format.global.write_batch_size = 111; parquet_format.global.data_pagesize_limit = 222; parquet_format.global.data_page_row_count_limit = 333;