Fault-tolerance in Flink | 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习 Fault-tolerance in Flink

开发者学堂课程【Apache Flink 2021 最新入门课程:Fault-tolerance in Flink】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/58/detail/1071


Fault-tolerance in Flink


内容介绍:


一、有状态的流计算

二、全局一致性快照

三、Flink 的容错机制

四、Flink 的状态管理


一、有状态的流计算


1.流计算

image.png


流计算是指有一个数据源可以持续不断地发送消息,同时有一个常驻程序运行代码,从数据源拿到一个消息后会进行处理,然后把结果输出到下游。


2.分布式流计算



image.png


分布式流计算是指把输入流以某种方式进行一个划分,再使用多个分布式实例对流进行处理。


3.流计算中的状态

3.png

计算可以分成有状态和无状态两种,无状态的计算只需要处理单一事件,有状态的计算需要记录并处理多个事件。


举个简单的例子:

例如一个事件由事件 ID 和事件值两部分组成,如果处理逻辑是每拿到一个事件,都解析并输出它的事件值,那么这就是一个无状态的计算;相反,如果每拿到一个状态,解析它的值出来后,需要和前一个事件值进行比较,比前一个事件值大的时候才把它进行输出,这就是一个有状态的计算。

4.png流计算中的状态有很多种。比如在去重的场景下,会记录所有的主键;又或者在窗口计算里,已经进入窗口还没触发的数据,这也是流计算的状态。

在机器学习/深度学习场景里,训练的模型及参数数据都是流计算的状态。



二、全局一致性快照

全局一致性快照是可以用来给分布式系统做备份和故障恢复的机制。

1.全局快照

什么是全局快照

5.png


全局快照首先是一个分布式应用,它有多个进程分布在多个服务器上;

其次,它在应用内部有自己的处理逻辑和状态;

第三,应用间是可以互相通信的;

第四,在这种分布式的应用,有内部状态,硬件可以通信的情况下,某一时刻的全局状态,就叫做全局的快照。


为什么需要全局快照

6.png

第一,用它来做检查点,可以定期对全局状态做备份,当应用程序故障时,就可以拿来恢复;

第二,做死锁检测,进行快照后当前的程序继续运行,然后可以对快照进行分析,看应用程序是不是存在死锁状态,如果是就可以进行相应的处理。


全局快照举例

下图为分布式系统中全局快照的示例。

7.png


P1 和 P2 是两个进程,它们之间有消息发送的管道,分别是 C12 和 C21。

对于  P1 进程来说, C12 是它发送消息的管道,称作 output channel; C21 是它接收消息的管道,称作  input channel。

除了管道,每个进程都有一个本地的状态。

比如说 P1 和 P2 每个进程的内存里都有XYZ三个变量和相应的值。

那么  P1 和 P2 进程的本地状态和它们之间发送消息的管道状态,就是一个初始的全局状态,也可称为全局快照。

8.png


假设 P1 给 P2 发了一条消息,让 P2 把 x 的状态值从 4 改为 7,但是这个消息在管道中,还没到达 P2。

这个状态也是一个全局快照。

9.png

再接下来,P2 收到了 P1 的消息,但是还没有处理,这个状态也是一个全局快照。

10.png

最后接到消息的 P2 把本地的 X 的值从 4 改为 7 ,这也是一个全局快照。

所以当有事件发生的时候,全局的状态就会发生改变。事件包括进程发送消息、进程接收消息和进程修改自己的状态。


2.全局一致性快照

当事件发生时,全局的状态会发生改变,这里的事件包括:

-进程发送消息

-进程接收到消息

-进程修改状态

a->b 代表在绝对时钟 (real time) 下 a happened before b, 则当一个全局快照满足下述条件时,我们称其为一个全局一致性快照︰

-如果 A->B 且 B 被包含在该快照中,则A也被包含在这个快照中

假如说有两个事件,a 和 b,在绝对时间下,如果 a 发生在 b 之前,且 b 被包含在快照当中,那么则 a 也被包含在快照当中。

满足这个条件的全局快照,就称为全局一致性快照。


3.全局一致性快照的实现方法

11.png


时钟同步并不能实现全局一致性快照;全局同步虽然可以实现,但是它的缺点也非常明显,它会让所有应用程序都停下来,会影响全局的性能。


4.异步全局一致性快照算法  – Chandy-Lamport

异步全局一致性快照算法 Chandy-Lamport 可以在不影响应用程序运行的前提下,实现全局一致性快照。

Chandy-Lamport 的系统要求有以下几点:

第一,不影响应用运行,也就是不影响收发消息,不需要停止应用程序;

第二,每个进程都可以记录本地状态;

第三,可以分布式地对已记录的状态进行收集;

第四,任意进程都可以发起快照


同时,Chandy-Lamport 算法可以执行还有一个前提条件:消息有序且不重复,并且消息可靠性可保障。

Chandy-Lamport 算法流程

12.png

Chandy-Lamport 的算法流程主要分为三个部分:

发起快照、分布式的执行快照和终止快照   。


发起快照

任意进程都可以发起快照。

如下图所示,当由 P1 发起快照的时候,第一步需要记录本地的状态,也就是对本地进行快照,然后立刻向它所有 output channel 发送一个 marker 消息,这中间是没有时间间隙的。

marker 消息是一个特殊的消息,它不同于应用之间传递的消息。


image.png


发出 Marker 消息后,P1 就会开始记录所有 input channel 的消息,也就是图示 C21 管道的消息。


分布式的执行快照

如下图,先假定当 Pi 接收到来自 Cki 的 marker 消息。

也就是 Pk 发给 Pi 的 marker 消息。可以分两种情况来看:14.png

第一种情况:

这个是 Pi 收到的第一个来自其它管道的 marker 消息,它会先记录一下本地的状态,再把 C12管道记为空,也就是说后续再从 P1 发消息,就不包含在此次快照里了,与此同时立刻向它所有 output channel 发送 marker 消息。

最后开始记录来自除 Cki 之外的所有 input channel 的消息。

15.png

 

上面提到 Cki 消息不包含在实时快照里,但是实时消息还是会发生,所以第二种情况是,如果此前Pi已经接收过 marker 消息,它会停止记录 Cki 消息,同时会将此前记录的所有 Cki 消息作为 Cki 在本次快照中的最终状态来保存。


终止快照

终止快照的条件有两个:

第一,所有进程都已经接收到 marker 消息,并记录在本地快照;

第二,所有进程都从它的 n-1 个 input channel 里收到了 marker 消息,并记录了管道状态。

16.png

当快照终止,快照收集器 (Central Server) 就开始收集每一个部分的快照去形成全局一致性快照了。

示例展示

在下图的例子里,一些状态是在内部发生的,比如 A,它跟其它进程没有交互。内部状态就是 P1 发给自己消息,可以将 A 认为是 C11=[A->]。


17.png

Chandy-Lamport 全局一致性快照的算法是怎么执行的呢?


18.png

假设从 p1 来发起快照,它发起快照时,首先对本地的状态进行快照,称之为 S1,然后立刻向它所有的 output channel,即P2 和 P3,分别发 marker 消息,然后再去记录它所有 input channel 的消息,即来自 P2 和 P3 及自身的消息。19.png

图例所示,纵轴是绝对时间,按照绝对时间来看,为什么 P3 和 P2 收到 marker 消息会有时间差呢?

因为假如这是一个真实的物理环境里的分布式进程,不同节点之间的网络状况是不一样的,这种情况会导致消息送达时间存在差异。

P3 先收到 marker 消息,且是它接收到的第一个 marker消息。

接收到消息后,它首先会对本地状态进行快照,然后把 C13管道的标记成 close,与此同时开始向它所有的 output channel 发送  marker 消息。

最后它会把来自除了 C13 之外的所有 input channel 的消息开始进行记录。

20.png

接收到 P3 发出的 marker 信息的是 P1,但这不是它接收的第一个 marker,它会把来自C31 channel 的管道立刻关闭,并且把当前的记录消息做这个 channel 的快照,后续再接收到来自 P3 的消息,就不会更新在此次的快照状态里了。21.png

接下来 P2 接收到来自 P3 的消息,这是它接到的第一个 marker 消息。

接收到消息后,它首先对本地状态进行快照,然后把  C32 管道的标记成 close,与此同时开始向它所有的 output channel 发送  marker 消息,最后它会把来自除了 C32 之外的所有input channel 的消息开始进行记录。

22.png


再来看 P2 接收到来自 P1 的消息,这不是 P2 接收到的第一个 marker 消息,所以它会把所有的 input channel 全部关闭,并且记录 channel 的状态。

23.png

接下来看 P1 接收到来自 P2 的消息,这也不是它接收的第一个消息。那么它就会把所有的input channel 关闭,并把记录的消息作为状态。那么这里面有两个状态,一个是 C11,即自己发给自己的消息;一个是 C21,是 P2 里 H 发给 P1D 的。

