diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 2bea2ec5a4526..95a02147438b0 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -23,6 +23,7 @@ use arrow_ipc::CompressionType; use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties}; use crate::error::_config_err; use crate::format::{ExplainAnalyzeLevel, ExplainFormat}; +use crate::parquet_config::DFParquetWriterVersion; use crate::parsers::CompressionTypeVariant; use crate::utils::get_available_parallelism; use crate::{DataFusionError, Result}; @@ -742,7 +743,7 @@ config_namespace! { /// (writing) Sets parquet writer version /// valid values are "1.0" and "2.0" - pub writer_version: String, default = "1.0".to_string() + pub writer_version: DFParquetWriterVersion, default = DFParquetWriterVersion::default() /// (writing) Skip encoding the embedded arrow metadata in the KV_meta /// @@ -3455,4 +3456,37 @@ mod tests { let parsed_metadata = table_config.parquet.key_value_metadata; assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into()))); } + #[cfg(feature = "parquet")] + #[test] + fn test_parquet_writer_version_validation() { + use crate::{config::ConfigOptions, parquet_config::DFParquetWriterVersion}; + + let mut config = ConfigOptions::default(); + + // Valid values should work + config + .set("datafusion.execution.parquet.writer_version", "1.0") + .unwrap(); + assert_eq!( + config.execution.parquet.writer_version, + DFParquetWriterVersion::V1_0 + ); + + config + .set("datafusion.execution.parquet.writer_version", "2.0") + .unwrap(); + assert_eq!( + config.execution.parquet.writer_version, + DFParquetWriterVersion::V2_0 + ); + + // Invalid value should error immediately at SET time + let err = config + .set("datafusion.execution.parquet.writer_version", "3.0") + .unwrap_err(); + assert_eq!( + err.to_string(), + "Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0" + ); + } } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 8aa0134d09ec8..196cb96f3832d 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -33,7 +33,7 @@ use parquet::{ metadata::KeyValue, properties::{ DEFAULT_STATISTICS_ENABLED, EnabledStatistics, WriterProperties, - WriterPropertiesBuilder, WriterVersion, + WriterPropertiesBuilder, }, }, schema::types::ColumnPath, @@ -214,7 +214,7 @@ impl ParquetOptions { let mut builder = WriterProperties::builder() .set_data_page_size_limit(*data_pagesize_limit) .set_write_batch_size(*write_batch_size) - .set_writer_version(parse_version_string(writer_version.as_str())?) + .set_writer_version((*writer_version).into()) .set_dictionary_page_size_limit(*dictionary_page_size_limit) .set_statistics_enabled( statistics_enabled @@ -373,18 +373,6 @@ pub fn parse_compression_string( } } -pub(crate) fn parse_version_string(str_setting: &str) -> Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - match str_setting_lower { - "1.0" => Ok(WriterVersion::PARQUET_1_0), - "2.0" => Ok(WriterVersion::PARQUET_2_0), - _ => Err(DataFusionError::Configuration(format!( - "Unknown or unsupported parquet writer version {str_setting} \ - valid options are 1.0 and 2.0" - ))), - } -} - pub(crate) fn parse_statistics_string(str_setting: &str) -> Result { let str_setting_lower: &str = &str_setting.to_lowercase(); match str_setting_lower { @@ -405,6 +393,7 @@ mod tests { #[cfg(feature = "parquet_encryption")] use crate::config::ConfigFileEncryptionProperties; use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions}; + use crate::parquet_config::DFParquetWriterVersion; use parquet::basic::Compression; use parquet::file::properties::{ BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_NDV, @@ -431,16 +420,17 @@ mod tests { fn parquet_options_with_non_defaults() -> ParquetOptions { let defaults = ParquetOptions::default(); - let writer_version = if defaults.writer_version.eq("1.0") { - "2.0" + let writer_version = if defaults.writer_version.eq(&DFParquetWriterVersion::V1_0) + { + DFParquetWriterVersion::V2_0 } else { - "1.0" + DFParquetWriterVersion::V1_0 }; ParquetOptions { data_pagesize_limit: 42, write_batch_size: 42, - writer_version: writer_version.into(), + writer_version, compression: Some("zstd(22)".into()), dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)), dictionary_page_size_limit: 42, @@ -548,7 +538,7 @@ mod tests { // global options data_pagesize_limit: props.dictionary_page_size_limit(), write_batch_size: props.write_batch_size(), - writer_version: format!("{}.0", props.writer_version().as_num()), + writer_version: props.writer_version().into(), dictionary_page_size_limit: props.dictionary_page_size_limit(), max_row_group_size: props.max_row_group_size(), created_by: props.created_by().to_string(), diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 2dbc08688652f..df6659c6f843c 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -51,6 +51,7 @@ pub mod instant; pub mod metadata; pub mod nested_struct; mod null_equality; +pub mod parquet_config; pub mod parsers; pub mod pruning; pub mod rounding; @@ -61,7 +62,6 @@ pub mod test_util; pub mod tree_node; pub mod types; pub mod utils; - /// Reexport arrow crate pub use arrow; pub use column::Column; diff --git a/datafusion/common/src/parquet_config.rs b/datafusion/common/src/parquet_config.rs new file mode 100644 index 0000000000000..9d6d7a88566a7 --- /dev/null +++ b/datafusion/common/src/parquet_config.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{self, Display}; +use std::str::FromStr; + +use crate::config::{ConfigField, Visit}; +use crate::error::{DataFusionError, Result}; + +/// Parquet writer version options for controlling the Parquet file format version +/// +/// This enum validates parquet writer version values at configuration time, +/// ensuring only valid versions ("1.0" or "2.0") can be set via `SET` commands +/// or proto deserialization. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum DFParquetWriterVersion { + /// Parquet format version 1.0 + #[default] + V1_0, + /// Parquet format version 2.0 + V2_0, +} + +/// Implement parsing strings to `DFParquetWriterVersion` +impl FromStr for DFParquetWriterVersion { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "1.0" => Ok(DFParquetWriterVersion::V1_0), + "2.0" => Ok(DFParquetWriterVersion::V2_0), + other => Err(DataFusionError::Configuration(format!( + "Invalid parquet writer version: {other}. Expected one of: 1.0, 2.0" + ))), + } + } +} + +impl Display for DFParquetWriterVersion { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + DFParquetWriterVersion::V1_0 => "1.0", + DFParquetWriterVersion::V2_0 => "2.0", + }; + write!(f, "{s}") + } +} + +impl ConfigField for DFParquetWriterVersion { + fn visit(&self, v: &mut V, key: &str, description: &'static str) { + v.some(key, self, description) + } + + fn set(&mut self, _: &str, value: &str) -> Result<()> { + *self = DFParquetWriterVersion::from_str(value)?; + Ok(()) + } +} + +/// Convert `DFParquetWriterVersion` to parquet crate's `WriterVersion` +/// +/// This conversion is infallible since `DFParquetWriterVersion` only contains +/// valid values that have been validated at configuration time. +#[cfg(feature = "parquet")] +impl From for parquet::file::properties::WriterVersion { + fn from(value: DFParquetWriterVersion) -> Self { + match value { + DFParquetWriterVersion::V1_0 => { + parquet::file::properties::WriterVersion::PARQUET_1_0 + } + DFParquetWriterVersion::V2_0 => { + parquet::file::properties::WriterVersion::PARQUET_2_0 + } + } + } +} + +/// Convert parquet crate's `WriterVersion` to `DFParquetWriterVersion` +/// +/// This is used when converting from existing parquet writer properties, +/// such as when reading from proto or test code. +#[cfg(feature = "parquet")] +impl From for DFParquetWriterVersion { + fn from(version: parquet::file::properties::WriterVersion) -> Self { + match version { + parquet::file::properties::WriterVersion::PARQUET_1_0 => { + DFParquetWriterVersion::V1_0 + } + parquet::file::properties::WriterVersion::PARQUET_2_0 => { + DFParquetWriterVersion::V2_0 + } + } + } +} diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 7cb7a92031427..e8e71c3884586 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -951,7 +951,9 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { force_filter_selections: value.force_filter_selections, data_pagesize_limit: value.data_pagesize_limit as usize, write_batch_size: value.write_batch_size as usize, - writer_version: value.writer_version.clone(), + writer_version: value.writer_version.parse().map_err(|e| { + DataFusionError::Internal(format!("Failed to parse writer_version: {e}")) + })?, compression: value.compression_opt.clone().map(|opt| match opt { protobuf::parquet_options::CompressionOpt::Compression(v) => Some(v), }).unwrap_or(None), diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index dfa136717f3ad..fee3656482005 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -860,7 +860,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { force_filter_selections: value.force_filter_selections, data_pagesize_limit: value.data_pagesize_limit as u64, write_batch_size: value.write_batch_size as u64, - writer_version: value.writer_version.clone(), + writer_version: value.writer_version.to_string(), compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression), dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled), dictionary_page_size_limit: value.dictionary_page_size_limit as u64, diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 87ce4d524f61b..436a06493766d 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -381,7 +381,7 @@ mod parquet { force_filter_selections: global_options.global.force_filter_selections, data_pagesize_limit: global_options.global.data_pagesize_limit as u64, write_batch_size: global_options.global.write_batch_size as u64, - writer_version: global_options.global.writer_version.clone(), + writer_version: global_options.global.writer_version.to_string(), compression_opt: global_options.global.compression.map(|compression| { parquet_options::CompressionOpt::Compression(compression) }), @@ -477,7 +477,10 @@ mod parquet { force_filter_selections: proto.force_filter_selections, data_pagesize_limit: proto.data_pagesize_limit as usize, write_batch_size: proto.write_batch_size as usize, - writer_version: proto.writer_version.clone(), + // TODO: Consider changing to TryFrom to avoid panic on invalid proto data + writer_version: proto.writer_version.parse().expect(" + Invalid parquet writer version in proto, expected '1.0' or '2.0' + "), compression: proto.compression_opt.as_ref().map(|opt| match opt { parquet_options::CompressionOpt::Compression(compression) => compression.clone(), }), diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 77676fc2fd2d9..bcfda648b53e5 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -31,6 +31,7 @@ use datafusion::datasource::listing::{ use datafusion::execution::options::ArrowReadOptions; use datafusion::optimizer::Optimizer; use datafusion::optimizer::optimize_unions::OptimizeUnions; +use datafusion_common::parquet_config::DFParquetWriterVersion; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_functions_aggregate::sum::sum_distinct; use prost::Message; @@ -464,7 +465,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> { parquet_format.global.bloom_filter_on_read = true; parquet_format.global.created_by = "DataFusion Test".to_string(); - parquet_format.global.writer_version = "PARQUET_2_0".to_string(); + parquet_format.global.writer_version = DFParquetWriterVersion::V2_0; parquet_format.global.write_batch_size = 111; parquet_format.global.data_pagesize_limit = 222; parquet_format.global.data_page_row_count_limit = 333;