Skip to content

Conversation

@coderfender
Copy link
Contributor

@coderfender coderfender commented Dec 29, 2025

Which issue does this PR close?

Closes ##2970

Rationale for this change

What changes are included in this PR?

How are these changes tested?

@coderfender
Copy link
Contributor Author

coderfender commented Dec 29, 2025

@comphead this is the PR to support overwrite mode in comet. Please take a look whenever you get a chance

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @coderfender I think it would be also needed to support partititioned mode. Otherwise this approach will delete entire user data

The flow is like

User calls: df.write.mode("overwrite").save(path)
    ↓
DataFrameWriter.saveInternal() 
    ↓
InsertIntoHadoopFsRelationCommand.run()
    ↓
[Line 131] deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
    ↓
[Line 238] committer.deleteWithJob(fs, staticPrefixPath, true)
    ↓
[Line 183] fs.delete(path, recursive=true)  ← ACTUAL DELETION HAPPENS HERE

This Spark implementation now

    val doInsertion = if (mode == SaveMode.Append) {
      true
    } else {
      val pathExists = fs.exists(qualifiedOutputPath)
      (mode, pathExists) match {
        case (SaveMode.ErrorIfExists, true) =>
          throw QueryCompilationErrors.outputPathAlreadyExistsError(qualifiedOutputPath)
        case (SaveMode.Overwrite, true) =>
          if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
            false
          } else if (dynamicPartitionOverwrite) {
            // For dynamic partition overwrite, do not delete partition directories ahead.
            true
          } else {
            deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
            true
          }
        case (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
          true
        case (SaveMode.Ignore, exists) =>
          !exists
        case (s, exists) =>
          throw QueryExecutionErrors.saveModeUnsupportedError(s, exists)
      }
    }

@coderfender
Copy link
Contributor Author

Thank you @comphead . Let me work on updating the code to not delete entire user data and just clear out the effected partitions

@comphead
Copy link
Contributor

Thank you @comphead . Let me work on updating the code to not delete entire user data and just clear out the effected partitions

Thanks @coderfender Ideally if we can reuse Spark code

@coderfender
Copy link
Contributor Author

Sure .Thank you. Let me try and port spark code to not have duplication

@coderfender coderfender changed the title feat: Support overwrite mode comet feat: Support dynamic partition overwrite mode in comet Dec 30, 2025
@coderfender coderfender marked this pull request as draft December 30, 2025 19:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants