Spark读取变更Hudi数据集Schema实现分析

简介: Spark读取变更Hudi数据集Schema实现分析

1. 介绍

Hudi支持上层Hive/Presto/Spark查询引擎,其中使用Spark读取Hudi数据集方法非常简单,在spark-shell或应用代码中,通过 spark.sqlContext.read.format("org.apache.hudi").load便可加载Hudi数据集,本篇文章分析具体的实现。

2. 分析

2.1 源码梳理

Spark支持用户自定义的format来读取或写入文件,只需要实现对应的(RelationProvider、SchemaRelationProvider)等接口即可。而Hudi也自定义实现了 org.apache.hudi/ hudi来实现Spark对Hudi数据集的读写,Hudi中最重要的一个相关类为 DefaultSource,其实现了 CreatableRelationProvider#createRelation接口,并实现了读写逻辑。其中读逻辑实现的方法核心代码如下。

override def createRelation(sqlContext: SQLContext,
                              optParams: Map[String, String],
                              schema: StructType): BaseRelation = {
    // 合并参数
    val parameters = Map(QUERY_TYPE_OPT_KEY -> DEFAULT_QUERY_TYPE_OPT_VAL) ++ translateViewTypesToQueryTypes(optParams)
    // 必须提供path
    val path = parameters.get("path")
    if (path.isEmpty) {
      throw new HoodieException("'path' must be specified.")
    }
    if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
        // 添加HoodieROTablePathFilter
      sqlContext.sparkContext.hadoopConfiguration.setClass(
        "mapreduce.input.pathFilter.class",
        classOf[HoodieROTablePathFilter],
        classOf[org.apache.hadoop.fs.PathFilter]);
      // 解析Relation
      DataSource.apply(
        sparkSession = sqlContext.sparkSession,
        userSpecifiedSchema = Option(schema),
        className = "parquet",
        options = parameters)
        .resolveRelation()
    } else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
      // 增量Relation
      new IncrementalRelation(sqlContext, path.get, optParams, schema)
    } else {
      throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY))
    }
  }

可以看到,对于读优化视图(ReadOptmized),会添加 HoodieROTablePathFilter,其用于过滤Hudi数据集中的文件。而过滤主要逻辑在 HoodieROTablePathFilter#accept方法中, HoodieROTablePathFilter会处理Hudi数据集和非Hudi数据集,对于Hudi数据集而言,会选取分区路径下最新的提交的parquet文件。

接着通过 DataSource#resolveRelation方法来解析parquet文件,关键逻辑如下

val index = createInMemoryFileIndex(globbedPaths)
    val (resultDataSchema, resultPartitionSchema) =
        getOrInferFileFormatSchema(format, Some(index))

继续通过 DataSource#getOrInferFileFormatSchema方法解析,其中一段关键代码如下

format.inferSchema(
        sparkSession,
        caseInsensitiveOptions,
        tempFileIndex.allFiles())

此时会根据不同的文件类型,如Orc/Text/Parquet类型来继续推导schema,其中tempFileIndex.allFiles获取到之前通过 HoodieROTableFilter过滤出的所有最新提交的parquet文件, inferSchema方法的关键代码如下

val filesToTouch =
      if (shouldMergeSchemas) { // 是否需要合并文件的schema,不需要默认取第一个文件,否则合并多个文件
        val needMerged: Seq[FileStatus] =
          if (mergeRespectSummaries) {
            Seq.empty
          } else {
            filesByType.data
          }
        needMerged ++ filesByType.metadata ++ filesByType.commonMetadata
      } else {
        filesByType.commonMetadata.headOption
            .orElse(filesByType.metadata.headOption)
            .orElse(filesByType.data.headOption)
            .toSeq
      }
    // 获取schema
    ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)

可以看到,当不需要合并schema时,是否需要需要合并schema可通过 mergeSchema参数控制,当不需要时,默认获取的第一个文件,需要合并时,会 把所有文件的schema合并。其会影响spark查询结果,下面通过示例说明。

2.2 示例展示

2.2.1 schema配置

第一次插入时的schema如下

{
  "type":"record",
  "name":"Person",
  "fields":[{
     "name": "name",
     "type": "string"
  }, {
      "name": "age", 
      "type": "int"
  }, {
      "name": "ts", 
      "type": "string"
  }, {
      "name": "location",
      "type": "string"
  }
]}

第二次更新时的schema如下(新增了sex列)

{
  "type":"record",
  "name":"Person",
  "fields":[{
     "name": "name",
     "type": "string"
  }, {
      "name": "age", 
      "type": "int"
  }, {
      "name": "ts", 
      "type": "string"
  }, {
      "name": "location",
      "type": "string"
  }, {
      "name": "sex",
      "type": "string"
  }
]}

