1. 介绍
Hudi支持上层Hive/Presto/Spark查询引擎,其中使用Spark读取Hudi数据集方法非常简单,在spark-shell或应用代码中,通过 spark.sqlContext.read.format("org.apache.hudi").load
便可加载Hudi数据集,本篇文章分析具体的实现。
2. 分析
2.1 源码梳理
Spark支持用户自定义的format来读取或写入文件,只需要实现对应的(RelationProvider、SchemaRelationProvider)等接口即可。而Hudi也自定义实现了 org.apache.hudi
/ hudi
来实现Spark对Hudi数据集的读写,Hudi中最重要的一个相关类为 DefaultSource
,其实现了 CreatableRelationProvider#createRelation
接口,并实现了读写逻辑。其中读逻辑实现的方法核心代码如下。
override def createRelation(sqlContext: SQLContext, optParams: Map[String, String], schema: StructType): BaseRelation = { // 合并参数 val parameters = Map(QUERY_TYPE_OPT_KEY -> DEFAULT_QUERY_TYPE_OPT_VAL) ++ translateViewTypesToQueryTypes(optParams) // 必须提供path val path = parameters.get("path") if (path.isEmpty) { throw new HoodieException("'path' must be specified.") } if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) { // 添加HoodieROTablePathFilter sqlContext.sparkContext.hadoopConfiguration.setClass( "mapreduce.input.pathFilter.class", classOf[HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]); // 解析Relation DataSource.apply( sparkSession = sqlContext.sparkSession, userSpecifiedSchema = Option(schema), className = "parquet", options = parameters) .resolveRelation() } else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) { // 增量Relation new IncrementalRelation(sqlContext, path.get, optParams, schema) } else { throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY)) } }
可以看到,对于读优化视图(ReadOptmized),会添加 HoodieROTablePathFilter
,其用于过滤Hudi数据集中的文件。而过滤主要逻辑在 HoodieROTablePathFilter#accept
方法中, HoodieROTablePathFilter
会处理Hudi数据集和非Hudi数据集,对于Hudi数据集而言,会选取分区路径下最新的提交的parquet文件。
接着通过 DataSource#resolveRelation
方法来解析parquet文件,关键逻辑如下
val index = createInMemoryFileIndex(globbedPaths) val (resultDataSchema, resultPartitionSchema) = getOrInferFileFormatSchema(format, Some(index))
继续通过 DataSource#getOrInferFileFormatSchema
方法解析,其中一段关键代码如下
format.inferSchema( sparkSession, caseInsensitiveOptions, tempFileIndex.allFiles())
此时会根据不同的文件类型,如Orc/Text/Parquet类型来继续推导schema,其中tempFileIndex.allFiles获取到之前通过 HoodieROTableFilter
过滤出的所有最新提交的parquet文件, inferSchema
方法的关键代码如下
val filesToTouch = if (shouldMergeSchemas) { // 是否需要合并文件的schema,不需要默认取第一个文件,否则合并多个文件 val needMerged: Seq[FileStatus] = if (mergeRespectSummaries) { Seq.empty } else { filesByType.data } needMerged ++ filesByType.metadata ++ filesByType.commonMetadata } else { filesByType.commonMetadata.headOption .orElse(filesByType.metadata.headOption) .orElse(filesByType.data.headOption) .toSeq } // 获取schema ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
可以看到,当不需要合并schema时,是否需要需要合并schema可通过 mergeSchema
参数控制,当不需要时,默认获取的第一个文件,需要合并时,会 把所有文件的schema合并。其会影响spark查询结果,下面通过示例说明。
2.2 示例展示
2.2.1 schema配置
第一次插入时的schema如下
{ "type":"record", "name":"Person", "fields":[{ "name": "name", "type": "string" }, { "name": "age", "type": "int" }, { "name": "ts", "type": "string" }, { "name": "location", "type": "string" } ]}
第二次更新时的schema如下(新增了sex列)
{ "type":"record", "name":"Person", "fields":[{ "name": "name", "type": "string" }, { "name": "age", "type": "int" }, { "name": "ts", "type": "string" }, { "name": "location", "type": "string" }, { "name": "sex", "type": "string" } ]}
Hudi使用MOR模式。
2.2.2 插入/更新核心配置
写记录核心配置如下
df.write().format("org.apache.hudi"). option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), "MERGE_ON_READ"). option("hoodie.insert.shuffle.parallelism", "10"). option("hoodie.upsert.shuffle.parallelism", "10"). option("hoodie.delete.shuffle.parallelism", "10"). option("hoodie.bulkinsert.shuffle.parallelism", "10"). option("hoodie.datasource.write.recordkey.field", "name"). option("hoodie.datasource.write.partitionpath.field", "location"). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.table.name", "hudi_mor_table"). mode(Overwrite). save("D:/hudi_mor_table");
更新记录核心配置如下
df.write().format("org.apache.hudi"). option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), "MERGE_ON_READ"). option("hoodie.insert.shuffle.parallelism", "10"). option("hoodie.upsert.shuffle.parallelism", "10"). option("hoodie.delete.shuffle.parallelism", "10"). option("hoodie.bulkinsert.shuffle.parallelism", "10"). option("hoodie.datasource.write.recordkey.field", "name"). option("hoodie.datasource.write.partitionpath.field", "location"). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.keep.max.commits", "5"). option("hoodie.keep.min.commits", "4"). option("hoodie.cleaner.commits.retained", "3"). option("hoodie.table.name", "hudi_mor_table"). mode(Append). save("D:/hudi_mor_table");
2.2.3 插入/更新实际数据设置
第一次插入实际数据为
{\"name\":\"yuan1", \"ts\": \"1574297893837\", \"age\": 1, \"location\": \"beijing\"}
当第二次更新实际数据为
{\"name\":\"yuan1", \"ts\": \"1574297893837\", \"age\": 1, \"location\": \"beijing\", \"sex\": \"male\"}
即第二次会更新一次写入的数据,那么使用如下代码显示数据时
spark.sqlContext().read().format("org.apache.hudi").load("D:/hudi_mor_table" + "/*").show();
那么会发现结果包含了新增的sex列,未更新的值为null。
当第二次更新实际数据为
{\"name\":\"yuan1", \"ts\": \"1574297893837\", \"age\": 1, \"location\": \"beijing1\", \"sex\": \"male\"}
即第二次会写入不同的分区,即不会更新第一次写入的数据,那么查询数据时,会发现查询的结果不会出现新增的sex列。
当使用如下代码显示数据时,设置合并schema参数,即会合并多个分区下的最新的parquet的schema。
spark.sqlContext().read().format("org.apache.hudi").option("mergeSchema", "true").load("D:/hudi_mor_table" + "/*").show();
会发现查询的结果出现了新增的sex列。
3. 总结
当使用Spark查询Hudi数据集时,当数据的schema新增时,会获取单个分区的parquet文件来推导出schema,若变更schema后未更新该分区数据,那么新增的列是不会显示,否则会显示该新增的列;若未更新该分区的记录时,那么新增的列也不会显示,可通过 mergeSchema
来控制合并不同分区下parquet文件的schema,从而可达到显示新增列的目的。