Flink 1.11 Unaligned Checkpoint 解析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 由于 Checkpoint 与反压的耦合,反压反过来也会作用于 Checkpoint,导致 Checkpoint 的种种问题。针对于此,Flink 在 1.11 引入 Unaligned Checkpint 来解耦 Checkpoint 机制与反压机制,优化高反压情况下的 Checkpoint 表现。

作者:林小铂@网易

作为 Flink 最基础也是最关键的容错机制,Checkpoint 快照机制很好地保证了 Flink 应用从异常状态恢复后的数据准确性。同时 Checkpoint 相关的 metrics 也是诊断 Flink 应用健康状态最为重要的指标,成功且耗时较短的 Checkpoint 表明作业运行状况良好,没有异常或反压。然而,由于 Checkpoint 与反压的耦合,反压反过来也会作用于 Checkpoint,导致 Checkpoint 的种种问题。

针对于此,Flink 在 1.11 引入 Unaligned Checkpint 来解耦 Checkpoint 机制与反压机制,优化高反压情况下的 Checkpoint 表现。

当前 Checkpoint 机制简述

相信不少读者对 Flink Checkpoint 基于 Chandy-Lamport 算法的分布式快照已经比较熟悉,该节简单回顾下算法的基础逻辑,熟悉算法的读者可放心跳过。

Chandy-Lamport 算法将分布式系统抽象成 DAG(暂时不考虑有闭环的图),节点表示进程,边表示两个进程间通信的管道。分布式快照的目的是记录下整个系统的状态,即可以分为节点的状态(进程的状态)和边的状态(信道的状态,即传输中的数据)。因为系统状态是由输入的消息序列驱动变化的,我们可以将输入的消息序列分为多个较短的子序列,图的每个节点或边先后处理完某个子序列后,都会进入同一个稳定的全局统状态。利用这个特性,系统的进程和信道在子序列的边界点分别进行本地快照,即使各部分的快照时间点不同,最终也可以组合成一个有意义的全局快照。

1.jpg图1. Checkpoint Barrier

从实现上看,Flink 通过在 DAG 数据源定时向数据流注入名为 Barrier 的特殊元素,将连续的数据流切分为多个有限序列,对应多个 Checkpoint 周期。每当接收到 Barrier,算子进行本地的 Checkpoint 快照,并在完成后异步上传本地快照,同时将 Barrier 以广播方式发送至下游。当某个 Checkpoint 的所有 Barrier 到达 DAG 末端且所有算子完成快照,则标志着全局快照的成功。

2.jpg图2. Barrier Alignment

在有多个输入 Channel 的情况下,为了数据准确性,算子会等待所有流的 Barrier 都到达之后才会开始本地的快照,这种机制被称为 Barrier 对齐。在对齐的过程中,算子只会继续处理的来自未出现 Barrier Channel 的数据,而其余 Channel 的数据会被写入输入队列,直至在队列满后被阻塞。当所有 Barrier 到达后,算子进行本地快照,输出 Barrier 到下游并恢复正常处理。

比起其他分布式快照,该算法的优势在于辅以 Copy-On-Write 技术的情况下不需要 “Stop The World” 影响应用吞吐量,同时基本不用持久化处理中的数据,只用保存进程的状态信息,大大减小了快照的大小。

Checkpoint 与反压的耦合

目前的 Checkpoint 算法在大多数情况下运行良好,然而当作业出现反压时,阻塞式的 Barrier 对齐反而会加剧作业的反压,甚至导致作业的不稳定。

首先, Chandy-Lamport 分布式快照的结束依赖于 Marker 的流动,而反压则会限制 Marker 的流动,导致快照的完成时间变长甚至超时。无论是哪种情况,都会导致 Checkpoint 的时间点落后于实际数据流较多。这时作业的计算进度是没有被持久化的,处于一个比较脆弱的状态,如果作业出于异常被动重启或者被用户主动重启,作业会回滚丢失一定的进度。如果 Checkpoint 连续超时且没有很好的监控,回滚丢失的进度可能高达一天以上,对于实时业务这通常是不可接受的。更糟糕的是,回滚后的作业落后的 Lag 更大,通常带来更大的反压,形成一个恶性循环。

其次,Barrier 对齐本身可能成为一个反压的源头,影响上游算子的效率,而这在某些情况下是不必要的。比如典型的情况是一个的作业读取多个 Source,分别进行不同的聚合计算,然后将计算完的结果分别写入不同的 Sink。通常来说,这些不同的 Sink 会复用公共的算子以减少重复计算,但并不希望不同 Source 间相互影响。

