Flink(十二)【容错机制】(3)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: Flink(十二)【容错机制】

Flink(十二)【容错机制】(2)https://developer.aliyun.com/article/1532255

3. 通用增量 checkpoint(changelog)

       在 Flink 1.15 之前,只有 RocksDB 支持增量快照。 不同于产生一个包含所有数据的全量备份,增量快照只包含自上一次快照完成后被修改的记录,因此可以显著减少快照完成的耗时。

RocksDB 状态后端启用增量 checkpoint:

       从 Flink 1.15 开始,不管是 hashmap 还是 rocksdb 状态后端都可以通过开启 changelog 实现通用的增量 checkpoint。

我们可以在 Flink 官网看到对 增量快照的解释:

执行过程

1. 有状态的算子任务将状态更改写入变更日志:

这里的 Stateful Changelog 就是变更日志,它记录了一些操作,比如原本的检查点数据为 1,2,3 现在变为了 1,2,3,4 它就会记录 +4 ,代表增加了一个数据 4。

State Table 就是操作后的状态(但它不是 checkpoint)。

Stateful Changelog 会实时同步到检查点存储当中。

2. 状态物化:状态表定期保存,独立于检查点

状态表默认 10 分钟保存一次,可以在配置文件中指定。状态表不是存在检查点的,而是独立于检查点之外的其他地方。

3. 状态物化后,状态变更日志就可以被截断到相应的点

所谓截断就是清理历史的状态操作日志,用新的操作日志替换。

state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem
# 存储 changelog 数据
dstl.dfs.base-path: hdfs://hadoop102:8020/changelog
execution.checkpointing.max-concurrent-checkpoints: 1
 
execution.savepoint.restore-mode: CLAIM

注意事项

目前为实验性功能,开启后可能会造成资源消耗巨大:

  • HDFS 上保存的文件数过多
  • 消耗更多的 IO 带宽用于上传变更日志
  • 更多的 CPU 用于序列化状态更改
  • TaskManager 使用更多内存来缓存状态更改

使用限制:

  • checkpoint 的最大并发数必须为1
  • 从 Flink 1.15 开始,只有文件系统的存储类型实现可用(memory 内存存储还在测试阶段)
  • 不支持 NO_CLAIM 模式

使用方式:

1)配置文件指定:

state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem
# 存储 changelog 数据
dstl.dfs.base-path: hdfs://hadoop102:8080/changelog
execution.checkpointing.max-concurrent-checkpoints: 1
executopn.savepoint-restore-mode: CLAIM

2)代码中设置

引入依赖(打包的时候是不需要打包进去的):

 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-statebackend-changelog</artifactId>
     <version>${flink.version}</version>
     <scope>runtime</scope>
 </dependency>

开启 changelog:

// 开启 cheangelog 需要设置检查点的最大并发为 1
checkpointConfig.setMaxConcurrentCheckpoints(1);
env.enableChangelogStateBackend(true);

4. 最终检查点

如果数据源是有界的,就可能出现部分 task 已经处理完所有数据变成finished的状态,不继续工作。从 Flink 1.14开始这些 finished 状态的 task,也可以继续执行检查点。自 1.15 起默认启用此功能,并且可以通过功能标志禁用它(一般我们肯定是不希望关掉的):

Configuration conf = new Configuration();
// 从 Flink1.15 开始默认启用(true)
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

1.5、保存点(Savepoint)

       除了检查点(checkpoint)外,Flink 还提供了另一个非常独特的镜像保存功能——保存点(Savepoint)。

       从名称就可以看出,这也是一个存盘的备份,它的原理和算法与检查点完全相同,只是多了一些额外的元数据。

       事实上,保存点就是通过检查点的机制来创建流式作业状态的一致性镜像(consistent image)的。保存点中的状态快照,是以算子 ID 和状态名称组织起来的,相当于一个键值对。从保存点启动应用程序时,Flink 会将保存点的状态数据重新分配给相应的算子任务。

