Apache Spark Delta Lake 删除使用及实现原理代码解析

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Apache Spark Delta Lake 删除使用及实现原理代码解析Delta Lake 的 Delete 功能是由 0.3.0 版本引入的。在介绍 Apache Spark Delta Lake 实现逻辑之前,我们先来看看如何使用 delete 这个功能。

Apache Spark Delta Lake 删除使用及实现原理代码解析

Delta Lake 的 Delete 功能是由 0.3.0 版本引入的。在介绍 Apache Spark Delta Lake 实现逻辑之前,我们先来看看如何使用 delete 这个功能。

Delta Lake 删除使用

Delta Lake 的官方文档为我们提供如何使用 Delete 的几个例子,参见这里,如下:

import io.delta.tables._
 
val iteblogDeltaTable = DeltaTable.forPath(spark, path)
 
// 删除 id 小于 4 的数据
iteblogDeltaTable.delete("id <= '4'")
 
import org.apache.spark.sql.functions._
import spark.implicits._
 
iteblogDeltaTable.delete($"id" <= "4")
 
// 删除所有的数据
iteblogDeltaTable.delete()

执行上面的 Delete 命令,如果确实删除了相应的数据,Delta Lake 会生成一个事务日志,内容类似下面的:

{"commitInfo":{"timestamp":1566978478414,"operation":"DELETE","operationParameters":{"predicate":"[\"(`id` <= CAST('4' AS BIGINT))\"]"},"readVersion":10,"isBlindAppend":false}}
{"remove":{"path":"dt=20190801/part-00000-ca73a0f4-fbeb-4ea8-9b9f-fa466a85724e.c000.snappy.parquet","deletionTimestamp":1566978478405,"dataChange":true}}
{"remove":{"path":"dt=20190803/part-00000-8e11f4cc-a7ac-47a1-8ce6-b9d87eaf6c51.c000.snappy.parquet","deletionTimestamp":1566978478405,"dataChange":true}}
{"add":{"path":"dt=20190801/part-00001-6ff11be3-22db-4ed2-bde3-a97d610fe11d.c000.snappy.parquet","partitionValues":{"dt":"20190801"},"size":429,"modificationTime":1566978478000,"dataChange":true}}

事务日志里面详细介绍了 Delete 执行的时间、删除的条件、需要删除的文件以及添加的文件等。
注意:执行 Delete 的时候,真实的数据其实并没有删除,只是在事务日志里面记录了,真正删除数据需要通过执行 vacuum 命令。

Delta Lake 删除是如何实现的

前面小结我们简单体验了一下 Delete 的使用,本小结将深入代码详细介绍 Delta Lake 的 Delete 是如何实现的。delete 的 API 是通过在 io.delta.tables.DeltaTable 类添加相应方法实现的,其中涉及删除的方法主要包括下面三个:

def delete(condition: String): Unit = {
  delete(functions.expr(condition))
}
 
def delete(condition: Column): Unit = {
  executeDelete(Some(condition.expr))
}
 
def delete(): Unit = {
  executeDelete(None)
}

这个就是我们在上面例子看到的 delete 支持的三种用法。这三个函数最终都是调用 io.delta.tables.execution.DeltaTableOperations#executeDelete 函数的,executeDelete 的实现如下:

protected def executeDelete(condition: Option[Expression]): Unit = {
  val delete = Delete(self.toDF.queryExecution.analyzed, condition)  
 
  // current DELETE does not support subquery,
  // and the reason why perform checking here is that
  // we want to have more meaningful exception messages,
  // instead of having some random msg generated by executePlan().
  subqueryNotSupportedCheck(condition, "DELETE")
 
  val qe = sparkSession.sessionState.executePlan(delete)
  val resolvedDelete = qe.analyzed.asInstanceOf[Delete]
  val deleteCommand = DeleteCommand(resolvedDelete)
  deleteCommand.run(sparkSession)
}

self.toDF.queryExecution.analyzed 这个就是我们输入 Delta Lake 表的 Analyzed Logical Plan,condition 就是我们执行删除操作的条件表达式(也就是上面例子的 id < = '4')。这个方法的核心就是初始化 DeleteCommand,然后调用 DeleteCommand 的 run 方法执行删除操作。DeleteCommand 类扩展自 Spark 的 RunnableCommand 特质,并实现其中的 run 方法,我们在 Spark 里面看到的 CREATE TABLE、ALTER TABLE、SHOE CREATE TABLE 等命令都是继承这个类的,所以 Delta Lake 的 delete、update 以及 Merge 也都是继承这个类。DeleteCommand 的 run 方法实现如下:

