Skip to content

Comet writer should support 2PC and staging output #3015

@comphead

Description

@comphead

What is the problem the feature request solves?

 Spark Uses Committer and Staging Paths for Writes
Apache Spark uses the FileCommitProtocol with staging paths to ensure atomic, fault-tolerant, and consistent distributed writes. Here's why:

1. The Problem: Distributed Writing is Complex
When multiple tasks write data in parallel across a cluster:

Task failures can leave partial/corrupted data
Speculative execution may create duplicate files
Network failures can interrupt writes mid-stream
Concurrent writes to the same location can conflict
2. The Solution: Two-Phase Commit Protocol
Spark implements a two-phase commit pattern borrowed from database systems:

Phase 1: Write to Staging (Task Commit)
Each task writes to a temporary staging location instead of the final destination.

Phase 2: Atomic Move (Job Commit)
Only after ALL tasks succeed, the driver atomically moves files from staging to final location.

Currently Comet just doing naive writes without 2PC

Spark implementation

The Staging Directory From FileCommitProtocol.getStagingDir()

def getStagingDir(path: String, jobId: String): Path = {
  new Path(path, ".spark-staging-" + jobId)
}

Example: If writing to /data/output, staging is /data/output/.spark-staging-{jobId}

How It Works: HadoopMapReduceCommitProtocol

here's the complete flow:

A. Task Execution

Each task writes to staging:

override def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = {
  val filename = getFilename(taskContext, spec)
  val stagingDir: Path = committer match {
    case f: FileOutputCommitter =>
      new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
    case _ => new Path(path)
  }
  
  dir.map { d =>
    new Path(new Path(stagingDir, d), filename).toString
  }.getOrElse {
    new Path(stagingDir, filename).toString
  }
}

File path example:

/data/output/.spark-staging-{jobId}/_temporary/{appAttemptId}/_temporary/{taskAttemptId}/part-00000.parquet

Task Commit

When a task completes successfully:

override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
  SparkHadoopMapRedUtil.commitTask(committer, taskContext, ...)
  new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
}

Files move from task-specific temp to job-level staging:

/data/output/.spark-staging-{jobId}/part-00000.parquet

Job Commit

Only after ALL tasks succeed, the driver commits:

override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
  committer.commitJob(jobContext)
  
  if (hasValidPath) {
    val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
    
    // Move files from staging to final location
    for ((src, dst) <- filesToMove) {
      if (!fs.rename(new Path(src), new Path(dst))) {
        throw new IOException(s"Failed to rename $src to $dst")
      }
    }
    
    // Clean up staging directory
    fs.delete(stagingDir, true)
  }
}

Files move to final location:

/data/output/part-00000.parquet

Job Abort

If ANY task fails, the driver aborts:

override def abortJob(jobContext: JobContext): Unit = {
  try {
    committer.abortJob(jobContext, JobStatus.State.FAILED)
  } catch { case e: IOException => logWarning(...) }
  
  try {
    if (hasValidPath) {
      val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
      fs.delete(stagingDir, true)  // Delete ALL staging data
    }
  } catch { case e: IOException => logWarning(...) }
}

Describe the potential solution

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions