Apache Hudi与Apache Flink更好地集成,最新方案了解下?

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Hudi与Apache Flink更好地集成,最新方案了解下?

1. 现有架构

现有Flink写Hudi架构如下

现有的架构存在如下瓶颈

InstantGeneratorOperator并发度为1,将限制高吞吐的消费,因为所有的split都将会打到一个线程内,网络IO会有很大压力;WriteProcessOperator算子根据分区处理输入数据,在单个分区处理,BUCKET逐一写入,磁盘IO也会有很大压力;通过checkpoint缓存数据,但checkpoint应该比较轻量级并且不应该有一些IO操作;FlinkHoodieIndex对per-job模式有效,不适用于其他Flink作业;

2. 改进方案

2.1 步骤1:移除并发度为1的算子

解决第一个瓶颈。

可以通过为写入算子实现一个算子协调器WriteOperatorCoordinator来避免使用并行度为1的算子InstantGeneratorOperator,协调器会基于checkpoint开始新的提交。

2.1.1 工作流

写方法首先会将数据缓存为一批HoodieRecord

当Flink checkpoint开始时,开始写一批数据,当一批数据写成功后,方法会通知StreamWriteOperaorCoordinator成功写入;

2.1.2 Exactly-once语义

通过缓存checkpoint之间的数据来实现exactly-once语义,算子协调器在触发checkpoint时会在Hoodie的timeline上创建一个新的instant,协调器总是会在其算子之前开始checkpoint,所以当方法开始checkpoint时,已经存在了REQUESTED HoodieInstant

方法处理线程开始阻塞数据缓存,然后checkpoint线程开始刷出之前缓存的数据,当刷出成功后,线程不再阻塞并且开始为新一轮的checkpoint缓存数据。

因为checkpoint失败会触发写回滚,实现了exactly-once语义。

2.1.3 容错

算子协调器在生成新的instant时会检查上一个instant的合法性,如果写入失败会进行回滚处理,算子协调器在提交写入状态时会进行多次重试以减少提交状态的失败概率。

注意:需要按照分区字段对输入数据进行分区以避免不同的线程写入相同的FileGroup,一般场景下时间字段为分区字段,所以sink task非常可能会有IO瓶颈,更灵活的方式是根据FileGroupId进行数据shuffle(步骤2解决)。

2.2 步骤2:更灵活的写入线程

解决第二个瓶颈。

对于每一个分区,WriteProcessOperator处理所有的逻辑,包括index/bucket/数据写入:

索引INSERT/UPDATE记录;使用PARTITIONER确定每条记录的Bucket(FileID)逐一写Bucket

第三步的单线程处理是瓶颈所在。为解决这个瓶颈,将WriteProcessOperator划分为FileIdAssignerBucketWriter

2.2.1 FileIdAssigner

FileIdAssigner对每条记录处理如下

BucketAssigner为每条记录创建一个分区写入Profile,其是分配BucketID(Partition Path+FileID)的关键;查找索引以确定记录是否为UPDATE,如果记录是UPDATE,那么关联已有的fileID,如果是INSERT,根据配置的文件大小确定fileID;向下游发送带有fileID的记录;

FileIdAssigner的输出记录可以通过fileID进一步shuffle到BucketWriter

2.2.2 BucketWriter

BucketWriter的输入为HoodieRecord,然后逐一写Bucket;

第二步需要重构已有的Flink客户端(HoodieFlinkWriteClient),当前代码中HoodieFlinkWriteClient将处理步骤二中的所有的任务,这种模式适用于Spark,但对Flink不太合适,对于Flink而言,需要做一些重构(移除index/bucket)以便让client更轻量级,专注于数据写入。

2.3 步骤3:Mini-batch模式写

解决第三个瓶颈。

BucketWriterCoordinator开始时会开始一个新的instant(不同于步骤1和步骤2中从新的checkpoint开始)新的checkpoint开始时,BucketWriter会阻塞并且刷出缓存数据,有异步线程消费缓存数据(在第一个版本中是在#processElement方法中刷出数据)并刷出。对于BucketWriteCoordinator,如果checkpoint的数据写入成功(获取一个checkpoint成功通知),检查并提交INFLIGHT状态的instant,同时还是新的instant。

2.3.1 Exactly-once语义

为提高吞吐,当checkpoint线程开始刷出缓存数据时,处理线程不再阻塞数据的缓存。当checkpoint失败触发回滚操作时,会有一些重复的数据,但是在UPSERT操作下语义依然正确。

当支持一条条记录写入而非一批记录时,可以支持Exactly-Once语义。

2.3.2 容错

在进行checkpoint时,不再阻塞数据缓存,因此很可能有一个mini-batch缓存刷出,当checkpoint失败时,会重新消费之前的缓存数据,会重复写入该缓存数据。

当checkpoint完成时,协调器检查并提交上一次instant,同时开始新的instant。当发生错误时,将会回滚写入的数据,这意味着一个Hoodie Instant可能会跨不同的checkpoint。如果一个checkpoint超时,那么下一次checkpoint将会刷出剩余的缓存数据。

2.4 步骤四:新的索引

解决第四个瓶颈。

新的索引基于BloomFilter索引,其步骤如下

从state中查找一条记录是否为UPDATE,如果为INSERT,不做任何处理;如果记录是UPDATE,使用BloomFilter索引查找候选文件,查找这些文件并且将所有的index信息放入状态;

当所有文件都被加载后,则可标识为纯状态模式,后面可以仅仅只查询状态即可。

新的索引可适用于不同的Flink作业写入;

3. 兼容性

算子协调器在Flink 1.11引入,为兼容低于1.11版本,需要添加一个不使用算子协调器的pipeline

input operator => the instant generator => fileID assigner => bucket writer => commit sink

其中使用了instant generator替换协调器。

注意该pipeline无法使用mini-batch模式,因为没有组件协调mini-batch,也无法控制算子checkpoint的通知顺序,所以无法在checkpoint完成后开始新的instant。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
39
分享
相关文章
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
265 2
ClickHouse与大数据生态集成:Spark & Flink 实战
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
166 43
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
372 12
Flink CDC YAML:面向数据集成的 API 设计
企业级API集成方案:基于阿里云函数计算调用DeepSeek全解析
DeepSeek R1 是一款先进的大规模深度学习模型,专为自然语言处理等复杂任务设计。它具备高效的架构、强大的泛化能力和优化的参数管理,适用于文本生成、智能问答、代码生成和数据分析等领域。阿里云平台提供了高性能计算资源、合规与数据安全、低延迟覆盖和成本效益等优势,支持用户便捷部署和调用 DeepSeek R1 模型,确保快速响应和稳定服务。通过阿里云百炼模型服务,用户可以轻松体验满血版 DeepSeek R1,并享受免费试用和灵活的API调用方式。
118 12
方案实践测评 | DataWorks集成Hologres构建一站式高性能的OLAP数据分析
DataWorks在任务开发便捷性、任务运行速度、产品使用门槛等方面都表现出色。在数据处理场景方面仍有改进和扩展的空间,通过引入更多的智能技术、扩展数据源支持、优化任务调度和可视化功能以及提升团队协作效率,DataWorks将能够为企业提供更全面、更高效的数据处理解决方案。
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
103 1
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
828 2
Flink CDC:新一代实时数据集成框架
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
87 0
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
74 0

热门文章

最新文章

推荐镜像

更多