如何实现基于Flink的高吞吐、精确一致性数据入湖

本文涉及的产品
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
简介: APS(ADB Pipeline Service)简介:ADB湖仓版在深化自身湖仓能力建设的同时,还推出了APS(ADB Pipeline Service)数据通道组件,为客户提供实时数据流服务实现数据低成本、低延迟入湖入仓。本文以数据源SLS如何通过APS实现高速精确一致性入湖为例,介绍相关的挑战和解决方法。

1. 概览

AnalyticDB MySQL高度兼容MySQL协议,支持毫秒级更新,亚秒级查询,可以对海量数据进行即时的多维分析透视和业务探索;AnalyticDB MySQL湖仓版(下文简称ADB湖仓版)支持低成本离线处理能力完成数据的清洗加工,同时提供高性能在线分析能力完成数据的洞察探索,真正做到数据湖的规模,数据库的体验。帮助企业降本增效,构建企业级数据分析平台。


APS(ADB Pipeline Service)简介:ADB湖仓版在深化自身湖仓能力建设的同时,还推出了APS(ADB Pipeline Service)数据通道组件,为客户提供实时数据流服务实现数据低成本、低延迟入湖入仓。本文以数据源SLS如何通过APS实现高速精确一致性入湖为例,介绍相关的挑战和解决方法。在数据通道的构建上,我们选择Flink作为基础引擎。Flink作为业界熟知的大数据处理框架,其流批一体架构有助于我们处理多种场景。而ADB的湖则建设在Hudi之上,Hudi作为成熟的数据湖底座已经被多家大型企业实际应用,ADB在其上也已积累多年经验,如今ADB湖仓版把湖和仓进行深度融合提供更加一体化的解决方案。


数据入湖的精确一致性挑战:在入湖通道中,有可能出现异常,升级,扩缩容等场景导致链路重启,触发部分已处理数据从源端重放,导致在目标端出现重复数据。为解决此问题一种做法是配置业务主键,并使用Hudi的Upsert能力来达到幂等写入目的。但SLS入湖的吞吐目标是每秒GB级别(某个业务的需求是4GB/s)且需要控制成本,Hudi Upsert难以满足需求,而SLS数据本身就具有Append特征,因此采用Hudi的Append Only模式写入实现高吞吐,并用其他机制保证数据不重不丢的精确一致性。


2. 端到端精确一致性的问题和解决方案

                     

                        image.png

流计算的一致性保障一般包含如下几种:

                  image.png

精确一致的语义是所有一致性语义中要求最高的,但流计算中的Exactly-Once一般是指内部状态的精确一致性(Exactly-Once State Consistency),而业务需要的是端到端Exactly Once,即当出现异常Failover场景时,最终目标端的数据需要与源端保持一致,数据不重不丢。


2.1 端到端精确一致性问题

要实现精确一致性,就必然要考虑Failover场景,即当系统宕机任务重启时,如何恢复到某个一致性状态。Flink之所以称为Stateful Stream Processing是其可通过Checkpoint机制保存状态到后端存储,并在重启时从后端存储恢复到某个一致性状态。但在状态的恢复中,其仅仅保证了Flink自身的状态一致,而在包含源端、Flink、目的端这样的完整系统中,仍可能产生数据丢失或重复,导致端到端出现不一致。下面用一个字符连接的例子说明数据重复问题。


下图是一个字符串连接的处理过程,处理逻辑是从源端读取一个个的字符,并对它们进行连接,每连接一个字符就向目标端输出一次,最终输出a, ab, abc ...多个不重复的字符串。

本例的场景中,Flink的Checkpoint保存了已完成的字符连接ab,以及对应的源端位点(Checkpoint箭头指向的位点)。Current则指向当前正在处理的位点,此时已经向目标端输出了a, ab, abc。当发生异常重启时,Flink从Checkpoint恢复自身状态,回退位点并重新处理字符c,并再次向目标端输出abc,导致abc出现重复。


在这个例子中,Filnk通过Checkpoint恢复状态,因此不会出现abb,或者abcc这种重复处理字符的情况,也不会出现ac种丢失字符的情况,保证了其自身的Exactly-Once。但是在目标端出现了两个重复的abc,因此没有保证端到端的Exactly-Once。

                        image.png


                      image.png