Hudi使用MOR模式。

2.2.2 插入/更新核心配置

写记录核心配置如下

df.write().format("org.apache.hudi").
        option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), "MERGE_ON_READ").
        option("hoodie.insert.shuffle.parallelism", "10").
        option("hoodie.upsert.shuffle.parallelism", "10").
        option("hoodie.delete.shuffle.parallelism", "10").
        option("hoodie.bulkinsert.shuffle.parallelism", "10").
        option("hoodie.datasource.write.recordkey.field", "name").
        option("hoodie.datasource.write.partitionpath.field", "location").
        option("hoodie.datasource.write.precombine.field", "ts").
        option("hoodie.table.name", "hudi_mor_table").
        mode(Overwrite).
        save("D:/hudi_mor_table");

更新记录核心配置如下

df.write().format("org.apache.hudi").
          option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), "MERGE_ON_READ").
          option("hoodie.insert.shuffle.parallelism", "10").
          option("hoodie.upsert.shuffle.parallelism", "10").
          option("hoodie.delete.shuffle.parallelism", "10").
          option("hoodie.bulkinsert.shuffle.parallelism", "10").
          option("hoodie.datasource.write.recordkey.field", "name").
          option("hoodie.datasource.write.partitionpath.field", "location").
          option("hoodie.datasource.write.precombine.field", "ts").
          option("hoodie.keep.max.commits", "5").
          option("hoodie.keep.min.commits", "4").
          option("hoodie.cleaner.commits.retained", "3").
          option("hoodie.table.name", "hudi_mor_table").
          mode(Append).
          save("D:/hudi_mor_table");
2.2.3 插入/更新实际数据设置

第一次插入实际数据为

{\"name\":\"yuan1", \"ts\": \"1574297893837\", \"age\": 1, \"location\": \"beijing\"}

当第二次更新实际数据为

{\"name\":\"yuan1", \"ts\": \"1574297893837\", \"age\": 1, \"location\": \"beijing\", \"sex\": \"male\"}

即第二次会更新一次写入的数据,那么使用如下代码显示数据时

spark.sqlContext().read().format("org.apache.hudi").load("D:/hudi_mor_table" + "/*").show();

那么会发现结果包含了新增的sex列,未更新的值为null

当第二次更新实际数据为

{\"name\":\"yuan1", \"ts\": \"1574297893837\", \"age\": 1, \"location\": \"beijing1\", \"sex\": \"male\"}

即第二次会写入不同的分区,即不会更新第一次写入的数据,那么查询数据时,会发现查询的结果不会出现新增的sex列

当使用如下代码显示数据时,设置合并schema参数,即会合并多个分区下的最新的parquet的schema。

spark.sqlContext().read().format("org.apache.hudi").option("mergeSchema", "true").load("D:/hudi_mor_table" + "/*").show();

会发现查询的结果出现了新增的sex列

3. 总结

当使用Spark查询Hudi数据集时,当数据的schema新增时,会获取单个分区的parquet文件来推导出schema,若变更schema后未更新该分区数据,那么新增的列是不会显示,否则会显示该新增的列;若未更新该分区的记录时,那么新增的列也不会显示,可通过 mergeSchema来控制合并不同分区下parquet文件的schema,从而可达到显示新增列的目的。

目录
相关文章
|
3月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
159 0
|
3月前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
|
4月前
|
SQL 分布式计算 HIVE
Spark数据倾斜问题分析和解决
Spark数据倾斜问题分析和解决
43 0
|
1月前
|
存储 分布式计算 Spark
实战|使用Spark Streaming写入Hudi
实战|使用Spark Streaming写入Hudi
42 0
|
1月前
|
消息中间件 分布式计算 Serverless
CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark
CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark
53 2
|
2月前
|
分布式计算 大数据 Java
Spark 大数据实战:基于 RDD 的大数据处理分析
Spark 大数据实战:基于 RDD 的大数据处理分析
122 0
|
3月前
|
机器学习/深度学习 数据采集 分布式计算
Spark回归分析与特征工程
Spark回归分析与特征工程
|
4月前
|
SQL 分布式计算 大数据
Hudi数据湖技术引领大数据新风口(三)解决spark模块依赖冲突
Hudi数据湖技术引领大数据新风口(三)解决spark模块依赖冲突
102 0
|
4月前
|
分布式计算 BI 双11
StructredStreaming+Kafka+Mysql(Spark实时计算| 天猫双十一实时报表分析)
StructredStreaming+Kafka+Mysql(Spark实时计算| 天猫双十一实时报表分析)
39 0
|
4月前
|
消息中间件 分布式计算 大数据
【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)
【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)
76 0