【spark系列6】spark delta写操作ACID事务前传--写文件基础类FileFormat/FileCommitProtocol分析

简介: 【spark系列6】spark delta写操作ACID事务前传--写文件基础类FileFormat/FileCommitProtocol分析

背景


本文基于delta 0.7.0

spark 3.0.1

我们知道spark或者mapreduce在写文件的时候么,都会写入的文件目录中写入一个临时目录_temporary,用来存储正在写入的文件,那么这是怎么实现的呢以及是怎么控制的,这部分了解了可以避免在多实例写同一个目录下的冲突问题,之后我们再分析一下delta是怎么实现spark多实例下怎么避免文件冲突,这部分是理解delta ACID事务的前提。


分析

 def write(
      sparkSession: SparkSession,
      plan: SparkPlan,
      fileFormat: FileFormat,
      committer: FileCommitProtocol,
      outputSpec: OutputSpec,
      hadoopConf: Configuration,
      partitionColumns: Seq[Attribute],
      bucketSpec: Option[BucketSpec],
      statsTrackers: Seq[WriteJobStatsTracker],
      options: Map[String, String])
    : Set[String] = {

因为delta是基于parquet实现的,

所以我们fileformat选择分析ParquetFileFormat,

而对于FileCommitProtocol,我们分析SQLHadoopMapReduceCommitProtocol

  1. 该write方法实现比较长,我们讲重点 :
committer.setupJob(job)

这个做一些job提交前的准备工作,比如设置jobId,taskId,设置OutputCommitter,OutputCommitter是用来。。

override def setupJob(jobContext: JobContext): Unit = {
    // Setup IDs
    val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
    val taskId = new TaskID(jobId, TaskType.MAP, 0)
    val taskAttemptId = new TaskAttemptID(taskId, 0)
    // Set up the configuration object
    jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString)
    jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
    jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString)
    jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
    jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)
    val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
    committer = setupCommitter(taskAttemptContext)
    committer.setupJob(jobContext)
  }

ParquetFileFormat对应的OutputCommitter是ParquetOutputCommitter,我们看一下方法:format.getOutputCommitter(context),ParquetOutputCommitter为:

public FileOutputCommitter(Path outputPath, 
                             TaskAttemptContext context) throws IOException {
    this(outputPath, (JobContext)context);
    if (outputPath != null) {
      workPath = getTaskAttemptPath(context, outputPath);
    }
  }

注意这里的workPath(全局变量)赋值为$outputPath/_temporary,在以下newTaskTempFile方法中会用到

接着进行setupJob操作:

public void setupJob(JobContext context) throws IOException {
    if (hasOutputPath()) {
      Path jobAttemptPath = getJobAttemptPath(context);
      FileSystem fs = jobAttemptPath.getFileSystem(
          context.getConfiguration());
      if (!fs.mkdirs(jobAttemptPath)) {
        LOG.error("Mkdirs failed to create " + jobAttemptPath);
      }
    } else {
      LOG.warn("Output Path is null in setupJob()");

而getJobAttemptPath中引用到$path/_temporary目录(其中path是文件输出目录),且建立该目录

接下来是进行任务的提交:

sparkSession.sparkContext.runJob(
        rddWithNonEmptyPartitions,
        (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
          executeTask(
            description = description,
            jobIdInstant = jobIdInstant,
            sparkStageId = taskContext.stageId(),
            sparkPartitionId = taskContext.partitionId(),
            sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
            committer,
            iterator = iter)
        },
        rddWithNonEmptyPartitions.partitions.indices,
        (index, res: WriteTaskResult) => {
          committer.onTaskCommit(res.commitMsg)
          ret(index) = res
        })

其中重点看看executeTask方法:

committer.setupTask(taskAttemptContext)
    val dataWriter =
      if (sparkPartitionId != 0 && !iterator.hasNext) {
        // In case of empty job, leave first partition to save meta for file format like parquet.
        new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)
      } else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) {
        new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
      } else {
        new DynamicPartitionDataWriter(description, taskAttemptContext, committer)
      }
  • 对于SQLHadoopMapReduceCommitProtocol:setupTask实现如下:
committer = setupCommitter(taskContext)
    committer.setupTask(taskContext)
    addedAbsPathFiles = mutable.Map[String, String]()
    partitionPaths = mutable.Set[String]()

而committer.setupTask(taskContext),对应到ParquetOutputCommitter为空实现,

  • 之后看数据写入的最终执行者dataWriter,
    如果是没有分区,则是SingleDirectoryDataWriter:
class SingleDirectoryDataWriter(
    description: WriteJobDescription,
    taskAttemptContext: TaskAttemptContext,
    committer: FileCommitProtocol)
  extends FileFormatDataWriter(description, taskAttemptContext, committer) {
  private var fileCounter: Int = _
  private var recordsInFile: Long = _
  // Initialize currentWriter and statsTrackers
  newOutputWriter()
  private def newOutputWriter(): Unit = {
    recordsInFile = 0
    releaseResources()
    val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext)
    val currentPath = committer.newTaskTempFile(
      taskAttemptContext,
      None,
      f"-c$fileCounter%03d" + ext)
    currentWriter = description.outputWriterFactory.newInstance(
      path = currentPath,
      dataSchema = description.dataColumns.toStructType,
      context = taskAttemptContext)
    statsTrackers.foreach(_.newFile(currentPath))
  }
  override def write(record: InternalRow): Unit = {
    if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) {
      fileCounter += 1
      assert(fileCounter < MAX_FILE_COUNTER,
        s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")
      newOutputWriter()
    }
    currentWriter.write(record)
    statsTrackers.foreach(_.newRow(record))
    recordsInFile += 1
  }
}

这里写文件是哪里呢?

    val currentPath = committer.newTaskTempFile(
      taskAttemptContext,
      None,
      f"-c$fileCounter%03d" + ext)

对应到HadoopMapReduceCommitProtocol到newTaskTempFile方法为:

override def newTaskTempFile(
      taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
    val filename = getFilename(taskContext, ext)
    val stagingDir: Path = committer match {
      case _ if dynamicPartitionOverwrite =>
        assert(dir.isDefined,
          "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
        partitionPaths += dir.get
        this.stagingDir
      // For FileOutputCommitter it has its own staging path called "work path".
      case f: FileOutputCommitter =>
        new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
      case _ => new Path(path)
    }
    dir.map { d =>
      new Path(new Path(stagingDir, d), filename).toString
    }.getOrElse {
      new Path(stagingDir, filename).toString
    }
  }

如果开启partitionOverwriteMode,则设置为new Path(path, “.spark-staging-” + jobId)

如果没有开启partitionOverwriteMode,且FileOutputCommitter的子类,如果workpath存在则设置为workPath,否则为path,注意我们之前FileOutputCommitter构造方法中已经设置了workPath,所以最终的输出目录为$path/_temporary


所以job向该目录写入数据。

DynamicPartitionDataWriter的分析,读者可以进行类似的分析,只不过目录则加了分区信息,只写入自己的分区目录中


如果写入成功的话执行如下:

try {
      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
        // Execute the task to write rows out and commit the task.
        while (iterator.hasNext) {
          dataWriter.write(iterator.next())
        }
        dataWriter.commit()
      })(catchBlock = {
        // If there is an error, abort the task
        dataWriter.abort()
        logError(s"Job $jobId aborted.")
      }, finallyBlock = {
        dataWriter.close()
      })

dataWriter.commit()如下:

override def commit(): WriteTaskResult = {
    releaseResources()
    val summary = ExecutedWriteSummary(
      updatedPartitions = updatedPartitions.toSet,
      stats = statsTrackers.map(_.getFinalStats()))
    WriteTaskResult(committer.commitTask(taskAttemptContext), summary)
  }

首先会释放资源,也就是关闭writer

之后调用FileCommitProtocol.commitTask();

 override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
    val attemptId = taskContext.getTaskAttemptID
    logTrace(s"Commit task ${attemptId}")
    SparkHadoopMapRedUtil.commitTask(
      committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
    new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
  }

