基于Flink的高吞吐精确一致性入湖实现

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: AnalyticDB助力企业降本增效,构建企业级数据分析平台

1.概览

AnalyticDB MySQL高度兼容MySQL协议,支持毫秒级更新,亚秒级查询,可以对海量数据进行即时的多维分析透视和业务探索;而最新推出的AnalyticDB MySQL湖仓(下文简称ADB湖仓版)支持低成本离线处理能力完成数据的清洗加工,同时提供高性能在线分析能力完成数据的洞察探索,真正做到数据湖的规模,数据库的体验。帮助企业降本增效,构建企业级数据分析平台。

APS(ADB Pipeline Service)简介:ADB湖仓版在深化自身湖仓能力建设的同时,还推出了APS(ADB Pipeline Service)数据通道组件,为客户提供实时数据流服务实现数据低成本、低延迟入湖入仓。本文以数据源SLS如何通过APS实现高速精确一致性入湖为例,介绍相关的挑战和解决方法。在数据通道的构建上,我们选择Flink作为基础引擎。Flink作为业界熟知的大数据处理框架,其流批一体架构有助于我们处理多种场景。而ADB的湖则建设在Hudi之上,Hudi作为成熟的数据湖底座已经被多家大型企业实际应用,ADB在其上也已积累多年经验,如今ADB湖仓版把湖和仓进行深度融合提供更加一体化的解决方案。

数据入湖的精确一致性挑战:在入湖通道中,有可能出现异常,升级,扩缩容等场景导致链路重启,触发部分已处理数据从源端重放,导致在目标端出现重复数据。为解决此问题一种做法是配置业务主键,并使用Hudi的Upsert能力来达到幂等写入目的。但SLS入湖的吞吐目标是每秒GB级别(某个业务的需求是4GB/s)且需要控制成本,Hudi Upsert难以满足需求,而SLS数据本身就具有Append特征,因此采用Hudi的Append Only模式写入实现高吞吐,并用其他机制保证数据不重不丢的精确一致性。


2.端到端精确一致性的问题和解决方案

1.png

流计算的一致性保障一般包含如下几种:

2.png

精确一致的语义是所有一致性语义中要求最高的,但流计算中的Exactly-Once一般是指内部状态的精确一致性(Exactly-Once State Consistency),而业务需要的是端到端Exactly Once,即当出现异常Failover场景时,最终目标端的数据需要与源端保持一致,数据不重不丢。

2.1 端到端精确一致性问题

要实现精确一致性,就必然要考虑Failover场景,即当系统宕机任务重启时,如何恢复到某个一致性状态。Flink之所以称为Stateful Stream Processing是其可通过Checkpoint机制保存状态到后端存储,并在重启时从后端存储恢复到某个一致性状态。但在状态的恢复中,其仅仅保证了Flink自身的状态一致,而在包含源端、Flink、目的端这样的完整系统中,仍可能产生数据丢失或重复,导致端到端出现不一致。下面用一个字符连接的例子说明数据重复问题。

下图是一个字符串连接的处理过程,处理逻辑是从源端读取一个个的字符,并对它们进行连接,每连接一个字符就向目标端输出一次,最终输出a, ab, abc ...多个不重复的字符串。

本例的场景中,Flink的Checkpoint保存了已完成的字符连接ab,以及对应的源端位点(Checkpoint箭头指向的位点)。Current则指向当前正在处理的位点,此时已经向目标端输出了a, ab, abc。当发生异常重启时,Flink从Checkpoint恢复自身状态,回退位点并重新处理字符c,并再次向目标端输出abc,导致abc出现重复。

在这个例子中,Filnk通过Checkpoint恢复状态,因此不会出现abb,或者abcc这种重复处理字符的情况,也不会出现ac种丢失字符的情况,保证了其自身的Exactly-Once。但是在目标端出现了两个重复的abc,因此没有保证端到端的Exactly-Once

3.png

4.png

2.2 端到端精确一致性方案

Flink本身是一种复杂的分布式系统,其内部包含Source、Sink等算子,同时还存在Slot等并行关系,这样的系统要实现精确一致一般会想到两阶段提交,实际上Flink的Checkpoint就是一种两阶段提交的实现。