final override def run(sparkSession: SparkSession): Seq[Row] = {
  recordDeltaOperation(tahoeFileIndex.deltaLog, "delta.dml.delete") {
    // 获取事务日志持有对象
    val deltaLog = tahoeFileIndex.deltaLog
    // 检查 Delta Lake 表是否支持删除操作
    deltaLog.assertRemovable()
    // 开启新事务,执行删除操作。
    deltaLog.withNewTransaction { txn =>
      performDelete(sparkSession, deltaLog, txn)
    }
    // Re-cache all cached plans(including this relation itself, if it's cached) that refer to
    // this data source relation.
    sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, target)
  }
 
  Seq.empty[Row]
}

Delta Lake 表允许用户设置成 appendOnly(通过 spark.databricks.delta.properties.defaults.appendOnly 参数设置),也就是只允许追加操作,所以如果我们执行删除之前需要做一些校验。校验通过之后开始执行删除操作,由于删除操作是需要保证原子性的,所以这个操作需要在事务里面进行,withNewTransaction 的实现如下:

def withNewTransaction[T](thunk: OptimisticTransaction => T): T = {
  try {
    // 更新当前表事务日志的快照
    update()
    // 初始化乐观事务锁对象
    val txn = new OptimisticTransaction(this)
    // 开启事务
    OptimisticTransaction.setActive(txn)
    // 执行写数据操作
    thunk(txn)
  } finally {
    // 关闭事务
    OptimisticTransaction.clearActive()
  }
}

在开启事务之前,需要更新当前表事务日志的快照,因为在执行删除操作表之前,这张表可能已经被修改了,执行 update 操作之后,就可以拿到当前表的最新版本,紧接着开启乐观事务锁。thunk(txn) 这个就是执行我们上面的 performDelete(sparkSession, deltaLog, txn) 方法。Delta Lake 删除的整个核心就在 performDelete 方法里面了。

如果某个文件里面有数据需要删除,那么这个文件会被标记为删除,然后这个文件里面不需要删除的数据需要重新写到一个新文件里面。那么在 performDelete 方法里面我们就需要知道哪些数据需要删除,这些数据对应的文件在哪里以及是否需要些事务日志。Delta Lake 将删除实现分为三大情况:

  • 1、如果执行 delete 的时候并没有传递相关的删除条件,也就是上面例子的 iteblogDeltaTable.delete(),这时候其实就是删除当前 Delta Lake 表的所有数据。那这种情况最好处理了,只需要直接删除 Delta Lake 表对应的所有文件即可;
  • 2、如果执行 delete 的时候传递了相关删除条件,而这个删除条件只是分区字段,比如 dt 是 Delta Lake 表的分区字段,然后我们执行了 iteblogDeltaTable.delete("dt = '20190828'") 这样相关的删除操作,那么我们可以直接从缓存在内存中的快照(snapshot, 也就是通过上面的 update() 函数初始化的)拿到需要删除哪些文件,直接删除即可,而且不需要执行数据重写操作。
  • 3、最后一种情况就是用户删除的时候含有一些非分区字段的过滤条件,这时候我们就需要扫描底层数据,获取需要删除的数据在哪个文件里面,这又分两种情况:

    • 3.1、Delta Lake 表并不存在我们需要删除的数据,这时候不需要做任何操作,直接返回,就连事务日志都不用记录;
    • 3.2、这种情况是最复杂的,我们需要计算需要删除的数据在哪个文件里面,然后把对应的文件里面不需要删除的数据重写到新的文件里面(如果没有,就不生成新文件),最后记录事务日志。

为了加深印象,我画了一张图希望大家能够理解上面的过程。
屏幕快照 2019-08-29 下午3.41.58.png
上图中每个绿色的框代表一个分区目录下的文件,红色代表标记为删除的文件,也就是事务日志中使用 remove 标记的文件,紫色代表移除需要删除的数据之后新生成的文件,也就是事务日志里面使用 add 标记的文件。

下面我们来详细分析删除的操作。