1. 保存点的用途

       保存点与检查点最大的区别,就是触发的时机(检查点就像CSDN草稿的自动保存,而保存点就像我们手动的保存草稿)。检查点是由 Flink 自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。因此两者尽管原理一致,但用途就有所差别了:检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用来做有计划的手动备份和恢复。保存点可以当作一个强大的运维工具来使用。我们可以在需要的时候创建一个保存点,然后停止应用,做一些处理调整之后再从保存点重启。它适用的具体场景有:

  • 版本管理和归档存储:对重要的节点进行手动备份,设置为某一版本,归档(archive)存储应用程序的状态。
  • 更新 Flink 版本:目前 Flink 的底层架构已经非常稳定,所以当 Flink 版本升级时,程序本身一般是兼容的。这时不需要重新执行所有的计算,只要创建一个保存点,停掉应用、升级 Flink 后,从保存点重启就可以继续处理了。
  • 更新应用程序:我们不仅可以在应用程序不变的时候,更新 Flink 版本;还可以直接更新应用程序。前提是程序必须是兼容的,也就是说更改之后的程序,状态的拓扑结构和数据类型都是不变的,这样才能正常从之前的保存点去加载。这个功能非常有用。我们可以及时修复应用程序中的逻辑 bug,更新之后接着继续处理;也可以用于有不同业务逻辑的场景,比如 A/B 测试等等。
  • 调整并行度:如果应用运行的过程中,发现需要的资源不足或已经有了大量剩余,也可以通过从保存点重启的方式,将应用程序的并行度增大或减小。
  • 暂停应用程序:有时候我们不需要调整集群或者更新程序,只是单纯地希望把应用暂停、释放一些资源来处理更重要的应用程序。使用保存点就可以灵活实现应用的暂停和重启,可以对有限的集群资源做最好的优化配置。

需要注意的是,保存点能够在程序更改的时候依然兼容,前提是状态的拓扑结构(比如原先是 source —> map ——> sum——>sink 之后变成了 source —> map ——> process——>sink 那么愚笨sum中的状态肯定不在了,因为这条算子链的结构已经变了)和数据类型(比如原本sum中存的是 ValueState 类型 之后变成了 MapState,这种也恢复不了)不变。我们知道保存点中状态都是以算子 ID-状态名称这样的 key-value 组织起来的,算子ID 可以在代码中直接调用 SingleOutputStreamOperator 的.uid()方法来进行指定:

DataStream<String> stream = env
 .addSource(new StatefulSource())
 .uid("source-id")
 .map(new StatefulMapper())
 .uid("mapper-id")
 .print();

对于没有设置 ID 的算子,Flink 默认会自动进行设置,所以在重新启动应用后可能会导致ID 不同而无法兼容以前的状态。所以为了方便后续的维护,强烈建议在程序中为每一个算子手动指定 ID

2. 使用保存点

保存点的使用非常简单,我们可以使用命令行工具来创建保存点,也可以从保存点恢复作业。

(1)创建保存点

要在命令行中为运行的作业创建一个保存点镜像,只需要执行:

bin/flink savepoint :jobId [:targetDirectory]

这里 jobId 需要填充要做镜像保存的作业 ID,目标路径 targetDirectory 可选,表示保存点存储的路径。

对于保存点的默认路径,可以通过配置文件 flink-conf.yaml 中的 state.savepoints.dir 项来设定:

state.savepoints.dir: hdfs:///flink/savepoints

当然对于单独的作业,我们也可以在程序代码中通过执行环境来设置:

env.setDefaultSavepointDir("hdfs:///flink/savepoints");

由于创建保存点一般都是希望更改环境之后重启,所以创建之后往往紧接着就是停掉作业的操作。除了对运行的作业创建保存点,我们也可以在停掉一个作业时直接创建保存点:

bin/flink stop --savepointPath [:targetDirectory] :jobId
(2)从保存点重启应用

我们已经知道,提交启动一个 Flink 作业,使用的命令是 flink run;现在要从保存点重启一个应用,其实本质是一样的:

bin/flink run -s :savepointPath [:runArgs]

这里只要增加一个-s 参数,指定保存点的路径就可以了,其他启动时的参数还是完全一样的,如果是基于 yarn 的运行模式还需要加上 -yid application-id。在使用 web UI 进行作业提交时,可以填入的参数除了入口类、并行度和运行参数,还有一个“Savepoint Path”,这就是从保存点启动应用的配置。

3. 使用保存点恢复状态以及切换状态后端

Flink1.17 版本还提供了使用保存点切换状态后端,比如我们原本是 rocksdb 状态后端,想改成 hashmap 状态后端。也就是使用 savepoint 恢复状态的时候,去更换状态后端。需要注意的是,不要在代码中指定状态后端了,通过配置文件来配置或者 -D 参数配置。

// yarn 模式
bin/flink run-application -d -t yarn-application -Dstate.backend=hashmap -c com.lyh.test.wc xxx.jar
// 不使用 yarn 模式
bin/flink run-application -d -Dstate.backend=hashmap -c com.lyh.test.wc xxx.jar

关闭程序并指定状态保存点:

我们这里用的是 cancel ,但其实我们更加推荐用 stop。

这次我们再次重启应用,并指定保存点:

为了再次验证是否是从保存点恢复,我们在 netcat 中输入 a (之前已经统计过一次了,如果保存点成功导入,结果将会输出 (a,2))

修改状态后端(这里我没有用 yarn 模式):