而在端到端中,Flink和Hudi又组成了另一个分布式系统,这个分布式系统要实现精确一致性,就需要另一套两阶段提交的实现(我们这里不讨论SLS端,因为在本场景中Flink不会改变SLS的状态,只利用SLS的位点重放能力即可)。因此端到端中,是由Flink和Flink + Hudi两套两阶段提交来保证精确一致性(见下图)。

5.png

Flink的Checkpoint两阶段实现不再赘述,后文会重点介绍Flink + Hudi两阶段提交的实现,定义出哪些是Precommit阶段,哪些是Commit阶段,同时发生异常时如何从故障中恢复保证Flink和Hudi的状态一致。例如Flink已经完成Checkpoint,而Hudi尚未完成Commit,如何恢复到一致性状态,这些在后续章节介绍。

3.SLS入湖链路端到端精确一致实现

下面介绍SLS入湖链路精确一致性的实现。整体架构如下,Hudi的组件是部署在Flink JobManager和TaskManager上的。SLS作为数据源,由Flink读取处理后,写出到Hudi表。因为SLS是多shard存储,因此会由Flink的多个Source算子并行读取。数据读取后通过Sink算子调用Hudi Worker写出到Hudi表。当然实际的链路中还会有Repartition,热点打散等逻辑,这些在图中做了简化。Flink Checkpoint的后端存储,以及Hudi数据的存储都是放在OSS上。

6.png

3.1 SLS Source算子

如何实现Source算子消费SLS数据已有大量介绍,此处不在赘述。这里介绍SLS的两种消费模式:消费组模式普通消费模式以及他们的区别。

消费组模式

顾名思义,多个消费者可注册到同一个消费组,SLS会自动把Shard分配给这些消费者来读取。其优点是由SLS的消费组来管理负载均衡。如下图左,消费组中首先注册了两个消费者,因此SLS把6个Shard均匀分配给这2个消费者。当有新的消费者注册(如下图右),则SLS会自动均衡,把部分Shard从旧消费者迁移到新消费者,称为shard transfer。

这种模式的优点是自动均衡,且在SLS Shard分裂/合并时会自动分配消费者。但该模式在我们的场景中却会引起问题。为保证精确一致,我们把SLS各Shard的当前位点保存在Flink Checkpoint中,运行中也是由各Slot上的Source算子持有当前消费位点。如果发生shard transfer,如何保证旧Slot上的算子不再消费,同时把位点转移给新Slot,这引入了新的一致性问题。尤其是大规模系统有数百个SLS Shard和数百个Flink Slot的情况下,很可能出现部分Source比其他Source先注册到SLS导致shard transfer不可避免。

7.png

 普通消费模式

这种模式就是调用SLS的SDK直接指定shard、offset来消费数据,而不是由SLS消费组进行分配,因此不会出现shard transfer。如下,因为Flink的Slot为3,因此可提前计算出每个消费者消费2个Shard并据此分配,即使Source 3尚未ready,也不会把Shard 5和6分配给Source 1和2。可以想象,为了负载均衡(例如某些TaskManager的负载过高时),仍然需要迁移shard,但此时迁移是我们主动触发的,状态更加可控,从而避免一致性问题。

8.png

3.2 Hudi Sink算子

下面介绍下Hudi提交的相关概念,以及如何与Flink配合实现两阶段提交和容错达到精确一致。

3.2.1 Hudi提交相关概念

时间轴和Instant

9.png

Hudi维护着一条Timeline,Instant是指某个时间点(Time)发起的对表的操作(Action)及表所处的状态(State)的集合。一个Instant可以理解为一个数据版本,Action可能是对表的Commit,Rollback或者Clean等操作,这些操作由Hudi保证了其原子性,因此Hudi的Instant实际类似于数据库中的事务和版本的概念。在图中我们用Start Transaction,Write Data,Commit这种类似数据库事务的方式来表达某个Instant的执行过程。Instant中,部分Action的含义如下:

  • Commit:将记录原子写入数据集
  • Rollback :当Commit不成功时进行回滚,其会删除在写入中产生的脏文件
  • Clean :删除数据集中不再需要的旧版本和文件

Instant State一共有三种状态:

  • Requested:操作已被计划但未被执行,可以理解为Start Transaction
  • Inflight:操作正在进行,可理解为Write Data
  • Completed:操作完成,可理解为Commit

Instant的Time、Action和State都在元数据文件中描述,下图表示了时间轴上两个先后的Instant。Instant 1的.hoodie目录下的元数据文件描述了Instant的开始时间是2022-10-17 16:05:00,Action是Commit,State可以看到已完成提交(有20221017160500.commit文件),在表的分区目录下则是该Instant对应的parquet数据文件。而Instant 2则可以看到发生在第二天,且Action正在执行尚未提交。

10.png

Hudi提交过程

Hudi提交过程可以用下图理解。Hudi中存在两种角色,Coordinator负责发起Instant和完成提交,Worker负责写入数据。

11.png

  1. 当开启一个事务时,Coordinator会分配一个Instant并传递给所有worker;
  2. Worker开始写入数据;
  3. 开始提交时,Coordinator发送commit给各Worker。各Worker收到提交命令,flush data到持久化存储,并反馈自己的状态给Coordinator。Coordinator确认各Worker commit完成,然后在.hoodie目录中写commit文件完成全局提交。

3.2.2 Flink + Hudi的两阶段提交

了解了Hudi的写入和提交过程,它如何与Flink配合完成数据的写入和提交就可以用下图表达了。

12.png

  1. 开启一个Hudi Instant;
  2. 由Filnk Sink发送数据给Hudi Worker写出;
  3. 发生Flink Checkpoint时,则通过Sink算子通知Worker flush数据,同时持久化operator-state(operator-state属于Flink checkpoint框架的一部分,持久化Hudi Worker所处的Instant等信息);
  4. 当Flink完成Checkpoint的持久化,则通过notifyCheckpointComplete机制通知Hudi Coordinator提交该Instant。Hudi Coordinator此时完成最终提交,写commit文件,数据对外可见;
  5. 结束Instant后,会立即开启一个新的Instant,重启上述循环。

3.2.3 Flink + Hudi两阶段提交的容错处理

13.png

实际的提交可简化为如上的流程。从图中可见,1到3是Flink的Checkpoint逻辑,如果异常在这些步骤上发生,则认为Checkpoint失败,触发Job重启,从上一次Checkpoint恢复,相当于两阶段提交的Precommit阶段失败,事务回滚。当3到4之间发生异常,则会出现Flink和Hudi状态不一致。此时Flink认为Checkpoint已结束,而Hudi实际尚未提交。如果对此情况不做处理,则发生了数据丢失,因为Flink Checkpoint完毕后,SLS位点已经前移,而这部分数据在Hudi上并未完成提交,因此容错的重点是如何处理此阶段引起的一致性问题。解决方法是Flink Job重启并从Checkpoint恢复时,发现Hudi最新的Instant有未提交的写入,需要保证执行Recommit。Recommit的流程如下图所示。

14.png

之前已提到Hudi Worker在Checkpoint时除了flush data,还持久化了一个operator-state,在这里记录了Worker当时所处的Instant信息。Job从Checkpoint恢复时,Sink算子会读取operator-state,Hudi Worker从中恢复持久化的Instant信息;

Hudi worker汇报给自己的Instant给Coordinator;

Hudi Coordinator会从Instant Timeline中获取最新的Instant信息,并接收所有Worker的汇报;

如果Worker汇报的Instant与Timeline中最新的一样,且该Instant尚未提交,则触发Recommit。如果Worker汇报的Instant与最新的不同,则认为上一次Instant执行失败,这份数据对用户不可见,回滚掉即可。


可以想像下是否会存在重启时,部分Hudi worker在最新的Instant,而部分worker在旧的Instant的情况?答案是不会,因为Flink的Checkpoint就是相当于两阶段提交的Precommit阶段,如果Checkpoint完成则说明Hudi Precommit完成,所有Worker都处于最新Instant。如果Checkpoint失败,则重启时会回到上一个Checkpoint,此时Hudi worker所处的状态也是一致的,全部都回退到旧Instant。


4.总结

