Apache Spark Delta Lake 写数据使用及实现原理代码解析

简介: Apache Spark Delta Lake 写数据使用及实现原理代码解析 Delta Lake 写数据是其最基本的功能,而且其使用和现有的 Spark 写 Parquet 文件基本一致,在介绍 Delta Lake 实现原理之前先来看看如何使用它,具体使用如下: df.

Apache Spark Delta Lake 写数据使用及实现原理代码解析

Delta Lake 写数据是其最基本的功能,而且其使用和现有的 Spark 写 Parquet 文件基本一致,在介绍 Delta Lake 实现原理之前先来看看如何使用它,具体使用如下:

df.write.format("delta").save("/data/yangping.wyp/delta/test/")
 
//数据按照 dt 分区
df.write.format("delta").partitionBy("dt").save("/data/yangping.wyp/delta/test/")
 
// 覆盖之前的数据
df.write.format("delta").mode(SaveMode.Overwrite).save("/data/yangping.wyp/delta/test/")

大家可以看出,使用写 Delta 数据是非常简单的,这也是 Delte Lake 介绍的 100% 兼容 Spark。

Delta Lake 写数据原理

前面简单了解了如何使用 Delta Lake 来写数据,本小结我们将深入介绍 Delta Lake 是如何保证写数据的基本原理以及如何保证事务性。

得益于 Apache Spark 强大的数据源 API,我们可以很方便的给 Spark 添加任何数据源,Delta Lake 也不例外。Delta Lake 就是使用 DataSource V1 版本的 API 实现的一种新的数据源,我们调用 df.write.format("delta") 其实底层调用的是 org.apache.spark.sql.delta.sources.DeltaDataSource 类。为了简单起见,本文介绍的是 Delta Lake 批量写的实现,实时流写 Delta Lake 本文不涉及,后面有机会再介绍。 Delta Lake 批量写扩展了 org.apache.spark.sql.sources.CreatableRelationProvider 特质,并实现了其中的方法。我们调用上面的写数据方法首先会调用 DeltaDataSource 类的 createRelation 方法,它的具体实现如下:

override def createRelation(
    sqlContext: SQLContext,
    mode: SaveMode,
    parameters: Map[String, String],
    data: DataFrame): BaseRelation = {
 
  // 写数据的路径
  val path = parameters.getOrElse("path", {
    throw DeltaErrors.pathNotSpecifiedException
  })
 
  // 分区字段
  val partitionColumns = parameters.get(DeltaSourceUtils.PARTITIONING_COLUMNS_KEY)
    .map(DeltaDataSource.decodePartitioningColumns)
    .getOrElse(Nil)
 
 
  // 事务日志对象
  val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path)
 
  // 真正的写操作过程
  WriteIntoDelta(
    deltaLog = deltaLog,
    mode = mode,
    new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf),
    partitionColumns = partitionColumns,
    configuration = Map.empty,
    data = data).run(sqlContext.sparkSession)
 
  deltaLog.createRelation()
}

其中 mode 就是保持数据的模式,支持 Append、Overwrite、ErrorIfExists 以及 Ignore 等。parameters 这个传递的参数,比如分区字段、数据保存路径以及 Delta 支持的一些参数(replaceWhere、mergeSchema、overwriteSchema 等,具体参见 org.apache.spark.sql.delta.DeltaOptions);data 就是我们需要保存的数据。

createRelation 方法紧接着就是获取数据保存的路径,分区字段等信息。然后初始化 deltaLog,deltaLog 的初始化会做很多事情,比如会读取磁盘所有的事务日志(_delta_log 目录下),并构建最新事务日志的最新快照,里面可以拿到最新数据的版本。由于 deltaLog 的初始化成本比较高,所以 deltaLog 初始化完之后会缓存到 deltaLogCache 中,这是一个使用 Guava 的 CacheBuilder 类实现的一个缓存,缓存的数据保持一小时,缓存大小可以通过 delta.log.cacheSize 参数进行设置。只要写数据的路径是一样的,就只需要初始化一次 deltaLog,后面直接从缓存中拿即可。除非之前缓存的 deltaLog 被清理了,或者无效才会再次初始化。DeltaLog 类是 Delta Lake 中最重要的类之一,涉及的内容非常多,所以我们会单独使用一篇文章进行介绍。

