Fault-tolerance in Flink | 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习 Fault-tolerance in Flink

开发者学堂课程【Apache Flink 2021 最新入门课程:Fault-tolerance in Flink】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/58/detail/1071


Fault-tolerance in Flink


内容介绍:

一、有状态的流计算

二、全局一致性快照

三、Flink的容错机制

四、Flink的状态管理


一、有状态的流计算


1.流计算

image.png


流计算是指有一个数据源可以持续不断地发送消息,同时有一个常驻程序运行代码,从数据源拿到一个消息后会进行处理,然后把结果输出到下游。


2.分布式流计算



image.png


分布式流计算是指把输入流以某种方式进行一个划分,再使用多个分布式实例对流进行处理。


3.流计算中的状态

3.png

计算可以分成有状态和无状态两种,无状态的计算只需要处理单一事件,有状态的计算需要记录并处理多个事件。


举个简单的例子:

例如一个事件由事件ID和事件值两部分组成,如果处理逻辑是每拿到一个事件,都解析并输出它的事件值,那么这就是一个无状态的计算;相反,如果每拿到一个状态,解析它的值出来后,需要和前一个事件值进行比较,比前一个事件值大的时候才把它进行输出,这就是一个有状态的计算。

4.png流计算中的状态有很多种。比如在去重的场景下,会记录所有的主键;又或者在窗口计算里,已经进入窗口还没触发的数据,这也是流计算的状态。

在机器学习/深度学习场景里,训练的模型及参数数据都是流计算的状态。



二、全局一致性快照

全局一致性快照是可以用来给分布式系统做备份和故障恢复的机制。

1.全局快照

什么是全局快照

5.png


全局快照首先是一个分布式应用,它有多个进程分布在多个服务器上;

其次,它在应用内部有自己的处理逻辑和状态;

第三,应用间是可以互相通信的;

第四,在这种分布式的应用,有内部状态,硬件可以通信的情况下,某一时刻的全局状态,就叫做全局的快照。


为什么需要全局快照

6.png

第一,用它来做检查点,可以定期对全局状态做备份,当应用程序故障时,就可以拿来恢复;

第二,做死锁检测,进行快照后当前的程序继续运行,然后可以对快照进行分析,看应用程序是不是存在死锁状态,如果是就可以进行相应的处理。


全局快照举例

下图为分布式系统中全局快照的示例。

7.png


P1和P2是两个进程,它们之间有消息发送的管道,分别是C12和C21。

对于 P1进程来说, C12是它发送消息的管道,称作output channel; C21是它接收消息的管道,称作 input channel。

除了管道,每个进程都有一个本地的状态。

比如说P1和P2每个进程的内存里都有XYZ三个变量和相应的值。

那么 P1和P2进程的本地状态和它们之间发送消息的管道状态,就是一个初始的全局状态,也可称为全局快照。

8.png


假设P1给P2发了一条消息,让P2把x的状态值从4改为7,但是这个消息在管道中,还没到达P2。

这个状态也是一个全局快照。

9.png

再接下来,P2收到了P1的消息,但是还没有处理,这个状态也是一个全局快照。

10.png

最后接到消息的P2把本地的X的值从4改为7,这也是一个全局快照。

所以当有事件发生的时候,全局的状态就会发生改变。事件包括进程发送消息、进程接收消息和进程修改自己的状态。


2.全局一致性快照

当事件发生时,全局的状态会发生改变,这里的事件包括:

-进程发送消息

-进程接收到消息

-进程修改状态

a->b代表在绝对时钟(real time)下a happened before b,则当一个全局快照满足下述条件时,我们称其为一个全局一致性快照︰

-如果A->B且B被包含在该快照中,则A也被包含在这个快照中

假如说有两个事件,a和b,在绝对时间下,如果a发生在b之前,且b被包含在快照当中,那么则a也被包含在快照当中。

满足这个条件的全局快照,就称为全局一致性快照。


3.全局一致性快照的实现方法

11.png


时钟同步并不能实现全局一致性快照;全局同步虽然可以实现,但是它的缺点也非常明显,它会让所有应用程序都停下来,会影响全局的性能。


4.异步全局一致性快照算法 – Chandy-Lamport

异步全局一致性快照算法Chandy-Lamport可以在不影响应用程序运行的前提下,实现全局一致性快照。

Chandy-Lamport的系统要求有以下几点:

第一,不影响应用运行,也就是不影响收发消息,不需要停止应用程序;

第二,每个进程都可以记录本地状态;

第三,可以分布式地对已记录的状态进行收集;

第四,任意进程都可以发起快照


同时,Chandy-Lamport算法可以执行还有一个前提条件:消息有序且不重复,并且消息可靠性可保障。

Chandy-Lamport算法流程

12.png