private def performDelete(
   sparkSession: SparkSession, deltaLog: DeltaLog, txn: OptimisticTransaction) = {
 import sparkSession.implicits._
 
 var numTouchedFiles: Long = 0
 var numRewrittenFiles: Long = 0
 var scanTimeMs: Long = 0
 var rewriteTimeMs: Long = 0
 
 val startTime = System.nanoTime()
 val numFilesTotal = deltaLog.snapshot.numOfFiles
 
 val deleteActions: Seq[Action] = condition match {
   // 对应上面情况1,delete 操作没有传递任何条件
   case None =>
     // 直接将内存中快照里面所有的 AddFile 文件拿出来
     val allFiles = txn.filterFiles(Nil)
 
     numTouchedFiles = allFiles.size
     scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
 
     val operationTimestamp = System.currentTimeMillis()
     // 将 AddFile 标记成 RemoveFile
     allFiles.map(_.removeWithTimestamp(operationTimestamp))
   // 传递了删除过滤条件,对应上面情况2、3
   case Some(cond) =>
     // metadataPredicates 为分区删除条件
     // otherPredicates 为其他删除条件
     val (metadataPredicates, otherPredicates) =
       DeltaTableUtils.splitMetadataAndDataPredicates(
         cond, txn.metadata.partitionColumns, sparkSession)
 
     // 如果只有分区删除条件,也就是 dt= "20190828" 这样的过滤条件,对应上面情况2
     if (otherPredicates.isEmpty) {
       val operationTimestamp = System.currentTimeMillis()
       // 从快照中拿出符合这个分区条件的 AddFile 文件
       val candidateFiles = txn.filterFiles(metadataPredicates)
 
       scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
       numTouchedFiles = candidateFiles.size
 
       // 将 AddFile 标记成 RemoveFile
       candidateFiles.map(_.removeWithTimestamp(operationTimestamp))
     } else { // 对应上面情况3,含有其他字段的删除条件,这时候我们需要扫描底层数据获取这些删除数据所在的文件
       // 找到删除的数据潜在的 AddFile 文件列表
       val candidateFiles = txn.filterFiles(metadataPredicates ++ otherPredicates)
 
       numTouchedFiles = candidateFiles.size
       val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles)
 
       val fileIndex = new TahoeBatchFileIndex(
         sparkSession, "delete", candidateFiles, deltaLog, tahoeFileIndex.path, txn.snapshot)
       // Keep everything from the resolved target except a new TahoeFileIndex
       // that only involves the affected files instead of all files.
       val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex)
 
       // 这个就是潜在需要删除的文件对应的 Dataset
       val data = Dataset.ofRows(sparkSession, newTarget)
       val filesToRewrite =
         withStatusCode("DELTA", s"Finding files to rewrite for DELETE operation") {
           // 没有需要潜在删除的 AddFile 文件
           if (numTouchedFiles == 0) {
             Array.empty[String]
           } else {
             // 找到删除数据所在的文件
             data.filter(new Column(cond)).select(new Column(InputFileName())).distinct()
               .as[String].collect()
           }
         }
 
       scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
       // 对应上面情况3.1,如果没有找到需要删除的数据所在文件,那么删除的文件就是 Nil,不需要做事务日志
       if (filesToRewrite.isEmpty) {
         Nil
       } else {  
         // 对应上面情况3.2,找到我们需要删除的文件列表,
         // 那我们需要将需要删除文件里面不用删除的数据重新写到新文件
         // Do the second pass and just read the affected files
         val baseRelation = buildBaseRelation(
           sparkSession, txn, "delete", tahoeFileIndex.path, filesToRewrite, nameToAddFileMap)
         // Keep everything from the resolved target except a new TahoeFileIndex
         // that only involves the affected files instead of all files.
         val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location)
 
         val targetDF = Dataset.ofRows(sparkSession, newTarget)
         // 将删除过滤条件取反,也就是 id > 10 变成 id <= 10
         val filterCond = Not(EqualNullSafe(cond, Literal(true, BooleanType)))
         // 拿到潜在删除文件中不需要删除的数据
         val updatedDF = targetDF.filter(new Column(filterCond))
 
         // rewrittenFiles 就是新增的文件
         val rewrittenFiles = withStatusCode(
           "DELTA", s"Rewriting ${filesToRewrite.size} files for DELETE operation") {
           // 开始将潜在需要删除文件里面不需要删除的数据写入到新文件
           txn.writeFiles(updatedDF)
         }
 
         numRewrittenFiles = rewrittenFiles.size
         rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs
 
         val operationTimestamp = System.currentTimeMillis()
         // 需要删除的文件和新增的文件集合
         removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) 
          ++ rewrittenFiles
       }
     }
 }
 
 // 如果匹配到需要删除的文件,那么需要记录事务日志
 if (deleteActions.nonEmpty) {
   // 写事务日志,也就是写到 _delta_log 目录下,这个我们在前面分析了。
   txn.commit(deleteActions, DeltaOperations.Delete(condition.map(_.sql).toSeq))
 }
 
 recordDeltaEvent(
   deltaLog,
   "delta.dml.delete.stats",
   data = DeleteMetric(
     condition = condition.map(_.sql).getOrElse("true"),
     numFilesTotal,
     numTouchedFiles,
     numRewrittenFiles,
     scanTimeMs,
     rewriteTimeMs)
 )

上面注释已经很清楚说明了 Delta Lake 的删除过程了。从上面的执行过程也可以看出,Delta Lake 删除操作的代价还是挺高的,所以官方也建议删除数据的时候提供分区过滤条件,这样可以避免扫描全表的数据。

写在最后

为了营造一个开放的Cassandra技术交流环境,社区建立了微信公众号和钉钉群。为广大用户提供专业的技术分享及问答,定期开展专家技术直播,欢迎大家加入。另云Cassandra免费火爆公测中,欢迎试用:https://www.aliyun.com/product/cds