紧接着初始化 WriteIntoDelta,WriteIntoDelta 扩展自 RunnableCommand,Delta Lake 中的更新、删除、合并都是扩展这个类的。初始化完 WriteIntoDelta 之后,就会调用 run 方法执行真正的写数据操作。WriteIntoDelta 的 run 方法实现如下:

override def run(sparkSession: SparkSession): Seq[Row] = {
    deltaLog.withNewTransaction { txn =>
      val actions = write(txn, sparkSession)
      val operation = DeltaOperations.Write(mode, Option(partitionColumns), options.replaceWhere)
      txn.commit(actions, operation)
    }
    Seq.empty
}

Delta Lake 所有的更新操作都是在事务中进行的,deltaLog.withNewTransaction 就是一个事务,withNewTransaction 的实现如下:

def withNewTransaction[T](thunk: OptimisticTransaction => T): T = {
  try {
    // 更新当前表事务日志的快照
    update()
    // 初始化乐观事务锁对象
    val txn = new OptimisticTransaction(this)
    // 开启事务
    OptimisticTransaction.setActive(txn)
    // 执行写数据操作
    thunk(txn)
  } finally {
    // 关闭事务
    OptimisticTransaction.clearActive()
  }
}

在开启事务之前,需要更新当前表事务的快照,因为在执行写数据之前,这张表可能已经被修改了,执行 update 操作之后,就可以拿到当前表的最新版本,紧接着开启乐观事务锁。thunk(txn) 就是需要执行的事务操作,对应 deltaLog.withNewTransaction 里面的所有代码。

我们回到上面的 run 方法。val actions = write(txn, sparkSession) 就是执行写数据的操作,它的实现如下:

  def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = {
    import sparkSession.implicits._
    // 如果不是第一次往表里面写数据,需要判断写数据的模式是否符合条件
    if (txn.readVersion > -1) {
      // This table already exists, check if the insert is valid.
      if (mode == SaveMode.ErrorIfExists) {
        throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath)
      } else if (mode == SaveMode.Ignore) {
        return Nil
      } else if (mode == SaveMode.Overwrite) {
        deltaLog.assertRemovable()
      }
    }
 
    // 更新表的模式,比如是否覆盖现有的模式,是否和现有的模式进行 merge
    updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation)
 
    // 是否定义分区过滤条件
    val replaceWhere = options.replaceWhere
    val partitionFilters = if (replaceWhere.isDefined) {
      val predicates = parsePartitionPredicates(sparkSession, replaceWhere.get)
      if (mode == SaveMode.Overwrite) {
        verifyPartitionPredicates(
          sparkSession, txn.metadata.partitionColumns, predicates)
      }
      Some(predicates)
    } else {
      None
    }
 
    // 第一次写数据初始化事务日志的目录
    if (txn.readVersion < 0) {
      // Initialize the log path
      deltaLog.fs.mkdirs(deltaLog.logPath)
    }
 
    // 写数据到文件系统中
    val newFiles = txn.writeFiles(data, Some(options))
     
    val deletedFiles = (mode, partitionFilters) match {
       // 全量覆盖,直接拿出缓存在内存中最新事务日志快照里面的所有 AddFile 文件
      case (SaveMode.Overwrite, None) =>
        txn.filterFiles().map(_.remove)
      // 从事务日志快照中获取对应分区里面的所有 AddFile 文件
      case (SaveMode.Overwrite, Some(predicates)) =>
        // Check to make sure the files we wrote out were actually valid.
        val matchingFiles = DeltaLog.filterFileList(
          txn.metadata.partitionColumns, newFiles.toDF(), predicates).as[AddFile].collect()
        val invalidFiles = newFiles.toSet -- matchingFiles
        if (invalidFiles.nonEmpty) {
          val badPartitions = invalidFiles
            .map(_.partitionValues)
            .map { _.map { case (k, v) => s"$k=$v" }.mkString("/") }
            .mkString(", ")
          throw DeltaErrors.replaceWhereMismatchException(replaceWhere.get, badPartitions)
        }
 
        txn.filterFiles(predicates).map(_.remove)
      case _ => Nil
    }
 
    newFiles ++ deletedFiles
  }
}

如果 txn.readVersion == -1,说明是第一次写数据到 Delta Lake 表,所以当这个值大于 -1 的时候,需要判断一下写数据的操作是否合法。
由于 Delta Lake 底层使用的是 Parquet 格式,所以 Delta Lake 表也支持模式的增加合并等,这就是 updateMetadata 函数对应的操作。
因为 Delta Lake 表支持分区,所以我们可能在写数据的时候指定某个分区进行覆盖。
真正写数据的操作是 txn.writeFiles 函数执行的,具体实现如下:

def writeFiles(
      data: Dataset[_],
      writeOptions: Option[DeltaOptions],
      isOptimize: Boolean): Seq[AddFile] = {
    hasWritten = true
 
    val spark = data.sparkSession
    val partitionSchema = metadata.partitionSchema
    val outputPath = deltaLog.dataPath
 
    val (queryExecution, output) = normalizeData(data, metadata.partitionColumns)
    val partitioningColumns =
      getPartitioningColumns(partitionSchema, output, output.length < data.schema.size)
 
    // 获取 DelayedCommitProtocol,里面可以设置写文件的名字,
    // commitTask 和 commitJob 等做一些事情
    val committer = getCommitter(outputPath)
 
    val invariants = Invariants.getFromSchema(metadata.schema, spark)
 
    SQLExecution.withNewExecutionId(spark, queryExecution) {
      val outputSpec = FileFormatWriter.OutputSpec(
        outputPath.toString,
        Map.empty,
        output)
 
      val physicalPlan = DeltaInvariantCheckerExec(queryExecution.executedPlan, invariants)
 
      FileFormatWriter.write(
        sparkSession = spark,
        plan = physicalPlan,
        fileFormat = snapshot.fileFormat,
        committer = committer,
        outputSpec = outputSpec,
        hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration),
        partitionColumns = partitioningColumns,
        bucketSpec = None,
        statsTrackers = Nil,
        options = Map.empty)
    }
 
    // 返回新增的文件
    committer.addedStatuses
}

Delta Lake 写操作最终调用 Spark 的 FileFormatWriter.write 方法进行的,通过这个方法的复用将我们真正的数据写入到 Delta Lake 表里面去了。
在 Delta Lake 中,如果是新增文件则会在事务日志中使用 AddFile 类记录相关的信息,AddFile 持久化到事务日志里面的内容如下:

{"add":{"path":"dt=20190801/part-00001-bdff67f3-c70f-4817-898d-15a73c93271a.c000.snappy.parquet","partitionValues":{"dt":"20190801"},"size":429,"modificationTime":1566990855000,"dataChange":true}}

可以看出 AddFile 里面记录了新增文件的保存路径,分区信息,新增的文件大小,修改时间等信息。如果是删除文件,也会在事务日志里面记录这个删除操作,对应的就是使用 RemoveFile 类存储,RemoveFile 持久化到事务日志里面的内容如下:

{"remove":{"path":"dt=20190801/part-00001-7f3fe89d-e55b-4848-93ea-4133b5d406d6.c000.snappy.parquet","deletionTimestamp":1566990856332,"dataChange":true}}

RemoveFile 里面保存了删除文件的路径,删除时间等信息。如果新增一个文件,再删除一个文件,那么最新的事务日志快照里面只会保存删除这个文件的记录。从这里面也可以看出, Delta Lake 删除、新增 ACID 是针对文件级别的。

上面的写操作肯定会产生新的文件,所以写操作之后就需要拿到新增的文件(val newFiles = txn.writeFiles(data, Some(options)) )newFiles(AddFile) 和需要删除的文件(RemoveFile)。针对那些文件需要删除需要做一些判断,主要分两种情况(具体参见 write 方法里面的):

  • 如果是全表覆盖,则直接从缓存在内存中最新的事务日志快照中拿出所有 AddFile 文件,然后将其标记为 RemoveFile;
  • 如果是分区内的覆盖,则从缓存在内存中最新的事务日志快照中拿出对应分区下的 AddFile 文件,然后将其标记为 RemoveFile。

最后 write 方法返回新增的文件和需要删除的文件(newFiles ++ deletedFiles),这些文件最终需要记录到事务日志里面去。关于事务日志是如何写进去的请参见这篇文章的详细分析。

写在最后

为了营造一个开放的Cassandra技术交流环境,社区建立了微信公众号和钉钉群。为广大用户提供专业的技术分享及问答,定期开展专家技术直播,欢迎大家加入。另云Cassandra免费火爆公测中,欢迎试用:https://www.aliyun.com/product/cds

xxx