2.2 端到端精确一致性方案

Flink本身是一种复杂的分布式系统,其内部包含Source、Sink等算子,同时还存在Slot等并行关系,这样的系统要实现精确一致一般会想到两阶段提交,实际上Flink的Checkpoint就是一种两阶段提交的实现。


而在端到端中,Flink和Hudi又组成了另一个分布式系统,这个分布式系统要实现精确一致性,就需要另一套两阶段提交的实现(我们这里不讨论SLS端,因为在本场景中Flink不会改变SLS的状态,只利用SLS的位点重放能力即可)。因此端到端中,是由Flink和Flink + Hudi两套两阶段提交来保证精确一致性(见下图)。


                            image.png


Flink的Checkpoint两阶段实现不再赘述,后文会重点介绍Flink + Hudi两阶段提交的实现,定义出哪些是Precommit阶段,哪些是Commit阶段,同时发生异常时如何从故障中恢复保证Flink和Hudi的状态一致。例如Flink已经完成Checkpoint,而Hudi尚未完成Commit,如何恢复到一致性状态,这些在后续章节介绍。


3. SLS入湖链路端到端精确一致实现

下面介绍SLS入湖链路精确一致性的实现。整体架构如下,Hudi的组件是部署在Flink JobManager和TaskManager上的。SLS作为数据源,由Flink读取处理后,写出到Hudi表。因为SLS是多shard存储,因此会由Flink的多个Source算子并行读取。数据读取后通过Sink算子调用Hudi Worker写出到Hudi表。当然实际的链路中还会有Repartition,热点打散等逻辑,这些在图中做了简化。Flink Checkpoint的后端存储,以及Hudi数据的存储都是放在OSS上。

            image.png


3.1 SLS Source算子

如何实现Source算子消费SLS数据已有大量介绍,此处不在赘述。这里介绍SLS的两种消费模式:消费组模式和普通消费模式以及他们的区别。


3.1.1 消费组模式

顾名思义,多个消费者可注册到同一个消费组,SLS会自动把Shard分配给这些消费者来读取。其优点是由SLS的消费组来管理负载均衡。如下图左,消费组中首先注册了两个消费者,因此SLS把6个Shard均匀分配给这2个消费者。当有新的消费者注册(如下图右),则SLS会自动均衡,把部分Shard从旧消费者迁移到新消费者,称为shard transfer。


这种模式的优点是自动均衡,且在SLS Shard分裂/合并时会自动分配消费者。但该模式在我们的场景中却会引起问题。为保证精确一致,我们把SLS各Shard的当前位点保存在Flink Checkpoint中,运行中也是由各Slot上的Source算子持有当前消费位点。如果发生shard transfer,如何保证旧Slot上的算子不再消费,同时把位点转移给新Slot,这引入了新的一致性问题。尤其是大规模系统有数百个SLS Shard和数百个Flink Slot的情况下,很可能出现部分Source比其他Source先注册到SLS导致shard transfer不可避免。

  image.png

3.1.2 普通消费模式

这种模式就是调用SLS的SDK直接指定shard、offset来消费数据,而不是由SLS消费组进行分配,因此不会出现shard transfer。如下,因为Flink的Slot为3,因此可提前计算出每个消费者消费2个Shard并据此分配,即使Source 3尚未ready,也不会把Shard 5和6分配给Source 1和2。可以想象,为了负载均衡(例如某些TaskManager的负载过高时),仍然需要迁移shard,但此时迁移是我们主动触发的,状态更加可控,从而避免一致性问题。

                image.png

3.1.3 Hudi Sink算子

下面介绍下Hudi提交的相关概念,以及如何与Flink配合实现两阶段提交和容错达到精确一致。

Hudi提交相关概念

时间轴和Instant

        image.png

Hudi维护着一条Timeline,Instant是指某个时间点(Time)发起的对表的操作(Action)及表所处的状态(State)的集合。一个Instant可以理解为一个数据版本,Action可能是对表的Commit,Rollback或者Clean等操作,这些操作由Hudi保证了其原子性,因此Hudi的Instant实际类似于数据库中的事务和版本的概念。在图中我们用Start Transaction,Write Data,Commit这种类似数据库事务的方式来表达某个Instant的执行过程。Instant中,部分Action的含义如下:

  • Commit:将记录原子写入数据集
  • Rollback :当Commit不成功时进行回滚,其会删除在写入中产生的脏文件
  • Clean :删除数据集中不再需要的旧版本和文件