Chandy-Lamport的算法流程主要分为三个部分:

发起快照、分布式的执行快照和终止快照   。


发起快照

任意进程都可以发起快照。

如下图所示,当由P1发起快照的时候,第一步需要记录本地的状态,也就是对本地进行快照,然后立刻向它所有 output channel发送一个marker消息,这中间是没有时间间隙的。

marker消息是一个特殊的消息,它不同于应用之间传递的消息。


image.png


发出Marker消息后,P1就会开始记录所有input channel的消息,也就是图示C21管道的消息。


分布式的执行快照

如下图,先假定当 Pi接收到来自Cki的marker消息。

也就是Pk发给Pi的marker消息。可以分两种情况来看:14.png

第一种情况:

这个是Pi收到的第一个来自其它管道的marker消息,它会先记录一下本地的状态,再把 C12管道记为空,也就是说后续再从 P1发消息,就不包含在此次快照里了,与此同时立刻向它所有output channel发送marker消息。

最后开始记录来自除Cki之外的所有input channel的消息。

15.png

 

上面提到Cki消息不包含在实时快照里,但是实时消息还是会发生,所以第二种情况是,如果此前Pi已经接收过marker消息,它会停止记录 Cki消息,同时会将此前记录的所有Cki消息作为Cki在本次快照中的最终状态来保存。


终止快照

终止快照的条件有两个:

第一,所有进程都已经接收到marker消息,并记录在本地快照;

第二,所有进程都从它的n-1个input channel里收到了marker 消息,并记录了管道状态。

16.png

当快照终止,快照收集器 (Central Server) 就开始收集每一个部分的快照去形成全局一致性快照了。

示例展示

在下图的例子里,一些状态是在内部发生的,比如A,它跟其它进程没有交互。内部状态就是 P1发给自己消息,可以将A认为是C11=[A->]。


17.png

Chandy-Lamport全局一致性快照的算法是怎么执行的呢?


18.png

假设从p1来发起快照,它发起快照时,首先对本地的状态进行快照,称之为S1,然后立刻向它所有的output channel,即P2和P3,分别发marker消息,然后再去记录它所有input channel的消息,即来自P2和P3及自身的消息。19.png

图例所示,纵轴是绝对时间,按照绝对时间来看,为什么P3和P2收到marker消息会有时间差呢?

因为假如这是一个真实的物理环境里的分布式进程,不同节点之间的网络状况是不一样的,这种情况会导致消息送达时间存在差异。

P3先收到marker消息,且是它接收到的第一个marker消息。

接收到消息后,它首先会对本地状态进行快照,然后把 C13管道的标记成 close,与此同时开始向它所有的output channel发送 marker消息。

最后它会把来自除了C13之外的所有input channel的消息开始进行记录。

20.png

接收到P3发出的marker信息的是P1,但这不是它接收的第一个marker,它会把来自C31 channel的管道立刻关闭,并且把当前的记录消息做这个channel的快照,后续再接收到来自P3的消息,就不会更新在此次的快照状态里了。21.png

接下来P2接收到来自P3的消息,这是它接到的第一个marker消息。

接收到消息后,它首先对本地状态进行快照,然后把 C32管道的标记成 close,与此同时开始向它所有的output channel发送 marker消息,最后它会把来自除了C32之外的所有input channel的消息开始进行记录。

22.png


再来看P2接收到来自P1的消息,这不是P2接收到的第一个marker消息,所以它会把所有的 input channel全部关闭,并且记录channel的状态。

23.png

接下来看P1接收到来自P2的消息,这也不是它接收的第一个消息。那么它就会把所有的input channel关闭,并把记录的消息作为状态。那么这里面有两个状态,一个是C11,即自己发给自己的消息;一个是C21,是P2里H发给P1D的。

24.png

最后一个时间点,P3接收到来自P2的消息,这也不是它收到的第一个消息,操作跟上面介绍的一样。

在这期间P3本地有一个事件J,它也会把J作为它的状态。

25.png

当所有进程都记录了本地状态,而且每一个进程的所有输入管道都已经关闭了,那么全局一致性快照就结束了,也就是对过去时间点的全局性的状态记录完成了。

Chandy-Lamport与 Flink之间的关系

Flink 是分布式系统,所以 Flink 会采用全局一致性快照的方式形成检查点,来支持故障恢复。

Flink的异步全局一致性快照算法跟Chandy-Lamport算法的区别主要有以下几点:

第一,Chandy-Lamput支持强连通图,而 Flink支持弱连通图;

第二,Flink采用的是裁剪的(Tailored)Chandy-Lamput异步快照算法;

第三,Flink的异步快照算法在DAG场景下不需要存储Channel state,从而极大减少快照的存储空间。


三、Flink的容错机制


26.png

容错,就是恢复到出错前的状态。流计算容错一致性保证有三种,分别是:

Exactly once,At least once,At most once。

1.Exactly once,是指每条event会且只会对state产生一次影响,这里的“一次”并非端到端的严格一次,而是指在 Flink内部只处理一次,不包括source和sink的处理。

2.At least once,是指每条event会对state产生最少一次影响,也就是存在重复处理的可能。

3.At most once,是指每条event会对state产生最多一次影响,就是状态可能会在出错时丢失。

端到端的Exactly once

Exactly once的意思是,作业结果总是正确的,但是很可能产出多次;所以它的要求是需要有可重放的source。

端到端的Exactly once,是指作业结果正确且只会被产出一次,它的要求除了有可重放的source外,还要求有事务型的sink和可以接收幂等的产出结果。


Flink的状态容错

简单场景的 Exactly Once 容错方法

简单场景的做法如下图,方法就是,记录本地状态并且把 source的offset,即 Event log的位置记录下来就好了。

27.png28.png29.png

分布式场景的状态容错

如果是分布式场景,我们需要在不中断运算的前提下对多个拥有本地状态的算子产生全局一致性快照。

Flink 分布式场景的作业拓扑比较特殊,它是有向无环并且是弱联通图,可以采用裁剪的Chandy-Lamport,也就是只记录所有输入的offset和各个算子状态,并依赖rewindable source(可回溯的source,即可以通过offset读取比较早一点时间点),从而不需要存储channel的状态,这在存在聚合 (aggregation)逻辑的情况下可以节省大量的存储空间。


30.png

最后做恢复,恢复就是把数据源的位置重新设定,然后每一个算子都从检查点恢复状态。


3.Flink 的分布式快照方法

31.png

首先在源数据流里插入Checkpoint barrier,也就是上文提到的Chandy-Lamport算法里的marker message,不同的Checkpoint barrier会把流自然地切分多个段,每个段都包含了Checkpoint的数据;

32.png

Flink 里有一个全局的 Coordinator,它不像 Chandy-Lamport 对任意一个进程都可以发起快照,这个集中式的 Coordinator会把 Checkpoint barrier 注入到每个 source 里,然后启动快照。

当每个节点收到 barrier 后,因为 Flink 里面它不存储 Channel state,所以它只需存储本地的状态就好。

33.png在做完了Checkpoint 后,每个算子的每个并发都会向Coordinator发送一个确认消息,当所有任务的确认消息都被Checkpoint Coordinator接收,快照就结束了。


4.流程演示

见下图示,假设Checkpoint N 被注入到 source里,这时source会先把它正在处理分区的offset记录下来。

34.png

随着时间的流逝,它会把Checkpoint barrier发送到两个并发的下游,当barrier分别到达两个并发,这两个并发会分别把它们本地的状态都记录在Checkpoint 的里:

最后barrier到达最终的subtask,快照就完成了。

35.png


这是比较简单的场景演示,每个算子只有单流的输入,再来看下图比较复杂的场景,算子有多流输入的情况。

36.png

当算子有多个输入,需要把Barrier 对齐。怎么把Barrier对齐呢?

如下图所示,在左侧原本的状态下,当其中一条barrier到达,另一条barrier命令上有的barrier还在管道中没有到达,这时会在保证Exactly once的情况下,把先到达的流直接阻塞掉,然后等待另一条流的数据处理。等到另外一条流也到达了,会把之前的流unblock,同时把barrier发送到算子。

37.png

在这个过程中,阻塞掉其中一条流的作用是,会让它产生反压。Barrier 对齐会导致反压和暂停operator的数据处理。

如果不在对齐过程中阻塞已收到barrier的数据管道,数据持续不断流进来,那么属于下个Checkpoint的数据被包含在当前的Checkpoint里,如果一旦发生故障恢复后,由于source会被rewind,部分数据会有重复处理,这就是at-least-once。 如果能接收at-least-once,那么可以选择其他可以避免barrier对齐带来的副作用。

另外也可以通过异步快照来尽量减少任务停顿并支持多个Checkpoint同时进行。


5.快照触发

38.png

本地快照同步上传到系统需要state Copy-on-write的机制。

假如对元数据信息做了快照之后数据处理恢复了,在上传数据的过程中如何保证恢复的应用程序逻辑不会修改正在上传的数据呢?

实际上不同状态存储后端的处理是不一样的,Heap backend会触发数据的copy-on-write,而对于RocksDB backend来说LSM的特性可以保证已经快照的数据不会被修改。


四、Flink 的状态管理


1.Flink 状态管理

39.png

首先需要去定义一个状态,在下图的例子里,先定义一个Value state。

在定义的状态的时候,需要给出以下的几个信息:

1.状态识别ID

2.状态数据类型

