如何实现基于Flink的高吞吐、精确一致性数据入湖

本文涉及的产品
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
简介: APS(ADB Pipeline Service)简介:ADB湖仓版在深化自身湖仓能力建设的同时,还推出了APS(ADB Pipeline Service)数据通道组件,为客户提供实时数据流服务实现数据低成本、低延迟入湖入仓。本文以数据源SLS如何通过APS实现高速精确一致性入湖为例,介绍相关的挑战和解决方法。

1. 概览

AnalyticDB MySQL高度兼容MySQL协议,支持毫秒级更新,亚秒级查询,可以对海量数据进行即时的多维分析透视和业务探索;AnalyticDB MySQL湖仓版(下文简称ADB湖仓版)支持低成本离线处理能力完成数据的清洗加工,同时提供高性能在线分析能力完成数据的洞察探索,真正做到数据湖的规模,数据库的体验。帮助企业降本增效,构建企业级数据分析平台。


APS(ADB Pipeline Service)简介:ADB湖仓版在深化自身湖仓能力建设的同时,还推出了APS(ADB Pipeline Service)数据通道组件,为客户提供实时数据流服务实现数据低成本、低延迟入湖入仓。本文以数据源SLS如何通过APS实现高速精确一致性入湖为例,介绍相关的挑战和解决方法。在数据通道的构建上,我们选择Flink作为基础引擎。Flink作为业界熟知的大数据处理框架,其流批一体架构有助于我们处理多种场景。而ADB的湖则建设在Hudi之上,Hudi作为成熟的数据湖底座已经被多家大型企业实际应用,ADB在其上也已积累多年经验,如今ADB湖仓版把湖和仓进行深度融合提供更加一体化的解决方案。


数据入湖的精确一致性挑战:在入湖通道中,有可能出现异常,升级,扩缩容等场景导致链路重启,触发部分已处理数据从源端重放,导致在目标端出现重复数据。为解决此问题一种做法是配置业务主键,并使用Hudi的Upsert能力来达到幂等写入目的。但SLS入湖的吞吐目标是每秒GB级别(某个业务的需求是4GB/s)且需要控制成本,Hudi Upsert难以满足需求,而SLS数据本身就具有Append特征,因此采用Hudi的Append Only模式写入实现高吞吐,并用其他机制保证数据不重不丢的精确一致性。


2. 端到端精确一致性的问题和解决方案

                     

                        image.png

流计算的一致性保障一般包含如下几种:

                  image.png

精确一致的语义是所有一致性语义中要求最高的,但流计算中的Exactly-Once一般是指内部状态的精确一致性(Exactly-Once State Consistency),而业务需要的是端到端Exactly Once,即当出现异常Failover场景时,最终目标端的数据需要与源端保持一致,数据不重不丢。


2.1 端到端精确一致性问题

要实现精确一致性,就必然要考虑Failover场景,即当系统宕机任务重启时,如何恢复到某个一致性状态。Flink之所以称为Stateful Stream Processing是其可通过Checkpoint机制保存状态到后端存储,并在重启时从后端存储恢复到某个一致性状态。但在状态的恢复中,其仅仅保证了Flink自身的状态一致,而在包含源端、Flink、目的端这样的完整系统中,仍可能产生数据丢失或重复,导致端到端出现不一致。下面用一个字符连接的例子说明数据重复问题。


下图是一个字符串连接的处理过程,处理逻辑是从源端读取一个个的字符,并对它们进行连接,每连接一个字符就向目标端输出一次,最终输出a, ab, abc ...多个不重复的字符串。

本例的场景中,Flink的Checkpoint保存了已完成的字符连接ab,以及对应的源端位点(Checkpoint箭头指向的位点)。Current则指向当前正在处理的位点,此时已经向目标端输出了a, ab, abc。当发生异常重启时,Flink从Checkpoint恢复自身状态,回退位点并重新处理字符c,并再次向目标端输出abc,导致abc出现重复。


在这个例子中,Filnk通过Checkpoint恢复状态,因此不会出现abb,或者abcc这种重复处理字符的情况,也不会出现ac种丢失字符的情况,保证了其自身的Exactly-Once。但是在目标端出现了两个重复的abc,因此没有保证端到端的Exactly-Once。

                        image.png


                      image.png

