【spark系列7】spark delta写操作ACID事务实现分析

本文涉及的产品
函数计算FC,每月15万CU 3个月
简介: 【spark系列7】spark delta写操作ACID事务实现分析

背景


本文基于delta 0.7.0

spark 3.0.1

我们之前的spark delta写操作ACID事务前传–写文件基础类FileFormat/FileCommitProtocol分析分析了delta写数据的流程,但是还没分析deltalog 写数据的流程,这部分也是实现ACID的核心部分。


背景


本文基于delta 0.7.0

spark 3.0.1

我们之前的spark delta写操作ACID事务前传–写文件基础类FileFormat/FileCommitProtocol分析分析了delta写数据的流程,但是还没分析deltalog 写数据的流程,这部分也是实现ACID的核心部分。


##分析


直接到WriteIntoDelta.run

override def run(sparkSession: SparkSession): Seq[Row] = {
    deltaLog.withNewTransaction { txn =>
      val actions = write(txn, sparkSession)
      val operation = DeltaOperations.Write(mode, Option(partitionColumns),
        options.replaceWhere, options.userMetadata)
      txn.commit(actions, operation)
    }
    Seq.empty
  }

我们来看一下deltaLog.withNewTrancation方法 :

 def withNewTransaction[T](thunk: OptimisticTransaction => T): T = {
    try {
      update()
      val txn = new OptimisticTransaction(this)
      OptimisticTransaction.setActive(txn)
      thunk(txn)
    } finally {
      OptimisticTransaction.clearActive()
    }
  }
  1. 首先update方法直接同步调用updateInternal用来更新当前deltalog的snapshot,具体的updateInternal如下:
val segment = getLogSegmentForVersion(currentSnapshot.logSegment.checkpointVersion)
          if (segment.version == currentSnapshot.version) {
            // Exit early if there is no new file
            lastUpdateTimestamp = clock.getTimeMillis()
            return currentSnapshot
          }
          logInfo(s"Loading version ${segment.version}" +
            segment.checkpointVersion.map(v => s"starting from checkpoint version $v."))
          val newSnapshot = createSnapshot(
            segment,
            minFileRetentionTimestamp,
            segment.lastCommitTimestamp)
            ...
          currentSnapshot.uncache()
          currentSnapshot = newSnapshot

首先先通过getLogSegmentForVersion方法获取当前最新的snapshot,之后更新到内存

  1. 设置OptimisticTransaction,并在当前事务中执行当前语句
val actions = write(txn, sparkSession)
      val operation = DeltaOperations.Write(mode, Option(partitionColumns),
        options.replaceWhere, options.userMetadata)
      txn.commit(actions, operation)

val atcions = write(txn, sparksession)我们已经在spark delta写操作ACID事务前传–写文件基础类FileFormat/FileCommitProtocol分析分析了,即会返回Seq[AddAction],而实际的数据文件已经存储到了文件目录下

