Delta Lake中CDC的实现

简介: Delta Lake中CDC的实现

背景


本文基于delta 2.0.0


Delta是通过CDF(change data feed)来实现CDC(change data capture)。


CDF是能让表能够输出数据表变化的能力,CDC是能够捕获和识别数据的变化,并能够将变化的数据交给下游做进一步的处理。


我们来分析一下是怎么做到数据行级别的CDF的


分析


在设置delta.enableChangeDataFeed= true的前提下(在Enable change data fedd有提及),我们分析一下逻辑计划DeltaDelete对应的RunnableCommand DeleteCommand的Run方法:

 final override def run(sparkSession: SparkSession): Seq[Row] = {
    recordDeltaOperation(deltaLog, "delta.dml.delete") {
      deltaLog.assertRemovable()
      deltaLog.withNewTransaction { txn =>
        val deleteActions = performDelete(sparkSession, deltaLog, txn)
        if (deleteActions.nonEmpty) {
          txn.commit(deleteActions, DeltaOperations.Delete(condition.map(_.sql).toSeq))
        }
      }
      // Re-cache all cached plans(including this relation itself, if it's cached) that refer to
      // this data source relation.
      sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, target)
    }
    Seq.empty[Row]
  }

最重要的方法是performDelete,这里的performDelete方法会根据condition来做不同的操作:


  • 如果没有条件,那就是全部删除:
    case None =>
      // Case 1: Delete the whole table if the condition is true
      val allFiles = txn.filterFiles(Nil)
      numRemovedFiles = allFiles.size
      scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
      val (numBytes, numPartitions) = totalBytesAndDistinctPartitionValues(allFiles)
      numBytesRemoved = numBytes
      numFilesBeforeSkipping = numRemovedFiles
      numBytesBeforeSkipping = numBytes
      numFilesAfterSkipping = numRemovedFiles
      numBytesAfterSkipping = numBytes
      if (txn.metadata.partitionColumns.nonEmpty) {
        numPartitionsAfterSkipping = Some(numPartitions)
        numPartitionsRemovedFrom = Some(numPartitions)
        numPartitionsAddedTo = Some(0)
      }
      val operationTimestamp = System.currentTimeMillis()
      allFiles.map(_.removeWithTimestamp(operationTimestamp))

这里列举该表的所有文件,并把所有的文件标识为RemoveFile,并带上时间戳。


  • 如果是有筛选条件,则会根据筛选条件进行行级别的记录变更(这里还会根据情况具体分析):

  1. 1.能够从delta的元数据能够囊括过滤的条件,则只是在元数据层面进行修改:
   case Some(cond) =>
     val (metadataPredicates, otherPredicates) =
       DeltaTableUtils.splitMetadataAndDataPredicates(
         cond, txn.metadata.partitionColumns, sparkSession)
     numFilesBeforeSkipping = txn.snapshot.numOfFiles
     numBytesBeforeSkipping = txn.snapshot.sizeInBytes
     if (otherPredicates.isEmpty) {
       // Case 2: The condition can be evaluated using metadata only.
       //         Delete a set of files without the need of scanning any data files.
       val operationTimestamp = System.currentTimeMillis()
       val candidateFiles = txn.filterFiles(metadataPredicates)
       scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
       numRemovedFiles = candidateFiles.size
       numBytesRemoved = candidateFiles.map(_.size).sum
       numFilesAfterSkipping = candidateFiles.size
       val (numCandidateBytes, numCandidatePartitions) =
         totalBytesAndDistinctPartitionValues(candidateFiles)
       numBytesAfterSkipping = numCandidateBytes
       if (txn.metadata.partitionColumns.nonEmpty) {
         numPartitionsAfterSkipping = Some(numCandidatePartitions)
         numPartitionsRemovedFrom = Some(numCandidatePartitions)
         numPartitionsAddedTo = Some(0)
       }
       candidateFiles.map(_.removeWithTimestamp(operationTimestamp))
     }

DeltaTableUtils.splitMetadataAndDataPredicates是判断过滤条件是否是在元数据层能够囊括,如果可以的话,就通过valcandidateFiles = txn.filterFiles(metadataPredicates)来过滤出所有的文件,然后通过candidateFiles.map(_.removeWithTimestamp(operationTimestamp))在元数据层面进行删除.


2. 如果不仅仅是从元数据层面进行过滤,则会过滤出所有满足条件的数据:

      val candidateFiles = txn.filterFiles(metadataPredicates ++ otherPredicates)
      numFilesAfterSkipping = candidateFiles.size
      val (numCandidateBytes, numCandidatePartitions) =
        totalBytesAndDistinctPartitionValues(candidateFiles)
      numBytesAfterSkipping = numCandidateBytes
      if (txn.metadata.partitionColumns.nonEmpty) {
        numPartitionsAfterSkipping = Some(numCandidatePartitions)
      }
      val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles)
      val fileIndex = new TahoeBatchFileIndex(
        sparkSession, "delete", candidateFiles, deltaLog, deltaLog.dataPath, txn.snapshot)
      // Keep everything from the resolved target except a new TahoeFileIndex
      // that only involves the affected files instead of all files.
      val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex)
      val data = Dataset.ofRows(sparkSession, newTarget)
      val deletedRowCount = metrics("numDeletedRows")
      val deletedRowUdf = udf { () =>
        deletedRowCount += 1
        true
      }.asNondeterministic()
      val filesToRewrite =
        withStatusCode("DELTA", FINDING_TOUCHED_FILES_MSG) {
          if (candidateFiles.isEmpty) {
            Array.empty[String]
          } else {
            data
              .filter(new Column(cond))
              .filter(deletedRowUdf())
              .select(new Column(InputFileName())).distinct()
              .as[String].collect()
          }
        }
      numRemovedFiles = filesToRewrite.length
      scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000

val candidateFiles = txn.filterFiles(metadataPredicates ++ otherPredicates) 这一步是过滤出满足条件的文件,大概思路是根据列的统计信息来过滤出不存在的文件,从而山选出可能存在的文件,


val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles) 得到具体的文件路径


val data = Dataset.ofRows(sparkSession, newTarget)这一步是替换了fileIndex之后的dataset


val filesToRewrite = ... 是得到需要重写的文件

 val baseRelation = buildBaseRelation(
 sparkSession, txn, "delete", deltaLog.dataPath, filesToRewrite, nameToAddFileMap)
  // Keep everything from the resolved target except a new TahoeFileIndex
  // that only involves the affected files instead of all files.
 val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location)
 val targetDF = Dataset.ofRows(sparkSession, newTarget)
 val filterCond = Not(EqualNullSafe(cond, Literal.TrueLiteral))
 val rewrittenActions = rewriteFiles(txn, targetDF, filterCond, filesToRewrite.length)
 val (changeFiles, rewrittenFiles) = rewrittenActions
     .partition(_.isInstanceOf[AddCDCFile])
     ...
 val operationTimestamp = System.currentTimeMillis()
 removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++
 rewrittenActions   