2.2 端到端精确一致性方案

Flink本身是一种复杂的分布式系统,其内部包含Source、Sink等算子,同时还存在Slot等并行关系,这样的系统要实现精确一致一般会想到两阶段提交,实际上Flink的Checkpoint就是一种两阶段提交的实现。


而在端到端中,Flink和Hudi又组成了另一个分布式系统,这个分布式系统要实现精确一致性,就需要另一套两阶段提交的实现(我们这里不讨论SLS端,因为在本场景中Flink不会改变SLS的状态,只利用SLS的位点重放能力即可)。因此端到端中,是由Flink和Flink + Hudi两套两阶段提交来保证精确一致性(见下图)。


                            image.png


Flink的Checkpoint两阶段实现不再赘述,后文会重点介绍Flink + Hudi两阶段提交的实现,定义出哪些是Precommit阶段,哪些是Commit阶段,同时发生异常时如何从故障中恢复保证Flink和Hudi的状态一致。例如Flink已经完成Checkpoint,而Hudi尚未完成Commit,如何恢复到一致性状态,这些在后续章节介绍。


3. SLS入湖链路端到端精确一致实现

下面介绍SLS入湖链路精确一致性的实现。整体架构如下,Hudi的组件是部署在Flink JobManager和TaskManager上的。SLS作为数据源,由Flink读取处理后,写出到Hudi表。因为SLS是多shard存储,因此会由Flink的多个Source算子并行读取。数据读取后通过Sink算子调用Hudi Worker写出到Hudi表。当然实际的链路中还会有Repartition,热点打散等逻辑,这些在图中做了简化。Flink Checkpoint的后端存储,以及Hudi数据的存储都是放在OSS上。

            image.png


3.1 SLS Source算子

如何实现Source算子消费SLS数据已有大量介绍,此处不在赘述。这里介绍SLS的两种消费模式:消费组模式和普通消费模式以及他们的区别。


3.1.1 消费组模式

顾名思义,多个消费者可注册到同一个消费组,SLS会自动把Shard分配给这些消费者来读取。其优点是由SLS的消费组来管理负载均衡。如下图左,消费组中首先注册了两个消费者,因此SLS把6个Shard均匀分配给这2个消费者。当有新的消费者注册(如下图右),则SLS会自动均衡,把部分Shard从旧消费者迁移到新消费者,称为shard transfer。


这种模式的优点是自动均衡,且在SLS Shard分裂/合并时会自动分配消费者。但该模式在我们的场景中却会引起问题。为保证精确一致,我们把SLS各Shard的当前位点保存在Flink Checkpoint中,运行中也是由各Slot上的Source算子持有当前消费位点。如果发生shard transfer,如何保证旧Slot上的算子不再消费,同时把位点转移给新Slot,这引入了新的一致性问题。尤其是大规模系统有数百个SLS Shard和数百个Flink Slot的情况下,很可能出现部分Source比其他Source先注册到SLS导致shard transfer不可避免。

  image.png

3.1.2 普通消费模式

这种模式就是调用SLS的SDK直接指定shard、offset来消费数据,而不是由SLS消费组进行分配,因此不会出现shard transfer。如下,因为Flink的Slot为3,因此可提前计算出每个消费者消费2个Shard并据此分配,即使Source 3尚未ready,也不会把Shard 5和6分配给Source 1和2。可以想象,为了负载均衡(例如某些TaskManager的负载过高时),仍然需要迁移shard,但此时迁移是我们主动触发的,状态更加可控,从而避免一致性问题。

                image.png

3.1.3 Hudi Sink算子

下面介绍下Hudi提交的相关概念,以及如何与Flink配合实现两阶段提交和容错达到精确一致。

Hudi提交相关概念

时间轴和Instant

        image.png

Hudi维护着一条Timeline,Instant是指某个时间点(Time)发起的对表的操作(Action)及表所处的状态(State)的集合。一个Instant可以理解为一个数据版本,Action可能是对表的Commit,Rollback或者Clean等操作,这些操作由Hudi保证了其原子性,因此Hudi的Instant实际类似于数据库中的事务和版本的概念。在图中我们用Start Transaction,Write Data,Commit这种类似数据库事务的方式来表达某个Instant的执行过程。Instant中,部分Action的含义如下:

  • Commit:将记录原子写入数据集
  • Rollback :当Commit不成功时进行回滚,其会删除在写入中产生的脏文件
  • Clean :删除数据集中不再需要的旧版本和文件

Instant State一共有三种状态:

  • Requested:操作已被计划但未被执行,可以理解为Start Transaction
  • Inflight:操作正在进行,可理解为Write Data
  • Completed:操作完成,可理解为Commit


Instant的Time、Action和State都在元数据文件中描述,下图表示了时间轴上两个先后的Instant。Instant 1的.hoodie目录下的元数据文件描述了Instant的开始时间是2022-10-17 16:05:00,Action是Commit,State可以看到已完成提交(有20221017160500.commit文件),在表的分区目录下则是该Instant对应的parquet数据文件。而Instant 2则可以看到发生在第二天,且Action正在执行尚未提交。


            image.png


Hudi提交过程

Hudi提交过程可以用下图理解。Hudi中存在两种角色,Coordinator负责发起Instant和完成提交,Worker负责写入数据。

  image.png

  1. 1.当开启一个事务时,Coordinator会分配一个Instant并传递给所有worker
  2. 2.Worker开始写入数据
  3. 3.开始提交时,Coordinator发送commit给各Worker。各Worker收到提交命令,flush data到持久化存储,并反馈自己的状态给Coordinator。Coordinator确认各Worker commit完成,然后在.hoodie目录中写commit文件完成全局提交。


Flink + Hudi的两阶段提交

了解了Hudi的写入和提交过程,它如何与Flink配合完成数据的写入和提交就可以用下图表达了。

image.png


  1. 1.开启一个Hudi Instant
  2. 2.由Filnk Sink发送数据给Hudi Worker写出
  3. 3.发生Flink Checkpoint时,则通过Sink算子通知Worker flush数据,同时持久化operator-state(operator-state属于Flink checkpoint框架的一部分,持久化Hudi Worker所处的Instant等信息)
  4. 4.当Flink完成Checkpoint的持久化,则通过notifyCheckpointComplete机制通知Hudi Coordinator提交该Instant。Hudi Coordinator此时完成最终提交,写commit文件,数据对外可见
  5. 5.结束Instant后,会立即开启一个新的Instant,重启上述循环

Flink + Hudi两阶段提交的容错处理

image.png


实际的提交可简化为如上的流程。从图中可见,1到3是Flink的Checkpoint逻辑,如果异常在这些步骤上发生,则认为Checkpoint失败,触发Job重启,从上一次Checkpoint恢复,相当于两阶段提交的Precommit阶段失败,事务回滚。当3到4之间发生异常,则会出现Flink和Hudi状态不一致。此时Flink认为Checkpoint已结束,而Hudi实际尚未提交。如果对此情况不做处理,则发生了数据丢失,因为Flink Checkpoint完毕后,SLS位点已经前移,而这部分数据在Hudi上并未完成提交,因此容错的重点是如何处理此阶段引起的一致性问题。

解决方法是Flink Job重启并从Checkpoint恢复时,发现Hudi最新的Instant有未提交的写入,需要保证执行Recommit。Recommit的流程如下图所示。

image.png


  1. 1.之前已提到Hudi Worker在Checkpoint时除了flush data,还持久化了一个operator-state,在这里记录了Worker当时所处的Instant信息。Job从Checkpoint恢复时,Sink算子会读取operator-state,Hudi Worker从中恢复持久化的Instant信息
  2. 2.Hudi worker汇报给自己的Instant给Coordinator
  3. 3.Hudi Coordinator会从Instant Timeline中获取最新的Instant信息,并接收所有Worker的汇报
  4. 4.如果Worker汇报的Instant与Timeline中最新的一样,且该Instant尚未提交,则触发Recommit。如果Worker汇报的Instant与最新的不同,则认为上一次Instant执行失败,这份数据对用户不可见,回滚掉即可。

可以想像下是否会存在重启时,部分Hudi worker在最新的Instant,而部分worker在旧的Instant的情况?答案是不会,因为Flink的Checkpoint就是相当于两阶段提交的Precommit阶段,如果Checkpoint完成则说明Hudi Precommit完成,所有Worker都处于最新Instant。如果Checkpoint失败,则重启时会回到上一个Checkpoint,此时Hudi worker所处的状态也是一致的,全部都回退到旧Instant。


4. 总结

在数据入湖异常时的Failover处理中,Source通过Checkpoint中持久化的SLS位点,不会重放已处理的数据,保证数据不重,Sink通过Flink和Hudi配合实现的两阶段提交和Recommit机制,保证数据不丢,最终实现Exactly-Once。经过实测这套机制对性能的影响约在3% ~ 5%,以极小的代价保证精确一致性的情况下,实现了高吞吐实时入湖。在某个海量日志入湖项目中,日常吞吐达到3GB/s,峰值吞吐达到5GB/s,数据通道稳定运行,并配合ADB湖仓版的离在线一体化引擎,实现了用户的数据实时入湖,离在线一体化分析需求。

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
1天前
|
SQL 关系型数据库 MySQL
实时数仓 Hologres操作报错合集之Flink CTAS Source(Mysql) 表字段从可空改为非空的原因是什么
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
2天前
|
关系型数据库 MySQL 数据库
MySQL 复制A的表结构和数据到表B
在MySQL中复制表A至表B可通过不同方法实现。一种是先用`CREATE TABLE B LIKE A;`复制结构,再用`INSERT INTO B SELECT * FROM A;`填充数据。另一种更简便的方法是直接使用`CREATE TABLE B AS SELECT * FROM A;`一次性完成结构和数据的复制。还有一种高级方法是通过`SHOW CREATE TABLE A;`获取表A的创建语句,手动调整后创建表B,如有需要再用`INSERT INTO ... SELECT`复制数据。注意权限问题、跨数据库复制时需指定数据库名,以及大表复制时可能影响性能。
|
5天前
|
消息中间件 数据采集 关系型数据库
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
20 1
|
5天前
|
数据采集 关系型数据库 MySQL
大数据-业务数据采集-FlinkCDC The MySQL server is not configured to use a ROW binlog_format
大数据-业务数据采集-FlinkCDC The MySQL server is not configured to use a ROW binlog_format
15 1
|
8天前
|
固态存储 关系型数据库 MySQL
"惊!20亿数据秒速入MySQL,揭秘数据库极速插入的黑科技,你不可不知的绝密技巧!"
【8月更文挑战第11天】面对20亿级数据量,高效插入MySQL成为挑战。本文探讨优化策略:合理设计数据库减少不必要的字段和索引;使用批量插入减少网络往返;优化硬件如SSD和内存及调整MySQL配置;并行处理加速插入;附Python示例代码实现分批导入。这些方法将有效提升大规模数据处理能力。
23 2
|
12天前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
34 1
|
18天前
|
存储 SQL 关系型数据库
(二十三)MySQL分表篇:该如何将月增上亿条数据的单表处理方案优雅落地?
前面《分库分表的正确姿势》、《分库分表的后患问题》两篇中,对数据库的分库分表技术进行了全面阐述,但前两篇大多属于方法论,并不存在具体的实战实操,而只有理论没有实践的技术永远都属纸上谈兵,所以接下来会再开几个单章对分库分表各类方案进行落地。
|
20天前
|
SQL 关系型数据库 MySQL
MySQL删除表数据、清空表命令(truncate、drop、delete 区别)
MySQL删除表数据、清空表命令(truncate、drop、delete区别) 使用原则总结如下: 当你不需要该表时(删除数据和结构),用drop; 当你仍要保留该表、仅删除所有数据表内容时,用truncate; 当你要删除部分记录、且希望能回滚的话,用delete;
|
1天前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
8 0
|
7天前
|
关系型数据库 MySQL
MySQL——删除重复数据
MySQL——删除重复数据
12 0

相关产品

  • 云原生数据仓库AnalyticDB MySQL版