目录
相关文章
存储 数据管理 物联网
412 0
|
11月前
|
算法 PyTorch 算法框架/工具
昇腾 msmodelslim w8a8量化代码解析
msmodelslim w8a8量化算法原理和代码解析
911 5
|
11月前
|
传感器 人工智能 监控
反向寻车系统怎么做?基本原理与系统组成解析
本文通过反向寻车系统的核心组成部分与技术分析,阐述反向寻车系统的工作原理,适用于适用于商场停车场、医院停车场及火车站停车场等。如需获取智慧停车场反向寻车技术方案前往文章最下方获取,如有项目合作及技术交流欢迎私信作者。
878 2
|
11月前
|
负载均衡 JavaScript 前端开发
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
11月前
|
机器学习/深度学习 数据可视化 PyTorch
深入解析图神经网络注意力机制:数学原理与可视化实现
本文深入解析了图神经网络(GNNs)中自注意力机制的内部运作原理,通过可视化和数学推导揭示其工作机制。文章采用“位置-转移图”概念框架,并使用NumPy实现代码示例,逐步拆解自注意力层的计算过程。文中详细展示了从节点特征矩阵、邻接矩阵到生成注意力权重的具体步骤,并通过四个类(GAL1至GAL4)模拟了整个计算流程。最终,结合实际PyTorch Geometric库中的代码,对比分析了核心逻辑,为理解GNN自注意力机制提供了清晰的学习路径。
736 7
深入解析图神经网络注意力机制:数学原理与可视化实现
|
11月前
|
机器学习/深度学习 缓存 自然语言处理
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
Tiktokenizer 是一款现代分词工具,旨在高效、智能地将文本转换为机器可处理的离散单元(token)。它不仅超越了传统的空格分割和正则表达式匹配方法,还结合了上下文感知能力,适应复杂语言结构。Tiktokenizer 的核心特性包括自适应 token 分割、高效编码能力和出色的可扩展性,使其适用于从聊天机器人到大规模文本分析等多种应用场景。通过模块化设计,Tiktokenizer 确保了代码的可重用性和维护性,并在分词精度、处理效率和灵活性方面表现出色。此外,它支持多语言处理、表情符号识别和领域特定文本处理,能够应对各种复杂的文本输入需求。
1367 6
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
|
11月前
|
传感器 监控 Java
Java代码结构解析:类、方法、主函数(1分钟解剖室)
### Java代码结构简介 掌握Java代码结构如同拥有程序世界的建筑蓝图,类、方法和主函数构成“黄金三角”。类是独立的容器,承载成员变量和方法;方法实现特定功能,参数控制输入环境;主函数是程序入口。常见错误包括类名与文件名不匹配、忘记static修饰符和花括号未闭合。通过实战案例学习电商系统、游戏角色控制和物联网设备监控,理解类的作用、方法类型和主函数任务,避免典型错误,逐步提升编程能力。 **脑图速记法**:类如太空站,方法即舱段;main是发射台,static不能换;文件名对仗,括号要成双;参数是坐标,void不返航。
438 5
|
12月前
|
安全 算法 网络协议
解析:HTTPS通过SSL/TLS证书加密的原理与逻辑
HTTPS通过SSL/TLS证书加密,结合对称与非对称加密及数字证书验证实现安全通信。首先,服务器发送含公钥的数字证书,客户端验证其合法性后生成随机数并用公钥加密发送给服务器,双方据此生成相同的对称密钥。后续通信使用对称加密确保高效性和安全性。同时,数字证书验证服务器身份,防止中间人攻击;哈希算法和数字签名确保数据完整性,防止篡改。整个流程保障了身份认证、数据加密和完整性保护。
|
12月前
|
开发框架 监控 JavaScript
解锁鸿蒙装饰器:应用、原理与优势全解析
ArkTS提供了多维度的状态管理机制。在UI开发框架中,与UI相关联的数据可以在组件内使用,也可以在不同组件层级间传递,比如父子组件之间、爷孙组件之间,还可以在应用全局范围内传递或跨设备传递。
386 2
|
12月前
|
Java 数据库 开发者
详细介绍SpringBoot启动流程及配置类解析原理
通过对 Spring Boot 启动流程及配置类解析原理的深入分析,我们可以看到 Spring Boot 在启动时的灵活性和可扩展性。理解这些机制不仅有助于开发者更好地使用 Spring Boot 进行应用开发,还能够在面对问题时,迅速定位和解决问题。希望本文能为您在 Spring Boot 开发过程中提供有效的指导和帮助。
1493 12

推荐镜像

更多