Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use arrow_ipc::CompressionType;
use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
use crate::error::_config_err;
use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
use crate::parquet_config::DFParquetWriterVersion;
use crate::parsers::CompressionTypeVariant;
use crate::utils::get_available_parallelism;
use crate::{DataFusionError, Result};
Expand Down Expand Up @@ -742,7 +743,7 @@ config_namespace! {

/// (writing) Sets parquet writer version
/// valid values are "1.0" and "2.0"
pub writer_version: String, default = "1.0".to_string()
pub writer_version: DFParquetWriterVersion, default = DFParquetWriterVersion::default()

/// (writing) Skip encoding the embedded arrow metadata in the KV_meta
///
Expand Down Expand Up @@ -3455,4 +3456,37 @@ mod tests {
let parsed_metadata = table_config.parquet.key_value_metadata;
assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
}
#[cfg(feature = "parquet")]
#[test]
fn test_parquet_writer_version_validation() {
use crate::{config::ConfigOptions, parquet_config::DFParquetWriterVersion};

let mut config = ConfigOptions::default();

// Valid values should work
config
.set("datafusion.execution.parquet.writer_version", "1.0")
.unwrap();
assert_eq!(
config.execution.parquet.writer_version,
DFParquetWriterVersion::V1_0
);

config
.set("datafusion.execution.parquet.writer_version", "2.0")
.unwrap();
assert_eq!(
config.execution.parquet.writer_version,
DFParquetWriterVersion::V2_0
);

// Invalid value should error immediately at SET time
let err = config
.set("datafusion.execution.parquet.writer_version", "3.0")
.unwrap_err();
assert_eq!(
err.to_string(),
"Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0"
);
}
}
28 changes: 9 additions & 19 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use parquet::{
metadata::KeyValue,
properties::{
DEFAULT_STATISTICS_ENABLED, EnabledStatistics, WriterProperties,
WriterPropertiesBuilder, WriterVersion,
WriterPropertiesBuilder,
},
},
schema::types::ColumnPath,
Expand Down Expand Up @@ -214,7 +214,7 @@ impl ParquetOptions {
let mut builder = WriterProperties::builder()
.set_data_page_size_limit(*data_pagesize_limit)
.set_write_batch_size(*write_batch_size)
.set_writer_version(parse_version_string(writer_version.as_str())?)
.set_writer_version((*writer_version).into())
.set_dictionary_page_size_limit(*dictionary_page_size_limit)
.set_statistics_enabled(
statistics_enabled
Expand Down Expand Up @@ -373,18 +373,6 @@ pub fn parse_compression_string(
}
}

pub(crate) fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
let str_setting_lower: &str = &str_setting.to_lowercase();
match str_setting_lower {
"1.0" => Ok(WriterVersion::PARQUET_1_0),
"2.0" => Ok(WriterVersion::PARQUET_2_0),
_ => Err(DataFusionError::Configuration(format!(
"Unknown or unsupported parquet writer version {str_setting} \
valid options are 1.0 and 2.0"
))),
}
}

pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
let str_setting_lower: &str = &str_setting.to_lowercase();
match str_setting_lower {
Expand All @@ -405,6 +393,7 @@ mod tests {
#[cfg(feature = "parquet_encryption")]
use crate::config::ConfigFileEncryptionProperties;
use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
use crate::parquet_config::DFParquetWriterVersion;
use parquet::basic::Compression;
use parquet::file::properties::{
BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_NDV,
Expand All @@ -431,16 +420,17 @@ mod tests {

fn parquet_options_with_non_defaults() -> ParquetOptions {
let defaults = ParquetOptions::default();
let writer_version = if defaults.writer_version.eq("1.0") {
"2.0"
let writer_version = if defaults.writer_version.eq(&DFParquetWriterVersion::V1_0)
{
DFParquetWriterVersion::V2_0
} else {
"1.0"
DFParquetWriterVersion::V1_0
};

ParquetOptions {
data_pagesize_limit: 42,
write_batch_size: 42,
writer_version: writer_version.into(),
writer_version,
compression: Some("zstd(22)".into()),
dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
dictionary_page_size_limit: 42,
Expand Down Expand Up @@ -548,7 +538,7 @@ mod tests {
// global options
data_pagesize_limit: props.dictionary_page_size_limit(),
write_batch_size: props.write_batch_size(),
writer_version: format!("{}.0", props.writer_version().as_num()),
writer_version: props.writer_version().into(),
dictionary_page_size_limit: props.dictionary_page_size_limit(),
max_row_group_size: props.max_row_group_size(),
created_by: props.created_by().to_string(),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub mod instant;
pub mod metadata;
pub mod nested_struct;
mod null_equality;
pub mod parquet_config;
pub mod parsers;
pub mod pruning;
pub mod rounding;
Expand All @@ -61,7 +62,6 @@ pub mod test_util;
pub mod tree_node;
pub mod types;
pub mod utils;

/// Reexport arrow crate
pub use arrow;
pub use column::Column;
Expand Down
108 changes: 108 additions & 0 deletions datafusion/common/src/parquet_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::fmt::{self, Display};
use std::str::FromStr;

use crate::config::{ConfigField, Visit};
use crate::error::{DataFusionError, Result};

/// Parquet writer version options for controlling the Parquet file format version
///
/// This enum validates parquet writer version values at configuration time,
/// ensuring only valid versions ("1.0" or "2.0") can be set via `SET` commands
/// or proto deserialization.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum DFParquetWriterVersion {
/// Parquet format version 1.0
#[default]
V1_0,
/// Parquet format version 2.0
V2_0,
}

/// Implement parsing strings to `DFParquetWriterVersion`
impl FromStr for DFParquetWriterVersion {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"1.0" => Ok(DFParquetWriterVersion::V1_0),
"2.0" => Ok(DFParquetWriterVersion::V2_0),
other => Err(DataFusionError::Configuration(format!(
"Invalid parquet writer version: {other}. Expected one of: 1.0, 2.0"
))),
}
}
}

impl Display for DFParquetWriterVersion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
DFParquetWriterVersion::V1_0 => "1.0",
DFParquetWriterVersion::V2_0 => "2.0",
};
write!(f, "{s}")
}
}

impl ConfigField for DFParquetWriterVersion {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}

fn set(&mut self, _: &str, value: &str) -> Result<()> {
*self = DFParquetWriterVersion::from_str(value)?;
Ok(())
}
}

/// Convert `DFParquetWriterVersion` to parquet crate's `WriterVersion`
///
/// This conversion is infallible since `DFParquetWriterVersion` only contains
/// valid values that have been validated at configuration time.
#[cfg(feature = "parquet")]
impl From<DFParquetWriterVersion> for parquet::file::properties::WriterVersion {
fn from(value: DFParquetWriterVersion) -> Self {
match value {
DFParquetWriterVersion::V1_0 => {
parquet::file::properties::WriterVersion::PARQUET_1_0
}
DFParquetWriterVersion::V2_0 => {
parquet::file::properties::WriterVersion::PARQUET_2_0
}
}
}
}

/// Convert parquet crate's `WriterVersion` to `DFParquetWriterVersion`
///
/// This is used when converting from existing parquet writer properties,
/// such as when reading from proto or test code.
#[cfg(feature = "parquet")]
impl From<parquet::file::properties::WriterVersion> for DFParquetWriterVersion {
fn from(version: parquet::file::properties::WriterVersion) -> Self {
match version {
parquet::file::properties::WriterVersion::PARQUET_1_0 => {
DFParquetWriterVersion::V1_0
}
parquet::file::properties::WriterVersion::PARQUET_2_0 => {
DFParquetWriterVersion::V2_0
}
}
}
}
4 changes: 3 additions & 1 deletion datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,9 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
force_filter_selections: value.force_filter_selections,
data_pagesize_limit: value.data_pagesize_limit as usize,
write_batch_size: value.write_batch_size as usize,
writer_version: value.writer_version.clone(),
writer_version: value.writer_version.parse().map_err(|e| {
DataFusionError::Internal(format!("Failed to parse writer_version: {e}"))
})?,
compression: value.compression_opt.clone().map(|opt| match opt {
protobuf::parquet_options::CompressionOpt::Compression(v) => Some(v),
}).unwrap_or(None),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto-common/src/to_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
force_filter_selections: value.force_filter_selections,
data_pagesize_limit: value.data_pagesize_limit as u64,
write_batch_size: value.write_batch_size as u64,
writer_version: value.writer_version.clone(),
writer_version: value.writer_version.to_string(),
compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
dictionary_page_size_limit: value.dictionary_page_size_limit as u64,
Expand Down
7 changes: 5 additions & 2 deletions datafusion/proto/src/logical_plan/file_formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ mod parquet {
force_filter_selections: global_options.global.force_filter_selections,
data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
write_batch_size: global_options.global.write_batch_size as u64,
writer_version: global_options.global.writer_version.clone(),
writer_version: global_options.global.writer_version.to_string(),
compression_opt: global_options.global.compression.map(|compression| {
parquet_options::CompressionOpt::Compression(compression)
}),
Expand Down Expand Up @@ -477,7 +477,10 @@ mod parquet {
force_filter_selections: proto.force_filter_selections,
data_pagesize_limit: proto.data_pagesize_limit as usize,
write_batch_size: proto.write_batch_size as usize,
writer_version: proto.writer_version.clone(),
// TODO: Consider changing to TryFrom to avoid panic on invalid proto data
writer_version: proto.writer_version.parse().expect("
Invalid parquet writer version in proto, expected '1.0' or '2.0'
"),
Comment on lines +481 to +483
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll probably need to look into this in the future, to change this to a TryFrom instead of having this panic

Copy link
Author

@AlyAbdelmoneim AlyAbdelmoneim Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll probably need to look into this in the future, to change this to a TryFrom instead of having this panic

I left a TODO comment for that now

compression: proto.compression_opt.as_ref().map(|opt| match opt {
parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
}),
Expand Down
3 changes: 2 additions & 1 deletion datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion::datasource::listing::{
use datafusion::execution::options::ArrowReadOptions;
use datafusion::optimizer::Optimizer;
use datafusion::optimizer::optimize_unions::OptimizeUnions;
use datafusion_common::parquet_config::DFParquetWriterVersion;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_functions_aggregate::sum::sum_distinct;
use prost::Message;
Expand Down Expand Up @@ -464,7 +465,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {

parquet_format.global.bloom_filter_on_read = true;
parquet_format.global.created_by = "DataFusion Test".to_string();
parquet_format.global.writer_version = "PARQUET_2_0".to_string();
parquet_format.global.writer_version = DFParquetWriterVersion::V2_0;
parquet_format.global.write_batch_size = 111;
parquet_format.global.data_pagesize_limit = 222;
parquet_format.global.data_page_row_count_limit = 333;
Expand Down