Instant State一共有三种状态:

  • Requested:操作已被计划但未被执行,可以理解为Start Transaction
  • Inflight:操作正在进行,可理解为Write Data
  • Completed:操作完成,可理解为Commit


Instant的Time、Action和State都在元数据文件中描述,下图表示了时间轴上两个先后的Instant。Instant 1的.hoodie目录下的元数据文件描述了Instant的开始时间是2022-10-17 16:05:00,Action是Commit,State可以看到已完成提交(有20221017160500.commit文件),在表的分区目录下则是该Instant对应的parquet数据文件。而Instant 2则可以看到发生在第二天,且Action正在执行尚未提交。


            image.png


Hudi提交过程

Hudi提交过程可以用下图理解。Hudi中存在两种角色,Coordinator负责发起Instant和完成提交,Worker负责写入数据。

  image.png

  1. 1.当开启一个事务时,Coordinator会分配一个Instant并传递给所有worker
  2. 2.Worker开始写入数据
  3. 3.开始提交时,Coordinator发送commit给各Worker。各Worker收到提交命令,flush data到持久化存储,并反馈自己的状态给Coordinator。Coordinator确认各Worker commit完成,然后在.hoodie目录中写commit文件完成全局提交。


Flink + Hudi的两阶段提交

了解了Hudi的写入和提交过程,它如何与Flink配合完成数据的写入和提交就可以用下图表达了。

image.png


  1. 1.开启一个Hudi Instant
  2. 2.由Filnk Sink发送数据给Hudi Worker写出
  3. 3.发生Flink Checkpoint时,则通过Sink算子通知Worker flush数据,同时持久化operator-state(operator-state属于Flink checkpoint框架的一部分,持久化Hudi Worker所处的Instant等信息)
  4. 4.当Flink完成Checkpoint的持久化,则通过notifyCheckpointComplete机制通知Hudi Coordinator提交该Instant。Hudi Coordinator此时完成最终提交,写commit文件,数据对外可见
  5. 5.结束Instant后,会立即开启一个新的Instant,重启上述循环

Flink + Hudi两阶段提交的容错处理

image.png


实际的提交可简化为如上的流程。从图中可见,1到3是Flink的Checkpoint逻辑,如果异常在这些步骤上发生,则认为Checkpoint失败,触发Job重启,从上一次Checkpoint恢复,相当于两阶段提交的Precommit阶段失败,事务回滚。当3到4之间发生异常,则会出现Flink和Hudi状态不一致。此时Flink认为Checkpoint已结束,而Hudi实际尚未提交。如果对此情况不做处理,则发生了数据丢失,因为Flink Checkpoint完毕后,SLS位点已经前移,而这部分数据在Hudi上并未完成提交,因此容错的重点是如何处理此阶段引起的一致性问题。

解决方法是Flink Job重启并从Checkpoint恢复时,发现Hudi最新的Instant有未提交的写入,需要保证执行Recommit。Recommit的流程如下图所示。

image.png


  1. 1.之前已提到Hudi Worker在Checkpoint时除了flush data,还持久化了一个operator-state,在这里记录了Worker当时所处的Instant信息。Job从Checkpoint恢复时,Sink算子会读取operator-state,Hudi Worker从中恢复持久化的Instant信息
  2. 2.Hudi worker汇报给自己的Instant给Coordinator
  3. 3.Hudi Coordinator会从Instant Timeline中获取最新的Instant信息,并接收所有Worker的汇报
  4. 4.如果Worker汇报的Instant与Timeline中最新的一样,且该Instant尚未提交,则触发Recommit。如果Worker汇报的Instant与最新的不同,则认为上一次Instant执行失败,这份数据对用户不可见,回滚掉即可。

