Flink(十二)【容错机制】(1)https://developer.aliyun.com/article/1532252
2.状态快照保存完成,分界线向下游传递
状态存入持久化存储之后,会返回通知给 Source 任务;Source 任务就会向 JobManager 确认检查点完成,然后像数据一样把 barrier 向下游任务传递。
由于 Source 和 Map 之间是一对一(forward)的传输关系(这里没有考虑算子链 operator chain),所以 barrier 可以直接传递给对应的 Map 任务。之后 Source 任务就可以继续读取新的数据了。与此同时,Sum 1 已经将第二条流传来的(hello,1)处理完毕,更新了状态。
3.向下游多个并行子任务广播分界线,执行分界线对齐
Map 任务没有状态,所以直接将 barrier 继续向下游传递。这时由于进行了 keyBy 分区,所以需要将 barrier 广播到下游并行的两个 Sum 任务。同时,Sum 任务可能收到来自上游两个并行 Map 任务的 barrier,所以需要执行“分界线对齐”操作。
所谓分界线对齐,意思就是当前任务要保存状态前,需要等待上游任务(多个上游任务才需要对齐)的 barrier 都到齐以后才能保存。
此时的 Sum 2 收到了来自上游两个 Map 任务的 barrier,说明第一条流第三个数据、第二条流第一个数据都已经处理完,可以进行状态的保存了;而 Sum 1 只收到了来自 Map 2 的barrier,所以这时需要等待分界线对齐。在等待的过程中,如果分界线尚未到达的分区任务Map 1 又传来了数据(hello, 1),说明这是需要保存到检查点的,Sum 任务应该正常继续处理数据,状态更新为 3;而如果分界线已经到达的分区任务 Map 2 又传来数据,这已经是下一个检查点要保存的内容了,就不应立即处理,而是要缓存起来、等到状态保存之后再做处理。
4.分界线对齐后,保存状态到持久化存储
各个分区的分界线都对齐后,就可以对当前状态做快照,保存到持久化存储了。存储完成之后,同样将 barrier 向下游继续传递,并通知 JobManager 保存完毕。
这个过程中,每个任务保存自己的状态都是相对独立的,互不影响。我们可以看到,当Sum 将当前状态保存完毕时,Source 1 任务已经读取到第一条流的第五个数据了。
5. 先处理缓存数据,然后正常继续处理
完成检查点保存之后,任务就可以继续正常处理数据了。这时如果有等待分界线对齐时缓存的数据,需要先做处理;然后再按照顺序依次处理新到的数据。当 JobManager 收到所有任务成功保存状态的信息,就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复了。由于分界线对齐要求先到达的分区做缓存等待,一定程度上会影响处理的速度;当出现背压(backpressure)时,下游任务会堆积大量的缓冲数据,检查点可能需要很久才可以保存完毕。为了应对这种场景,Flink 1.11 之后提供了不对齐的检查点保存方式,可以将未处理的缓冲数据(in-flight data)也保存进检查点。这样,当我们遇到一个分区 barrier 时就不需等待对齐,而是可以直接启动状态的保存了。
背压机制:背压机制是一种在异步编程中处理数据流的机制,特别是在响应式编程中。当生产者产生的数据流速度超过消费者处理的速度时,背压机制可以用来调整生产者的生产速率,以适应消费者的处理能力,从而避免数据积压和资源耗尽的问题。
(3)分布式快照算法(Barrier 对齐的至少一次)
在 Barrier 对齐精准一次的方式下,对于 Barrier 之后的数据,不能进行计算,只能等到 Barrier 对齐并持久化保存之后才能进入下游算子进行计算。
而在 Barrier 对齐至少一次的语义下,如果在 Barrier 对齐的过程中,Barrier 后面的数据越过了 Barrier 并进行了计算持久化保存到状态当中。所以缺点就是如果应用出现了故障需要重启,那么这部分在 Barrier 之后但是被持久化保存到状态中的数据就会被重复恢复计算,就会造成结果的不准确。但是优点也很明显,至少一次情况下,它不需要等待,就是说不用等到Barrier 对齐才进行计算,Barrier 后的数据就不需要缓存起来,也就不用担心出现背压时可能出现的一些其他问题,对我们的程序的压力不会那么大。
(4)分布式快照算法(非 Barrier 对齐的精准一次)
非 Barrier 对齐的精准一次语义是在 Flink1.11 之后提出来的,由于分界线对齐要求先叨叨的分区做缓存等待,一定程度上会影响处理的速度;当出现背压时下游任务会堆积大量的数据,检查点也可能需要很久才能保存完毕。所以我们的解决方案就是要么使用 Barrier 对齐的至少一次语义,要么就使用非 Barrier 对齐的精准一次语义。
非Barrier对齐并不是说不用Barrier,它的意思只是说不需要对齐了,仅此而已。
在非Barrier对齐算法中,一个任务在收到第一个Barrier时就开始执行备份,可以保证精准一次。
- 收到第一个 Barrier 后,直接把它放到输出缓冲区末端,向下游传递
- 标记数据:把被第一个 Barrier 越过的数据和其它 Barrier 之前的所有数据标记
- 把标记数据和当前任务的状态保存到当前任务的状态当中,当进行检查点恢复时这些数据都会恢复到对应位置
优点:
非Barrier对齐算法可以避免数据阻塞等待的问题,并且可以更精确地控制数据处理的语义。同时,非Barrier对齐算法可以更好地利用系统资源,提高数据处理的效率和吞吐量。
缺点:
需要占用更多的备份磁盘开销。
这种算法更加符合Chandy-Lamport 算法的思想。
总结
1. Barrier 对齐:一个 Task 收到所有上游的 barrier 之后,才会对自己的本地状态进行备份。
1.1 精准一次:在对齐过程中,barrier 后面的数据 阻塞等待(不会越过 barrier)
1.2 至少一次:在对齐的过程中,先到的 barrier 其后面的数据不阻塞,将会被计算并备份到状态当中
2. 非 Barrier 对齐:一个 Task 收到第一个 barrier 就开始执行备份。
能保证精准一次,先到的 barrier 会将本地状态备份,后面的数据接着计算输出
未到的 barrier,其前面的数据接着计算输出,同时也保存到备份当中
最后一个 barrier 到达该 task 时,这个task的备份结束
1.4、检查点配置
检查点的作用是为了故障恢复,我们不能因为保存检查点占据了大量时间、导致数据处理性能明显降低。为了兼顾容错性和处理性能,我们可以在代码中对检查点进行各种配置。
1. 启用检查点
默认情况下,Flink 程序是禁用检查点的。如果想要为 Flink 应用开启自动保存快照的功能,需要在代码中显式地调用执行环境的.enableCheckpointing()方法:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每隔 1 秒启动一次检查点保存 env.enableCheckpointing(1000);
这里需要传入一个长整型的毫秒数,表示周期性保存检查点的间隔时间。如果不传参数直接启用检查点,默认的间隔周期为 500 毫秒,这种方式已经被弃用。检查点的间隔时间是对处理性能和故障恢复速度的一个权衡。如果我们希望对性能的影响更小,可以调大间隔时间;而如果希望故障重启后迅速赶上实时的数据处理,就需要将间隔时间设小一些。
2. 检查点存储
(1)检查点模式(CheckpointingMode)
设置检查点一致性的保证级别,有“精确一次”(exactly-once)和“至少一次”(at-least-once)两个选项。默认级别为 exactly-once,而对于大多数低延迟的流处理程序,at-least-once 就够用了,而且处理效率会更高。关于一致性级别,我们会在 10.2 节继续展开。
(2)超时时间(checkpointTimeout)
用于指定检查点保存的超时时间,超时没完成(比如等待其他barrier的时间过长)就视为失败。传入一个长整型毫秒数作为参数,表示超时时间。
(3)最小间隔时间(minPauseBetweenCheckpoints)
用于指定在上一个检查点完成之后,检查点协调器(checkpoint coordinator)最快等多久可以出发保存下一个检查点的指令。这就意味着即使已经达到了周期触发的时间点,只要距离上一个检查点完成的间隔不够,就依然不能开启下一次检查点的保存。这就为正常处理数据留下了充足的间隙。当指定这个参数时,maxConcurrentCheckpoints 的值强制为 1。(控制一个流作用当中最多存在几次不同的检查点,barrier为1的检查点一直从source到sink,最后JobManager上传元数据到hdfs算一轮完整的checkpoint)
(4)最大并发检查点数量(maxConcurrentCheckpoints)
用于指定运行中的检查点最多可以有多少个。由于每个任务的处理进度不同,完全可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了。这个参数就是限制同时进行的最大数量。如果前面设置了 minPauseBetweenCheckpoints,则maxConcurrentCheckpoints 这个参数就不起作用了。(一个流作业当中同时最多可以存在的检查点个数,比如一个流计算当中同时存在 barrier1、barrier2、barrier3...)
(5)开启外部持久化存储(enableExternalizedCheckpoints)
用于开启检查点的外部持久化,而且默认在作业失败的时候不会自动清理,如果想释放空间需要自己手工清理。里面传入的参数 ExternalizedCheckpointCleanup 指定了当作业取消的时候外部的检查点该如何清理。
- DELETE_ON_CANCELLATION:在作业取消的时候会自动删除外部检查点,但是如果是作业失败退出,则会保留检查点。
- RETAIN_ON_CANCELLATION:作业取消的时候也会保留外部检查点。
(6)检查点异常时是否让整个任务失败(failOnCheckpointingErrors)
用于指定在检查点发生异常的时候,是否应该让任务直接失败退出。默认为 true,如果设置为 false,则任务会丢弃掉检查点然后继续运行。
(7)不对齐检查点(enableUnalignedCheckpoints)
不再执行检查点的分界线对齐操作,启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式(CheckpointingMode)必须为 exctly-once,并且并发的检查点个数为 1。
// 创建一个本地执行环境,并启用 Web 用户界面。本地执行环境意味着 Flink 任务将在本地机器上运行,而不是在集群上。Web 用户界面允许你监视和调试正在运行的 Flink 任务。 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
导入hadoop依赖:
<!-- 引入hadoop依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-common</artifactId> <version>3.3.0</version> <!--防止把hadoop的依赖打进项目造成和 flink 依赖冲突--> <scope>provided</scope> </dependency>
// 代码中指定管理检查点路径为hdfs,就存储到hdfs 导入hadoop依赖,指定访问hdfs的用户名 System.setProperty("HADOOP_USER_NAME","lyh"); // TODO 检查点配置 // 1. 周期为 5s 默认就是barrier对齐的精准一次 env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); // 2. 指定检查点的存储位置 CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8080/chk");// 一般我们会存到云端 // 3. 超时时间 默认10分钟 checkpointConfig.setCheckpointTimeout(60000); // 4. 同时运行中的checkpoint的最大数量 checkpointConfig.setMaxConcurrentCheckpoints(2); // 5. 最小等待间隔 上一轮checkpoint结束 到 下一轮checkpoint开始 之间的间隔 checkpointConfig.setMinPauseBetweenCheckpoints(1000); // 6. 取消作业时,checkpoint的数据是否保留在外部系统 这里设置成如果作业结束就把检查点内容删除 checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); // 7. 允许 checkpoint 连续失败的次数 默认为0 checkpointConfig.setTolerableCheckpointFailureNumber(10);
IDEA 调试查看Flink UI(不需要启动虚拟机里的Flink集群):
导入依赖( scope 作用于不可以是 provide,否则打不开 localhost:8081)
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>${flink.version}</version> </dependency>
访问 localhost:8081
完整代码:
public class CheckpointConfigDemo { public static void main(String[] args) throws Exception { // 1. 创建一个流式的执行环境 // 注意:用 getExecutionEnvironment 而不是 createLocalEnvironment 否则提交到flink无法完成作业 // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 创建一个本地执行环境,并启用 Web 用户界面。本地执行环境意味着 Flink 任务将在本地机器上运行,而不是在集群上。Web 用户界面允许你监视和调试正在运行的 Flink 任务。 Configuration conf = new Configuration(); conf.setInteger(RestOptions.PORT,8081); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); // 代码中指定管理检查点路径为hdfs,就存储到hdfs 导入hadoop依赖,指定访问hdfs的用户名 System.setProperty("HADOOP_USER_NAME","lyh"); // TODO 检查点配置 // 1. 周期为 5s 默认就是barrier对齐的精准一次 env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); // 2. 指定检查点的存储位置 CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setCheckpointStorage("file:///D://Desktop/FlinkStudy/chk");// 一般我们会存到云端 // 3. 超时时间 默认10分钟 checkpointConfig.setCheckpointTimeout(60000); // 4. 同时运行中的checkpoint的最大数量 checkpointConfig.setMaxConcurrentCheckpoints(2); // 5. 最小等待间隔 上一轮checkpoint结束 到 下一轮checkpoint开始 之间的间隔 checkpointConfig.setMinPauseBetweenCheckpoints(1000); // 6. 取消作业时,checkpoint的数据是否保留在外部系统 这里设置成如果作业结束就把检查点内容删除 checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); // 7. 允许 checkpoint 连续失败的次数 默认为0 checkpointConfig.setTolerableCheckpointFailureNumber(10); // 2. 流式数据处理环境得到的 DataSource 继承自 DataStream env .socketTextStream("hadoop102",9999) .flatMap((String line, Collector<Tuple2<String, Long>> out) -> { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } }).returns(Types.TUPLE(Types.STRING, Types.LONG)) .keyBy(t -> t.f0) .sum(1) .print(); // 7. 执行 env.execute(); // 这里我们的数据是有界的,但是真正开发环境是无界的,这里需要用execute方法等待新数据的到来 } }
说明:
// 取消作业时,checkpoint的数据是否保留在外部系统 这里设置成如果作业正常结束就把检查点内容删除(如果是突然挂掉 还会保存检查点) checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); // 取消作业时 会将检查点保留下来 checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
非Barrier对齐的精准一次配置
除此之外还可以设置非Barrier 对齐的精准一次,同样必须在启用检查点的时候设置精准一次且设置最大并发为1(如果是至少一次语义的话虽然不报错但是非对齐不生效,如果最大并发不是1将报错),然后设置:
// 设置精确一次模式 checkpointConfig.setCheckpointingMode(5000,CheckpointingMode.EXACTLY_ONCE); // 同时运行中的checkpoint的最大数量 checkpointConfig.setMaxConcurrentCheckpoints(1); // 启用不对齐的检查点保存方式 checkpointConfig.enableUnalignedCheckpoints();
我们可以查看源码中的说明:
启用未对齐的检查点,将大大减少背压下的检查点设置时间。
未对齐的检查点包含作为检查点状态的一部分存储在缓冲区中的数据,这允许检查点屏障超越这些缓冲区。因此,检查点持续时间变得与当前吞吐量无关,因为检查点屏障不再有效地嵌入到数据流中。
只有在ExecutionCheckpointingOptions的情况下才能启用未对齐的检查点。
新特性-设置对齐超时时间
Flink 16/17+才有的:
开启检查点才能生效:默认为0 表示一开始就用非对齐的检查点 如果>0 程序一开始先使用对齐的检查点(也就是Barrier对齐) 对齐时间超过这个参数自动切换成非对齐(非Barrier对齐)
// 开启检查点才生效:默认为0 表示一开始就用非对齐的检查点 如果>0 程序一开始先使用对齐的检查点(也就是Barrier对齐) 对齐时间超过这个参数自动切换成非对齐(非Barrier对齐) checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1L));
Flink(十二)【容错机制】(3)https://developer.aliyun.com/article/1532258