Flink(十二)【容错机制】(1)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十二)【容错机制】

前言

       最近已经放假了,但是一直在忙一个很重要的自己的一个项目,用 JavaFX 和一个大数据组件联合开发一个功能,也算不枉我学了一次 JavaFX,收获很大,JavaFX 它作为一个 GUI 开发语言,本质还是 Java,所以很好的锻炼了我的 Java 水平、抽象能力 ... 平常看似简单的一些概念用到实际应用当中才发现了其中的坑点,比如怎么封装、什么时候用 static 关键字、静态资源怎么放、哪些要反复利用的东西需要抽象成一个 pojo、什么情况下需要定义接口 ... 总之收获很大。

       今天赶紧继续开始大数据组件的学习,Flink 已经停了好长一段时间了,开干开干。

容错机制

       流式数据连续不断地到来,无休无止;所以流处理程序也是持续运行的,并没有一个明确的结束退出时间。机器运行程序,996 起来当然比人要容易得多,不过希望“永远运行”也是不切实际的。因为各种硬件软件的原因,运行一段时间后程序可能异常退出、机器可能宕机,如果我们只依赖一台机器来运行,就会使得任务的处理被迫中断。

       一个解决方案就是多台机器组成集群,以“分布式架构”来运行程序。这样不仅扩展了系统的并行处理能力,而且可以解决单点故障的问题,从而大大提高系统的稳定性和可用性。在分布式架构中,当某个节点出现故障,其他节点基本不受影响。这时只需要重启应用,恢复之前某个时间点的状态继续处理就可以了。这一切看似简单,可是在实时流处理中,我们不仅需要保证故障后能够重启继续运行,还要保证结果的正确性、故障恢复的速度、对处理性能的影响,这就需要在架构上做出更加精巧的设计。

       在 Flink 中,有一套完整的容错机制(fault tolerance)来保证故障后的恢复,其中最重要的就是检查点(checkpoint),类似与我们之前学习的 Spark ,它也有检查点来提供容错,学完我们对比一下它们究竟有啥不同。

1、检查点(checkpoint)

       在流处理中,我们可以用存档的思路,将之前某个时间点的状态保存下来,这份存档就是我们所谓的“检查点”。就像我们学大数据专业时候安装虚拟机的过程,虚拟机的快照功能可以帮助我们恢复到我们机器之前的状态。

       我们知道在有状态的流处理中,任务继续处理新数据,并不需要“之前的计算结果”,而是需要任务“之前的状态”。比如假设我们有一个长度为 10 的滑动窗口,它的滑动步长是 5 ,任务是求sum。当机器故障时,我们重启应用,这时候我们的任务会创建新的窗口处理新的数据,我们知道滑动窗口会在每个步长处触发一次计算,所以当我们的窗口到达一个步长时,它的窗口范围是 [-5,5),而[-5,0) 的数据在历史状态(检查点)中保存着,而且上一个窗口的计算结果我们无法利用,因为它计算的是 [-10,0) 内的数据,所以说我们并不需要“之前的计算结果”,而是需要任务“之前的状态”。

       遇到故障重启的时候,我们可以从检查点中“读档”,恢复出之前的状态,这样就可以回到当时保存的一刻接着处理数据了。

       检查点是 Flink 容错机制的核心。这里所谓的“检查”,其实是针对故障恢复的结果而言的:故障恢复之后继续处理的结果,应该与发生故障前完全一致,我们需要“检查”结果的正确性。所以,有时又会把 checkpoint 叫作“一致性检查点”。

1.1、检查点的保存

       什么时候进行检查点的保存呢?最理想的情况下,我们应该“随时”保存,也就是每处理完一个数据就保存一下当前的状态;这样如果在处理某条数据时出现故障,我们只要回到上一个数据处理完之后的状态,然后重新处理一遍这条数据就可以。这样重复处理的数据最少,完全没有多余操作,可以做到最低的延迟。然而实际情况不会这么完美。