24.png

最后一个时间点,P3 接收到来自 P2 的消息,这也不是它收到的第一个消息,操作跟上面介绍的一样。

在这期间 P3 本地有一个事件J,它也会把J作为它的状态。

25.png

当所有进程都记录了本地状态,而且每一个进程的所有输入管道都已经关闭了,那么全局一致性快照就结束了,也就是对过去时间点的全局性的状态记录完成了。

Chandy-Lamport 与 Flink 之间的关系

Flink 是分布式系统,所以 Flink 会采用全局一致性快照的方式形成检查点,来支持故障恢复。

Flink 的异步全局一致性快照算法跟 Chandy-Lamport 算法的区别主要有以下几点:

第一,Chandy-Lamput 支持强连通图,而  Flink 支持弱连通图;

第二,Flink 采用的是裁剪的(Tailored)Chandy-Lamput 异步快照算法;

第三,Flink 的异步快照算法在 DAG 场景下不需要存储 Channel state,从而极大减少快照的存储空间。


三、Flink 的容错机制


26.png

容错,就是恢复到出错前的状态。流计算容错一致性保证有三种,分别是:

Exactly once,At least once,At most once。

1.Exactly once,是指每条 event 会且只会对 state 产生一次影响,这里的“一次”并非端到端的严格一次,而是指在 Flink内部只处理一次,不包括 source 和 sink 的处理。

2.At least once,是指每条 event 会对 state 产生最少一次影响,也就是存在重复处理的可能。

3.At most once,是指每条 event 会对 state 产生最多一次影响,就是状态可能会在出错时丢失。

端到端的 Exactly once

Exactly once 的意思是,作业结果总是正确的,但是很可能产出多次;所以它的要求是需要有可重放的 source。

端到端的 Exactly once,是指作业结果正确且只会被产出一次,它的要求除了有可重放的source 外,还要求有事务型的 sink 和可以接收幂等的产出结果。


Flink 的状态容错

简单场景的 Exactly Once 容错方法

简单场景的做法如下图,方法就是,记录本地状态并且把 source 的 offset,即 Event log 的位置记录下来就好了。

27.png28.png29.png

分布式场景的状态容错

如果是分布式场景,我们需要在不中断运算的前提下对多个拥有本地状态的算子产生全局一致性快照。

Flink  分布式场景的作业拓扑比较特殊,它是有向无环并且是弱联通图,可以采用裁剪的 Chandy-Lamport,也就是只记录所有输入的 offset 和各个算子状态,并依赖rewindable source(可回溯的 source,即可以通过 offset 读取比较早一点时间点),从而不需要存储 channel 的状态,这在存在聚合 (aggregation)逻辑的情况下可以节省大量的存储空间。


30.png

最后做恢复,恢复就是把数据源的位置重新设定,然后每一个算子都从检查点恢复状态。


3.Flink 的分布式快照方法

31.png

首先在源数据流里插入 Checkpoint barrier,也就是上文提到的 Chandy-Lamport 算法里的 marker message,不同的 Checkpoint barrier 会把流自然地切分多个段,每个段都包含了 Checkpoint 的数据;

32.png

Flink 里有一个全局的  Coordinator,它不像 Chandy-Lamport 对任意一个进程都可以发起快照,这个集中式的  Coordinator 会把  Checkpoint barrier 注入到每个 source 里,然后启动快照。

当每个节点收到 barrier 后,因为 Flink 里面它不存储 Channel state,所以它只需存储本地的状态就好。

33.png在做完了 Checkpoint  后,每个算子的每个并发都会向 Coordinator 发送一个确认消息,当所有任务的确认消息都被 Checkpoint Coordinator 接收,快照就结束了。


4.流程演示

见下图示,假设 Checkpoint N  被注入到 source 里,这时 source 会先把它正在处理分区的 offset 记录下来。

34.png

随着时间的流逝,它会把 Checkpoint barrier 发送到两个并发的下游,当 barrier 分别到达两个并发,这两个并发会分别把它们本地的状态都记录在 Checkpoint  的里:

最后 barrier 到达最终的 subtask,快照就完成了。

35.png


这是比较简单的场景演示,每个算子只有单流的输入,再来看下图比较复杂的场景,算子有多流输入的情况。

36.png

当算子有多个输入,需要把 Barrier 对齐。怎么把 Barrier 对齐呢?

