基于 Apache Hudi 构建增量和无限回放事件流的 OLAP 平台

本文涉及的产品
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
简介: 我们将讨论在构建流数据平台时如何利用 Hudi 的两个最令人难以置信的能力。增量消费--每 30 分钟处理一次数据,并在我们的组织内构建每小时级别的OLAP平台事件流的无限回放--利用 Hudi 的提交时间线在超级便宜的云对象存储(如 AWS S3)中存储 10 天的事件流(想象一个具有 10 天保留期的 kafka 主题)具有部分记录更新的自定义 Hudi Payload 类

1. 摘要


在本博客中,我们将讨论在构建流数据平台时如何利用 Hudi 的两个最令人难以置信的能力。

  • 增量消费--每 30 分钟处理一次数据,并在我们的组织内构建每小时级别的OLAP平台
  • 事件流的无限回放--利用 Hudi 的提交时间线在超级便宜的云对象存储(如 AWS S3)中存储 10 天的事件流(想象一个具有 10 天保留期的 kafka 主题)
  • 具有部分记录更新的自定义 Hudi Payload 类


2. 当前状态


2.1 问题说明

对于大多数业务需要手动干预以通过查看 KPI 和数据趋势来决定下一组操作用例以及其他不太实时的用例,我们需要具有成本效益和高性能的近实时系统。

但是我们在数据湖中获得的数据通常以 D -1 的每日批处理粒度出现,即使我们每天不止一次地运行这些日常批处理数据处理系统以获取当前 D 的最新数据,这些批处理系统的固有局限性也无助于我们解决近实时业务用例。


2.2 挑战

在将批处理数据摄取到我们的数据湖时,我们支持 S3 的数据集在每日更新日期分区上进行分区。即使我们每天多次运行这些批处理系统,我们从上游 Kafka 或 RDBMS 应用程序数据库中提取的最新批处理也会附加到 S3 数据集中当前日期的分区中。

当下游系统想要从我们的 S3 数据集中获取这些最新记录时,它需要重新处理当天的所有记录,因为下游进程无法在不扫描整个数据分区的情况下从增量记录中找出已处理的记录。

此外如果我们按小时(而不是每日分区)对 S3 数据集进行分区,那么这会将分区粒度设置为每小时间隔。任何试图以低于一小时(例如最后 x 分钟)的粒度获取最新更新的下游作业都必须在每次运行时再次重新处理每小时数据分区,即这些批处理源将错过解决近实时用例所需的关键增量数据消费。


2.3 无限播放事件流

现在回到帮助我们解决这些挑战的 Apache Hudi 的特性,让我们首先尝试了解commit(提交)和commit timeline(提交时间线)如何影响增量消费和事件流保留/回放。

Hudi 维护了在不同时刻在表上执行的所有操作的时间表,这些commit(提交)包含有关作为 upsert 的一部分插入或重写的部分文件的信息,我们称之为 Hudi 的提交时间线。

对于每个 Hudi 表,我们可以选择指定要保留多少历史提交,要保留的默认提交是 10 次,即在 10 次提交之后,第 11 次提交将另外运行一个清理服务,该服务将清除第一次提交历史记录。

清理commit(提交)时,清理程序会清理与该提交对应的部分文件的过时版本,相关数据被保留,因为过时的文件中的所有数据无论如何都存在于新版本的文件中,这里重要的是我们可以触发快照查询来获取数据的最新状态,但我们将无法对已清理的提交运行增量查询来获取增量数据。

简而言之,如果清除了commit(提交),我们就失去了从该commit(提交)回放事件流的能力,但是我们仍然可以从任何尚未清理的commit(提交)中回放事件流。

在我们的例子中,我们将 Hudi 表配置为保留 10K 提交,从而为我们提供 10 天的增量读取能力(类似于保留 10 天的 kafka 主题)

我们保留的历史提交数量越多,我们就越有能力及时返回并重放事件流。


3. 每小时 OLAP


让我快速展示一下我们的端到端消息 OLAP 计算管道与 10 天事件流的架构

60.jpg

在 kafka 层,我们的 kafka 输入源每个都有 1 天的主题保留期。

在摄取层,我们有 Spark 结构化流作业,从 kafka 源读取数据并将微批处理写入 S3 支持的 Hudi 表。 这是我们配置为保持 10k 提交以启用 10 天事件流播放的地方。

.option("hoodie.cleaner.commits.retained", 10000)
.option("hoodie.keep.max.commits", 10002)
.option("hoodie.keep.min.commits", 10001)

计算层由我们当前每 30 分钟运行一次的批处理 Spark 作业组成,并重新处理我们在过去 60 分钟内摄取到 Hudi 表中的所有事件。 每小时 OLAP 作业读取两个跨国表和可选的 N 维表,并将它们全部连接起来以准备我们的 OLAP 增量DataFrame。

我们每 30 分钟处理一次 60 分钟的数据,以增强表连接的一致性。

有趣的是生产系统中通常不建议保留 1 天的 kafka 保留期,但是我们能够进行这种权衡以节省一些 SSD 和 Kafka 代理成本,因为无论如何我们都可以通过 S3 支持的 Hudi 表实现 10 天的事件流播放能力。


4. 部分记录更新


上面的管道显示了我们如何通过读取和合并两个增量上游数据源来创建每小时增量 OLAP。

