Apache Hudi从零到一:深入研究读取流程和查询类型(二)

简介: Apache Hudi从零到一:深入研究读取流程和查询类型(二)

在上一篇文章中,我们讨论了 Hudi 表中的数据布局,并介绍了 CoW 和 MoR 两种表类型,以及它们各自的权衡。在此基础上我们现在将探讨 Hudi 中的读取操作是如何工作的。

有多种引擎(例如 Spark、Presto 和 Trino)与 Hudi 集成来执行分析查询。尽管集成 API 可能有所不同,但分布式查询引擎中的基本过程保持一致。此过程需要解释输入 SQL、创建在工作节点上执行的查询计划以及收集结果以返回给用户。

在这篇文章中,我选择 Spark 作为示例引擎来说明读取操作的流程,并提供代码片段来展示各种 Hudi 查询类型的用法。我将首先通过入门介绍 Spark 查询,然后深入研究 Hudi-Spark 集成点,最后解释不同的查询类型。

Spark 查询入门

Spark SQL是一个分布式SQL引擎,可以对大规模数据执行分析任务。典型的分析查询从用户提供的 SQL 开始,旨在从存储上的表中检索结果。Spark SQL 接受此输入并继续执行多个阶段,如下图所示。

在分析阶段,输入被解析、解析并转换为树结构,作为 SQL 语句的抽象。查询表目录以获取表名称和列类型等信息。

在逻辑优化步骤中,在逻辑层对树进行评估和优化。一些常见的优化包括谓词下推、模式裁剪和空传播。此步骤生成一个逻辑计划,概述查询所需的计算。由于它是逻辑表示,逻辑计划缺乏在实际节点上运行所需的细节。

物理规划充当逻辑层和物理层之间的桥梁。物理计划指定了执行计算的精确方式。例如,在逻辑计划中,可能有一个连接节点指示连接操作,而在物理计划中,连接操作可以指定为sort-merge连接或broadcast-hash连接,具体取决于相关表的大小估计。选择最佳物理计划用于代码生成和实际执行。

这三个阶段是 Catalyst Optimizer[1] 提供的功能。要进一步研究该主题可以探索此处[2]此处[3]链接的精彩演讲。

在执行过程中,Spark 应用程序在称为 RDD(弹性分布式数据集)的基础数据结构上运行。RDD 是 JVM 对象的集合,这些对象是不可变的、跨节点分区的,并且由于跟踪数据沿袭信息而具有容错能力。当应用程序运行时,将执行计划的计算:RDD 被转换并执行操作以产生结果。这个过程通常也称为 RDD 的“物化”。

数据源API

当 Catalyst Optimizer 制定查询计划时,连接到数据源变得有利,可以将优化下推。Spark 的 DataSource API 旨在提供与各种数据源集成的可扩展性。有些源是开箱即用的,例如 JDBC、Hive 表和 Parquet 文件。Hudi 表由于特定的数据布局而代表了另一种类型的自定义数据源。

Spark-Hudi 读取流程

下图展示了Spark-Hudi读取流程中的一些关键接口和方法调用。

1. DefaultSource 作为集成的入口点,将数据源的格式定义为 org.apache.hudihudi。它提供了一个 BaseRelation 实现,我将其设想为建立一个“关系”来简化表中的数据访问。

2. buildScan() 是一个核心 API,用于将过滤器传递到数据源以进行优化。Hudi定义了collectFileSplits()来收集相关文件。

3. collectFileSplits() 将所有过滤器传递给 FileIndex 对象,该对象有助于识别要读取的必要文件。

4. FileIndex 定位所有相关的 FileSlice 以进行进一步处理。

5. 识别 FileSlices 后调用 composeRDD()。

6. FileSlice 作为 RDD 加载和读取。对于 Parquet 中的基本文件等列式文件,此读取操作通过仅读取必要的列来最大限度地减少传输的字节。

7. RDD 从 API 返回,用于进一步规划和代码生成。

请注意上述步骤仅提供读取流程的高级概述,省略了读取模式支持和高级索引技术(例如使用元数据表跳过数据)等细节。

该流程对于 Spark 的所有 Hudi 查询类型都是通用的。在以下部分将解释各种查询类型的工作原理。除读取优化外,所有这些都适用于 CoW 和 MoR 表。

快照查询

这是读取 Hudi 表时的默认查询类型。它的目的是从表中检索最新记录,本质上捕获查询时表的“快照”。在 MoR 表上执行时,会发生日志文件与基本文件的合并,并导致一些性能影响。

启动带有 Hudi 依赖的 Spark SQL Shell 后可以运行这些 SQL 来设置一个 MoR 表,其中插入和更新了一条记录。