而SparkHadoopMapRedUtil.commitTask最终调用FileOutputCommitter的commitTask方法把 P A T H / t e m p o r a r y 下 文 件 m v 到 PATH/_temporary下文件mv到 PATH/

t


emporary下文件mv到PATH下

之后返回统计的数值,数据格式如下:

case class BasicWriteTaskStats(
    numPartitions: Int,
    numFiles: Int,
    numBytes: Long,
    numRows: Long)
  extends WriteTaskStats

之后会committer.onTaskCommit(res.commitMsg)操作,

对于SQLHadoopMapReduceCommitProtocol的实现为:

logDebug(s"onTaskCommit($taskCommit)")


下一步committer.commitJob(job, commitMsgs):

...
 committer.commitJob(jobContext)
 ...
  for ((src, dst) <- filesToMove) {
        fs.rename(new Path(src), new Path(dst))
      }
 ...
 fs.delete(stagingDir, true)

这里主要涉及清理job,以及把task所产生的文件(writer输出的临时文件)移动到path目录下,且清理临时目录,至此文件真正的写入到了path目录下

  1. 指标记录
private[datasources] def processStats(
      statsTrackers: Seq[WriteJobStatsTracker],
      statsPerTask: Seq[Seq[WriteTaskStats]])
  : Unit = {
    val numStatsTrackers = statsTrackers.length
    assert(statsPerTask.forall(_.length == numStatsTrackers),
      s"""Every WriteTask should have produced one `WriteTaskStats` object for every tracker.
         |There are $numStatsTrackers statsTrackers, but some task returned
         |${statsPerTask.find(_.length != numStatsTrackers).get.length} results instead.
       """.stripMargin)
    val statsPerTracker = if (statsPerTask.nonEmpty) {
      statsPerTask.transpose
    } else {
      statsTrackers.map(_ => Seq.empty)
    }
    statsTrackers.zip(statsPerTracker).foreach {
      case (statsTracker, stats) => statsTracker.processStats(stats)
    }
  }

主要是把刚才job的指标通过statsTrackers传给driver,而目前的statsTracker实现类为BasicWriteJobStatsTracker,也就是说最终会通过listenerbus以事件的形式传播,

如下代码:

class BasicWriteJobStatsTracker(
    serializableHadoopConf: SerializableConfiguration,
    @transient val metrics: Map[String, SQLMetric])
  extends WriteJobStatsTracker {
   ...
  override def processStats(stats: Seq[WriteTaskStats]): Unit = {
    ...
    metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles)
    metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
    metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
    metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(numPartitions)
    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
  }
}

主要是把刚才job的指标通过statsTrackers传给driver,而目前的statsTracker实现类为BasicWriteJobStatsTracker,也就是说最终会通过listenerbus以事件的形式传播,

如下代码:

class BasicWriteJobStatsTracker(
    serializableHadoopConf: SerializableConfiguration,
    @transient val metrics: Map[String, SQLMetric])
  extends WriteJobStatsTracker {
   ...
  override def processStats(stats: Seq[WriteTaskStats]): Unit = {
    ...
    metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles)
    metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
    metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
    metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(numPartitions)
    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
  }
}

至此整个spark parquet write文件的数据流程我们就已经全部过了一遍了,部分细节没有展示。

最终的数据流如下:

实例化Job对象
      |
      v
FileCommitProtocol.setupJob -> OutputCommitter.setupJob  进行作业运行前的准备,如建立临时目录_temporary等
      |
      v
executeTask()-> FileCommitProtocol.setupTask -> OutputCommitter.setupTask 目前为空实现 
                |
                v
         FileCommitProtocol.newTaskTempFile/newTaskTempFileAbsPath 建立写任务的临时目录
                |
                v
         dataWriter.write() 
                |
                v
         dataWriter.commit() 释放资源以及返回写入文件的指标信息 -> HadoopMapReduceCommitProtocol.commitTask
                          |
                          v
                 SparkHadoopMapRedUtil.commitTask 完成mv $PATH/_temporary文件 到$PATH目录,以及做outputCommitCoordination
                           |
                           v
                 返回需要额外临时目录的信息  
      |
      v
FileCommitProtocol.onTaskCommit
      |
      v