(1) 周期性的触发保存

       “随时存档”确实恢复起来方便,但是需要我们不停地做存档操作,那不是闲得蛋疼嘛。如果每处理一条数据就进行检查点的保存,当大量数据同时到来时,就会耗费很多资源来频繁做检查点,影响应用处理数据的性能,数据处理的速度就会受到影响。所以更好的方式是,每隔一段时间去做一次存档,这样既不会影响数据的正常处理,也不会有太大的延迟——毕竟故障恢复的情况不是随时发生的。在 Flink 中,检查点的保存是周期性触发的,间隔时间可以进行设置。

       所以检查点作为应用状态的一份“存档”,其实就是所有任务状态在同一时间点的一个“快照”(snapshot),它的触发是周期性的。具体来说,当每隔一段时间检查点保存操作被触发时,就把每个任务当前的状态复制一份,按照一定的逻辑结构放在一起持久化保存起来,就构成了检查点。

(2) 保存的时间点

       这里有一个关键问题:当检查点的保存被触发时,任务有可能正在处理某个数据,这时该怎么办呢?最简单的想法是,可以在某个时刻“按下暂停键”,让所有任务停止处理数据。这样状态就不再更改,大家可以一起复制保存;保存完毕之后,再同时恢复数据处理就可以了。然而仔细思考就会发现这有很多问题。这种想法其实是粗暴地“停止一切来进行快照”,在保存检查点的过程中,任务完全中断了,这会造成很大的延迟;我们之前为了实时性做出的所有设计就毁在了做快照上。

       另一方面,我们做快照的目的是为了故障恢复;现在的快照中,有些任务正在处理数据,那它保存的到底是处理到什么程度的状态呢?举个例子,我们在程序中某一步操作中自定义了一个 ValueState,处理的逻辑是:当遇到一个数据时,状态先加 1;而后经过一些其他步骤后再加 1。现在停止处理数据,状态到底是被加了 1 还是加了 2 呢?这很重要,因为状态恢复之后,我们需要知道当前数据从哪里开始继续处理。要满足这个要求,就必须将暂停时的所有环境信息都保存下来——而这显然是很麻烦的。为了解决这个问题,我们不应该“一刀切”把所有任务同时停掉,而是至少得先把手头正在处理的数据弄完。这样的话,我们在检查点中就不需要保存所有上下文信息,只要知道当前处理到哪个数据就可以了。

       但这样依然会有问题:分布式系统的节点之间需要通过网络通信来传递数据,如果我们保存检查点的时候刚好有数据在网络传输的路上,那么下游任务是没法将数据保存起来的;故障重启之后,我们只能期待上游任务重新发送这个数据。然而上游任务是无法知道下游任务是否收到数据的,只能盲目地重发,这可能导致下游将数据处理两次,结果就会出现错误。

       所以我们最终的选择是:当所有任务都恰好处理完一个相同的输入数据的时候(这里指的是上游算子和下游算子处理完一个相同的数据),将它们的状态保存下来。首先,这样避免了除状态之外其他额外信息的存储,提高了检查点保存的效率。其次,一个数据要么就是被所有任务完整地处理完,状态得到了保存;要么就是没处理完,状态全部没保存:这就相当于构建了一个“事务”(transaction)。

       如果出现故障,我们恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;所以我们只需要让源(source)任务向数据源重新提交偏移量、请求重放数据就可以了。这需要源任务可以把偏移量作为算子状态保存下来,而且外部数据源能够重置偏移量,Kafka 就能满足这个要求,我们只需要重置Kafka输出的偏移量就行,毕竟Kafka是持久保存我们的数据的,并不是发送完数据就立马删除。

(3)保存的具体流程

       检查点的保存,最关键的就是要等所有任务将“同一个数据”处理完毕。这里举一个统计词频的例子—WordCount。这里为了方便,我们直接从数据源读入已经分开的一个个单词,例如这里输入的就是:

“hello”,“world”,“hello”,“flink”,“hello”,“world”,“hello”,“flink”……

       这里的比如我们的所有的有状态算子(source、sum、sink)都处理完第三个单词 “hello” 后就更新自己的状态。

       当我们的所有任务处理完同一条数据后,对状态做个快照保存下来。例如上图中,已经处理了 3 条数据:“hello”“world”“hello”,所以我们会看到 Source 算子的偏移量为 3;后面的 Sum 算子处理完第三条数据“hello”之后,此时已经有 2 个“hello”和 1 个“world”,所以对应的状态为“hello”-> 2,“world”-> 1(这里 KeyedState底层会以 key-value 形式存储)。此时所有任务都已经处理完了前三个数据,所以我们可以把当前的状态保存成一个检查点,写入外部存储中。至于具体保存到哪里,这是由状态后端的配置项 “ 检 查 点 存 储 ”( CheckpointStorage )来决定的,可以有作业管理器的堆内存(JobManagerCheckpointStorage)和文件系统(FileSystemCheckpointStorage)两种选择。一般情况下,我们会将检查点写入持久化的分布式文件系统。

