Flink 1.15 新功能架构解析:高效稳定的通用增量 Checkpoint

简介: Generic Log-Based Incremental Checkpointing 功能介绍

作者 | 梅源(Yuan Mei)& Roman Khachatryan

流处理系统最重要的特性是端到端的延迟,端到端延迟是指开始处理输入数据到输出该数据产生的结果所需的时间。Flink,作为流式计算的标杆,其端到端延迟包括容错的快慢主要取决于检查点机制(Checkpointing),所以如何将 Checkpoint 做得高效稳定是 Flink 流计算的首要任务。我们在 “Flink 新一代流计算和容错——阶段总结和展望”[1] 一文中介绍了 Flink 从社区 1.12 版本开始所做的提升 Checkpointing 机制的努力,本文将着重介绍其中刚刚在 Flink 1.15 版本发布的 Generic Log-Based Incremental Checkpointing 这个功能。

一、概述

Generic Log-Based Incremental Checkpointing 的设计初衷是我们将全量的状态快照和增量的检查点机制分隔开,通过持续上传增量 Changelog 的方法,来确保每次 Checkpointing 可以稳定快速的完成,从而减小 Checkpointing 之间的间隔,提升 Flink系统端到端的延迟。拓展开来说,主要有如下三点提升:

  1. 更短的端到端延迟:尤其是对于 Transactional Sink。Transactional Sink 在 Checkpoint 完成的时候才能完成两阶段提交,因此减小 Checkpointing 的间隔意味着可以更频繁的提交,达到更短的端到端的延迟。
  2. 更稳定的 Checkpoint 完成时间:目前 Checkpoint 完成时间很大程度上取决于在 Checkpointing 时需要持久化的(增量)状态的大小。在新的设计中,我们通过持续上传增量,以达到减少 Checkpoint Flush 时所需要持久化的数据,来保证 Checkpoint 完成的稳定性。
  3. 容错恢复需要回滚的数据量更少:Checkpointing 之间的间隔越短,每次容错恢复后需要重新处理的数据就越少。

那是怎么做到的呢?我们知道影响 Flink Checkpointing 时间的主要因素有以下几点:

  1. Checkpoint Barrier 流动和对齐的速度;
  2. 将状态快照持久化到非易失性高可用存储(例如 S3)上所需要的时间。

对Flink Checkpoint 机制不太了解的读者可以参考 1

Flink 1.12 版本引入的 Unaligned Checkpoint[2] 和 1.14 版本中引入的 Buffer Debloating[3] 主要解决了上述第 1 个问题,尤其是在反压的情况下。更早之前引入的 Incremental Checkpoint[4] 是为了减少每次 Checkpointing 所需要持久化存储状态的大小,以减小第 2 个影响因素,但在实际中也不完全能做到:现有 Incremental Checkpoint 是基于 RocksDB 来完成的,RocksDB 出于空间放大和读性能的考虑会定期做 Compaction。Compaction 会产生新的、相对较大的文件,会增加上传所需要的时间。每一个执行 Flink 作业的物理节点(Task)至少有一个 RocksDB 实例,所以Checkpoint 被延迟的概率会随着物理节点增多而变大。这导致在 Flink 的大型作业中,几乎每次完成 Checkpointing 时都有可能会因为某个节点而延迟,如下图所示。

img

图1: 每次 Checkpoint 都可能因为某个节点上传文件缓慢而延迟

另外值得一提的是在现有的 Checkpointing 机制下,Task 只有在收到至少一个 Checkpoint Barrier 之后,才会做状态快照并且开始持久化状态快照到高可用存储,从而增加了 Checkpoint 完成时间,如下图所示。

img

图2: 在现有机制下,快照在 Checkpoint Barrier 到达之后才会开始上传

在新的设计中,我们通过持续上传增量 Changelog 的方法,可以避免这个限制,加速 Checkpoint 完成时间。下面我们来看看详细的设计。

二、设计

Generic Log-Based Incremental Checkpointing 的核心思想是引入 State Changelog(状态变化日志),这样可以更细粒度地持久化状态:

  1. 算子在更新状态的时候写双份,一份更新写入状态表 State Table 中,一份增量写入 State Changelog 中。
  2. Checkpoint 变成由两个部分组成,第一个部分是当前已经持久化的存在远端存储上的 State Table,第二个部分是增量的 State Changelog。
  3. State Table 的持久化和 Checkpointing 过程独立开来,会定期由 background thread 持久化,我们称为 Materialization(物化)的过程。
  4. 在做 Checkpoint 的时候,只要保证新增的 State Changelog 被持久化就可以了。