val operation = DeltaOperations.Write(mode, Option(partitionColumns)记录了这是一个delta write Operation

txn.commit(actions, operation) 是该提交delta log的关键:

def commit(actions: Seq[Action], op: DeltaOperations.Operation): Long = recordDeltaOperation(
      deltaLog,
      "delta.commit") {
    commitStartNano = System.nanoTime()
    val version = try {
      // Try to commit at the next version.
      var finalActions = prepareCommit(actions, op)
      // Find the isolation level to use for this commit
      val noDataChanged = actions.collect { case f: FileAction => f.dataChange }.forall(_ == false)
      val isolationLevelToUse = if (noDataChanged) {
        // If no data has changed (i.e. its is only being rearranged), then SnapshotIsolation
        // provides Serializable guarantee. Hence, allow reduced conflict detection by using
        // SnapshotIsolation of what the table isolation level is.
        SnapshotIsolation
      } else {
        Serializable
      }
      val isBlindAppend = {
        val dependsOnFiles = readPredicates.nonEmpty || readFiles.nonEmpty
        val onlyAddFiles =
          finalActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile])
        onlyAddFiles && !dependsOnFiles
      }
      if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED)) {
        commitInfo = CommitInfo(
          clock.getTimeMillis(),
          op.name,
          op.jsonEncodedValues,
          Map.empty,
          Some(readVersion).filter(_ >= 0),
          None,
          Some(isBlindAppend),
          getOperationMetrics(op),
          getUserMetadata(op))
        finalActions = commitInfo +: finalActions
      }
      // Register post-commit hooks if any
      lazy val hasFileActions = finalActions.collect { case f: FileAction => f }.nonEmpty
      if (DeltaConfigs.SYMLINK_FORMAT_MANIFEST_ENABLED.fromMetaData(metadata) && hasFileActions) {
        registerPostCommitHook(GenerateSymlinkManifest)
      }
      val commitVersion = doCommit(snapshot.version + 1, finalActions, 0, isolationLevelToUse)
      logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}")
      postCommit(commitVersion, finalActions)
      commitVersion
    } catch {
      case e: DeltaConcurrentModificationException =>
        recordDeltaEvent(deltaLog, "delta.commit.conflict." + e.conflictType)
        throw e
      case NonFatal(e) =>
        recordDeltaEvent(
          deltaLog, "delta.commit.failure", data = Map("exception" -> Utils.exceptionString(e)))
        throw e
    }
    runPostCommitHooks(version, actions)
    version
  }

prepareCommit用来做一些提交前的检查,以及增加一些actions,

如果是第一次提交还得增加Protocol,如{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}

