作者:林小铂@网易
作为 Flink 最基础也是最关键的容错机制,Checkpoint 快照机制很好地保证了 Flink 应用从异常状态恢复后的数据准确性。同时 Checkpoint 相关的 metrics 也是诊断 Flink 应用健康状态最为重要的指标,成功且耗时较短的 Checkpoint 表明作业运行状况良好,没有异常或反压。然而,由于 Checkpoint 与反压的耦合,反压反过来也会作用于 Checkpoint,导致 Checkpoint 的种种问题。
针对于此,Flink 在 1.11 引入 Unaligned Checkpint 来解耦 Checkpoint 机制与反压机制,优化高反压情况下的 Checkpoint 表现。
当前 Checkpoint 机制简述
相信不少读者对 Flink Checkpoint 基于 Chandy-Lamport 算法的分布式快照已经比较熟悉,该节简单回顾下算法的基础逻辑,熟悉算法的读者可放心跳过。
Chandy-Lamport 算法将分布式系统抽象成 DAG(暂时不考虑有闭环的图),节点表示进程,边表示两个进程间通信的管道。分布式快照的目的是记录下整个系统的状态,即可以分为节点的状态(进程的状态)和边的状态(信道的状态,即传输中的数据)。因为系统状态是由输入的消息序列驱动变化的,我们可以将输入的消息序列分为多个较短的子序列,图的每个节点或边先后处理完某个子序列后,都会进入同一个稳定的全局统状态。利用这个特性,系统的进程和信道在子序列的边界点分别进行本地快照,即使各部分的快照时间点不同,最终也可以组合成一个有意义的全局快照。
图1. Checkpoint Barrier
从实现上看,Flink 通过在 DAG 数据源定时向数据流注入名为 Barrier 的特殊元素,将连续的数据流切分为多个有限序列,对应多个 Checkpoint 周期。每当接收到 Barrier,算子进行本地的 Checkpoint 快照,并在完成后异步上传本地快照,同时将 Barrier 以广播方式发送至下游。当某个 Checkpoint 的所有 Barrier 到达 DAG 末端且所有算子完成快照,则标志着全局快照的成功。
图2. Barrier Alignment
在有多个输入 Channel 的情况下,为了数据准确性,算子会等待所有流的 Barrier 都到达之后才会开始本地的快照,这种机制被称为 Barrier 对齐。在对齐的过程中,算子只会继续处理的来自未出现 Barrier Channel 的数据,而其余 Channel 的数据会被写入输入队列,直至在队列满后被阻塞。当所有 Barrier 到达后,算子进行本地快照,输出 Barrier 到下游并恢复正常处理。
比起其他分布式快照,该算法的优势在于辅以 Copy-On-Write 技术的情况下不需要 “Stop The World” 影响应用吞吐量,同时基本不用持久化处理中的数据,只用保存进程的状态信息,大大减小了快照的大小。
Checkpoint 与反压的耦合
目前的 Checkpoint 算法在大多数情况下运行良好,然而当作业出现反压时,阻塞式的 Barrier 对齐反而会加剧作业的反压,甚至导致作业的不稳定。
首先, Chandy-Lamport 分布式快照的结束依赖于 Marker 的流动,而反压则会限制 Marker 的流动,导致快照的完成时间变长甚至超时。无论是哪种情况,都会导致 Checkpoint 的时间点落后于实际数据流较多。这时作业的计算进度是没有被持久化的,处于一个比较脆弱的状态,如果作业出于异常被动重启或者被用户主动重启,作业会回滚丢失一定的进度。如果 Checkpoint 连续超时且没有很好的监控,回滚丢失的进度可能高达一天以上,对于实时业务这通常是不可接受的。更糟糕的是,回滚后的作业落后的 Lag 更大,通常带来更大的反压,形成一个恶性循环。
其次,Barrier 对齐本身可能成为一个反压的源头,影响上游算子的效率,而这在某些情况下是不必要的。比如典型的情况是一个的作业读取多个 Source,分别进行不同的聚合计算,然后将计算完的结果分别写入不同的 Sink。通常来说,这些不同的 Sink 会复用公共的算子以减少重复计算,但并不希望不同 Source 间相互影响。
图3. Barrier Alignment 阻塞上游 Task
假设一个作业要分别统计 A 和 B 两个业务线的以天为粒度指标,同时还需要统计所有业务线以周为单位的指标,拓扑如上图所示。如果 B 业务线某天的业务量突涨,使得 Checkpoint Barrier 有延迟,那么会导致公用的 Window Aggregate 进行 Barrier 对齐,进而阻塞业务 A 的 FlatMap,最终令业务 A 的计算也出现延迟。
当然这种情况可以通过拆分作业等方式优化,但难免引入更多开发维护成本,而且更重要的是这本来就符合 Flink 用户常规的开发思路,应该在框架内尽量减小出现用户意料之外的行为的可能性。
Unaligned Checkpoint
为了解决这个问题,Flink 在 1.11 版本引入了 Unaligned Checkpoint 的特性。要理解 Unaligned Checkpoint 的原理,首先需要了解 Chandy-Lamport 论文中对于 Marker 处理规则的描述:
图4. Chandy-Lamport Marker 处理
其中关键是 if q has not recorded its state,也就是接收到 Marker 时算子是否已经进行过本地快照。一直以来 Flink 的 Aligned Checkpoint 通过 Barrier 对齐,将本地快照延迟至所有 Barrier 到达,因而这个条件是永真的,从而巧妙地避免了对算子输入队列的状态进行快照,但代价是比较不可控的 Checkpoint 时长和吞吐量的降低。实际上这和 Chandy-Lamport 算法是有一定出入的。
举个例子,假设我们对两个数据流进行 equal-join,输出匹配上的元素。按照 Flink Aligned Checkpoint 的方式,系统的状态变化如下(图中不同颜色的元素代表属于不同的 Checkpoint 周期):
图5. Aligned Checkpoint 状态变化
- 图 a: 输入 Channel 1 存在 3 个元素,其中 2 在 Barrier 前面;Channel 2 存在 4 个元素,其中 2、9、7在 Barrier 前面。
- 图 b: 算子分别读取 Channel 一个元素,输出 2。随后接收到 Channel 1 的 Barrier,停止处理 Channel 1 后续的数据,只处理 Channel 2 的数据。
- 图 c: 算子再消费 2 个自 Channel 2 的元素,接收到 Barrier,开始本地快照并输出 Barrier。
对于相同的情况,Chandy-Lamport 算法的状态变化如下:
图6. Chandy-Lamport 状态变化
- 图 a: 同上。
- 图 b: 算子分别处理两个 Channel 一个元素,输出结果 2。此后接收到 Channel 1 的 Barrier,算子开始本地快照记录自己的状态,并输出 Barrier。
- 图 c: 算子继续正常处理两个 Channel 的输入,输出 9。特别的地方是 Channel 2 后续元素会被保存下来,直到 Channel 2 的 Barrier 出现(即 Channel 2 的 9 和 7)。保存的数据会作为 Channel 的状态成为快照的一部分。
两者的差异主要可以总结为两点:
- 快照的触发是在接收到第一个 Barrier 时还是在接收到最后一个 Barrier 时。
- 是否需要阻塞已经接收到 Barrier 的 Channel 的计算。
从这两点来看,新的 Unaligned Checkpoint 将快照的触发改为第一个 Barrier 且取消阻塞 Channel 的计算,算法上与 Chandy-Lamport 基本一致,同时在实现细节方面结合 Flink 的定位做了几个改进。
首先,不同于 Chandy-Lamport 模型的只需要考虑算子输入 Channel 的状态,Flink 的算子有输入和输出两种 Channel,在快照时两者的状态都需要被考虑。
其次,无论在 Chandy-Lamport 还是 Flink Aligned Checkpoint 算法中,Barrier 都必须遵循其在数据流中的位置,算子需要等待 Barrier 被实际处理才开始快照。而 Unaligned Checkpoint 改变了这个设定,允许算子优先摄入并优先输出 Barrier。如此一来,第一个到达 Barrier 会在算子的缓存数据队列(包括输入 Channel 和输出 Channel)中往前跳跃一段距离,而被”插队”的数据和其他输入 Channel 在其 Barrier 之前的数据会被写入快照中(图中黄色部分)。
图7. Barrier 越过数据
这样的主要好处是,如果本身算子的处理就是瓶颈,Chandy-Lamport 的 Barrier 仍会被阻塞,但 Unaligned Checkpoint 则可以在 Barrier 进入输入 Channel 就马上开始快照。这可以从很大程度上加快 Barrier 流经整个 DAG 的速度,从而降低 Checkpoint 整体时长。
回到之前的例子,用 Unaligned Checkpoint 来实现,状态变化如下:
图8. Unaligned-Checkpoint 状态变化
- 图 a: 输入 Channel 1 存在 3 个元素,其中 2 在 Barrier 前面;Channel 2 存在 4 个元素,其中 2、9、7在 Barrier 前面。输出 Channel 已存在结果数据 1。
- 图 b: 算子优先处理输入 Channel 1 的 Barrier,开始本地快照记录自己的状态,并将 Barrier 插到输出 Channel 末端。
- 图 c: 算子继续正常处理两个 Channel 的输入,输出 2、9。同时算子会将 Barrier 越过的数据(即输入 Channel 1 的 2 和输出 Channel 的 1)写入 Checkpoint,并将输入 Channel 2 后续早于 Barrier 的数据(即 2、9、7)持续写入 Checkpoint。
比起 Aligned Checkpoint 中不同 Checkpoint 周期的数据以算子快照为界限分隔得很清晰,Unaligned Checkpoint 进行快照和输出 Barrier 时,部分本属于当前 Checkpoint 的输入数据还未计算(因此未反映到当前算子状态中),而部分属于当前 Checkpoint 的输出数据却落到 Barrier 之后(因此未反映到下游算子的状态中)。
这也正是 Unaligned 的含义: 不同 Checkpoint 周期的数据没有对齐,包括不同输入 Channel 之间的不对齐,以及输入和输出间的不对齐。而这部分不对齐的数据会被快照记录下来,以在恢复状态时重放。换句话说,从 Checkpoint 恢复时,不对齐的数据并不能由 Source 端重放的数据计算得出,同时也没有反映到算子状态中,但因为它们会被 Checkpoint 恢复到对应 Channel 中,所以依然能提供只计算一次的准确结果。
当然,Unaligned Checkpoint 并不是百分百优于 Aligned Checkpoint,它会带来的已知问题就有:
- 由于要持久化缓存数据,State Size 会有比较大的增长,磁盘负载会加重。
- 随着 State Size 增长,作业恢复时间可能增长,运维管理难度增加。
目前看来,Unaligned Checkpoint 更适合容易产生高反压同时又比较重要的复杂作业。对于像数据 ETL 同步等简单作业,更轻量级的 Aligned Checkpoint 显然是更好的选择。
总结
Flink 1.11 的 Unaligned Checkpoint 主要解决在高反压情况下作业难以完成 Checkpoint 的问题,同时它以磁盘资源为代价,避免了 Checkpoint 可能带来的阻塞,有利于提升 Flink 的资源利用率。随着流计算的普及,未来的 Flink 应用大概会越来越复杂,在未来经过实战打磨完善后 Unaligned Checkpoint 很有可能会取代 Aligned Checkpoint 成为 Flink 的默认 Checkpoint 策略。
参考:
- Flink Docs: Data Streaming Fault Tolerance
https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html
- Checkpointing Under Backpressure
- Flink Checkpoint 问题排查实用指南
https://zhuanlan.zhihu.com/p/87131964
作者介绍:
林小铂,网易游戏高级开发工程师,负责游戏数据中心实时平台的开发及运维工作,目前专注于 Apache Flink 的开发及应用。探究问题本来就是一种乐趣。