然而这些增量数据处理有其自身的挑战。 可能会发生在两个上游表中,对于主键,我们在其中一个数据源中获得更新,但在另一个数据源中没有,我们称之为不匹配的交易问题。

下面的插图试图帮助我们理解这一挑战,并看看我们实施的解决方案。

61.jpg

在这里,表A和B都有一些对应的匹配事务和一些不匹配的事务。使用内部连接将简单地忽略不匹配的事务,这些事务可能永远不会流入我们的基础 OLAP。相反使用外连接会将不匹配的事务合并到我们的每小时增量数据加载中。但是使用外连接会将缺失的列值添加为 null,现在这些空值将需要单独处理。

在使用默认有效负载类将此每小时增量数据更新到基础 Hudi OLAP 时,它将简单地用我们准备的每小时增量数据中的新记录覆盖基础 Hudi OLAP 中的记录。但是通过这种方式,当我们用传入记录中的空列值覆盖现有记录时,我们将丢失现有记录中可能已经存在的信息。因此为了解决这个问题,我们提供了我们的自定义部分行更新有效负载类,同时将外部连接的每小时增量数据插入到基础 Hudi OLAP。有效负载类定义了控制我们在更新记录时如何合并新旧记录的函数。

我们的自定义有效负载类比较存储和传入记录的所有列,并通过将一条记录中的空列与另一条记录中的非空列重叠来返回一条新记录。

因此即使只有一个上游表得到了更新,我们的自定义有效负载类也会使用这个部分可用的新信息,它会返回包含部分更新信息的完全最新记录。

由于存储和部分行更新记录的主键和分区键相同,因此 Hudi upsert 操作会自动更新旧记录,从而为我们提供基本 OLAP 的去重和一致视图。

有关如何编写自己的有效负载类的更多技术细节。


5. 结语


结合这三个概念,即增量消费、增量每小时 OLAP 处理和自定义部分行更新有效负载类,我们为我们的独角兽初创公司构建了一个强大的流处理平台,以使其一直扩展成为一个百角兽组织。

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
相关文章
|
1月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
77 5
|
1月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
67 4
|
2月前
|
消息中间件 分布式计算 大数据
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
68 5
|
1月前
|
存储 数据挖掘 数据处理
巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践
随着数据湖技术的发展,企业纷纷探索其优化潜力。本文分享了巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践。Paimon 支持流式和批处理,提供高性能、统一的数据访问和流批一体的优势。通过示例代码和实践经验,展示了如何高效处理实时数据,解决了数据一致性和故障恢复等挑战。
110 61
|
1月前
|
消息中间件 人工智能 监控
Paimon x StarRocks 助力喜马拉雅直播实时湖仓构建
本文由喜马拉雅直播业务与仓库建设负责人王琛撰写,介绍了喜马拉雅直播业务的数据仓库架构迭代升级。文章重点分享了基于 Flink + Paimon + StarRocks 实现实时湖仓的架构及其成效,通过分钟级别的收入监控、实时榜单生成、流量监测和盈亏预警,大幅提升了运营效率与决策质量,并为未来的业务扩展和 AI 项目打下坚实基础。
175 5
Paimon x StarRocks 助力喜马拉雅直播实时湖仓构建
|
2月前
|
Java 大数据 数据库连接
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
31 2
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
|
1月前
|
SQL 存储 数据挖掘
快速入门:利用AnalyticDB构建实时数据分析平台
【10月更文挑战第22天】在大数据时代,实时数据分析成为了企业和开发者们关注的焦点。传统的数据仓库和分析工具往往无法满足实时性要求,而AnalyticDB(ADB)作为阿里巴巴推出的一款实时数据仓库服务,凭借其强大的实时处理能力和易用性,成为了众多企业的首选。作为一名数据分析师,我将在本文中分享如何快速入门AnalyticDB,帮助初学者在短时间内掌握使用AnalyticDB进行简单数据分析的能力。
41 2
|
2月前
|
存储 SQL 分布式计算
湖仓一体架构深度解析:构建企业级数据管理与分析的新基石
【10月更文挑战第7天】湖仓一体架构深度解析:构建企业级数据管理与分析的新基石
105 1
|
2月前
|
存储 SQL 缓存
Apache Doris 3.0 里程碑版本|存算分离架构升级、湖仓一体再进化
从 3.0 系列版本开始,Apache Doris 开始支持存算分离模式,用户可以在集群部署时选择采用存算一体模式或存算分离模式。基于云原生存算分离的架构,用户可以通过多计算集群实现查询负载间的物理隔离以及读写负载隔离,并借助对象存储或 HDFS 等低成本的共享存储系统来大幅降低存储成本。
Apache Doris 3.0 里程碑版本|存算分离架构升级、湖仓一体再进化
|
1月前
|
缓存 监控 大数据
构建高可用AnalyticDB集群:最佳实践
【10月更文挑战第25天】在大数据时代,数据仓库和分析平台的高可用性变得尤为重要。作为阿里巴巴推出的一款完全托管的PB级实时数据仓库服务,AnalyticDB(ADB)凭借其高性能、易扩展和高可用的特点,成为众多企业的首选。本文将从我个人的角度出发,分享如何构建和维护高可用性的AnalyticDB集群,确保系统在各种情况下都能稳定运行。
28 0

热门文章

最新文章

推荐镜像

更多