3.本地状态后端注册状态

4.本地状态后端读写状态


2.Flink 状态后端

也称作state backend,Flink状态后端有两种;40.png

第一种,JVM Heap,它里面的数据是以Java对象形式存在的,读写也是以对象形式去完成的,所以速度很快。

但是也存在两个弊端:第一个弊端,以对象方式存储所需的空间是磁盘上序列化压缩后的数据大小的很多倍,所以占用的内存空间很大;第二个弊端,虽然读写不用做序列化,但是在形成snapshot时需要做序列化,所以它的异步snapshot过程会比较慢。

41.png


第二种,RocksDB,这个类型在读写时就需要做序列化,所以它读写的速度比较慢。

但是它有一个好处,基于 LSM 的数据结构在快照之后会形成sst文件,它的异步 checkpoint 过程就是文件拷贝的过程,CPU 消耗会比较低。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
存储 消息中间件 Kafka
Apache Flink 漫谈系列(05) - Fault Tolerance
实际问题 在流计算场景中,数据会源源不断的流入Apache Flink系统,每条数据进入Apache Flink系统都会触发计算。那么在计算过程中如果网络、机器等原因导致Task运行失败了,Apache Flink会如何处理呢?在 《Apache Flink 漫谈系列 - State》一篇中我们介绍了 Apache Flink 会利用State记录计算的状态,在Failover时候Task会根据State进行恢复。
7370 0
|
消息中间件 存储 Kafka
Flink1.4 Fault Tolerance源码解析-1
前言:本篇关注Flink,对Fault Tolerance的源码实现进行阐述,主要介绍Api层及Flink现有实现。 本篇文章重点关注以下问题: 1. 具备Fault Tolerance能力的两种对象:Function和Operator 2.
2095 0
|
消息中间件 Kafka Apache
Apache Flink fault tolerance源码剖析(一)
因某些童鞋的建议,从这篇文章开始结合源码谈谈Flink Fault Tolerance相关的话题。上篇官方介绍的翻译是理解这个话题的前提,所以如果你想更深入得了解Flink Fault Tolerance的机制,推荐先读一下前篇文章理解它的实现原理。
2471 0
|
Apache 流计算
Apache Flink fault tolerance源码剖析(二)
继续Flink Fault Tolerance机制剖析。上一篇文章我们结合代码讲解了Flink中检查点是如何应用的(如何根据快照做失败恢复,以及检查点被应用的场景),这篇我们来谈谈检查点的触发机制以及基于Actor的消息驱动的协同机制。
1589 0
|
存储 Apache 流计算
Apache Flink fault tolerance源码剖析(三)
上一篇文章我们探讨了基于定时任务的周期性检查点触发机制以及基于Akka的actor模型的消息驱动协同机制。这篇文章我们将探讨Zookeeper在Flink的Fault Tolerance所起到的作用。
1766 0
|
存储 Apache 流计算
Apache Flink fault tolerance源码剖析(四)
上篇文章我们探讨了Zookeeper在Flink的fault tolerance中发挥的作用(存储/恢复已完成的检查点以及检查点编号生成器)。 这篇文章会谈论一种特殊的检查点,Flink将之命名为——Savepoint(保存点)。
1734 0
|
存储 Apache 流计算
Apache Flink fault tolerance源码剖析(五)
上一篇文章我们谈论了保存点的相关内容,其中就谈到了保存点状态的存储。这篇文章我们来探讨用户程序状态的存储,也是在之前的文章中多次提及的state backend(中文暂译为状态终端)。 基于数据流API而编写的程序经常以各种各样的形式保存着状态: 窗口收集/聚合元素(这里的元素可以看作是窗口的状态)直到它们被触发 转换函数可能会使用key/value状态接口来存储数据 转换函数可能实现Checkpointed接口来让它们的本地变量受益于fault tolerant机制 当检查点机制工作时,上面谈到的这些状态将能够基于检查点一同持久化来保证数据不丢失并且得到可持续的恢复。
2243 0
|
存储 缓存 Apache
Apache Flink fault tolerance源码剖析(六)
上篇文章我们分析了基于检查点的用户状态的保存机制——状态终端。这篇文章我们来分析barrier(中文常译为栅栏或者屏障,为了避免引入名称争议,此处仍用英文表示)。检查点的barrier是提供exactly once一致性保证的主要保证机制。
1723 0
|
存储 Apache 流计算
Apache Flink fault tolerance源码剖析完结篇
这篇文章是对Flinkfault tolerance的一个总结。虽然还有些细节没有涉及到,但是基本的实现要点在这个系列中都已提及。 回顾这个系列,每篇文章都至少涉及一个知识点。我们来挨个总结一下。 恢复机制实现 Flink中通常需要进行状态恢复的对象是operator以及function。
2296 0