3.jpg图3. Barrier Alignment 阻塞上游 Task

假设一个作业要分别统计 A 和 B 两个业务线的以天为粒度指标,同时还需要统计所有业务线以周为单位的指标,拓扑如上图所示。如果 B 业务线某天的业务量突涨,使得 Checkpoint Barrier 有延迟,那么会导致公用的 Window Aggregate 进行 Barrier 对齐,进而阻塞业务 A 的 FlatMap,最终令业务 A 的计算也出现延迟。

当然这种情况可以通过拆分作业等方式优化,但难免引入更多开发维护成本,而且更重要的是这本来就符合 Flink 用户常规的开发思路,应该在框架内尽量减小出现用户意料之外的行为的可能性。

Unaligned Checkpoint

为了解决这个问题,Flink 在 1.11 版本引入了 Unaligned Checkpoint 的特性。要理解 Unaligned Checkpoint 的原理,首先需要了解 Chandy-Lamport 论文中对于 Marker 处理规则的描述:

4.jpg
图4. Chandy-Lamport Marker 处理

其中关键是 if q has not recorded its state,也就是接收到 Marker 时算子是否已经进行过本地快照。一直以来 Flink 的 Aligned Checkpoint 通过 Barrier 对齐,将本地快照延迟至所有 Barrier 到达,因而这个条件是永真的,从而巧妙地避免了对算子输入队列的状态进行快照,但代价是比较不可控的 Checkpoint 时长和吞吐量的降低。实际上这和 Chandy-Lamport 算法是有一定出入的。

举个例子,假设我们对两个数据流进行 equal-join,输出匹配上的元素。按照 Flink Aligned Checkpoint 的方式,系统的状态变化如下(图中不同颜色的元素代表属于不同的 Checkpoint 周期):

6.jpg
图5. Aligned Checkpoint 状态变化

  • 图 a: 输入 Channel 1 存在 3 个元素,其中 2 在 Barrier 前面;Channel 2 存在 4 个元素,其中 2、9、7在 Barrier 前面。
  • 图 b: 算子分别读取 Channel 一个元素,输出 2。随后接收到 Channel 1 的 Barrier,停止处理 Channel 1 后续的数据,只处理 Channel 2 的数据。
  • 图 c: 算子再消费 2 个自 Channel 2 的元素,接收到 Barrier,开始本地快照并输出 Barrier。

对于相同的情况,Chandy-Lamport 算法的状态变化如下:

7.jpg
图6. Chandy-Lamport 状态变化

  • 图 a: 同上。
  • 图 b: 算子分别处理两个 Channel 一个元素,输出结果 2。此后接收到 Channel 1 的 Barrier,算子开始本地快照记录自己的状态,并输出 Barrier。
  • 图 c: 算子继续正常处理两个 Channel 的输入,输出 9。特别的地方是 Channel 2 后续元素会被保存下来,直到 Channel 2 的 Barrier 出现(即 Channel 2 的 9 和 7)。保存的数据会作为 Channel 的状态成为快照的一部分。

两者的差异主要可以总结为两点:

  1. 快照的触发是在接收到第一个 Barrier 时还是在接收到最后一个 Barrier 时。
  2. 是否需要阻塞已经接收到 Barrier 的 Channel 的计算。

从这两点来看,新的 Unaligned Checkpoint 将快照的触发改为第一个 Barrier 且取消阻塞 Channel 的计算,算法上与 Chandy-Lamport 基本一致,同时在实现细节方面结合 Flink 的定位做了几个改进。

首先,不同于 Chandy-Lamport 模型的只需要考虑算子输入 Channel 的状态,Flink 的算子有输入和输出两种 Channel,在快照时两者的状态都需要被考虑。

其次,无论在 Chandy-Lamport 还是 Flink Aligned Checkpoint 算法中,Barrier 都必须遵循其在数据流中的位置,算子需要等待 Barrier 被实际处理才开始快照。而 Unaligned Checkpoint 改变了这个设定,允许算子优先摄入并优先输出 Barrier。如此一来,第一个到达 Barrier 会在算子的缓存数据队列(包括输入 Channel 和输出 Channel)中往前跳跃一段距离,而被”插队”的数据和其他输入 Channel 在其 Barrier 之前的数据会被写入快照中(图中黄色部分)。

8.jpg
图7. Barrier 越过数据

这样的主要好处是,如果本身算子的处理就是瓶颈,Chandy-Lamport 的 Barrier 仍会被阻塞,但 Unaligned Checkpoint 则可以在 Barrier 进入输入 Channel 就马上开始快照。这可以从很大程度上加快 Barrier 流经整个 DAG 的速度,从而降低 Checkpoint 整体时长。