FileCommitProtocol.commitJob -> OutputCommitter.commitJob 清理$PATH/_temporary目录且把写额外临时目录下的文件mv到最终path目录下
      |
      v
processStats,处理写入的文件指标

那对应到delta中,spark写入delta数据是怎么写入的呢?其实流程和以上的流程一模一样,唯一不同的是FileCommitProtocol类的实现,直接到TransactionalWrite.writeFiles:

def writeFiles(
      data: Dataset[_],
      writeOptions: Option[DeltaOptions],
      isOptimize: Boolean): Seq[AddFile] = {
    hasWritten = true
    ...
    val committer = getCommitter(outputPath)
    ...
      FileFormatWriter.write(
        sparkSession = spark,
        plan = physicalPlan,
        fileFormat = snapshot.fileFormat, // TODO doesn't support changing formats.
        committer = committer,
        outputSpec = outputSpec,
        hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration),
        partitionColumns = partitioningColumns,
        bucketSpec = None,
        statsTrackers = statsTrackers,
        options = Map.empty)
    }
    committer.addedStatuses
  }

而这里的commiter为DelayedCommitProtocol,如下:

    new DelayedCommitProtocol("delta", outputPath.toString, None)

我们来看一下DelayedCommitProtocol方法:

override def newTaskTempFile(
      taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
    val filename = getFileName(taskContext, ext)
    val partitionValues = dir.map(parsePartitions).getOrElse(Map.empty[String, String])
    val relativePath = randomPrefixLength.map { prefixLength =>
      getRandomPrefix(prefixLength) // Generate a random prefix as a first choice
    }.orElse {
      dir // or else write into the partition directory if it is partitioned
    }.map { subDir =>
      new Path(subDir, filename)
    }.getOrElse(new Path(filename)) // or directly write out to the output path
    addedFiles.append((partitionValues, relativePath.toUri.toString))
    new Path(path, relativePath).toString
  }
  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
    if (addedFiles.nonEmpty) {
      val fs = new Path(path, addedFiles.head._2).getFileSystem(taskContext.getConfiguration)
      val statuses: Seq[AddFile] = addedFiles.map { f =>
        val filePath = new Path(path, new Path(new URI(f._2)))
        val stat = fs.getFileStatus(filePath)
        AddFile(f._2, f._1, stat.getLen, stat.getModificationTime, true)
      }
      new TaskCommitMessage(statuses)
    } else {
      new TaskCommitMessage(Nil)
    }
  }
  override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
    val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[AddFile]]).toArray
    addedStatuses ++= fileStatuses
  }

其中newTaskTempFile生成的文件中多了一个UUID.randomUUID.toString,这能减少文件的冲突

newTaskTempFile目前直接是返回了输出目录,而不是_temporary目录

commitTask只是记录增加的文件

commitJob并没有真正的提交job,只是把AddFile保存到了内存中

后续我们会分析delta怎么处理AddFile,从而做到事务性

注意task输出的文件目录为:
${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName}
如:/data/_temporary/0/_temporary/attempt_20190219101232_0003_m_000005_0/part-00000-936a4f3b-8f71-48ce-960a-e60d3cf4488f.c000.snappy.parquet


相关文章
|
4月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
174 1
Spark快速大数据分析PDF下载读书分享推荐
|
26天前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
119 2
|
1月前
|
JSON 分布式计算 大数据
大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出
大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出
31 1
|
1月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
69 0
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
31 0
|
4月前
|
弹性计算 分布式计算 Serverless
全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
【7月更文挑战第6天】全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
23709 42
|
4月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
SQL JSON 分布式计算
不通过 Spark 获取 Delta Lake Snapshot
Delta Lake 进行数据删除或更新操作时实际上只是对被删除数据文件做了一个 remove 标记,在进行 vacuum 前并不会进行物理删除,因此一些例如在 web 上获取元数据或进行部分数据展示的操作如果直接从表路径下获取 parquet 文件信息,读到的可能是历史已经被标记删除的数据。
不通过 Spark 获取 Delta Lake Snapshot
下一篇
无影云桌面