Flink CDC + Hudi 海量数据入湖在顺丰的实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 覃立辉在 5.21 Flink CDC Meetup 的分享。

本文整理自顺丰大数据研发工程师覃立辉在 5月 21 日 Flink CDC Meetup 的演讲。主要内容包括:

  1. 顺丰数据集成背景
  2. Flink CDC 实践问题与优化
  3. 未来规划

点击查看直播回放 & 演讲PDF

一、顺丰数据集成背景

img

顺丰是快递物流服务提供商,主营业务包含了时效快递、经济快递、同城配送以及冷链运输等。

运输流程背后需要一系列系统的支持,比如订单管理系统、智慧物业系统、以及很多中转场、汽车或飞机上的很多传感器,都会产生大量数据。如果需要对这些数据进行数据分析,那么数据集成是其中很重要的一步。

img

顺丰的数据集成经历了几年的发展,主要分为两块,一块是离线数据集成,一块是实时数据集成。离线数据集成以 DataX 为主,本文主要介绍实时数据集成方案。

2017 年,基于 Jstorm + Canal 的方式实现了第一个版本的实时数据集成方案。但是此方案存在诸多问题,比如无法保证数据的一致性、吞吐率较低、难以维护。 2019 年,随着 Flink 社区的不断发展,它补齐了很多重要特性,因此基于 Flink + Canal 的方式实现了第二个版本的实时数据集成方案。但是此方案依然不够完美,经历了内部调研与实践,2022 年初,我们全面转向 Flink CDC 。

img

上图为 Flink + Canal 的实时数据入湖架构。

Flink 启动之后,首先读取当前的 Binlog 信息,标记为 StartOffset ,通过 select 方式将全量数据采集上来,发往下游 Kafka。全量采集完毕之后,再从 startOffset 采集增量的日志信息,发往 Kafka。最终 Kafka 的数据由 Spark 消费后写往 Hudi。

但是此架构存在以下三个问题:

  • 全量与增量数据存在重复:因为采集过程中不会进行锁表,如果在全量采集过程中有数据变更,并且采集到了这些数据,那么这些数据会与 Binlog 中的数据存在重复;
  • 需要下游进行 Upsert 或 Merge 写入才能剔除重复的数据,确保数据的最终一致性;
  • 需要两套计算引擎,再加上消息队列 Kafka 才能将数据写入到数据湖 Hudi 中,过程涉及组件多、链路长,且消耗资源大。

基于以上问题,我们整理出了数据集成的核心需求:

img

  1. 全量增量自动切换,并保证数据的准确性。Flink + Canal 的架构能实现全量和增量自动切换,但无法保证数据的准确性;
  2. 最大限度地减少对源数据库的影响,比如同步过程中尽量不使用锁、能流控等;
  3. 能在已存在的任务中添加新表的数据采集,这是非常核心的需求,因为在复杂的生产环境中,等所有表都准备好之后再进行数据集成会导致效率低下。此外,如果不能做到任务的合并,需要起很多次任务,采集很多次 Binlog 的数据,可能会导致 DB 机器带宽被打满;
  4. 能同时进行全量和增量日志采集,新增表不能暂停日志采集来确保数据的准确性,这种方式会给其他表日志采集带来延迟;
  5. 能确保数据在同一主键 ID 下按历史顺序发生,不会出现后发生的事件先发送到下游。

img

Flink CDC 很好地解决了业务痛点,并且在可扩展性、稳定性、社区活跃度方面都非常优秀。

  • 首先,它能无缝对接 Flink 生态,复用 Flink 众多 sink 能力,使用 Flink 数据清理转换的能力;
  • 其次,它能进行全量与增量自动切换,并且保证数据的准确性;
  • 第三,它能支持无锁读取、断点续传、水平扩展,特别是在水平扩展方面,理论上来说,给的资源足够多时,性能瓶颈一般不会出现在 CDC 侧,而是在于数据源/目标源是否能支持读/写这么多数据。

二、Flink CDC 实践问题与优化

img

上图为 Flink CDC 2.0 的架构原理。 它基于 FLIP-27 实现,核心步骤如下:

  1. Enumerator 先将全量数据拆分成多个 SnapshotSplit,然后按照上图中第一步将 SnapshotSplit 发送给 SourceReader 执行。执行过程中会对数据进行修正来保证数据的一致性;
  2. SnapshotSplit 读取完成后向 Enumerator 汇报已读取完成的块信息;
  3. 重复执行 (1) (2) 两个步骤,直到将全量数据读取完毕;
  4. 全量数据读取完毕之后,Enumerator 会根据之前全量完成的 split 信息, 构造一个 BinlogSplit。 发送给 SourceRead 执行,读取增量日志数据。

问题一:新增表会停止 Binlog 日志流

img

在已存在的任务中添加新表是非常重要的需求, Flink CDC 2.0 也支持了这一功能。但是为了确保数据的一致性,Flink CDC 2.0 在新增表的流程中,需要停止 Binlog 日志流的读取,再进行新增表的全量数据读取。等新增表的全量数据读取完毕之后,再将之前停止的 Binlog 任务重新启动。这也意味着新增表会影响其他表的日志采集进度。然而我们希望全量和增量两个任务能够同时进行,为了解决这一问题,我们对 Flink CDC 进行了拓展,支持了全量和增量日志流并行读取,步骤如下:

img

  1. 程序启动后,在 Enumerator 中创建 BinlogSplit ,放在分配列表的第一位,分配给 SourceReader 执行增量数据采集;
  2. 与原有的全量数据采集一样,Enumerator 将全量采集切分成多个 split 块,然后将切分好的块分配给 SourceReader 去执行全量数据的采集;
  3. 全量数据采集完成之后,SourceReader 向 Enumerator 汇报已经完成的全量数据采集块的信息;
  4. 重复 (2) (3) 步,将全量的表采集完毕。

以上就是第一次启动任务,全量与增量日志并行读取的流程。新增表后,并行读取实现步骤如下:

img

  1. 恢复任务时,Flink CDC 会从 state 中获取用户新表的配置信息;
  2. 通过对比用户配置信息与状态信息,捕获到要新增的表。对于 BinlogSplit 任务,会增加新表 binlog 数据的采集;对于 Enumerator 任务,会对新表进行全量切分;
  3. Enumerator 将切分好的 SnapshotSplit 分配给 SourceReader 执行全量数据采集;
  4. 重复步骤 (3),直到所有全量数据读取完毕。

然而,实现全量和增量日志并行读取后,又出现了数据冲突问题。

img

如上图所示, Flink CDC 在读取全量数据之前,会先读取当前 Binlog 的位置信息,将其标记为 LW,接着通过 select 的方式读取全量数据,读取到上图中 s1、s2、 s3、s4 四条数据。再读取当前的 Binlog 位置,标记为 HW, 然后将 LW 和 HW 中变更的数据 merge 到之前全量采集上来的数据中。经过一系列操作后,最终全量采集到的数据是 s1、s2、s3、s4 和 s5。

而增量采集的进程也会读取 Binlog 中的日志信息,会将 LW 和 HW 中的 s2、s2、s4、s5 四条数据发往下游。

上述整个流程中存在两个问题:首先,数据多取,存在数据重复,上图中红色标识即存在重复的数据;其次,全量和增量在两个不同的线程中,也有可能是在两个不同的 JVM 中,因此先发往下游的数据可能是全量数据,也有可能是增量数据,意味着同一主键 ID 到达下游的先后顺序不是按历史顺序,与核心需求不符。

针对数据冲突问题,我们提供了基于 GTID 实现的处理方案。

img

首先,为全量数据打上 Snapshot 标签,增量数据打上 Binlog 标签;其次,为全量数据补充一个高水位 GTID 信息,而增量数据本身携带有 GTID 信息,因此不需要补充。将数据下发,下游会接上一个 KeyBy 算子,再接上数据冲突处理算子,数据冲突的核心是保证发往下游的数据不重复,并且按历史顺序产生。

如果下发的是全量采集到的数据,且此前没有 Binlog 数据下发,则将这条数据的 GTID 存储到 state 并把这条数据下发;如果 state 不为空且此条记录的 GTID 大于等于状态中的 GTID ,也将这条数据的 GTID 存储到 state 并把这条数据下发;

通过这种方式,很好地解决了数据冲突的问题,最终输出到下游的数据是不重复且按历史顺序发生的。

img

然而,新的问题又产生了。在处理算法中可以看出,为了确保数据的不重复并且按历史顺序下发,会将所有记录对应的 GTID 信息存储在状态中,导致状态一直递增。

清理状态一般首选 TTL,但 TTL 难以控制时间,且无法将数据完全清理掉。第二种方式是手动清理,全量表完成之后,可以下发一条记录告诉下游清理 state 中的数据。

解决了以上所有问题,并行读取的最终方案如下图所示。

img

首先,给数据打上四种标签,分别代表不同的状态:

  • SNAPSHOT:全量采集到的数据信息。
  • STATE_BINLOG:还未完成全量采集, Binlog 已采集到这张表的变更数据。
  • BINLOG:全量数据采集完毕之后,Binlog 再采集到这张表的变更数据。
  • TABLE_FINISHED:全量数据采集完成之后通知下游,可以清理 state。

具体实现步骤如下:

  1. 分配 Binlog ,此时 Binlog 采集到的数据都为 STATE_BINLOG 标签;
  2. 分配 SnapshotSplit 任务,此时全量采集到的数据都为 SNAPSHOT 标签;
  3. Enumerator 实时监控表的状态,某一张表执行完成并完成 checkpoint 后,通知 Binlog 任务。Binlog 任务收到通知后,将此表后续采集到的 Binlog 信息都打上 BINLOG 标签;此外,它还会构造一条 TABLE_FINISHED 记录发往下游做处理;
  4. 数据采集完成后,除了接上数据冲突处理算子,此处还新增了一个步骤:从主流中筛选出来的 TABLE_FINISHED 事件记录,通过广播的方式将其发往下游,下游根据具体信息清理对应表的状态信息。

问题二:写 Hudi 时存在数据倾斜

img

如上图,Flink CDC 采集三张表数据的时候,会先读取完 tableA 的全量数据,再读取tableB 的全量数据。读取 tableA 的过程中,下游只有 tableA 的 sink 有数据流入。

img

我们通过多表混合读取的方式来解决数据倾斜的问题。

引入多表混合之前,Flink CDC 读取完 tableA 的所有 chunk,再读取 tableB 的所有 chunk。实现了多表混合读取后,读取的顺序变为读取 tableA 的 chunk1、tableB 的 chunk1、tableC 的 chunk1,再读取 tableA 的 chunk2,以此类推,最终很好地解决了下游 sink 数据倾斜的问题,保证每个 sink 都有数据流入。

img

我们对多表混合读取的性能进行了测试,由 TPCC 工具构造的测试数据,读取了 4。张表,总并行度为 8,每个 sink 的并行度为 2,写入时间由原来的 46 分钟降至 20 分钟,性能提升 2.3 倍。

需要注意的是,如果 sink 的并行度和总并行度相等,则性能不会有明显提升,多表混合读取主要的作用是更快地获取到每张表下发的数据。

问题三:需要用户手动指定 schema 信息

img

用户手动执行 DB schema 与 sink 之间 schema 映射关系,开发效率低,耗时长且容易出错。

img

为了降低用户的使用门槛,提升开发效率,我们实现了 Oracle catalog ,让用户能以低代码的方式、无需指定 DB schema 信息与 sink schema 信息的映射关系,即可通过 Flink CDC 将数据写入到 Hudi。

三、未来规划

img

第一, 支持 schema 信息变更同步。比如数据源发生了 schema 信息变更,能够将其同步到 Kafka 和 Hudi 中;支持平台接入更多数据源类型,增强稳定性,实现更多应用场景的落地。

第二, 支持 SQL 化的方式,使用 Flink CDC 将数据同步到 Hudi 中,降低用户的使用门槛。

第三, 希望技术更开放,与社区共同成长,为社区贡献出自己的一份力量。

问答

Q:断点续传采集如何处理?

A:断点续传有两种,分为全量和 Binlog。但它们都是基于 Flink state 的能力,同步的过程中会将进度存储到 state 中。如果失败了,下一次再从 state 中恢复即可。

Q:MySQL 在监控多表使用 SQL 写入 Hudi 表中的时候,存在多个 job,维护很麻烦,如何通过单 job 同步整库?

A:我们基于 GTID 的方式对 Flink CDC 进行了拓展,支持任务中新增表,且不影响其他表的采集进度。不考虑新增表影响到其他表进度的情况下,也可以基于 Flink CDC 2.2 做新增表的能力。

Q:顺丰这些特性会在 CDC 开源版本中实现吗?

A:目前我们的方案还存在一些局限性,比如必须用 MySQL 的 GTID,需要下游有数据冲突处理的算子,因此较难实现在社区中开源。

Q:Flink CDC 2.0 新增表支持全量 + 增量吗?

A:是的。

Q:GTID 去重算子会不会成为性能瓶颈?

A:经过实践,不存在性能瓶颈,它只是做了一些数据的判断和过滤。

点击查看直播回放 & 演讲PDF


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

O1CN01tmtpiy1iazJYZdixL_!!6000000004430-2-tps-899-548.png"

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
11天前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
34 9
|
2月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
587 1
Flink CDC:新一代实时数据集成框架
|
2月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
538 14
Flink CDC 在货拉拉的落地与实践
|
3月前
|
Oracle 关系型数据库 新能源
Flink CDC 在新能源制造业的实践
本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。
446 13
Flink CDC 在新能源制造业的实践
|
SQL Kubernetes Cloud Native
开发者社区精选直播合集(三十六)| Flink实践合集
Flink 作为业界公认为最好的流计算引擎,不仅仅局限于做流处理,而是一套兼具流、批、机器学习等多种计算功能的大数据引擎,以其高吞吐低延时的优异实时计算能力、支持海量数据的亚秒级快速响应帮助企业和开发者实现数据算力升级,并成为阿里、腾讯、滴滴、美团、字节跳动、Netflix、Lyft 等国内外知名公司建设实时计算平台的首选。
开发者社区精选直播合集(三十六)|  Flink实践合集
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
4月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
821 7
阿里云实时计算Flink在多行业的应用和实践
|
16天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
690 10
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
13天前
|
SQL 运维 数据可视化
阿里云实时计算Flink版产品体验测评
阿里云实时计算Flink基于Apache Flink构建,提供一站式实时大数据分析平台,支持端到端亚秒级实时数据分析,适用于实时大屏、实时报表、实时ETL和风控监测等场景,具备高性价比、开发效率、运维管理和企业安全等优势。

相关产品

  • 实时计算 Flink版