xxx

目录
相关文章
|
1月前
|
Java Apache C++
别再手写RPC了,Apache Thrift帮你自动生成RPC客户端及服务端代码
Thrift 是一个轻量级、跨语言的远程服务调用框架,由 Facebook 开发并贡献给 Apache。它通过 IDL 生成多种语言的 RPC 服务端和客户端代码,支持 C++、Java、Python 等。Thrift 的主要特点包括开发速度快、接口维护简单、学习成本低和多语言支持。广泛应用于 Cassandra、Hadoop 等开源项目及 Facebook、百度等公司。
别再手写RPC了,Apache Thrift帮你自动生成RPC客户端及服务端代码
|
1月前
|
存储 安全 Java
系统安全架构的深度解析与实践:Java代码实现
【11月更文挑战第1天】系统安全架构是保护信息系统免受各种威胁和攻击的关键。作为系统架构师,设计一套完善的系统安全架构不仅需要对各种安全威胁有深入理解,还需要熟练掌握各种安全技术和工具。
108 10
|
1月前
|
前端开发 JavaScript 开发者
揭秘前端高手的秘密武器:深度解析递归组件与动态组件的奥妙,让你代码效率翻倍!
【10月更文挑战第23天】在Web开发中,组件化已成为主流。本文深入探讨了递归组件与动态组件的概念、应用及实现方式。递归组件通过在组件内部调用自身,适用于处理层级结构数据,如菜单和树形控件。动态组件则根据数据变化动态切换组件显示,适用于不同业务逻辑下的组件展示。通过示例,展示了这两种组件的实现方法及其在实际开发中的应用价值。
36 1
|
2月前
|
机器学习/深度学习 人工智能 算法
揭开深度学习与传统机器学习的神秘面纱:从理论差异到实战代码详解两者间的选择与应用策略全面解析
【10月更文挑战第10天】本文探讨了深度学习与传统机器学习的区别,通过图像识别和语音处理等领域的应用案例,展示了深度学习在自动特征学习和处理大规模数据方面的优势。文中还提供了一个Python代码示例,使用TensorFlow构建多层感知器(MLP)并与Scikit-learn中的逻辑回归模型进行对比,进一步说明了两者的不同特点。
80 2
|
2月前
|
存储 搜索推荐 数据库
运用LangChain赋能企业规章制度制定:深入解析Retrieval-Augmented Generation(RAG)技术如何革新内部管理文件起草流程,实现高效合规与个性化定制的完美结合——实战指南与代码示例全面呈现
【10月更文挑战第3天】构建公司规章制度时,需融合业务实际与管理理论,制定合规且促发展的规则体系。尤其在数字化转型背景下,利用LangChain框架中的RAG技术,可提升规章制定效率与质量。通过Chroma向量数据库存储规章制度文本,并使用OpenAI Embeddings处理文本向量化,将现有文档转换后插入数据库。基于此,构建RAG生成器,根据输入问题检索信息并生成规章制度草案,加快更新速度并确保内容准确,灵活应对法律与业务变化,提高管理效率。此方法结合了先进的人工智能技术,展现了未来规章制度制定的新方向。
39 3
|
2月前
|
SQL 监控 关系型数据库
SQL错误代码1303解析与处理方法
在SQL编程和数据库管理中,遇到错误代码是常有的事,其中错误代码1303在不同数据库系统中可能代表不同的含义
|
2月前
|
SQL 安全 关系型数据库
SQL错误代码1303解析与解决方案:深入理解并应对权限问题
在数据库管理和开发过程中,遇到错误代码是常见的事情,每个错误代码都代表着一种特定的问题
|
3月前
|
敏捷开发 安全 测试技术
软件测试的艺术:从代码到用户体验的全方位解析
本文将深入探讨软件测试的重要性和实施策略,通过分析不同类型的测试方法和工具,展示如何有效地提升软件质量和用户满意度。我们将从单元测试、集成测试到性能测试等多个角度出发,详细解释每种测试方法的实施步骤和最佳实践。此外,文章还将讨论如何通过持续集成和自动化测试来优化测试流程,以及如何建立有效的测试团队来应对快速变化的市场需求。通过实际案例的分析,本文旨在为读者提供一套系统而实用的软件测试策略,帮助读者在软件开发过程中做出更明智的决策。
|
2月前
|
Java Apache
Apache POI java对excel表格进行操作(读、写) 有代码!!!
文章提供了使用Apache POI库在Java中创建和读取Excel文件的详细代码示例,包括写入数据到Excel和从Excel读取数据的方法。
48 0
|
3月前
|
SQL 人工智能 机器人
遇到的代码部份解析
/ 模拟后端返回的数据
19 0

推荐镜像

更多