如下图所示,在左侧原本的状态下,当其中一条 barrier 到达,另一条 barrier 命令上有的barrier 还在管道中没有到达,这时会在保证 Exactly once 的情况下,把先到达的流直接阻塞掉,然后等待另一条流的数据处理。等到另外一条流也到达了,会把之前的流unblock,同时把 barrier 发送到算子。

37.png

在这个过程中,阻塞掉其中一条流的作用是,会让它产生反压。Barrier 对齐会导致反压和暂停 operator 的数据处理。

如果不在对齐过程中阻塞已收到 barrier 的数据管道,数据持续不断流进来,那么属于下个 Checkpoint 的数据被包含在当前的 Checkpoint 里,如果一旦发生故障恢复后,由于source 会被 rewind,部分数据会有重复处理,这就是 at-least-once。 如果能接收at-least-once,那么可以选择其他可以避免barrier对齐带来的副作用。

另外也可以通过异步快照来尽量减少任务停顿并支持多个 Checkpoint 同时进行。


5.快照触发

38.png

本地快照同步上传到系统需要 state Copy-on-write 的机制。

假如对元数据信息做了快照之后数据处理恢复了,在上传数据的过程中如何保证恢复的应用程序逻辑不会修改正在上传的数据呢?

实际上不同状态存储后端的处理是不一样的,Heap backend 会触发数据的 copy-on-write,而对于 RocksDB backend 来说 LSM 的特性可以保证已经快照的数据不会被修改。


四、Flink 的状态管理


1.Flink 状态管理

39.png

首先需要去定义一个状态,在下图的例子里,先定义一个 Value state。

在定义的状态的时候,需要给出以下的几个信息:

1.状态识别 ID

2.状态数据类型

3.本地状态后端注册状态

4.本地状态后端读写状态


2.Flink 状态后端

也称作 state backend,Flink 状态后端有两种;40.png

第一种,JVM Heap,它里面的数据是以 Java 对象形式存在的,读写也是以对象形式去完成的,所以速度很快。

但是也存在两个弊端:第一个弊端,以对象方式存储所需的空间是磁盘上序列化压缩后的数据大小的很多倍,所以占用的内存空间很大;第二个弊端,虽然读写不用做序列化,但是在形成 snapshot 时需要做序列化,所以它的异步 snapshot 过程会比较慢。

41.png


第二种,RocksDB,这个类型在读写时就需要做序列化,所以它读写的速度比较慢。

但是它有一个好处,基于 LSM  的数据结构在快照之后会形成 sst 文件,它的异步 checkpoint  过程就是文件拷贝的过程,CPU 消耗会比较低。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即"Top N"问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
178 1
|
6月前
|
分布式计算 资源调度 Hadoop
Hadoop学习笔记(HDP)-Part.18 安装Flink
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
228 2
Hadoop学习笔记(HDP)-Part.18 安装Flink
|
前端开发 数据可视化 关系型数据库
用 PolarDB - X + Flink 搭建实时数据大屏|学习笔记(三)
快速学习用 PolarDB - X + Flink 搭建实时数据大屏
用 PolarDB - X + Flink 搭建实时数据大屏|学习笔记(三)
|
存储 运维 监控
如何开通实时计算 Flink 版|学习笔记(三)
快速学习如何开通实时计算 Flink 版
如何开通实时计算 Flink 版|学习笔记(三)
|
机器学习/深度学习 SQL 人工智能
实时计算 Flink 训练营场景与应用|学习笔记(三)
快速学习实时计算 Flink 训练营场景与应用
实时计算 Flink 训练营场景与应用|学习笔记(三)
|
SQL 存储 搜索推荐
实时计算 Flink 训练营场景与应用|学习笔记(二)
快速学习实时计算 Flink 训练营场景与应用
实时计算 Flink 训练营场景与应用|学习笔记(二)
|
SQL 存储 弹性计算
实时计算 Flink 与你相约阿里云|学习笔记(二)
快速学习实时计算 Flink 与你相约阿里云
实时计算 Flink 与你相约阿里云|学习笔记(二)
|
传感器 存储 Shell
走进 Apache Flink(二)|学习笔记
快速学习走进 Apache Flink
218 0
走进 Apache  Flink(二)|学习笔记
|
SQL 消息中间件 存储
Flink SQL_Table 介绍与实战(二)|学习笔记
快速学习 Flink SQL_Table 介绍与实战
251 0
Flink SQL_Table 介绍与实战(二)|学习笔记
|
存储 分布式计算 API
Stream Processing with Apache Flink(二)|学习笔记
快速学习 Stream Processing with Apache Flink
158 0