create table hudi_mor_example (
  id int,
  name string,
  price double,
  ts bigint
) using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
) location '/tmp/hudi_mor_example';
set hoodie.spark.sql.insert.into.operation=UPSERT;
insert into hudi_mor_example select 1, 'foo', 10, 1000;
insert into hudi_mor_example select 1, 'foo', 20, 2000;
insert into hudi_mor_example select 1, 'foo', 30, 3000;
可以通过运行如下所示的 SELECT 语句来执行快照查询,它将检索记录的最新值。
spark-sql> select id, name, price, ts from hudi_mor_example;
1       foo     30.0    3000
Time taken: 0.161 seconds, Fetched 1 row(s)

读取优化 (RO) 查询

RO 查询类型被设计为较低的读取延迟与可能较旧的结果的权衡,因此,它专门适用于 MoR 表。进行此类查询时,collectFileSplits() 将仅获取 FileSlices 的基本文件(Parquet文件)。

上面提供的设置代码自动生成一个名为 hudi_mor_example_ro 的目录表,该表指定属性 hoodie.query.as.ro.table=true。此属性指示查询引擎始终执行 RO 查询。运行下面的 SELECT 语句将返回记录的原始值,因为后续更新尚未应用于基本文件。

spark-sql> select id, name, price, ts from hudi_mor_example_ro;
1       foo     10.0    1000
Time taken: 0.114 seconds, Fetched 1 row(s)
时间旅行查询

通过指定时间戳,用户可以请求Hudi表在给定时间的历史快照。正如前面第 1 篇文章中所讨论的,FileSlices 与特定的提交时间相关联,因此支持过滤。执行时间旅行查询时,如果没有完全匹配,FileIndex 仅查找与指定时间相对应或早于指定时间的 FileSlice。

spark-sql> select id, name, price, ts from hudi_mor_example timestamp as of '20230905221619987';
1       foo     30.0    3000
Time taken: 0.274 seconds, Fetched 1 row(s)
spark-sql> select id, name, price, ts from hudi_mor_example timestamp as of '20230905221619986';
1       foo     20.0    2000
Time taken: 0.241 seconds, Fetched 1 row(s)

第一个 SELECT 语句精确地在最新插入的 deltacommit 时间执行时间旅行查询,提供表的最新快照。第二个查询设置的时间戳早于最新插入的时间戳,从而生成倒数第二个插入的快照。

示例中的时间戳遵循 Hudi 时间线的格式"yyyyMMddHHmmssSSS"。也可以以"yyyy-MM-dd HH:mm:ss.SSS"或"yyyy-MM-dd"的形式设置。

增量查询

用户可以设置起始时间戳(带或不带结束时间戳)以检索指定时间窗口内更改的记录。如果没有设置结束时间,则时间窗口将包括最近的记录。Hudi 还通过在写入端启用附加日志并为增量读取器激活 CDC 模式来提供完整的更改数据捕获 (CDC) 功能。更多详细信息将在专门介绍增量处理的单独帖子中介绍。

回顾

在这篇文章中,我们概述了 Spark 的 Catalyst 优化器,探讨了 Hudi 如何实现 Spark DataSource API 来读取数据,并介绍了四种不同的 Hudi 查询类型。代码片段也可以在这里[4]找到。在接下来的文章中将演示写入流程以进一步加深我们对 Hudi 的理解。

目录
相关文章
|
1月前
|
存储 Apache
Apache Hudi Savepoint实现分析
Apache Hudi Savepoint实现分析
36 0
|
1月前
|
Apache
Apache Hudi Rollback实现分析
Apache Hudi Rollback实现分析
26 0
|
1月前
|
存储 SQL 分布式计算
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
116 0
|
1月前
|
存储 分布式计算 Hadoop
一文了解Apache Hudi架构、工具和最佳实践
一文了解Apache Hudi架构、工具和最佳实践
117 0
|
1月前
|
SQL 分布式计算 NoSQL
使用Apache Hudi和Debezium构建健壮的CDC管道
使用Apache Hudi和Debezium构建健壮的CDC管道
19 0
|
1月前
|
存储 SQL 消息中间件
Apache Hudi:统一批和近实时分析的存储和服务
Apache Hudi:统一批和近实时分析的存储和服务
36 0
|
1月前
|
分布式计算 API Apache
解锁Apache Hudi删除记录新姿势
解锁Apache Hudi删除记录新姿势
42 0
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
484 5
|
1月前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1425 1
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
1月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1368 1
官宣|Apache Flink 1.19 发布公告

推荐镜像

更多