1.2、从检查点恢复状态

       在运行流处理程序时,Flink 会周期性地保存检查点。当发生故障时,就需要找到最近一次成功保存的检查点来恢复状态。

       比如我们上面处理完第三个数据(“hello”)后保存了一个检查点。之后继续运行,又正常处理了一个数据“flink”,在处理第五个数据“hello”时发生了故障:

       这时 Source 任务已经处理完毕,所以偏移量为 5;Map 任务也处理完成了。而其中一个 Sum 任务在处理中发生了故障,此时状态并未保存(“flink” 和 “hello” 的状态都未保存)。接下来就需要从检查点来恢复状态了。具体的步骤为:

(1)重启应用

遇到故障之后,第一步当然就是重启。我们将应用重新启动后,所有任务的状态会清空:

(2)读取检查点,重置状态

       找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态中。这样,Flink 内部所有任务的状态,就恢复到了保存检查点的那一时刻,也就是刚好处理完第三个数据的时候,这里第四条数据 “flink” 并没有数据到来,所以初始为 0。

(3)重放数据

       从检查点恢复状态后还有一个问题:如果直接继续处理数据,那么保存检查点之后、到发生故障这段时间内的数据,也就是第 4、5 个数据(“flink”“hello”)就相当于丢掉了;这会造成计算结果的错误。为了不丢数据,我们应该从保存检查点后开始重新读取数据,这可以通过 Source 任务向外部数据源重新提交偏移量(offset)来实现:

这样,整个系统的状态已经完全回退到了检查点保存完成的那一时刻。

(4)继续处理数据

接下来,我们就可以正常处理数据了。首先是重放第 4、5 个数据,然后继续读取后面的数据:

       当处理到第 5 个数据时,就已经追上了发生故障时的系统状态。之后继续处理,就好像没有发生过故障一样;我们既没有丢掉数据也没有重复计算数据,这就保证了计算结果的正确性。在分布式系统中,这叫作实现了“精确一次”(exactly-once)的状态一致性保证。

       这里我们也可以发现,想要正确地从检查点中读取并恢复状态,必须知道每个算子任务状态的类型和它们的先后顺序(拓扑结构);因此为了可以从之前的检查点中恢复状态,我们在改动程序、修复 bug 时要保证状态的拓扑顺序和类型不变。状态的拓扑结构在 JobManager 上可以由 JobGraph 分析得到,而检查点保存的定期触发也是由 JobManager 控制的;所以故障恢复的过程需要 JobManager 的参与。

1.3、检查点算法

       我们已经知道,Flink 保存检查点的时间点,是所有任务都处理完同一个输入数据的时候。但是不同的任务处理数据的速度不同,当第一个 Source 任务处理到某个数据时,后面的 Sum任务可能还在处理之前的数据;而且数据经过任务处理之后类型和值都会发生变化,面对着“面目全非”的数据,不同的任务怎么知道处理的是“同一个”呢?

       一个简单的想法是,当接到 JobManager 发出的保存检查点的指令后,Source 算子任务处理完当前数据就暂停等待,不再读取新的数据了。也就是留一个空档期,这样我们就可以保证在流中只有需要保存到检查点的数据,只要把它们全部处理完,就可以保证所有任务刚好处理完最后一个数据;这时把所有状态保存起来,合并之后就是一个检查点了。就相当于当要进行检查点保存时,Source任务先停下来,这样就只需要等待最后一个数据被所有任务处理之后再进行保存 ,而且这样可以保证所有任务保存的都是统一个数据。

       但这样做最大的问题,就是每个任务的进度可能不同;为了保证状态一致前面的任务不能进行其他工作,只能等待后面的任务处理到相同的数据再进行检查点的保存。当先保存完状态的任务需要等待其他任务时,就导致了资源的闲置和性能的降低。所以更好的做法是,在不暂停整体流处理的前提下,将状态备份保存到检查点。在 Flink中,采用了基于 Chandy-Lamport 算法的分布式快照。