bin/flink run-application -d -Dstate.backend=rocksdb -s hdfs://hadoop102:8020/sp/savepoint-7ca51f-5e1c4815e549 -C com.lyh.checkpoint.SavepointDemo.class ./jobs/FlinkStudy-1.0-SNAPSHOT.jar

如果是yarn模式直接加一个 -t yarn-application 就好了。

4. 使用 checkpoint 恢复状态

同样,也可以从 checkpoint 来进行状态的恢复,但是注意,使用 checkpoint恢复的话不能切换状态后端,但是恢复命令还是一样的,都是指定恢复路径来重启应用:

// 路径必须指定到 checkpoint 的 chk-id 目录
bin/flink run-application -d -Dstate.backend=rocksdb -s hdfs://hadoop102:8020/chk/22516b874d99d1478983a9a5b248c6bf/chk-175 -C com.lyh.checkpoint.SavepointDemo.class ./jobs/FlinkStudy-1.0-SNAPSHOT.jar

我们说不能切换状态后端,但是这里指定 -Dstate.backend=rocksdb 并不影响,因为作业本来现在状态本来就是rocksdb 的。


2、状态一致性

2.1、一致性的概念和级别

       在分布式系统中,一致性(consistency)是一个非常重要的概念;在事务(transaction)中,一致性也是重要的一个特性。Flink 中一致性的概念,主要用在故障恢复的描述中,所以更加类似于事务中的表述。那到底什么是一致性呢?

       简单来讲,一致性其实就是结果的正确性,一般从数据丢失、数据重复角度来评估。对于分布式系统而言,强调的是不同节点中相同数据的副本应该总是“一致的”,也就是从不同节点读取时总能得到相同的值;而对于事务而言,是要求提交更新操作后,能够读取到新的数据。对于 Flink 来说,多个节点并行处理不同的任务,我们要保证计算结果是正确的,就必须不漏掉任何一个数据,而且也不会重复处理同一个数据。流式计算本身就是一个一个来的,所以正常处理的过程中结果肯定是正确的;在发生故障、需要恢复状态进行回滚时就需要更多的保障机制了。我们通过检查点的保存来保证状态恢复后结果的正确,所以主要讨论的就是“状态的一致性”。

一般说来,状态一致性有三种级别:

⚫ 最多一次(At-Most-Once)

       就是说数据最多只处理一次,不管之后故没故障,丢没丢掉,数据只来一遍。对于 Flink 而言,不开启 checkpoint 就是最多一次。

       当任务发生故障时,最简单的做法就是直接重启,别的什么都不干;既不恢复丢失的状态,也不重放丢失的数据。每个数据在正常情况下会被处理一次,遇到故障时就会丢掉,所以就是“最多处理一次”。我们发现,如果数据可以直接被丢掉,那其实就是没有任何操作来保证结果的准确性;所以这种类型的保证也叫“没有保证”。尽管看起来比较糟糕,不过如果我们的主要诉求是“快”,而对近似正确的结果也能接受,那这也不失为一种很好的解决方案。

⚫ 至少一次(AT-LEAST-ONCE)

       数据至少处理一次,甚至是多次,所以数据很可能重复处理。对于 Flink 而言,当第一个 Barrier 到达,而其他 Barrier 没有到达时,第一个 Barrier 后面的数据不会等待,而是直接越过 Barrier ,当出现故障需要恢复检查点的时候,会把一些 Barrier 之外的数据(也就是不该恢复的数据)重复处理,这就是至少一次。

       在实际应用中,我们一般会希望至少不要丢掉数据。这种一致性级别就叫作“至少一次”(at-least-once),就是说是所有数据都不会丢,肯定被处理了;不过不能保证只处理一次,有些数据会被重复处理。

       在有些场景下,重复处理数据是不影响结果的正确性的,这种操作具有“幂等性”。比如,如果我们统计电商网站的 UV,需要对每个用户的访问数据进行去重处理,所以即使同一个数据被处理多次,也不会影响最终的结果,这时使用 at-least-once 语义是完全没问题的。当然,如果重复数据对结果有影响,比如统计的是 PV,或者之前的统计词频 word count,使用at-least-once 语义就可能会导致结果的不一致了。为了保证达到 at-least-once 的状态一致性,我们需要在发生故障时能够重放数据。最常见的做法是,可以用持久化的事件日志系统,把所有的事件写入到持久化存储中。这时只要记录一个偏移量,当任务发生故障重启后,重置偏移量就可以重放检查点之后的数据了。Kafka 就是这种架构的一个典型实现。