回到之前的例子,用 Unaligned Checkpoint 来实现,状态变化如下:

9.jpg
图8. Unaligned-Checkpoint 状态变化

  • 图 a: 输入 Channel 1 存在 3 个元素,其中 2 在 Barrier 前面;Channel 2 存在 4 个元素,其中 2、9、7在 Barrier 前面。输出 Channel 已存在结果数据 1。
  • 图 b: 算子优先处理输入 Channel 1 的 Barrier,开始本地快照记录自己的状态,并将 Barrier 插到输出 Channel 末端。
  • 图 c: 算子继续正常处理两个 Channel 的输入,输出 2、9。同时算子会将 Barrier 越过的数据(即输入 Channel 1 的 2 和输出 Channel 的 1)写入 Checkpoint,并将输入 Channel 2 后续早于 Barrier 的数据(即 2、9、7)持续写入 Checkpoint。

比起 Aligned Checkpoint 中不同 Checkpoint 周期的数据以算子快照为界限分隔得很清晰,Unaligned Checkpoint 进行快照和输出 Barrier 时,部分本属于当前 Checkpoint 的输入数据还未计算(因此未反映到当前算子状态中),而部分属于当前 Checkpoint 的输出数据却落到 Barrier 之后(因此未反映到下游算子的状态中)。

这也正是 Unaligned 的含义: 不同 Checkpoint 周期的数据没有对齐,包括不同输入 Channel 之间的不对齐,以及输入和输出间的不对齐。而这部分不对齐的数据会被快照记录下来,以在恢复状态时重放。换句话说,从 Checkpoint 恢复时,不对齐的数据并不能由 Source 端重放的数据计算得出,同时也没有反映到算子状态中,但因为它们会被 Checkpoint 恢复到对应 Channel 中,所以依然能提供只计算一次的准确结果。

当然,Unaligned Checkpoint 并不是百分百优于 Aligned Checkpoint,它会带来的已知问题就有:

  1. 由于要持久化缓存数据,State Size 会有比较大的增长,磁盘负载会加重。
  2. 随着 State Size 增长,作业恢复时间可能增长,运维管理难度增加。

目前看来,Unaligned Checkpoint 更适合容易产生高反压同时又比较重要的复杂作业。对于像数据 ETL 同步等简单作业,更轻量级的 Aligned Checkpoint 显然是更好的选择。

总结

Flink 1.11 的 Unaligned Checkpoint 主要解决在高反压情况下作业难以完成 Checkpoint 的问题,同时它以磁盘资源为代价,避免了 Checkpoint 可能带来的阻塞,有利于提升 Flink 的资源利用率。随着流计算的普及,未来的 Flink 应用大概会越来越复杂,在未来经过实战打磨完善后 Unaligned Checkpoint 很有可能会取代 Aligned Checkpoint 成为 Flink 的默认 Checkpoint 策略。

参考:

  1. Flink Docs: Data Streaming Fault Tolerance

https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html

  1. Checkpointing Under Backpressure

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html

  1. Flink Checkpoint 问题排查实用指南

https://zhuanlan.zhihu.com/p/87131964

作者介绍:

林小铂,网易游戏高级开发工程师,负责游戏数据中心实时平台的开发及运维工作,目前专注于 Apache Flink 的开发及应用。探究问题本来就是一种乐趣。

相关文章
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
137 3
|
4天前
|
存储 监控 算法
Flink 四大基石之 Checkpoint 使用详解
Flink 的 Checkpoint 机制通过定期插入 Barrier 将数据流切分并进行快照,确保故障时能从最近的 Checkpoint 恢复,保障数据一致性。Checkpoint 分为精确一次和至少一次两种语义,前者确保每个数据仅处理一次,后者允许重复处理但不会丢失数据。此外,Flink 提供多种重启策略,如固定延迟、失败率和无重启策略,以应对不同场景。SavePoint 是手动触发的 Checkpoint,用于作业升级和迁移。Checkpoint 执行流程包括 Barrier 注入、算子状态快照、Barrier 对齐和完成 Checkpoint。
56 20
|
19天前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
74 14
|
5月前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用问题之mini-cluster模式下,怎么指定checkpoint的时间间隔
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
存储 监控 Serverless
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决
|
5月前
|
缓存 流计算
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
|
5月前
|
分布式计算 流计算
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
|
4月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
2月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1544 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
5天前
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
112 0
Flink CDC 在阿里云实时计算Flink版的云上实践

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多