Spark读取变更Hudi数据集Schema实现分析

简介: Spark读取变更Hudi数据集Schema实现分析

1. 介绍

Hudi支持上层Hive/Presto/Spark查询引擎,其中使用Spark读取Hudi数据集方法非常简单,在spark-shell或应用代码中,通过 spark.sqlContext.read.format("org.apache.hudi").load便可加载Hudi数据集,本篇文章分析具体的实现。

2. 分析

2.1 源码梳理

Spark支持用户自定义的format来读取或写入文件,只需要实现对应的(RelationProvider、SchemaRelationProvider)等接口即可。而Hudi也自定义实现了 org.apache.hudi/ hudi来实现Spark对Hudi数据集的读写,Hudi中最重要的一个相关类为 DefaultSource,其实现了 CreatableRelationProvider#createRelation接口,并实现了读写逻辑。其中读逻辑实现的方法核心代码如下。

override def createRelation(sqlContext: SQLContext,
                              optParams: Map[String, String],
                              schema: StructType): BaseRelation = {
    // 合并参数
    val parameters = Map(QUERY_TYPE_OPT_KEY -> DEFAULT_QUERY_TYPE_OPT_VAL) ++ translateViewTypesToQueryTypes(optParams)
    // 必须提供path
    val path = parameters.get("path")
    if (path.isEmpty) {
      throw new HoodieException("'path' must be specified.")
    }
    if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
        // 添加HoodieROTablePathFilter
      sqlContext.sparkContext.hadoopConfiguration.setClass(
        "mapreduce.input.pathFilter.class",
        classOf[HoodieROTablePathFilter],
        classOf[org.apache.hadoop.fs.PathFilter]);
      // 解析Relation
      DataSource.apply(
        sparkSession = sqlContext.sparkSession,
        userSpecifiedSchema = Option(schema),
        className = "parquet",
        options = parameters)
        .resolveRelation()
    } else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
      // 增量Relation
      new IncrementalRelation(sqlContext, path.get, optParams, schema)
    } else {
      throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY))
    }
  }

可以看到,对于读优化视图(ReadOptmized),会添加 HoodieROTablePathFilter,其用于过滤Hudi数据集中的文件。而过滤主要逻辑在 HoodieROTablePathFilter#accept方法中, HoodieROTablePathFilter会处理Hudi数据集和非Hudi数据集,对于Hudi数据集而言,会选取分区路径下最新的提交的parquet文件。

接着通过 DataSource#resolveRelation方法来解析parquet文件,关键逻辑如下

val index = createInMemoryFileIndex(globbedPaths)
    val (resultDataSchema, resultPartitionSchema) =
        getOrInferFileFormatSchema(format, Some(index))

继续通过 DataSource#getOrInferFileFormatSchema方法解析,其中一段关键代码如下

format.inferSchema(
        sparkSession,
        caseInsensitiveOptions,
        tempFileIndex.allFiles())

此时会根据不同的文件类型,如Orc/Text/Parquet类型来继续推导schema,其中tempFileIndex.allFiles获取到之前通过 HoodieROTableFilter过滤出的所有最新提交的parquet文件, inferSchema方法的关键代码如下

val filesToTouch =
      if (shouldMergeSchemas) { // 是否需要合并文件的schema,不需要默认取第一个文件,否则合并多个文件
        val needMerged: Seq[FileStatus] =
          if (mergeRespectSummaries) {
            Seq.empty
          } else {
            filesByType.data
          }
        needMerged ++ filesByType.metadata ++ filesByType.commonMetadata
      } else {
        filesByType.commonMetadata.headOption
            .orElse(filesByType.metadata.headOption)
            .orElse(filesByType.data.headOption)
            .toSeq
      }
    // 获取schema
    ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)

可以看到,当不需要合并schema时,是否需要需要合并schema可通过 mergeSchema参数控制,当不需要时,默认获取的第一个文件,需要合并时,会 把所有文件的schema合并。其会影响spark查询结果,下面通过示例说明。

2.2 示例展示

2.2.1 schema配置

第一次插入时的schema如下

{
  "type":"record",
  "name":"Person",
  "fields":[{
     "name": "name",
     "type": "string"
  }, {
      "name": "age", 
      "type": "int"
  }, {
      "name": "ts", 
      "type": "string"
  }, {
      "name": "location",
      "type": "string"
  }
]}

第二次更新时的schema如下(新增了sex列)

{
  "type":"record",
  "name":"Person",
  "fields":[{
     "name": "name",
     "type": "string"
  }, {
      "name": "age", 
      "type": "int"
  }, {
      "name": "ts", 
      "type": "string"
  }, {
      "name": "location",
      "type": "string"
  }, {
      "name": "sex",
      "type": "string"
  }
]}

