Flink CDC + Hudi 海量数据入湖在顺丰的实践

简介: 覃立辉在 5.21 Flink CDC Meetup 的分享。

本文整理自顺丰大数据研发工程师覃立辉在 5月 21 日 Flink CDC Meetup 的演讲。主要内容包括:

  1. 顺丰数据集成背景
  2. Flink CDC 实践问题与优化
  3. 未来规划

点击查看直播回放 & 演讲PDF

一、顺丰数据集成背景

img

顺丰是快递物流服务提供商,主营业务包含了时效快递、经济快递、同城配送以及冷链运输等。

运输流程背后需要一系列系统的支持,比如订单管理系统、智慧物业系统、以及很多中转场、汽车或飞机上的很多传感器,都会产生大量数据。如果需要对这些数据进行数据分析,那么数据集成是其中很重要的一步。

img

顺丰的数据集成经历了几年的发展,主要分为两块,一块是离线数据集成,一块是实时数据集成。离线数据集成以 DataX 为主,本文主要介绍实时数据集成方案。

2017 年,基于 Jstorm + Canal 的方式实现了第一个版本的实时数据集成方案。但是此方案存在诸多问题,比如无法保证数据的一致性、吞吐率较低、难以维护。 2019 年,随着 Flink 社区的不断发展,它补齐了很多重要特性,因此基于 Flink + Canal 的方式实现了第二个版本的实时数据集成方案。但是此方案依然不够完美,经历了内部调研与实践,2022 年初,我们全面转向 Flink CDC 。

img

上图为 Flink + Canal 的实时数据入湖架构。

Flink 启动之后,首先读取当前的 Binlog 信息,标记为 StartOffset ,通过 select 方式将全量数据采集上来,发往下游 Kafka。全量采集完毕之后,再从 startOffset 采集增量的日志信息,发往 Kafka。最终 Kafka 的数据由 Spark 消费后写往 Hudi。

但是此架构存在以下三个问题:

  • 全量与增量数据存在重复:因为采集过程中不会进行锁表,如果在全量采集过程中有数据变更,并且采集到了这些数据,那么这些数据会与 Binlog 中的数据存在重复;
  • 需要下游进行 Upsert 或 Merge 写入才能剔除重复的数据,确保数据的最终一致性;
  • 需要两套计算引擎,再加上消息队列 Kafka 才能将数据写入到数据湖 Hudi 中,过程涉及组件多、链路长,且消耗资源大。

基于以上问题,我们整理出了数据集成的核心需求:

img

  1. 全量增量自动切换,并保证数据的准确性。Flink + Canal 的架构能实现全量和增量自动切换,但无法保证数据的准确性;
  2. 最大限度地减少对源数据库的影响,比如同步过程中尽量不使用锁、能流控等;
  3. 能在已存在的任务中添加新表的数据采集,这是非常核心的需求,因为在复杂的生产环境中,等所有表都准备好之后再进行数据集成会导致效率低下。此外,如果不能做到任务的合并,需要起很多次任务,采集很多次 Binlog 的数据,可能会导致 DB 机器带宽被打满;
  4. 能同时进行全量和增量日志采集,新增表不能暂停日志采集来确保数据的准确性,这种方式会给其他表日志采集带来延迟;
  5. 能确保数据在同一主键 ID 下按历史顺序发生,不会出现后发生的事件先发送到下游。

img

Flink CDC 很好地解决了业务痛点,并且在可扩展性、稳定性、社区活跃度方面都非常优秀。

  • 首先,它能无缝对接 Flink 生态,复用 Flink 众多 sink 能力,使用 Flink 数据清理转换的能力;
  • 其次,它能进行全量与增量自动切换,并且保证数据的准确性;
  • 第三,它能支持无锁读取、断点续传、水平扩展,特别是在水平扩展方面,理论上来说,给的资源足够多时,性能瓶颈一般不会出现在 CDC 侧,而是在于数据源/目标源是否能支持读/写这么多数据。

二、Flink CDC 实践问题与优化

img

上图为 Flink CDC 2.0 的架构原理。 它基于 FLIP-27 实现,核心步骤如下:

  1. Enumerator 先将全量数据拆分成多个 SnapshotSplit,然后按照上图中第一步将 SnapshotSplit 发送给 SourceReader 执行。执行过程中会对数据进行修正来保证数据的一致性;
  2. SnapshotSplit 读取完成后向 Enumerator 汇报已读取完成的块信息;
  3. 重复执行 (1) (2) 两个步骤,直到将全量数据读取完毕;
  4. 全量数据读取完毕之后,Enumerator 会根据之前全量完成的 split 信息, 构造一个 BinlogSplit。 发送给 SourceRead 执行,读取增量日志数据。

问题一:新增表会停止 Binlog 日志流

img

在已存在的任务中添加新表是非常重要的需求, Flink CDC 2.0 也支持了这一功能。但是为了确保数据的一致性,Flink CDC 2.0 在新增表的流程中,需要停止 Binlog 日志流的读取,再进行新增表的全量数据读取。等新增表的全量数据读取完毕之后,再将之前停止的 Binlog 任务重新启动。这也意味着新增表会影响其他表的日志采集进度。然而我们希望全量和增量两个任务能够同时进行,为了解决这一问题,我们对 Flink CDC 进行了拓展,支持了全量和增量日志流并行读取,步骤如下:

img

  1. 程序启动后,在 Enumerator 中创建 BinlogSplit ,放在分配列表的第一位,分配给 SourceReader 执行增量数据采集;
  2. 与原有的全量数据采集一样,Enumerator 将全量采集切分成多个 split 块,然后将切分好的块分配给 SourceReader 去执行全量数据的采集;
  3. 全量数据采集完成之后,SourceReader 向 Enumerator 汇报已经完成的全量数据采集块的信息;
  4. 重复 (2) (3) 步,将全量的表采集完毕。

以上就是第一次启动任务,全量与增量日志并行读取的流程。新增表后,并行读取实现步骤如下:

img

  1. 恢复任务时,Flink CDC 会从 state 中获取用户新表的配置信息;
  2. 通过对比用户配置信息与状态信息,捕获到要新增的表。对于 BinlogSplit 任务,会增加新表 binlog 数据的采集;对于 Enumerator 任务,会对新表进行全量切分;
  3. Enumerator 将切分好的 SnapshotSplit 分配给 SourceReader 执行全量数据采集;
  4. 重复步骤 (3),直到所有全量数据读取完毕。

然而,实现全量和增量日志并行读取后,又出现了数据冲突问题。

img

如上图所示, Flink CDC 在读取全量数据之前,会先读取当前 Binlog 的位置信息,将其标记为 LW,接着通过 select 的方式读取全量数据,读取到上图中 s1、s2、 s3、s4 四条数据。再读取当前的 Binlog 位置,标记为 HW, 然后将 LW 和 HW 中变更的数据 merge 到之前全量采集上来的数据中。经过一系列操作后,最终全量采集到的数据是 s1、s2、s3、s4 和 s5。

而增量采集的进程也会读取 Binlog 中的日志信息,会将 LW 和 HW 中的 s2、s2、s4、s5 四条数据发往下游。

上述整个流程中存在两个问题:首先,数据多取,存在数据重复,上图中红色标识即存在重复的数据;其次,全量和增量在两个不同的线程中,也有可能是在两个不同的 JVM 中,因此先发往下游的数据可能是全量数据,也有可能是增量数据,意味着同一主键 ID 到达下游的先后顺序不是按历史顺序,与核心需求不符。

针对数据冲突问题,我们提供了基于 GTID 实现的处理方案。

img

首先,为全量数据打上 Snapshot 标签,增量数据打上 Binlog 标签;其次,为全量数据补充一个高水位 GTID 信息,而增量数据本身携带有 GTID 信息,因此不需要补充。将数据下发,下游会接上一个 KeyBy 算子,再接上数据冲突处理算子,数据冲突的核心是保证发往下游的数据不重复,并且按历史顺序产生。

如果下发的是全量采集到的数据,且此前没有 Binlog 数据下发,则将这条数据的 GTID 存储到 state 并把这条数据下发;如果 state 不为空且此条记录的 GTID 大于等于状态中的 GTID ,也将这条数据的 GTID 存储到 state 并把这条数据下发;

通过这种方式,很好地解决了数据冲突的问题,最终输出到下游的数据是不重复且按历史顺序发生的。

img

然而,新的问题又产生了。在处理算法中可以看出,为了确保数据的不重复并且按历史顺序下发,会将所有记录对应的 GTID 信息存储在状态中,导致状态一直递增。

清理状态一般首选 TTL,但 TTL 难以控制时间,且无法将数据完全清理掉。第二种方式是手动清理,全量表完成之后,可以下发一条记录告诉下游清理 state 中的数据。

解决了以上所有问题,并行读取的最终方案如下图所示。

img

首先,给数据打上四种标签,分别代表不同的状态:

  • SNAPSHOT:全量采集到的数据信息。
  • STATE_BINLOG:还未完成全量采集, Binlog 已采集到这张表的变更数据。
  • BINLOG:全量数据采集完毕之后,Binlog 再采集到这张表的变更数据。
  • TABLE_FINISHED:全量数据采集完成之后通知下游,可以清理 state。

具体实现步骤如下:

  1. 分配 Binlog ,此时 Binlog 采集到的数据都为 STATE_BINLOG 标签;
  2. 分配 SnapshotSplit 任务,此时全量采集到的数据都为 SNAPSHOT 标签;
  3. Enumerator 实时监控表的状态,某一张表执行完成并完成 checkpoint 后,通知 Binlog 任务。Binlog 任务收到通知后,将此表后续采集到的 Binlog 信息都打上 BINLOG 标签;此外,它还会构造一条 TABLE_FINISHED 记录发往下游做处理;
  4. 数据采集完成后,除了接上数据冲突处理算子,此处还新增了一个步骤:从主流中筛选出来的 TABLE_FINISHED 事件记录,通过广播的方式将其发往下游,下游根据具体信息清理对应表的状态信息。

问题二:写 Hudi 时存在数据倾斜

img

如上图,Flink CDC 采集三张表数据的时候,会先读取完 tableA 的全量数据,再读取tableB 的全量数据。读取 tableA 的过程中,下游只有 tableA 的 sink 有数据流入。

img

我们通过多表混合读取的方式来解决数据倾斜的问题。

引入多表混合之前,Flink CDC 读取完 tableA 的所有 chunk,再读取 tableB 的所有 chunk。实现了多表混合读取后,读取的顺序变为读取 tableA 的 chunk1、tableB 的 chunk1、tableC 的 chunk1,再读取 tableA 的 chunk2,以此类推,最终很好地解决了下游 sink 数据倾斜的问题,保证每个 sink 都有数据流入。

img

我们对多表混合读取的性能进行了测试,由 TPCC 工具构造的测试数据,读取了 4。张表,总并行度为 8,每个 sink 的并行度为 2,写入时间由原来的 46 分钟降至 20 分钟,性能提升 2.3 倍。

需要注意的是,如果 sink 的并行度和总并行度相等,则性能不会有明显提升,多表混合读取主要的作用是更快地获取到每张表下发的数据。

问题三:需要用户手动指定 schema 信息

img

用户手动执行 DB schema 与 sink 之间 schema 映射关系,开发效率低,耗时长且容易出错。

img

为了降低用户的使用门槛,提升开发效率,我们实现了 Oracle catalog ,让用户能以低代码的方式、无需指定 DB schema 信息与 sink schema 信息的映射关系,即可通过 Flink CDC 将数据写入到 Hudi。

三、未来规划

img

第一, 支持 schema 信息变更同步。比如数据源发生了 schema 信息变更,能够将其同步到 Kafka 和 Hudi 中;支持平台接入更多数据源类型,增强稳定性,实现更多应用场景的落地。

第二, 支持 SQL 化的方式,使用 Flink CDC 将数据同步到 Hudi 中,降低用户的使用门槛。

第三, 希望技术更开放,与社区共同成长,为社区贡献出自己的一份力量。

问答

Q:断点续传采集如何处理?

A:断点续传有两种,分为全量和 Binlog。但它们都是基于 Flink state 的能力,同步的过程中会将进度存储到 state 中。如果失败了,下一次再从 state 中恢复即可。

Q:MySQL 在监控多表使用 SQL 写入 Hudi 表中的时候,存在多个 job,维护很麻烦,如何通过单 job 同步整库?

A:我们基于 GTID 的方式对 Flink CDC 进行了拓展,支持任务中新增表,且不影响其他表的采集进度。不考虑新增表影响到其他表进度的情况下,也可以基于 Flink CDC 2.2 做新增表的能力。

Q:顺丰这些特性会在 CDC 开源版本中实现吗?

A:目前我们的方案还存在一些局限性,比如必须用 MySQL 的 GTID,需要下游有数据冲突处理的算子,因此较难实现在社区中开源。

Q:Flink CDC 2.0 新增表支持全量 + 增量吗?

A:是的。

Q:GTID 去重算子会不会成为性能瓶颈?

A:经过实践,不存在性能瓶颈,它只是做了一些数据的判断和过滤。

点击查看直播回放 & 演讲PDF


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

O1CN01tmtpiy1iazJYZdixL_!!6000000004430-2-tps-899-548.png"

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
6月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
2662 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
8月前
|
资源调度 Kubernetes 流计算
Flink在B站的大规模云原生实践
本文基于哔哩哔哩资深开发工程师丁国涛在Flink Forward Asia 2024云原生专场的分享,围绕Flink On K8S的实践展开。内容涵盖五个部分:背景介绍、功能及稳定性优化、性能优化、运维优化和未来展望。文章详细分析了从YARN迁移到K8S的优势与挑战,包括资源池统一、环境一致性改进及隔离性提升,并针对镜像优化、Pod异常处理、启动速度优化等问题提出解决方案。此外,还探讨了多机房容灾、负载均衡及潮汐混部等未来发展方向,为Flink云原生化提供了全面的技术参考。
492 9
Flink在B站的大规模云原生实践
|
9月前
|
数据采集 SQL canal
Amoro + Flink CDC 数据融合入湖新体验
本文总结了货拉拉高级大数据开发工程师陈政羽在Flink Forward Asia 2024上的分享,聚焦Flink CDC在货拉拉的应用与优化。内容涵盖CDC应用现状、数据入湖新体验、入湖优化及未来规划。文中详细分析了CDC在多业务场景中的实践,包括数据采集平台化、稳定性建设,以及面临的文件碎片化、Schema演进等挑战。同时介绍了基于Apache Amoro的湖仓融合架构,通过自优化服务解决小文件问题,提升数据新鲜度与读写平衡。未来将深化Paimon与Amoro的结合,打造更高效的入湖生态与自动化优化方案。
530 1
Amoro + Flink CDC 数据融合入湖新体验
|
9月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1600 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
8月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
消息中间件 缓存 关系型数据库
Flink CDC产品常见问题之upsert-kafka增加参数报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
Oracle 关系型数据库 MySQL
flink cdc 插件问题之报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
监控 关系型数据库 MySQL
Flink CDC产品常见问题之使用3.0测试mysql到starrocks启动报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
Java 关系型数据库 MySQL
Flink CDC有见这个报错不?
【2月更文挑战第29天】Flink CDC有见这个报错不?
283 2
|
存储 关系型数据库 MySQL
Flink CDC产品常见问题之写hudi的时候报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。

相关产品

  • 实时计算 Flink版