-
Notifications
You must be signed in to change notification settings - Fork 265
Open
Labels
enhancementNew feature or requestNew feature or request
Description
What is the problem the feature request solves?
Spark Uses Committer and Staging Paths for Writes
Apache Spark uses the FileCommitProtocol with staging paths to ensure atomic, fault-tolerant, and consistent distributed writes. Here's why:
1. The Problem: Distributed Writing is Complex
When multiple tasks write data in parallel across a cluster:
Task failures can leave partial/corrupted data
Speculative execution may create duplicate files
Network failures can interrupt writes mid-stream
Concurrent writes to the same location can conflict
2. The Solution: Two-Phase Commit Protocol
Spark implements a two-phase commit pattern borrowed from database systems:
Phase 1: Write to Staging (Task Commit)
Each task writes to a temporary staging location instead of the final destination.
Phase 2: Atomic Move (Job Commit)
Only after ALL tasks succeed, the driver atomically moves files from staging to final location.
Currently Comet just doing naive writes without 2PC
Spark implementation
The Staging Directory From FileCommitProtocol.getStagingDir()
def getStagingDir(path: String, jobId: String): Path = {
new Path(path, ".spark-staging-" + jobId)
}
Example: If writing to /data/output, staging is /data/output/.spark-staging-{jobId}
How It Works: HadoopMapReduceCommitProtocol
here's the complete flow:
A. Task Execution
Each task writes to staging:
override def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = {
val filename = getFilename(taskContext, spec)
val stagingDir: Path = committer match {
case f: FileOutputCommitter =>
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
case _ => new Path(path)
}
dir.map { d =>
new Path(new Path(stagingDir, d), filename).toString
}.getOrElse {
new Path(stagingDir, filename).toString
}
}
File path example:
/data/output/.spark-staging-{jobId}/_temporary/{appAttemptId}/_temporary/{taskAttemptId}/part-00000.parquet
Task Commit
When a task completes successfully:
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
SparkHadoopMapRedUtil.commitTask(committer, taskContext, ...)
new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
}
Files move from task-specific temp to job-level staging:
/data/output/.spark-staging-{jobId}/part-00000.parquet
Job Commit
Only after ALL tasks succeed, the driver commits:
override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
committer.commitJob(jobContext)
if (hasValidPath) {
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
// Move files from staging to final location
for ((src, dst) <- filesToMove) {
if (!fs.rename(new Path(src), new Path(dst))) {
throw new IOException(s"Failed to rename $src to $dst")
}
}
// Clean up staging directory
fs.delete(stagingDir, true)
}
}
Files move to final location:
/data/output/part-00000.parquet
Job Abort
If ANY task fails, the driver aborts:
override def abortJob(jobContext: JobContext): Unit = {
try {
committer.abortJob(jobContext, JobStatus.State.FAILED)
} catch { case e: IOException => logWarning(...) }
try {
if (hasValidPath) {
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
fs.delete(stagingDir, true) // Delete ALL staging data
}
} catch { case e: IOException => logWarning(...) }
}
Describe the potential solution
No response
Additional context
No response
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request