背景
本文基于delta 0.7.0
spark 3.0.1
我们知道spark或者mapreduce在写文件的时候么,都会写入的文件目录中写入一个临时目录_temporary,用来存储正在写入的文件,那么这是怎么实现的呢以及是怎么控制的,这部分了解了可以避免在多实例写同一个目录下的冲突问题,之后我们再分析一下delta是怎么实现spark多实例下怎么避免文件冲突,这部分是理解delta ACID事务的前提。
分析
def write( sparkSession: SparkSession, plan: SparkPlan, fileFormat: FileFormat, committer: FileCommitProtocol, outputSpec: OutputSpec, hadoopConf: Configuration, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], statsTrackers: Seq[WriteJobStatsTracker], options: Map[String, String]) : Set[String] = {
因为delta是基于parquet实现的,
所以我们fileformat选择分析ParquetFileFormat,
而对于FileCommitProtocol,我们分析SQLHadoopMapReduceCommitProtocol
- 该write方法实现比较长,我们讲重点 :
committer.setupJob(job)
这个做一些job提交前的准备工作,比如设置jobId,taskId,设置OutputCommitter,OutputCommitter是用来。。
override def setupJob(jobContext: JobContext): Unit = { // Setup IDs val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0) val taskId = new TaskID(jobId, TaskType.MAP, 0) val taskAttemptId = new TaskAttemptID(taskId, 0) // Set up the configuration object jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString) jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString) jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString) jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true) jobContext.getConfiguration.setInt("mapreduce.task.partition", 0) val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) committer = setupCommitter(taskAttemptContext) committer.setupJob(jobContext) }
ParquetFileFormat对应的OutputCommitter是ParquetOutputCommitter,我们看一下方法:format.getOutputCommitter(context)
,ParquetOutputCommitter为:
public FileOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { this(outputPath, (JobContext)context); if (outputPath != null) { workPath = getTaskAttemptPath(context, outputPath); } }
注意这里的workPath(全局变量)赋值为$outputPath/_temporary,在以下newTaskTempFile方法中会用到
接着进行setupJob操作:
public void setupJob(JobContext context) throws IOException { if (hasOutputPath()) { Path jobAttemptPath = getJobAttemptPath(context); FileSystem fs = jobAttemptPath.getFileSystem( context.getConfiguration()); if (!fs.mkdirs(jobAttemptPath)) { LOG.error("Mkdirs failed to create " + jobAttemptPath); } } else { LOG.warn("Output Path is null in setupJob()");
而getJobAttemptPath中引用到$path/_temporary目录(其中path是文件输出目录),且建立该目录
接下来是进行任务的提交:
sparkSession.sparkContext.runJob( rddWithNonEmptyPartitions, (taskContext: TaskContext, iter: Iterator[InternalRow]) => { executeTask( description = description, jobIdInstant = jobIdInstant, sparkStageId = taskContext.stageId(), sparkPartitionId = taskContext.partitionId(), sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, committer, iterator = iter) }, rddWithNonEmptyPartitions.partitions.indices, (index, res: WriteTaskResult) => { committer.onTaskCommit(res.commitMsg) ret(index) = res })
其中重点看看executeTask方法:
committer.setupTask(taskAttemptContext) val dataWriter = if (sparkPartitionId != 0 && !iterator.hasNext) { // In case of empty job, leave first partition to save meta for file format like parquet. new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) } else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) { new SingleDirectoryDataWriter(description, taskAttemptContext, committer) } else { new DynamicPartitionDataWriter(description, taskAttemptContext, committer) }
- 对于SQLHadoopMapReduceCommitProtocol:setupTask实现如下:
committer = setupCommitter(taskContext) committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]()
而committer.setupTask(taskContext),对应到ParquetOutputCommitter为空实现,
- 之后看数据写入的最终执行者dataWriter,
如果是没有分区,则是SingleDirectoryDataWriter:
class SingleDirectoryDataWriter( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, committer: FileCommitProtocol) extends FileFormatDataWriter(description, taskAttemptContext, committer) { private var fileCounter: Int = _ private var recordsInFile: Long = _ // Initialize currentWriter and statsTrackers newOutputWriter() private def newOutputWriter(): Unit = { recordsInFile = 0 releaseResources() val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext) val currentPath = committer.newTaskTempFile( taskAttemptContext, None, f"-c$fileCounter%03d" + ext) currentWriter = description.outputWriterFactory.newInstance( path = currentPath, dataSchema = description.dataColumns.toStructType, context = taskAttemptContext) statsTrackers.foreach(_.newFile(currentPath)) } override def write(record: InternalRow): Unit = { if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) { fileCounter += 1 assert(fileCounter < MAX_FILE_COUNTER, s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER") newOutputWriter() } currentWriter.write(record) statsTrackers.foreach(_.newRow(record)) recordsInFile += 1 } }
这里写文件是哪里呢?
val currentPath = committer.newTaskTempFile( taskAttemptContext, None, f"-c$fileCounter%03d" + ext)
对应到HadoopMapReduceCommitProtocol到newTaskTempFile方法为:
override def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { val filename = getFilename(taskContext, ext) val stagingDir: Path = committer match { case _ if dynamicPartitionOverwrite => assert(dir.isDefined, "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") partitionPaths += dir.get this.stagingDir // For FileOutputCommitter it has its own staging path called "work path". case f: FileOutputCommitter => new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) case _ => new Path(path) } dir.map { d => new Path(new Path(stagingDir, d), filename).toString }.getOrElse { new Path(stagingDir, filename).toString } }
如果开启partitionOverwriteMode,则设置为new Path(path, “.spark-staging-” + jobId)
如果没有开启partitionOverwriteMode,且FileOutputCommitter的子类,如果workpath存在则设置为workPath,否则为path,注意我们之前FileOutputCommitter构造方法中已经设置了workPath,所以最终的输出目录为$path/_temporary
所以job向该目录写入数据。
DynamicPartitionDataWriter的分析,读者可以进行类似的分析,只不过目录则加了分区信息,只写入自己的分区目录中
如果写入成功的话执行如下:
try { Utils.tryWithSafeFinallyAndFailureCallbacks(block = { // Execute the task to write rows out and commit the task. while (iterator.hasNext) { dataWriter.write(iterator.next()) } dataWriter.commit() })(catchBlock = { // If there is an error, abort the task dataWriter.abort() logError(s"Job $jobId aborted.") }, finallyBlock = { dataWriter.close() })
dataWriter.commit()如下:
override def commit(): WriteTaskResult = { releaseResources() val summary = ExecutedWriteSummary( updatedPartitions = updatedPartitions.toSet, stats = statsTrackers.map(_.getFinalStats())) WriteTaskResult(committer.commitTask(taskAttemptContext), summary) }
首先会释放资源,也就是关闭writer
之后调用FileCommitProtocol.commitTask();
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet) }
而SparkHadoopMapRedUtil.commitTask最终调用FileOutputCommitter的commitTask方法把 P A T H / t e m p o r a r y 下 文 件 m v 到 PATH/_temporary下文件mv到 PATH/
t
emporary下文件mv到PATH下
之后返回统计的数值,数据格式如下:
case class BasicWriteTaskStats( numPartitions: Int, numFiles: Int, numBytes: Long, numRows: Long) extends WriteTaskStats
之后会committer.onTaskCommit(res.commitMsg)操作,
对于SQLHadoopMapReduceCommitProtocol的实现为:
logDebug(s"onTaskCommit($taskCommit)")
下一步committer.commitJob(job, commitMsgs):
... committer.commitJob(jobContext) ... for ((src, dst) <- filesToMove) { fs.rename(new Path(src), new Path(dst)) } ... fs.delete(stagingDir, true)
这里主要涉及清理job,以及把task所产生的文件(writer输出的临时文件)移动到path目录下,且清理临时目录,至此文件真正的写入到了path目录下
- 指标记录
private[datasources] def processStats( statsTrackers: Seq[WriteJobStatsTracker], statsPerTask: Seq[Seq[WriteTaskStats]]) : Unit = { val numStatsTrackers = statsTrackers.length assert(statsPerTask.forall(_.length == numStatsTrackers), s"""Every WriteTask should have produced one `WriteTaskStats` object for every tracker. |There are $numStatsTrackers statsTrackers, but some task returned |${statsPerTask.find(_.length != numStatsTrackers).get.length} results instead. """.stripMargin) val statsPerTracker = if (statsPerTask.nonEmpty) { statsPerTask.transpose } else { statsTrackers.map(_ => Seq.empty) } statsTrackers.zip(statsPerTracker).foreach { case (statsTracker, stats) => statsTracker.processStats(stats) } }
主要是把刚才job的指标通过statsTrackers传给driver,而目前的statsTracker实现类为BasicWriteJobStatsTracker,也就是说最终会通过listenerbus以事件的形式传播,
如下代码:
class BasicWriteJobStatsTracker( serializableHadoopConf: SerializableConfiguration, @transient val metrics: Map[String, SQLMetric]) extends WriteJobStatsTracker { ... override def processStats(stats: Seq[WriteTaskStats]): Unit = { ... metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles) metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes) metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput) metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(numPartitions) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList) } }
主要是把刚才job的指标通过statsTrackers传给driver,而目前的statsTracker实现类为BasicWriteJobStatsTracker,也就是说最终会通过listenerbus以事件的形式传播,
如下代码:
class BasicWriteJobStatsTracker( serializableHadoopConf: SerializableConfiguration, @transient val metrics: Map[String, SQLMetric]) extends WriteJobStatsTracker { ... override def processStats(stats: Seq[WriteTaskStats]): Unit = { ... metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles) metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes) metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput) metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(numPartitions) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList) } }
至此整个spark parquet write文件的数据流程我们就已经全部过了一遍了,部分细节没有展示。
最终的数据流如下:
实例化Job对象 | v FileCommitProtocol.setupJob -> OutputCommitter.setupJob 进行作业运行前的准备,如建立临时目录_temporary等 | v executeTask()-> FileCommitProtocol.setupTask -> OutputCommitter.setupTask 目前为空实现 | v FileCommitProtocol.newTaskTempFile/newTaskTempFileAbsPath 建立写任务的临时目录 | v dataWriter.write() | v dataWriter.commit() 释放资源以及返回写入文件的指标信息 -> HadoopMapReduceCommitProtocol.commitTask | v SparkHadoopMapRedUtil.commitTask 完成mv $PATH/_temporary文件 到$PATH目录,以及做outputCommitCoordination | v 返回需要额外临时目录的信息 | v FileCommitProtocol.onTaskCommit | v FileCommitProtocol.commitJob -> OutputCommitter.commitJob 清理$PATH/_temporary目录且把写额外临时目录下的文件mv到最终path目录下 | v processStats,处理写入的文件指标
那对应到delta中,spark写入delta数据是怎么写入的呢?其实流程和以上的流程一模一样,唯一不同的是FileCommitProtocol类的实现,直接到TransactionalWrite.writeFiles:
def writeFiles( data: Dataset[_], writeOptions: Option[DeltaOptions], isOptimize: Boolean): Seq[AddFile] = { hasWritten = true ... val committer = getCommitter(outputPath) ... FileFormatWriter.write( sparkSession = spark, plan = physicalPlan, fileFormat = snapshot.fileFormat, // TODO doesn't support changing formats. committer = committer, outputSpec = outputSpec, hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration), partitionColumns = partitioningColumns, bucketSpec = None, statsTrackers = statsTrackers, options = Map.empty) } committer.addedStatuses }
而这里的commiter为DelayedCommitProtocol,如下:
new DelayedCommitProtocol("delta", outputPath.toString, None)
我们来看一下DelayedCommitProtocol方法:
override def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { val filename = getFileName(taskContext, ext) val partitionValues = dir.map(parsePartitions).getOrElse(Map.empty[String, String]) val relativePath = randomPrefixLength.map { prefixLength => getRandomPrefix(prefixLength) // Generate a random prefix as a first choice }.orElse { dir // or else write into the partition directory if it is partitioned }.map { subDir => new Path(subDir, filename) }.getOrElse(new Path(filename)) // or directly write out to the output path addedFiles.append((partitionValues, relativePath.toUri.toString)) new Path(path, relativePath).toString } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { if (addedFiles.nonEmpty) { val fs = new Path(path, addedFiles.head._2).getFileSystem(taskContext.getConfiguration) val statuses: Seq[AddFile] = addedFiles.map { f => val filePath = new Path(path, new Path(new URI(f._2))) val stat = fs.getFileStatus(filePath) AddFile(f._2, f._1, stat.getLen, stat.getModificationTime, true) } new TaskCommitMessage(statuses) } else { new TaskCommitMessage(Nil) } } override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[AddFile]]).toArray addedStatuses ++= fileStatuses }
其中newTaskTempFile生成的文件中多了一个UUID.randomUUID.toString,这能减少文件的冲突
newTaskTempFile目前直接是返回了输出目录,而不是_temporary目录
commitTask只是记录增加的文件
commitJob并没有真正的提交job,只是把AddFile保存到了内存中
后续我们会分析delta怎么处理AddFile,从而做到事务性
注意task输出的文件目录为: ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName} 如:/data/_temporary/0/_temporary/attempt_20190219101232_0003_m_000005_0/part-00000-936a4f3b-8f71-48ce-960a-e60d3cf4488f.c000.snappy.parquet