⚫ 精确一次(EXACTLY-ONCE)

       第一个 Barrier 到达后,Barrier 后面的数据必须老老实实等着,等到所有 Barrier 都对齐之后才进行持久化,持久化完其他数据才能继续处理。或者非 Barrier 对齐情况下,第一个 Barrier 到达后直接跳到输出缓冲区继续往下游传递,把第一个Barrier 和其他Barrier 之间的数据都进行标记。这两种都是精确一次。

       最严格的一致性保证,就是所谓的“精确一次”(exactly-once,有时也译作“恰好一次”)。这也是最难实现的状态一致性语义。exactly-once 意味着所有数据不仅不会丢失,而且只被处理一次,不会重复处理。也就是说对于每一个数据,最终体现在状态和输出结果上,只能有一次统计。exactly-once 可以真正意义上保证结果的绝对正确,在发生故障恢复后,就好像从未发生过故障一样。很明显,要做的 exactly-once,首先必须能达到 at-least-once 的要求,就是数据不丢。所以同样需要有数据重放机制来保证这一点。另外,还需要有专门的设计保证每个数据只被处理一次。Flink 中使用的是一种轻量级快照机制——检查点(checkpoint)来保证 exactly-once 语义。

2.2 端到端的状态一致性

       我们已经知道检查点可以保证 Flink 内部状态的一致性,而且可以做到精确一次(exactly-once)。那是不是说,只要开启了检查点,发生故障进行恢复,结果就不会有任何问题呢?

       没那么简单。在实际应用中,一般要保证从用户的角度看来,最终消费的数据是正确的。而用户或者外部应用不会直接从 Flink 内部的状态读取数据,往往需要我们将处理结果写入外部存储中。这就要求我们不仅要考虑 Flink 内部数据的处理转换,还涉及从外部数据源读取,以及写入外部持久化系统,整个应用处理流程从头到尾都应该是正确的。

       所以完整的流处理应用,应该包括了数据源、流处理器和外部存储系统三个部分。这个完整应用的一致性,就叫作“端到端(end-to-end)的状态一致性”,它取决于三个组件中最弱的那一环。一般来说,能否达到 at-least-once 一致性级别,主要看数据源能够重放数据;而能否达到 exactly-once 级别,流处理器内部、数据源、外部存储都要有相应的保证机制。

       状态一致性实现难度:At-most-once < At-least-once < Exactly-once 。

Flink(十二)【容错机制】(4)https://developer.aliyun.com/article/1532262

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
99 3
|
4月前
|
存储 数据处理 Apache
超越传统数据库:揭秘Flink状态机制,让你的数据处理效率飞升!
【8月更文挑战第26天】Apache Flink 在流处理领域以其高效实时的数据处理能力脱颖而出,其核心特色之一便是状态管理机制。不同于传统数据库依靠持久化存储及 ACID 事务确保数据一致性和可靠性,Flink 利用内存中的状态管理和分布式数据流模型实现了低延迟处理。Flink 的状态分为键控状态与非键控状态,前者依据数据键值进行状态维护,适用于键值对数据处理;后者与算子实例关联,用于所有输入数据共享的状态场景。通过 checkpointing 机制,Flink 在保障状态一致性的同时,提供了更适合流处理场景的轻量级解决方案。
71 0
|
6月前
|
消息中间件 存储 NoSQL
Flink(十二)【容错机制】(4)
Flink(十二)【容错机制】
|
4月前
|
调度 流计算
Flink 新一代流计算和容错问题之Flink 中的数据可以分为什么类型
Flink 新一代流计算和容错问题之Flink 中的数据可以分为什么类型
|
4月前
|
Cloud Native 安全 调度
Flink 新一代流计算和容错问题之Flink 通过云原生技术改进容错设计要如何操作
Flink 新一代流计算和容错问题之Flink 通过云原生技术改进容错设计要如何操作
|
4月前
|
运维 Cloud Native 数据库
Flink 新一代流计算和容错问题之将 Flink 的容错与云原生的弹性扩缩容相结合要怎么操作
Flink 新一代流计算和容错问题之将 Flink 的容错与云原生的弹性扩缩容相结合要怎么操作
|
4月前
|
存储 流计算
Flink 新一代流计算和容错问题之Flink 通过 Key Group 管理状态是怎么操作的
Flink 新一代流计算和容错问题之Flink 通过 Key Group 管理状态是怎么操作的
|
4月前
|
存储 调度 流计算
Flink 新一代流计算和容错问题之如何实现 Generalized Log-Based Incremental Checkpoint
Flink 新一代流计算和容错问题之如何实现 Generalized Log-Based Incremental Checkpoint
|
4月前
|
存储 缓存 流计算
Flink 新一代流计算和容错问题之在有状态的算子中,状态更新是怎么记录的
Flink 新一代流计算和容错问题之在有状态的算子中,状态更新是怎么记录的
|
4月前
|
缓存 流计算
Flink 新一代流计算和容错问题之 Flink 作业的 local buffer pool 的 size 要如何估算
Flink 新一代流计算和容错问题之 Flink 作业的 local buffer pool 的 size 要如何估算