可以想像下是否会存在重启时,部分Hudi worker在最新的Instant,而部分worker在旧的Instant的情况?答案是不会,因为Flink的Checkpoint就是相当于两阶段提交的Precommit阶段,如果Checkpoint完成则说明Hudi Precommit完成,所有Worker都处于最新Instant。如果Checkpoint失败,则重启时会回到上一个Checkpoint,此时Hudi worker所处的状态也是一致的,全部都回退到旧Instant。


4. 总结

在数据入湖异常时的Failover处理中,Source通过Checkpoint中持久化的SLS位点,不会重放已处理的数据,保证数据不重,Sink通过Flink和Hudi配合实现的两阶段提交和Recommit机制,保证数据不丢,最终实现Exactly-Once。经过实测这套机制对性能的影响约在3% ~ 5%,以极小的代价保证精确一致性的情况下,实现了高吞吐实时入湖。在某个海量日志入湖项目中,日常吞吐达到3GB/s,峰值吞吐达到5GB/s,数据通道稳定运行,并配合ADB湖仓版的离在线一体化引擎,实现了用户的数据实时入湖,离在线一体化分析需求。

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
24天前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
11天前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
90 4
|
19天前
|
关系型数据库 MySQL 数据库
RDS MySQL灾备服务协同解决方案构建问题之数据库备份数据的云上云下迁移如何解决
RDS MySQL灾备服务协同解决方案构建问题之数据库备份数据的云上云下迁移如何解决
|
14天前
|
SQL 存储 缓存
MySQL是如何保证数据不丢失的?
文章详细阐述了InnoDB存储引擎中Buffer Pool与DML操作的关系。在执行插入、更新或删除操作时,InnoDB为了减少磁盘I/O,会在Buffer Pool中缓存数据页进行操作,随后将更新后的“脏页”刷新至磁盘。为防止服务宕机导致数据丢失,InnoDB采用了日志先行(WAL)机制,通过将DML操作记录为Redo Log并异步刷新到磁盘,结合双写机制和合理的日志刷新策略,确保数据的持久性和一致性。尽管如此,仍需合理配置参数以平衡性能与数据安全性。
MySQL是如何保证数据不丢失的?
|
9天前
|
缓存 NoSQL 关系型数据库
MySQL与Redis缓存一致性的实现与挑战
在现代软件开发中,MySQL作为关系型数据库管理系统,广泛应用于数据存储;而Redis则以其高性能的内存数据结构存储特性,常被用作缓存层来提升数据访问速度。然而,当MySQL与Redis结合使用时,确保两者之间的数据一致性成为了一个重要且复杂的挑战。本文将从技术角度分享MySQL与Redis缓存一致性的实现方法及其面临的挑战。
31 2
|
21天前
|
SQL 数据库 流计算
Flink CDC数据读取问题之一致性如何解决
Flink CDC 使用Change Data Capture (CDC)技术从数据库捕获变更事件,并利用Flink的流处理能力确保数据读取一致性。相较于传统工具,它具备全增量一体化数据集成能力,满足实时性需求。在实践中解决了高效数据同步、稳定同步大量表数据等问题。应用场景包括实时数据同步、实时数据集成等。快速上手需学习基本概念与实践操作。未来发展方向包括提升效率与稳定性,并依据用户需求持续优化。
66 1
|
24天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之ODPS数据怎么Merge到MySQL数据库
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
24天前
|
DataWorks 关系型数据库 MySQL
DataWorks产品使用合集之mysql节点如何插入数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
11天前
|
存储 关系型数据库 MySQL
|
12天前
|
SQL 关系型数据库 MySQL
SQL Server、MySQL、PostgreSQL:主流数据库SQL语法异同比较——深入探讨数据类型、分页查询、表创建与数据插入、函数和索引等关键语法差异,为跨数据库开发提供实用指导
【8月更文挑战第31天】SQL Server、MySQL和PostgreSQL是当今最流行的关系型数据库管理系统,均使用SQL作为查询语言,但在语法和功能实现上存在差异。本文将比较它们在数据类型、分页查询、创建和插入数据以及函数和索引等方面的异同,帮助开发者更好地理解和使用这些数据库。尽管它们共用SQL语言,但每个系统都有独特的语法规则,了解这些差异有助于提升开发效率和项目成功率。
71 0

相关产品

  • 云原生数据仓库AnalyticDB MySQL版