val rewrittenActions = rewriteFiles(txn, targetDF, filterCond, filesToRewrite.length) 会根据每一行是否满足条件来增加一个名为“_change_type”值为“delete”的列:

    baseData
   .filter(numTouchedRowsUdf())
   .withColumn(
     CDC_TYPE_COLUMN_NAME,
     new Column(
       If(filterCondition, typedLit[String](CDC_TYPE_NOT_CDC).expr,
       lit(CDC_TYPE_DELETE).expr)
     )
  ...
  txn.writeFiles(dfToWrite)

1.这里的 txn.writeFiles(dfToWrite) 会进行根据是否存在“_change_type” 列来插入名为“__is_cdc”, 值为“true或false”的列,同时也会增加分区列“__is_cdc”


注意在txn.writeFiles(dfToWrite)如下代码块中:

 val committer = getCommitter(outputPath)

会得到DelayedCommitProtocol对象,这个对象里的newTaskTempFile方法,会对CDC的数据做额外处理:

  } else if (subDir.startsWith(cdcPartitionTrue)) {
    val cleanedSubDir = cdcPartitionTrueRegex.replaceFirstIn(subDir, CDC_LOCATION)
    new Path(cleanedSubDir, filename)
  1. 其中CDC_LOCATION的值为“_change_data”,这样所有有变化的数据就存在了“_change_data”目录下了.

  2. 2.在writeFiles方法最后会返回两种元数据文件,如下:
  val resultFiles = committer.addedStatuses.map { a =>
  a.copy(stats = optionalStatsTracker.map(
   _.recordedStats(new Path(new URI(a.path)).getName)).getOrElse(a.stats))
   }
  resultFiles.toSeq ++ committer.changeFiles
  • addedStatuses返回的是AddFile文件,这表明是没有改动的文件,

  • changeFiles返回的是AddCDCFile文件,这表明的是满足条件的需要被delete的文件
    其实这两种文件的区分最终还是在commitTask方法中,

commitTask中的buildActionFromAddedFile方法会根据__is_cdc=true来区分AddCDCFileAddFile文件, 如果存在,则是AddCDCFile,否则是AddFile,代码如下:

val partitioning = f._1.filter { case (k, v) => k != CDC_PARTITION_COL }
f._1.get(CDC_PARTITION_COL) match {
 case Some("true") =>
   val partitioning = f._1.filter { case (k, v) => k != CDC_PARTITION_COL }
   AddCDCFile(f._2, partitioning, stat.getLen)
 case _ =>
   val addFile = AddFile(f._2, partitioning, stat.getLen, stat.getModificationTime, true)
   addFile

最终会被commitTask方法调用,最终传递到Driver端,这样rewrite就会返回两种元数据文件。请注意这里并还没有进行元数据的操作,真正的元数据的操作在 txn.commit


val (changeFiles, rewrittenFiles) = rewrittenActions.partition(_.isInstanceOf[AddCDCFile])根据两种元数据文件的不同进行metrics级别的记录


removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++ rewrittenActions这一步主要是在元数据层面删除根据条件过滤出来的数据,因为该数据已经根据用户的条件已经处理完了。


结论


根据以上的分析,可以知道目前的CDF只是在Delte层级做了反馈,如果说想要在Flink层达到CDC的效果,还得有个中间层,把delta里的CDF的数据给读取出来,转换Flink 内部形式的ChangelogMode CDC格式(比如说INSERT("+I", (byte) 0),DELETE("-D", (byte) 3))


相关文章
|
2月前
|
存储 SQL Apache
Apache Hudi与Delta Lake对比
Apache Hudi与Delta Lake对比
37 0
|
存储 SQL JSON
Delta Lake、Hudi与Iceberg详解
Delta Lake、Hudi与Iceberg详解
840 0
Delta Lake、Hudi与Iceberg详解
|
2月前
|
分布式计算 测试技术 Apache
Apache Hudi vs Delta Lake:透明TPC-DS Lakehouse性能基准
Apache Hudi vs Delta Lake:透明TPC-DS Lakehouse性能基准
38 4
|
SQL 存储 分布式计算
数据湖揭秘—Delta Lake
Delta Lake 是 DataBricks 公司开源的、用于构建湖仓架构的存储框架。能够支持 Spark,Flink,Hive,PrestoDB,Trino 等查询/计算引擎。作为一个开放格式的存储层,它在提供了批流一体的同时,为湖仓架构提供可靠的,安全的,高性能的保证。
3840 7
数据湖揭秘—Delta Lake
|
存储 分布式计算 DataWorks
基于Delta lake、Hudi格式的湖仓一体方案
Delta Lake 和 Hudi 是流行的开放格式的存储层,为数据湖同时提供流式和批处理的操作,这允许我们在数据湖上直接运行 BI 等应用,让数据分析师可以即时查询新的实时数据,从而对您的业务产生即时的洞察。MaxCompute 在湖仓一体架构中,通过支持 Delta Lake 和 Hudi 在数据湖中提供数据仓库性能。
1354 0
基于Delta lake、Hudi格式的湖仓一体方案
|
SQL 分布式计算 搜索推荐
《 Delta Lake 数据湖专题系列5讲》文章回顾
《Delta Lake 数据湖专题系列5讲》由阿里云 DDI 团队翻译整理自大数据技术公司 Databricks 针对数据湖 Delta Lake 系列技术文章。阅读完此系列文章可以帮助您达到入门级,对数据湖 Lakehouse 有整体上的认识和应用,掌握理论知识体系。
《 Delta Lake 数据湖专题系列5讲》文章回顾
|
SQL 消息中间件 JSON
Delta Lake在Soul的应用实践
传统离线数仓模式下,日志入库前首要阶段便是ETL,我们面临如下问题:天级ETL任务耗时久,影响下游依赖的产出时间;凌晨占用资源庞大,任务高峰期抢占大量集群资源;ETL任务稳定性不佳且出错需凌晨解决、影响范围大。为了解决天级ETL逐渐尖锐的问题,所以这次我们选择了近来逐渐进入大家视野的数据湖架构,基于阿里云EMR的Delta Lake,我们进一步打造优化实时数仓结构,提升部分业务指标实时性,满足更多更实时的业务需求。
Delta Lake在Soul的应用实践
|
SQL JSON 分布式计算
不通过 Spark 获取 Delta Lake Snapshot
Delta Lake 进行数据删除或更新操作时实际上只是对被删除数据文件做了一个 remove 标记,在进行 vacuum 前并不会进行物理删除,因此一些例如在 web 上获取元数据或进行部分数据展示的操作如果直接从表路径下获取 parquet 文件信息,读到的可能是历史已经被标记删除的数据。
不通过 Spark 获取 Delta Lake Snapshot
|
SQL 分布式计算 大数据
Delta Lake Presto Integration & Manifests 机制
Delta 0.5 已于上周发布,增加了不少新特性,这篇文章主要讲解其 Presto Integration 和 Manifests 机制。
Delta Lake Presto Integration & Manifests 机制
|
SQL 分布式计算 大数据
Delta Lake 分区表覆盖写入操作
Delta Lake当前版本(0.5)只支持API操作的,但是实现 Insert SQL 语法也不难,需要注意的是 Delta Lake 中的分区表覆盖写入操作。
Delta Lake 分区表覆盖写入操作