新的设计中需要在做 Checkpoint 的时候上传的数据量变得很少,不仅可以把 Checkpoint 做得更稳定,还可以做得更高频。整个工作流程如下图所示:

img

img

img

图3: Generic Log-Based Incremental Checkpointing 工作流程

Generic Log-Based Incremental Checkpointing 类似传统数据库系统的 WAL 机制:

  1. 数据的增量更改(插入/更新/删除)会被写入到 Transaction Log 中。一旦这部分更改的日志被同步到持久存储中,我们就可以认为 Transaction 已经完成了。这个过程类似于上述方法中的 Checkpointing 的过程。
  2. 同时,为了方便数据查询,数据的更改也会异步持久化在数据表(Table)中。 一旦 Transaction Log 中的相关部分也在数据表中被持久化了,Transaction Log 中相关部分就可以删除了。这个过程类似于我们方法中的 State Table 持久化过程。

这种和 WAL 类似的机制可以有效提升 Checkpoint 完成的速度,但也带来一些额外的开销:

  1. 额外的网络 IO 和额外的 Changelog 持久存储开销;
  2. 缓存 Changelog 带来的额外的内存使用;
  3. 容错恢复需要额外的重放 Changelog 带来的潜在的恢复时间的增加。

我们在后面的 Benchmark 对比中,也会对这三方面的影响进行分析。特别对于第 3 点,额外的重放 Changelog 所带来的容错恢复时间增加会在一定程度上因为可以做更频繁的 Checkpoint 所弥补,因为更频繁的 Checkpoint 意味着容错恢复后需要回放的处理数据更少。

三、Changelog 存储(DSTL)

Generic Log-Based Incremental Checkpointing 的很重要的一个组件是 State Changelog 存储这个部分,我们称之为 Durable Short-term Log(DSTL,短存 Log)。DSTL 需要满足以下几个特性:

  • 短期持久化

    State Changelog 是组成 Checkpoint 的一个部分,所以也需要能持久化存储。同时,State Changelog 只需要保存从最近一次持久化 State Table 到当前做 Checkpoint 时的 Changelog,因此只需要保存很短时间(几分钟)的数据。

  • 写入频率远远大于读取频率

    只有在 Restore 或者 Rescale 的情况下才需要读取 Changelog,大部分情况下只有 append 操作,并且一旦写入,数据就不能再被修改。

  • 很短的写延迟

    引入 State Changelog 是为了能将 Checkpoint 做得更快(1s 以内)。因此,单次写请求需要至少能在期望的 Checkpoint 时间内完成。

  • 保证一致性

    如果我们有多个 State Changelog 的副本,就会产生多副本之间的一致性问题。一旦某个副本的 State Changelog 被持久化并被 JM 确认,恢复时需要以此副本为基准保证语义一致性。

从上面的特性也可以看出为什么我们将 Changelog 存储命名为 DSTL 短存 Log。

3.1 DSTL 方案的选择

DSTL 可以有多种方式实现,例如分布式日志(Kafka)、分布式文件系统(DFS),甚至是数据库。在 Flink 1.15 发布的 Generic Log-Based Incremental Checkpointing MVP 版本中,我们选择 DFS 来实现 DSTL,基于如下考虑:

  1. 没有额外的外部依赖:目前 Flink Checkpoint 持久化在 DFS 中,所以以 DFS 来实现 DSTL没有引入额外的外部组件。
  2. 没有额外的状态管理:目前的设计方案中 DSTL 的状态管理是和 Flink Checkpointing 机制整合在一起的,所以也不需要额外的状态管理。
  3. DFS 原生提供持久化和一致性保证:如果实现多副本分布式日志,这些都是额外需要考虑的成本。

另一方面,使用 DFS 有以下缺点:

  1. 更高的延迟:DFS 相比于写入本地盘的分布式日志系统来讲一般来说有更高的延迟。
  2. 网络 I/O 限制:大部分 DFS 供应商出于成本的考虑都会对单用户 DFS 写入限流限速,极端情况有可能会造成网络过载。

经过一些初步实验,我们认为目前大部分 DFS 实现(例如 S3,HDFS 等)的性能可以满足 80% 的用例,后面的 Benchmark 会提供更多数据。

3.2 DSTL 架构

下图以 RocksDB 为例展示了基于 DFS 的 DSTL 架构图。状态更新通过 Changelog State Backend 双写,一份写到 RocksDB,另一份写到 DSTL。RocksDB 会定期进行 Materialization,也就是将当前的 SST 文件 上传到 DFS;而 DSTL 会将 state change 持续写入 DFS,并在 Checkpointing 的时候完成 flush,这样 Checkpoint 完成时间只取决于所需 flush 的数据量。需要注意的是 Materialization 完全独立于 Checkpointing 的过程,并且 Materialization 也可以比 Checkpointing 的频率慢很多,系统默认值是 10 分钟。

img

图4: 以 RocksDB 为例基于 DFS 的 DSTL 架构图

这里还有几个问题值得补充讨论一下:

  • 状态清理问题

    前面有提到在新的架构中,一个 Checkpoint 由两部分组成:1)State Table 和 2)State Change Log。这两部分都需要按需清理。1)这个部分的清理复用 Flink 已有的 Checkpoint 机制;2)这个部分的清理相对较复杂,特别是 State Change Log 在当前的设计中为了避免小文件的问题,是以 TM 为粒度的。在当前的设计中,我们分两个部分来清理 State Change Log:一是 Change Log 本身的数据需要在 State Table 物化后删除其相对应的部分;二是 Change Log 中成为 Checkpoint 的部分的清理融合进已有的 Flink Checkpoint 清理机制[4]

  • DFS 相关问题

    • 小文件问题

      DFS 的一个问题是每个 Checkpoint 会创建很多小文件,并且因为 Changleog State Backend 可以提供更高频的 Checkpoint,小文件问题会成为瓶颈。为了缓解这种情况,我们将同一个 Task Manager 上同一作业的所有 State Change 写到同一个文件中。因此,同一个 Task Manager 会共享同一个 State Change Log。

    • 长尾延迟问题

      为了解决 DFS 高长尾延迟问题,DFS 写入请求会在允许超时时间(默认为 1 秒)内无法完成时重试。

四、Benchmark 测试结果分析

Generic Log-Based Incremental Checkpointing 对于 Checkpoint 速度和稳定性的提升取决于以下几个因素:

  1. State Change Log 增量的部分与全量状态大小之比,增量越小越好。
  2. 不间断上传状态增量的能力。这个和状态访问模式相关,极端情况下,如果算子只在 Checkpointing 前更新 Flink State Table 的话,Changelog 起不到太大作用。
  3. 能够对来自多个 Task 的 changelog 分组批量上传的能力。Changelog 分组批量写 DFS 可以减少需要创建的文件数量并降低 DFS 负载,从而提高稳定性。
  4. 底层 State Backend 在刷磁盘前对同一个 key 的 更新的去重能力。因为 state change log 保存的是状态更新,而不是最终值,底层 State Backend 这种能力会增大 Changelog 增量与 State Table 全量状态大小之比。
  5. 写持久存储 DFS 的速度,写的速度越快 Changelog 所带来的提升越不明显。

4.1 Benchmark 配置

在 Benchmark 实验中,我们使用如下配置:

  • 算子并行度:50
  • 运行时间:21h
  • State Backend:RocksDB (Incremental Checkpoint Enabled)
  • 持久存储:S3 (Presto plugin)
  • 机器型号:AWS m5.xlarge(4 slots per TM)
  • Checkpoint 间隔: 10ms
  • State Table Materialization 间隔:3m
  • Input Rate:50K Events /s

4.2 ValueState Workload

我们第一部分的实验,主要针对每次更新的 Key 值都不一样的负载;这种负载因为上述第 2 点和第 4 点的原因,Changelog 的提升是比较明显的:Checkpoint 完成时间缩短了 10 倍(99.9 pct),Checkpoint 大小增加 30%,恢复时间增加 66% - 225%,如下表所示。

img

表1: 基于 ValueState Workload 的 Changelog 各项指标对比

下面我们来更详细的看一下 Checkpoint Size 这个部分:

img

表2: 基于 ValueState Workload 的 Changelog(开启/关闭)的 Checkpoint 相关指标对比

  • Checkpointed Data Size 是指在收到 Checkpoint Barrier,Checkpointing 过程开始后上传数据的大小。对于 Changelog 来说,大部分数据在 Checkpointing 过程开始前就已经上传了,所以这就是为什么开启 Changelog 时这个指标要比关闭时小得多的原因。
  • Full Checkpoint Data Size 是构成 Checkpoint 的所有文件的总大小,也包括与之前 Checkpoint 共享的文件。与通常的 Checkpoint 相比,Changelog 的格式没有被压缩过也不够紧凑,因此占用更多空间。

4.3 Window Workload

这里使用的是 Sliding Window。如下表所示,Changelog 对 checkpoint 完成时间加速 3 倍左右;但存储放大要高得多(消耗的空间接近 45 倍):

img

表3: 基于 Window Workload 的 Changelog(开启/关闭)的 Checkpoint 相关指标对比

Full Checkpoint Data 存储空间放大主要原因来自于:

  1. 对于 Sliding Window 算子,每条数据会加到多个滑动窗口中,因此为造成多次更新。Changelog 的写放大问题会更大。
  2. 前面有提到,如果底层 State Backend(比如 RocksDB)在刷磁盘前对同一个 key 的 更新去重能力越强,则快照的大小相对于 Changelog 会越小。在 Sliding Window 算子的极端情况下,滑动窗口会因为失效被清理。如果更新和清理发生在同一个 Checkpoint 之内,则很可能该窗口中的数据不包含在快照中。这也意味着清除窗口的速度越快,快照的大小就可能越小。

五、结论

Flink 1.15 版本实现了 Generic Log-Based Incremental Checkpointing 的 MVP 版本。这个版本基于 DFS 可以提供秒级左右的 Checkpoint 时间,并极大的提升了 Checkpoint 稳定性,但一定程度上也增加了空间的成本,本质上是用空间换时间。1.16 版本将进一步完善使其生产可用,比如我们可以通过 Local Recovery 和文件缓存来加速恢复时间。另一个方面,Changelog State Backend 接口是通用的,我们可以用同样的接口对接更快的存储来实现更短的延迟,例如 Apache Bookkeeper。 除此之外,我们正在研究 Changelog 的其他应用,例如将 Changelog 应用于 Sink 来实现通用的端到端的 exactly-once 等。

附录

如果您想试用 Generic Log-Based Incremental Checkpointing 的话,可以在 flink-conf.yaml 中进行如下简单的设置:

state.backend.changelog.enabled: true

state.backend.changelog.storage: filesystem 

dstl.dfs.base-path: <location similar to state.checkpoints.dir>

完整的设置文档可以参考 5

致谢

我们感谢 Stephan Ewen 提出了这个功能的最初设想,也感谢 Piotr Nowojski, Yu Li 和 Yun Tang 的讨论和代码 Review。

[1] https://flink-learning.org.cn/article/detail/47e7326c71e6f792c0bb4687c7d152a0

[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints

[3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment

[4] https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html

[5] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#enabling-changelog


Flink CDC Meetup 视频回顾 & PPT 下载

O1CN01yaX99t1HxtVhJJwGU_!!6000000000825-2-tps-2300-1000.png"

更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

O1CN01tmtpiy1iazJYZdixL_!!6000000004430-2-tps-899-548.png"

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
9月前
|
机器学习/深度学习 运维 监控
实时异常检测实战:Flink+PAI 算法模型服务化架构设计
本文深入探讨了基于 Apache Flink 与阿里云 PAI 构建的实时异常检测系统。内容涵盖技术演进、架构设计、核心模块实现及金融、工业等多领域实战案例,解析流处理、模型服务化、状态管理等关键技术,并提供性能优化与高可用方案,助力企业打造高效智能的实时异常检测平台。
834 1
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
549 3
|
11月前
|
SQL 运维 Java
蚂蚁 Flink 实时计算编译任务 Koupleless 架构改造
本文介绍了对Flink实时计算编译任务的Koupleless架构改造。为解决进程模型带来的响应慢、资源消耗大等问题,团队将进程模型改为线程模型,并借助Koupleless的类加载隔离能力实现版本和包的隔离。通过动态装配Plugin及其Classpath,以及Biz运行时仅对依赖Plugin可见的设计,大幅优化了编译任务的性能。结果表明,新架构使编译耗时降低50%,吞吐量提升5倍以上。
蚂蚁 Flink 实时计算编译任务 Koupleless 架构改造
|
存储 分布式计算 API
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
735 0
|
SQL 消息中间件 Kafka
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
本文介绍了阿里云实时数仓Hologres负责人姜伟华在Flink Forward Asia 2024上的分享,涵盖实时数仓的发展历程、从实时数仓到实时湖仓的演进,以及总结。文章通过三代实时数仓架构的演变,详细解析了Lambda架构、Kafka实时数仓分层+OLAP、Hologres实时数仓分层复用等方案,并探讨了未来从实时数仓到实时湖仓的演进方向。最后,结合实际案例和Demo展示了Hologres + Flink + Paimon在实时湖仓中的应用,帮助用户根据业务需求选择合适的方案。
1667 20
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
|
SQL 消息中间件 Serverless
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
375 4
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
542 16
|
SQL 存储 数据库
【赵渝强老师】基于Flink的流批一体架构
本文介绍了Flink如何实现流批一体的系统架构,包括数据集成、数仓架构和数据湖的流批一体方案。Flink通过统一的开发规范和SQL支持,解决了传统架构中的多套技术栈、数据链路冗余和数据口径不一致等问题,提高了开发效率和数据一致性。
946 7
|
消息中间件 监控 Java
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
308 1
|
7月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
691 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多
  • DNS