【spark系列8】spark delta读数据实现分析

简介: 【spark系列8】spark delta读数据实现分析

背景


本文基于delta 0.7.0

spark 3.0.1

我们之前的spark delta写操作ACID事务前传–写文件基础类FileFormat/FileCommitProtocol分析,spark delta写操作ACID事务实现分析分析了delta写数据的流程,这次我们分析一下delta是怎么读取数据的。


分析

override def createRelation(
      sqlContext: SQLContext,
      parameters: Map[String, String]): BaseRelation = {
    val maybePath = parameters.getOrElse("path", {
      throw DeltaErrors.pathNotSpecifiedException
    })
    // Log any invalid options that are being passed in
    DeltaOptions.verifyOptions(CaseInsensitiveMap(parameters))
    val timeTravelByParams = DeltaDataSource.getTimeTravelVersion(parameters)
    DeltaTableV2(
      sqlContext.sparkSession,
      new Path(maybePath),
      timeTravelOpt = timeTravelByParams).toBaseRelation
  }
  1. DeltaOptions.verifyOptions进行参数校验,有效的参数如下:
val validOptionKeys : Set[String] = Set(
    REPLACE_WHERE_OPTION,
    MERGE_SCHEMA_OPTION,
    EXCLUDE_REGEX_OPTION,
    OVERWRITE_SCHEMA_OPTION,
    USER_METADATA_OPTION,
    MAX_FILES_PER_TRIGGER_OPTION,
    IGNORE_FILE_DELETION_OPTION,
    IGNORE_CHANGES_OPTION,
    IGNORE_DELETES_OPTION,
    OPTIMIZE_WRITE_OPTION,
    DATA_CHANGE_OPTION,
    "queryName",
    "checkpointLocation",
    "path",
    "timestampAsOf",
    "versionAsOf"
  )
  1. DeltaDataSource.getTimeTravelVersion根据指定的timestampAsOf或者versionAsOf获取指定的版本
  2. 直接调用DeltaTableV2的toBaseRelation方法:
def toBaseRelation: BaseRelation = {
    if (deltaLog.snapshot.version == -1) {
      val id = catalogTable.map(ct => DeltaTableIdentifier(table = Some(ct.identifier)))
        .getOrElse(DeltaTableIdentifier(path = Some(path.toString)))
      throw DeltaErrors.notADeltaTableException(id)
    }
    val partitionPredicates = DeltaDataSource.verifyAndCreatePartitionFilters(
      path.toString, deltaLog.snapshot, partitionFilters)
    // TODO(burak): We should pass in the snapshot here
    deltaLog.createRelation(partitionPredicates, timeTravelSpec)
  }

如果存在分区,则DeltaDataSource.verifyAndCreatePartitionFilter创建partitionPredicates


timeTravelSpec,这里优先选择用户指定的timeTravelByParams,否则通过DeltaDataSource.parsePathIdentifier选择path指定的version,格式如:/some/path/partition=1@v1234 或者/some/path/partition=1@yyyyMMddHHmmssSSS


直接调用deltaLog.createRelation:

def createRelation(
   partitionFilters: Seq[Expression] = Nil,
   timeTravel: Option[DeltaTimeTravelSpec] = None): BaseRelation = {
 val versionToUse = timeTravel.map { tt =>
   val (version, accessType) = DeltaTableUtils.resolveTimeTravelVersion(
     spark.sessionState.conf, this, tt)
   val source = tt.creationSource.getOrElse("unknown")
   recordDeltaEvent(this, s"delta.timeTravel.$source", data = Map(
     "tableVersion" -> snapshot.version,
     "queriedVersion" -> version,
     "accessType" -> accessType
   ))
   version
 }
 /** Used to link the files present in the table into the query planner. */
 val snapshotToUse = versionToUse.map(getSnapshotAt(_)).getOrElse(snapshot)
 val fileIndex = TahoeLogFileIndex(
   spark, this, dataPath, snapshotToUse.metadata.schema, partitionFilters, versionToUse)
 new HadoopFsRelation(
   fileIndex,
   partitionSchema = snapshotToUse.metadata.partitionSchema,
   dataSchema = snapshotToUse.metadata.schema,
   bucketSpec = None,
   snapshotToUse.fileFormat,
   snapshotToUse.metadata.format.options)(spark) with InsertableRelation {
   def insert(data: DataFrame, overwrite: Boolean): Unit = {
     val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
     WriteIntoDelta(
       deltaLog = DeltaLog.this,
       mode = mode,
       new DeltaOptions(Map.empty[String, String], spark.sessionState.conf),
       partitionColumns = Seq.empty,
       configuration = Map.empty,
       data = data).run(spark)
   }
 }
  • . 通过指定版本获取对应的snapshot
  • . 构建TahoeLogFileIndex,因为这里构建的是HadoopFsRelation,所以我们关注TahoeLogFileIndex的inputfiles方法:
override def inputFiles: Array[String] = {
getSnapshot(stalenessAcceptable = false).filesForScan(
  projection = Nil, partitionFilters).files.map(f => absolutePath(f.path).toString).toArray
}

该方法调用了snapshot的filesForScan方法:

def filesForScan(projection: Seq[Attribute], filters: Seq[Expression]): DeltaScan = {
implicit val enc = SingleAction.addFileEncoder
val partitionFilters = filters.flatMap { filter =>
  DeltaTableUtils.splitMetadataAndDataPredicates(filter, metadata.partitionColumns, spark)._1
}
val files = DeltaLog.filterFileList(
  metadata.partitionSchema,
  allFiles.toDF(),
  partitionFilters).as[AddFile].collect()
DeltaScan(version = version, files, null, null, null)(null, null, null, null)
}

通过之前文章的分析,我们直到deltalog记录了AddFile和Remove记录,那现在读数据怎么读取呢?全部在allFiles方法。

重点看一下:allFiles方法:

def allFiles: Dataset[AddFile] = {
 val implicits = spark.implicits
 import implicits._
 state.where("add IS NOT NULL").select($"add".as[AddFile])
 }

这里调用了state方法,而它又调用了stateReconstruction方法,

private lazy val cachedState =
 cacheDS(stateReconstruction, s"Delta Table State #$version - $redactedPath")
 /** The current set of actions in this [[Snapshot]]. */
 def state: Dataset[SingleAction] = cachedState.getDS

stateReconstruction方法在checkpoint的时用到了,在这里也用到了,主要是重新构造文件状态,合并AddFile和RemoveFile:

private def stateReconstruction: Dataset[SingleAction] = {
 ...
 loadActions.mapPartitions { actions =>
     val hdpConf = hadoopConf.value.value
     actions.flatMap {
       _.unwrap match {
         case add: AddFile => Some(add.copy(path = canonicalizePath(add.path, hdpConf)).wrap)
         case rm: RemoveFile => Some(rm.copy(path = canonicalizePath(rm.path, hdpConf)).wrap)
         case other if other == null => None
         case other => Some(other.wrap)
       }
     }
    }
   ...
   .mapPartitions { iter =>
     val state = new InMemoryLogReplay(time)
     state.append(0, iter.map(_.unwrap))
     state.checkpoint.map(_.wrap)
   }
  }

而关键在于InMemoryLogReplay的append方法和checkpoint方法,这里做到了文件状态的合并:

  assert(currentVersion == -1 || version == currentVersion + 1,
   s"Attempted to replay version $version, but state is at $currentVersion")
 currentVersion = version
 actions.foreach {
   case a: SetTransaction =>
     transactions(a.appId) = a
   case a: Metadata =>
     currentMetaData = a
   case a: Protocol =>
     currentProtocolVersion = a
   case add: AddFile =>
     activeFiles(add.pathAsUri) = add.copy(dataChange = false)
     // Remove the tombstone to make sure we only output one `FileAction`.
     tombstones.remove(add.pathAsUri)
   case remove: RemoveFile =>
     activeFiles.remove(remove.pathAsUri)
     tombstones(remove.pathAsUri) = remove.copy(dataChange = false)
   case ci: CommitInfo => // do nothing
   case null => // Some crazy future feature. Ignore
  }
 }

重点就在case add: AddFile和 case remove: RemoveFile处理以及checkpoint方法,能够很好的合并文件状态。


再调用collect方法,返回DeltaScan,之后获取文件路径作为要处理的文件路径。


把TahoeLogFileIndex传入HadoopFsRelation得到最后的BaseRelation 返回

注意:spark读取delta格式整个流程和spark读取其他数据格式流程一致,主要区别在于读取数据之前,会把文件状态在内存中进行一次合并,这样只需要读取文件状态为Addfile的就行了


相关文章
|
6月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
210 1
Spark快速大数据分析PDF下载读书分享推荐
|
8月前
|
移动开发 分布式计算 Spark
Spark的几种去重的原理分析
Spark的几种去重的原理分析
163 0
|
3月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
215 2
|
3月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
136 0
|
6月前
|
弹性计算 分布式计算 Serverless
全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
【7月更文挑战第6天】全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
23749 42
|
7月前
|
机器学习/深度学习 数据采集 分布式计算
基于spark的大数据分析预测地震受灾情况的系统设计
基于spark的大数据分析预测地震受灾情况的系统设计
214 1
|
7月前
|
分布式计算 定位技术 Scala
使用spark基于出租车GPS数据实现车辆数量统计以及北京每个城区的车辆位置点数分析
使用spark基于出租车GPS数据实现车辆数量统计以及北京每个城区的车辆位置点数分析
140 0
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
199 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
83 0
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
57 0