Hudi使用MOR模式。

2.2.2 插入/更新核心配置

写记录核心配置如下

df.write().format("org.apache.hudi").
        option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), "MERGE_ON_READ").
        option("hoodie.insert.shuffle.parallelism", "10").
        option("hoodie.upsert.shuffle.parallelism", "10").
        option("hoodie.delete.shuffle.parallelism", "10").
        option("hoodie.bulkinsert.shuffle.parallelism", "10").
        option("hoodie.datasource.write.recordkey.field", "name").
        option("hoodie.datasource.write.partitionpath.field", "location").
        option("hoodie.datasource.write.precombine.field", "ts").
        option("hoodie.table.name", "hudi_mor_table").
        mode(Overwrite).
        save("D:/hudi_mor_table");

更新记录核心配置如下

df.write().format("org.apache.hudi").
          option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), "MERGE_ON_READ").
          option("hoodie.insert.shuffle.parallelism", "10").
          option("hoodie.upsert.shuffle.parallelism", "10").
          option("hoodie.delete.shuffle.parallelism", "10").
          option("hoodie.bulkinsert.shuffle.parallelism", "10").
          option("hoodie.datasource.write.recordkey.field", "name").
          option("hoodie.datasource.write.partitionpath.field", "location").
          option("hoodie.datasource.write.precombine.field", "ts").
          option("hoodie.keep.max.commits", "5").
          option("hoodie.keep.min.commits", "4").
          option("hoodie.cleaner.commits.retained", "3").
          option("hoodie.table.name", "hudi_mor_table").
          mode(Append).
          save("D:/hudi_mor_table");
2.2.3 插入/更新实际数据设置

第一次插入实际数据为

{\"name\":\"yuan1", \"ts\": \"1574297893837\", \"age\": 1, \"location\": \"beijing\"}

当第二次更新实际数据为

{\"name\":\"yuan1", \"ts\": \"1574297893837\", \"age\": 1, \"location\": \"beijing\", \"sex\": \"male\"}

即第二次会更新一次写入的数据,那么使用如下代码显示数据时

spark.sqlContext().read().format("org.apache.hudi").load("D:/hudi_mor_table" + "/*").show();

那么会发现结果包含了新增的sex列,未更新的值为null

当第二次更新实际数据为

{\"name\":\"yuan1", \"ts\": \"1574297893837\", \"age\": 1, \"location\": \"beijing1\", \"sex\": \"male\"}

即第二次会写入不同的分区,即不会更新第一次写入的数据,那么查询数据时,会发现查询的结果不会出现新增的sex列

当使用如下代码显示数据时,设置合并schema参数,即会合并多个分区下的最新的parquet的schema。

spark.sqlContext().read().format("org.apache.hudi").option("mergeSchema", "true").load("D:/hudi_mor_table" + "/*").show();

会发现查询的结果出现了新增的sex列

3. 总结

当使用Spark查询Hudi数据集时,当数据的schema新增时,会获取单个分区的parquet文件来推导出schema,若变更schema后未更新该分区数据,那么新增的列是不会显示,否则会显示该新增的列;若未更新该分区的记录时,那么新增的列也不会显示,可通过 mergeSchema来控制合并不同分区下parquet文件的schema,从而可达到显示新增列的目的。

目录
相关文章
|
5月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
189 1
Spark快速大数据分析PDF下载读书分享推荐
|
2月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
177 2
|
2月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
108 0
|
5月前
|
弹性计算 分布式计算 Serverless
全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
【7月更文挑战第6天】全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
23729 42
|
5月前
|
分布式计算 数据处理 流计算
实时计算 Flink版产品使用问题之使用Spark ThriftServer查询同步到Hudi的数据时,如何实时查看数据变化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
机器学习/深度学习 数据采集 分布式计算
基于spark的大数据分析预测地震受灾情况的系统设计
基于spark的大数据分析预测地震受灾情况的系统设计
173 1
|
6月前
|
分布式计算 定位技术 Scala
使用spark基于出租车GPS数据实现车辆数量统计以及北京每个城区的车辆位置点数分析
使用spark基于出租车GPS数据实现车辆数量统计以及北京每个城区的车辆位置点数分析
125 0
|
分布式计算 Hadoop Java
使用spark3操作hudi数据湖初探
本文介绍使用spark3操作hudi数据湖初探
使用spark3操作hudi数据湖初探
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
141 2
ClickHouse与大数据生态集成:Spark & Flink 实战