在数据入湖异常时的Failover处理中,Source通过Checkpoint中持久化的SLS位点,不会重放已处理的数据,保证数据不重,Sink通过Flink和Hudi配合实现的两阶段提交和Recommit机制,保证数据不丢,最终实现Exactly-Once。经过实测这套机制对性能的影响约在3% ~ 5%,以极小的代价保证精确一致性的情况下,实现了高吞吐实时入湖。在某个海量日志入湖项目中,日常吞吐达到3GB/s,峰值吞吐达到5GB/s,数据通道稳定运行,并配合ADB湖仓版的离在线一体化引擎,实现了用户的数据实时入湖,离在线一体化分析需求。

除了精确一致性外,为实现高吞吐写和查,数据通道中还有自动热点打散,小文件合并等诸多挑战,将在后续文章中进行介绍。


AnalyticDB MySQL湖仓版已正式上线商用,对于低成本离线处理ETL有需求,同时又需要使用高性能在线分析支撑BI报表/交互式查询/APP应用的用户,欢迎购买和体验!

另外对湖仓版感兴趣的客户也依然可以填写问卷来进行试用,点击此处填写AnalyticDB MySQL 3.0湖仓版试用申请

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
SQL 缓存 Java
Flink SQL 核心解密 —— 提升吞吐的利器 MicroBatch
之前我们在 Flink SQL 中支持了 MiniBatch, 在支持高吞吐场景发挥了重要作用。今年我们在 Flink SQL 性能优化中一项重要的改进就是升级了微批模型,我们称之为 MicroBatch,也叫 MiniBatch2.0。
5389 0
|
5月前
|
SQL 数据库 流计算
Flink CDC数据读取问题之一致性如何解决
Flink CDC 使用Change Data Capture (CDC)技术从数据库捕获变更事件,并利用Flink的流处理能力确保数据读取一致性。相较于传统工具,它具备全增量一体化数据集成能力,满足实时性需求。在实践中解决了高效数据同步、稳定同步大量表数据等问题。应用场景包括实时数据同步、实时数据集成等。快速上手需学习基本概念与实践操作。未来发展方向包括提升效率与稳定性,并依据用户需求持续优化。
162 1
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之全量和增量同步数据的一致性、不丢失和不重复读取可以通过什么方式保证
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何确保多并发sink同时更新Redis值时,数据能按事件时间有序地更新并且保持一致性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 存储 算法
Flink---13、容错机制(检查点(保存、恢复、算法、配置)、状态一致性、端到端精确一次)
Flink---13、容错机制(检查点(保存、恢复、算法、配置)、状态一致性、端到端精确一次)
|
存储 关系型数据库 MySQL
如何实现基于Flink的高吞吐、精确一致性数据入湖
APS(ADB Pipeline Service)简介:ADB湖仓版在深化自身湖仓能力建设的同时,还推出了APS(ADB Pipeline Service)数据通道组件,为客户提供实时数据流服务实现数据低成本、低延迟入湖入仓。本文以数据源SLS如何通过APS实现高速精确一致性入湖为例,介绍相关的挑战和解决方法。
|
消息中间件 存储 缓存
【Flink】(九)状态一致性、端到端的精确一次(ecactly-once)保证2
【Flink】(九)状态一致性、端到端的精确一次(ecactly-once)保证2
466 0
【Flink】(九)状态一致性、端到端的精确一次(ecactly-once)保证2
|
消息中间件 分布式计算 Kafka
【Flink】(九)状态一致性、端到端的精确一次(ecactly-once)保证1
【Flink】(九)状态一致性、端到端的精确一次(ecactly-once)保证1
328 0
【Flink】(九)状态一致性、端到端的精确一次(ecactly-once)保证1
|
消息中间件 分布式计算 Kafka
Flink之状态一致性
当在分布式系统中引入状态时,自然也引入了一致性问题。 一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到 的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对 最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏 掉的计数还是重复计数?
Flink之状态一致性
|
消息中间件 存储 传感器
Flink状态管理与状态一致性(长文)
状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。下面的几个场景都需要使用流处理的状态功能: 数据流中的数据有重复,想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。 对一个时间窗口内的数据进行聚合分析,分析一个小时内某项指标的75分位或99分位的数值。
260 0
Flink状态管理与状态一致性(长文)