如metadata变化了,还得增加newMetadata,如 {"metaData":{"id":"2b2457e3-ce74-4897-abbd-04a94692304a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1609398723678}}

如果配置了spark.databricks.delta.commitInfo.enabled(默认是true)则还会增加commitInfo信息等,如{"commitInfo":{"timestamp":1609400013646,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"306","numOutputRows":"0"}}},

如果做了Presto / Athena兼容,还会注册GenerateSymlinkManifest postCommitHook,在commit成功后还会进行调用

doCommit(snapshot.version + 1, finalActions, 0, isolationLevelToUse) 同步提交最终的action到deltalog:

   protected def doCommit(
      attemptVersion: Long,
      actions: Seq[Action],
      attemptNumber: Int,
      isolationLevel: IsolationLevel): Long = deltaLog.lockInterruptibly {
    try {
      ...
      deltaLog.store.write(
        deltaFile(deltaLog.logPath, attemptVersion),
        actions.map(_.json).toIterator)
      val commitTime = System.nanoTime()
      val postCommitSnapshot = deltaLog.update()
      if (postCommitSnapshot.version < attemptVersion) {
        recordDeltaEvent(deltaLog, "delta.commit.inconsistentList", data = Map(
          "committedVersion" -> attemptVersion,
          "currentVersion" -> postCommitSnapshot.version
        ))
        throw new IllegalStateException(
          s"The committed version is $attemptVersion " +
            s"but the current version is ${postCommitSnapshot.version}.")
      }
      // Post stats
      var numAbsolutePaths = 0
      var pathHolder: Path = null
      val distinctPartitions = new mutable.HashSet[Map[String, String]]
      val adds = actions.collect {
        case a: AddFile =>
          pathHolder = new Path(new URI(a.path))
          if (pathHolder.isAbsolute) numAbsolutePaths += 1
          distinctPartitions += a.partitionValues
          a
      }
      ...
      attemptVersion
    } catch {
      case e: java.nio.file.FileAlreadyExistsException =>
        checkAndRetry(attemptVersion, actions, attemptNumber, isolationLevel)
    }
  }

deltaLog.store.write(

deltaFile(deltaLog.logPath, attemptVersion),

actions.map(_.json).toIterator)` 方法直接调用HDFSLogStore的write方法,而最终调用writeInternal方法,这里attemptVersion是当前的version+1

我们看一下writeInternal方法:

  private def writeInternal(path: Path, actions: Iterator[String], overwrite: Boolean): Unit = {
  val fc: FileContext = try {
    getFileContext(path)
  } catch {
    case e: IOException if e.getMessage.contains(noAbstractFileSystemExceptionMessage) =>
      val newException = DeltaErrors.incorrectLogStoreImplementationException(sparkConf, e)
      logError(newException.getMessage, newException.getCause)
      throw newException
  }
  if (!overwrite && fc.util.exists(path)) {
    // This is needed for the tests to throw error with local file system
    throw new FileAlreadyExistsException(path.toString)
  }
  val tempPath = createTempPath(path)
  var streamClosed = false // This flag is to avoid double close
  var renameDone = false // This flag is to save the delete operation in most of cases.
  val stream = fc.create(
    tempPath, EnumSet.of(CREATE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))
  try {
    actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(stream.write)
    stream.close()
    streamClosed = true
    try {
      val renameOpt = if (overwrite) Options.Rename.OVERWRITE else Options.Rename.NONE
      fc.rename(tempPath, path, renameOpt)
      renameDone = true
      // TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved
      tryRemoveCrcFile(fc, tempPath)
    } catch {
      case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
        throw new FileAlreadyExistsException(path.toString)
    }
  } finally {
    if (!streamClosed) {
      stream.close()
    }
    if (!renameDone) {
      fc.delete(tempPath, false)
    }
  }
}
如果文件存在,则抛出异常。否则写入该log文件,这里注意:如果是本地文件的话,需要加同步进行rename操作,因为本地文件的rename操作,即使目标文件存在了也不会报异常,其他的文件类型,则不需要加同步。
如果deltaLog.store.write没有发生异常,则获取最新的snaphost,进行记录,返回传入的attemptVersion,
如果发生了异常,则执行checkAndRetry(attemptVersion, actions, attemptNumber, isolationLevel),进行重试,重试的时候,我们得查找以下从事务提交到失败这段时间的由其他程序提交的Actions,如果和当前actions没有冲突则继续提交,否则抛出异常

postCommit 进行checkpoint操作,每隔 10 个提交,Delta Lake 会在 _delta_log 子目录下自动生成一个 Parquet 格式的 checkpoint 文件,便于快速replays,且清除过期的deltalog,默认保存30天(这里会把addfile和removefile合并了,比如先ad

d A文件,之后remove A 合并完了A就没记录了):

def checkpoint(): Unit = recordDeltaOperation(this, "delta.checkpoint") {
    val checkpointMetaData = checkpoint(snapshot)
    val json = JsonUtils.toJson(checkpointMetaData)
    store.write(LAST_CHECKPOINT, Iterator(json), overwrite = true)
    doLogCleanup()
  } 
  1. runPostCommitHooks 如果注册了postCommitHooks,就执行

至此整个delta写deltalog的流程就结束了,

整个流程如下:

update 获取最新的snapshot
     |
     v
write()写入delta data
    |
    v
commit 事务提交 -> prepareCommit 用来做一些提交前的检查,以及增加一些actions 
                         |
                         v
                    doCommit 真正写入deltalog,会一直重试直到冲突
                         |
                         v
                    postCommit 进行checkpoint操作,合并Addfile和RemoveFile,便于快速replays,且清除过期的delta log
                         |
                         v
                    runPostCommitHooks 如果存在hook则执行对应的hook
   |
   v
  结束


相关实践学习
【文生图】一键部署Stable Diffusion基于函数计算
本实验教你如何在函数计算FC上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。函数计算提供一定的免费额度供用户使用。本实验答疑钉钉群:29290019867
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
相关文章
|
5月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
184 1
Spark快速大数据分析PDF下载读书分享推荐
|
2月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
153 2
|
2月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
86 0
|
5月前
|
弹性计算 分布式计算 Serverless
全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
【7月更文挑战第6天】全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
23717 42
|
5月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
机器学习/深度学习 数据采集 分布式计算
基于spark的大数据分析预测地震受灾情况的系统设计
基于spark的大数据分析预测地震受灾情况的系统设计
163 1
|
6月前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之spark客户端执行时,报错,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
分布式计算 定位技术 Scala
使用spark基于出租车GPS数据实现车辆数量统计以及北京每个城区的车辆位置点数分析
使用spark基于出租车GPS数据实现车辆数量统计以及北京每个城区的车辆位置点数分析
120 0