在上一篇文章中,我们讨论了 Hudi 表中的数据布局,并介绍了 CoW 和 MoR 两种表类型,以及它们各自的权衡。在此基础上我们现在将探讨 Hudi 中的读取操作是如何工作的。
有多种引擎(例如 Spark、Presto 和 Trino)与 Hudi 集成来执行分析查询。尽管集成 API 可能有所不同,但分布式查询引擎中的基本过程保持一致。此过程需要解释输入 SQL、创建在工作节点上执行的查询计划以及收集结果以返回给用户。
在这篇文章中,我选择 Spark 作为示例引擎来说明读取操作的流程,并提供代码片段来展示各种 Hudi 查询类型的用法。我将首先通过入门介绍 Spark 查询,然后深入研究 Hudi-Spark 集成点,最后解释不同的查询类型。
Spark 查询入门
Spark SQL是一个分布式SQL引擎,可以对大规模数据执行分析任务。典型的分析查询从用户提供的 SQL 开始,旨在从存储上的表中检索结果。Spark SQL 接受此输入并继续执行多个阶段,如下图所示。
在分析阶段,输入被解析、解析并转换为树结构,作为 SQL 语句的抽象。查询表目录以获取表名称和列类型等信息。
在逻辑优化步骤中,在逻辑层对树进行评估和优化。一些常见的优化包括谓词下推、模式裁剪和空传播。此步骤生成一个逻辑计划,概述查询所需的计算。由于它是逻辑表示,逻辑计划缺乏在实际节点上运行所需的细节。
物理规划充当逻辑层和物理层之间的桥梁。物理计划指定了执行计算的精确方式。例如,在逻辑计划中,可能有一个连接节点指示连接操作,而在物理计划中,连接操作可以指定为sort-merge连接或broadcast-hash连接,具体取决于相关表的大小估计。选择最佳物理计划用于代码生成和实际执行。
这三个阶段是 Catalyst Optimizer[1] 提供的功能。要进一步研究该主题可以探索此处[2]和此处[3]链接的精彩演讲。
在执行过程中,Spark 应用程序在称为 RDD(弹性分布式数据集)的基础数据结构上运行。RDD 是 JVM 对象的集合,这些对象是不可变的、跨节点分区的,并且由于跟踪数据沿袭信息而具有容错能力。当应用程序运行时,将执行计划的计算:RDD 被转换并执行操作以产生结果。这个过程通常也称为 RDD 的“物化”。
数据源API
当 Catalyst Optimizer 制定查询计划时,连接到数据源变得有利,可以将优化下推。Spark 的 DataSource API 旨在提供与各种数据源集成的可扩展性。有些源是开箱即用的,例如 JDBC、Hive 表和 Parquet 文件。Hudi 表由于特定的数据布局而代表了另一种类型的自定义数据源。
Spark-Hudi 读取流程
下图展示了Spark-Hudi读取流程中的一些关键接口和方法调用。
1. DefaultSource 作为集成的入口点,将数据源的格式定义为 org.apache.hudi
或 hudi
。它提供了一个 BaseRelation 实现,我将其设想为建立一个“关系”来简化表中的数据访问。
2. buildScan() 是一个核心 API,用于将过滤器传递到数据源以进行优化。Hudi定义了collectFileSplits()来收集相关文件。
3. collectFileSplits() 将所有过滤器传递给 FileIndex 对象,该对象有助于识别要读取的必要文件。
4. FileIndex 定位所有相关的 FileSlice 以进行进一步处理。
5. 识别 FileSlices 后调用 composeRDD()。
6. FileSlice 作为 RDD 加载和读取。对于 Parquet 中的基本文件等列式文件,此读取操作通过仅读取必要的列来最大限度地减少传输的字节。
7. RDD 从 API 返回,用于进一步规划和代码生成。
请注意上述步骤仅提供读取流程的高级概述,省略了读取模式支持和高级索引技术(例如使用元数据表跳过数据)等细节。
该流程对于 Spark 的所有 Hudi 查询类型都是通用的。在以下部分将解释各种查询类型的工作原理。除读取优化外,所有这些都适用于 CoW 和 MoR 表。
快照查询
这是读取 Hudi 表时的默认查询类型。它的目的是从表中检索最新记录,本质上捕获查询时表的“快照”。在 MoR 表上执行时,会发生日志文件与基本文件的合并,并导致一些性能影响。
启动带有 Hudi 依赖的 Spark SQL Shell 后可以运行这些 SQL 来设置一个 MoR 表,其中插入和更新了一条记录。
create table hudi_mor_example ( id int, name string, price double, ts bigint ) using hudi tblproperties ( type = 'mor', primaryKey = 'id', preCombineField = 'ts' ) location '/tmp/hudi_mor_example'; set hoodie.spark.sql.insert.into.operation=UPSERT; insert into hudi_mor_example select 1, 'foo', 10, 1000; insert into hudi_mor_example select 1, 'foo', 20, 2000; insert into hudi_mor_example select 1, 'foo', 30, 3000; 可以通过运行如下所示的 SELECT 语句来执行快照查询,它将检索记录的最新值。 spark-sql> select id, name, price, ts from hudi_mor_example; 1 foo 30.0 3000 Time taken: 0.161 seconds, Fetched 1 row(s)
读取优化 (RO) 查询
RO 查询类型被设计为较低的读取延迟与可能较旧的结果的权衡,因此,它专门适用于 MoR 表。进行此类查询时,collectFileSplits() 将仅获取 FileSlices 的基本文件(Parquet文件)。
上面提供的设置代码自动生成一个名为 hudi_mor_example_ro 的目录表,该表指定属性 hoodie.query.as.ro.table=true
。此属性指示查询引擎始终执行 RO 查询。运行下面的 SELECT 语句将返回记录的原始值,因为后续更新尚未应用于基本文件。
spark-sql> select id, name, price, ts from hudi_mor_example_ro; 1 foo 10.0 1000 Time taken: 0.114 seconds, Fetched 1 row(s) 时间旅行查询
通过指定时间戳,用户可以请求Hudi表在给定时间的历史快照。正如前面第 1 篇文章中所讨论的,FileSlices 与特定的提交时间相关联,因此支持过滤。执行时间旅行查询时,如果没有完全匹配,FileIndex 仅查找与指定时间相对应或早于指定时间的 FileSlice。
spark-sql> select id, name, price, ts from hudi_mor_example timestamp as of '20230905221619987'; 1 foo 30.0 3000 Time taken: 0.274 seconds, Fetched 1 row(s) spark-sql> select id, name, price, ts from hudi_mor_example timestamp as of '20230905221619986'; 1 foo 20.0 2000 Time taken: 0.241 seconds, Fetched 1 row(s)
第一个 SELECT 语句精确地在最新插入的 deltacommit 时间执行时间旅行查询,提供表的最新快照。第二个查询设置的时间戳早于最新插入的时间戳,从而生成倒数第二个插入的快照。
示例中的时间戳遵循 Hudi 时间线的格式"yyyyMMddHHmmssSSS"。也可以以"yyyy-MM-dd HH:mm:ss.SSS"或"yyyy-MM-dd"的形式设置。
增量查询
用户可以设置起始时间戳(带或不带结束时间戳)以检索指定时间窗口内更改的记录。如果没有设置结束时间,则时间窗口将包括最近的记录。Hudi 还通过在写入端启用附加日志并为增量读取器激活 CDC 模式来提供完整的更改数据捕获 (CDC) 功能。更多详细信息将在专门介绍增量处理的单独帖子中介绍。
回顾
在这篇文章中,我们概述了 Spark 的 Catalyst 优化器,探讨了 Hudi 如何实现 Spark DataSource API 来读取数据,并介绍了四种不同的 Hudi 查询类型。代码片段也可以在这里[4]找到。在接下来的文章中将演示写入流程以进一步加深我们对 Hudi 的理解。