(1)检查点分界线(Barrier)

       我们现在的目标是,在不暂停流处理的前提下,让每个任务“认出”触发检查点保存的那个数据。

       自然想到,如果给数据添加一个特殊标识,任务就可以准确识别并开始保存状态了。这需要在 Source 任务收到触发检查点保存的指令后,立即在当前处理的数据中插入一个标识字段,然后再向下游任务发出。但是假如 Source 任务此时并没有正在处理的数据,这个操作就无法实现了。所以我们可以借鉴水位线(watermark)的设计,在数据流中插入一个特殊的数据结构,专门用来表示触发检查点保存的时间点。收到保存检查点的指令后,Source 任务可以在当前数据流中插入这个结构;之后的所有任务只要遇到它就开始对状态做持久化快照保存。由于数据流是保持顺序依次处理的,因此遇到这个标识就代表之前的数据都处理完了,可以保存一个检查点;而在它之后的数据,引起的状态改变就不会体现在这个检查点中,而需要保存到下一个检查点。

       这种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫作检查点的“分界线”(Checkpoint Barrier)。与水位线很类似,检查点分界线也是一条特殊的数据,由 Source 算子注入到常规的数据流中,它的位置是限定好的,不能超过其他数据,也不能被后面的数据超过。检查点分界线中带有一个检查点 ID,这是当前要保存的检查点的唯一标识。

       这样,分界线就将一条流逻辑上分成了两部分:分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所表示的检查点中;而基于分界线之后的数据导致的状态更改,则会被包含在之后的检查点中。

       在 JobManager 中有一个“检查点协调器”(checkpoint coordinator),专门用来协调处理检查点的相关工作。检查点协调器会定期向 TaskManager 发出指令,要求保存检查点(带着检查点 ID);TaskManager 会让所有的 Source 任务把自己的偏移量(算子状态)保存起来,并将带有检查点 ID 的分界线(barrier)插入到当前的数据流中,然后像正常的数据一样像下游传递;之后 Source 任务就可以继续读入新的数据了。

       每个算子任务只要处理到这个 barrier,就把当前的状态进行快照;在收到 barrier 之前,还是正常地处理之前的数据,完全不受影响。比如上图中,Source 任务收到 1 号检查点保存指令时,读取完了三个数据,所以将偏移量 3 保存到外部存储中;而后将 ID 为 1 的 barrier 注入数据流;与此同时,Map 任务刚刚收到上一条数据“hello”,而 Sum 任务则还在处理之前的第二条数据(world, 1)。下游任务不会在这时就立刻保存状态,而是等收到 barrier 时才去做快照,这时可以保证前三个数据都已经处理完了。同样地,下游任务做状态快照时,也不会影响上游任务的处理,每个任务的快照保存并行不悖,不会有暂停等待的时间。

(2) 分布式快照算法(Barrier 对齐的精准一次)

       通过在流中插入分界线(barrier),我们可以明确地指示触发检查点保存的时间。在一条单一的流上,数据依次进行处理,顺序保持不变;不过对于分布式流处理来说,想要一直保持数据的顺序就不是那么容易了。我们先回忆一下水位线(watermark)的处理:上游任务向多个并行下游任务传递时,需要广播出去;而多个上游任务向同一个下游任务传递时,则需要下游任务为每个上游并行任务维护一个“分区水位线”,取其中最小的那个作为当前任务的事件时钟。那 barier 在并行数据流中的传递,是不是也有类似的规则呢?watermark 指示的是“之前的数据全部到齐了”,而 barrier 指示的是“之前所有数据的状态更改保存入当前检查点”:它们都是一个“截止时间”的标志。所以在处理多个分区的传递时,也要以是否还会有数据到来作为一个判断标准。

       具体实现上,Flink 使用了 Chandy-Lamport 算法的一种变体,被称为“异步分界线快照”(asynchronous barrier snapshotting)算法。算法的核心就是两个原则:

  1. 当上游任务向多个并行下游任务发送 barrier 时,需要广播出去;
  2. 而当多个上游任务向同一个下游任务传递 barrier 时,需要在下游任务执行“分界线对齐”(barrier alignment)操作,也就是需要等到所有并行分区的 barrier 都到齐,才可以开始状态的保存。

为了详细解释检查点算法的原理,我们对之前的 word count 程序进行扩展,考虑所有算子并行度为 2 的场景:

我们有两个并行的 Source 任务,会分别读取两个数据流(或者是一个源的不同分区)。这里每条流中的数据都是一个个的单词:“hello”“world”“hello”“flink”交替出现。此时第一条流的 Source 任务(我们叫它“Source 1”)读取了 3个数据,偏移量为 3;而第二条流的 Source 任务(Source 2)只读取了一个“hello”数据,偏移量为 1。第一条流中的第一个数据“hello”已经完全处理完毕,所以 Sum 任务的状态中 key为 hello 对应着值 1,而且已经发出了结果(hello, 1);第二个数据“world”经过了 Map 任务的转换,还在被 Sum 任务处理;第三个数据“hello”还在被 Map 任务处理。而第二条流的第一个数据“hello”同样已经经过了 Map 转换,正在被 Sum 任务处理。

接下来就是检查点保存的算法。具体过程如下:

1.JobManager 发送指令,触发检查点的保存;

JobManager 会周期性地向每个 TaskManager 发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点。收到指令后,TaskManger 会在所有 Source 任务中插入一个分界线(barrier),并将偏移量保存到远程的持久化存储中。

并行的 Source 任务保存的状态为 3 和 1,表示当前的 1 号检查点应该包含:第一条流中截至第三个数据、第二条流中截至第一个数据的所有状态更改。可以发现 Source 任务做这些的时候并不影响下游任务的处理,Sum 任务已经处理完了第一条流中传来的(world, 1),对应的状态也有了更改。


Flink(十二)【容错机制】(2)https://developer.aliyun.com/article/1532255

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
89 3
|
4月前
|
存储 数据处理 Apache
超越传统数据库:揭秘Flink状态机制,让你的数据处理效率飞升!
【8月更文挑战第26天】Apache Flink 在流处理领域以其高效实时的数据处理能力脱颖而出,其核心特色之一便是状态管理机制。不同于传统数据库依靠持久化存储及 ACID 事务确保数据一致性和可靠性,Flink 利用内存中的状态管理和分布式数据流模型实现了低延迟处理。Flink 的状态分为键控状态与非键控状态,前者依据数据键值进行状态维护,适用于键值对数据处理;后者与算子实例关联,用于所有输入数据共享的状态场景。通过 checkpointing 机制,Flink 在保障状态一致性的同时,提供了更适合流处理场景的轻量级解决方案。
65 0
|
6月前
|
消息中间件 存储 NoSQL
Flink(十二)【容错机制】(4)
Flink(十二)【容错机制】
|
4月前
|
调度 流计算
Flink 新一代流计算和容错问题之Flink 中的数据可以分为什么类型
Flink 新一代流计算和容错问题之Flink 中的数据可以分为什么类型
|
4月前
|
Cloud Native 安全 调度
Flink 新一代流计算和容错问题之Flink 通过云原生技术改进容错设计要如何操作
Flink 新一代流计算和容错问题之Flink 通过云原生技术改进容错设计要如何操作
|
4月前
|
运维 Cloud Native 数据库
Flink 新一代流计算和容错问题之将 Flink 的容错与云原生的弹性扩缩容相结合要怎么操作
Flink 新一代流计算和容错问题之将 Flink 的容错与云原生的弹性扩缩容相结合要怎么操作
|
4月前
|
存储 流计算
Flink 新一代流计算和容错问题之Flink 通过 Key Group 管理状态是怎么操作的
Flink 新一代流计算和容错问题之Flink 通过 Key Group 管理状态是怎么操作的
|
4月前
|
存储 调度 流计算
Flink 新一代流计算和容错问题之如何实现 Generalized Log-Based Incremental Checkpoint
Flink 新一代流计算和容错问题之如何实现 Generalized Log-Based Incremental Checkpoint
|
4月前
|
存储 缓存 流计算
Flink 新一代流计算和容错问题之在有状态的算子中,状态更新是怎么记录的
Flink 新一代流计算和容错问题之在有状态的算子中,状态更新是怎么记录的
|
4月前
|
缓存 流计算
Flink 新一代流计算和容错问题之 Flink 作业的 local buffer pool 的 size 要如何估算
Flink 新一代流计算和容错问题之 Flink 作业的 local buffer pool 的 size 要如何估算
下一篇
DataWorks