Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions native/core/src/execution/operators/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl ParquetWriterExec {
compression: CompressionCodec,
partition_id: i32,
column_names: Vec<String>,
partition_columns: Vec<String>
) -> Result<Self> {
// Preserve the input's partitioning so each partition writes its own file
let input_partitioning = input.output_partitioning().clone();
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,7 @@ impl PhysicalPlanner {
codec,
self.partition,
writer.column_names.clone(),
writer.partition_columns.clone()
)?);

Ok((
Expand Down
1 change: 1 addition & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ message ParquetWriter {
optional string job_id = 6;
// Task attempt ID for this specific task
optional int32 task_attempt_id = 7;
repeated string partition_columns = 8; // list of partition columns to support dynamic partitioning mode
}

enum AggregateMode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,22 @@
package org.apache.comet.serde.operator

import java.util.Locale

import scala.jdk.CollectionConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.comet.{CometNativeExec, CometNativeWriteExec}
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, WriteFilesExec}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf

import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.{CometOperatorSerde, Incompatible, OperatorOuterClass, SupportLevel, Unsupported}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.serializeDataType
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode

/**
* CometOperatorSerde implementation for DataWritingCommandExec that converts Parquet write
Expand All @@ -61,7 +62,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
}

if (cmd.partitionColumns.nonEmpty || cmd.staticPartitions.nonEmpty) {
return Unsupported(Some("Partitioned writes are not supported"))
return Incompatible(Some("Partitioned writes are not supported"))
}

if (cmd.query.output.exists(attr => DataTypeSupport.isComplexType(attr.dataType))) {
Expand Down Expand Up @@ -132,6 +133,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
.addAllColumnNames(cmd.query.output.map(_.name).asJava)
// Note: work_dir, job_id, and task_attempt_id will be set at execution time
// in CometNativeWriteExec, as they depend on the Spark task context
.addPartitionColumns(cmd.partitionColumns.map(_.toString()).mkString(","))
.build()

val writerOperator = Operator
Expand Down Expand Up @@ -175,14 +177,14 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
classOf[org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol]
val constructor =
committerClass.getConstructor(classOf[String], classOf[String], classOf[Boolean])
Some(
constructor
.newInstance(
jobId,
outputPath,
java.lang.Boolean.FALSE // dynamicPartitionOverwrite = false for now
)
.asInstanceOf[org.apache.spark.internal.io.FileCommitProtocol])

val isDynamicOverWriteModeEnabled = cmd.partitionColumns.nonEmpty &&
SQLConf.get.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC

Some(
constructor
.newInstance(jobId, outputPath, isDynamicOverWriteModeEnabled)
.asInstanceOf[org.apache.spark.internal.io.FileCommitProtocol])
} catch {
case e: Exception =>
throw new SparkException(s"Could not instantiate FileCommitProtocol: ${e.getMessage}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,32 @@ class CometParquetWriterSuite extends CometTestBase {
}
}
}

test("parquet write with mode overwrite") {
withTempPath { dir =>
val outputPath = new File(dir, "output.parquet").getAbsolutePath

withTempPath { inputDir =>
val inputPath = createTestData(inputDir)

withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true") {

val df = spark.read.parquet(inputPath)

// First write
df.repartition(2).write.parquet(outputPath)
// verifyWrittenFile(outputPath)
// Second write (with overwrite mode and a different record count to make sure we are not reading the same data)
df.limit(500).repartition(2).write.mode("overwrite").parquet(outputPath)
// // Verify the data was written
val resultDf = spark.read.parquet(outputPath)
assert(resultDf.count() == 500, "Expected 1000 rows after overwrite")
}
}
}
}
}