(1)Checkpoint的背景
State场景:
flink中有状态函数和运算符在各个元素(element)/事件(event)的处理过程中存储的数据,这些数据可以修改和查询,可以自己维护,根据自己的业务场景,保存历史数据或者中间结果到状态(state)中);
使用状态计算的例子:
当应用程序搜索某些事件模式时,状态将存储到目前为止遇到的事件序列。
在每分钟/小时/天聚合事件时,状态保存待处理的聚合。
当在数据点流上训练机器学习模型时,状态保持模型参数的当前版本。
当需要管理历史数据时,状态允许有效访问过去发生的事件。
比如:以wordcount中计算pv/uv为例:
输出的结果跟之前的状态有关系,不符合幂等性,访问多次,pv会增加;
为什么需要state管理
流式作业的特点是7*24小时运行,数据不重复消费,不丢失,保证只计算一次,数据实时产出不延迟,但是当状态很大,内存容量限制,或者实例运行奔溃,或需要扩展并发度等情况下,如何保证状态正确的管理,在任务重新执行的时候能正确执行,状态管理就显得尤为重要。
理想中的state管理
易用, flink提供了丰富的数据结构,简洁易用的接口;
高效,flink对状态的处理读写快,可以横向扩展,保存状态不影响计算性能;
可靠,flink对状态可以做持久化,而且可以保证exactly一once语义;
(2)Flink 中Checkpoint
checkpoint 机制是 Flink 可靠性的基石,可以保证 Flink 集群在某个算子因为 某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某 一状态,保证应用流图状态的一致性.
快照的实现算法:
简单算法–暂停应用, 然后开始做检查点, 再重新恢复应用
Flink 的改进 Checkpoint 算法. Flink 的 checkpoint 机制原理来自 "Chandy-Lamport algorithm"算法(分布式快照算)的一种变体: 异步 barrier 快照 (asynchronous barrier snapshotting)
每个需要 checkpoint 的应用在启动时,Flink 的 JobManager 为其创 建一个 CheckpointCoordinator ,CheckpointCoordinator 全权负责本应用 的快照制作。
(3)理解 Barrier
流的 barrier 是 Flink 的 Checkpoint 中的一个核心概念. 多个 barrier 被插入到数据 流中, 然后作为数据流的一部分随着数据流动(有点类似于 Watermark).这些 barrier 不会 跨越流中的数据.
每个 barrier 会把数据流分成两部分: 一部分数据进入 当前的快照 , 另一部分数据进入下一个快照每个 barrier 携带着快照的 id. barrier 不会暂停数据的流动, 所以非 常轻量级. 在流中, 同一时间可以有来源于多个不同快照的多个 barrier, 这个意味着可 以并发的出现不同的快照.
(4)Flink 中Checkpoint 执行过程
checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。Flink的checkpoint机制原理来自"Chandy一Lamport
algorithm”算法。(分布式快照算)
每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。
CheckpointCoordinator周期性的向该流应用的所有source算子发送barrier。
当某个source算子收到一个barrier时,便暂停数据处理过程,然后将自己的当前状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自己快照制作情况,同时向自身所有下游算子广播该barrier,恢复数据处理
下游算子收到barrier之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自身快照情况,同时向自身所有下游算子广播该barrier,恢复数据处理。
每个算子按照步骤3不断制作快照并向下游广播,直到最后barrier传递到sink算子,快照制作完成。
当CheckpointCoordinator收到所有算子的报告之后,认为该周期的快照制作成功;否则,如果在规定的时间内没有收到所有算子的报告,则认为本周期快照制作失败;
第一步: Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint. 然后 Source Task 会在数据流中安插 CheckPoint barrier
第二步: source 节点向下游广播 barrier,这个 barrier 就是实现 ChandyLamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才 会执行相应的 Checkpoint
第三步: 当 task 完成 state 备份后,会将备份数据的地址(state handle) 通知给 Checkpoint coordinator。
第四步: 下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行 本地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会从中选择没 有上传的文件进行持久化备份(紫色小三角)。
第五步: 同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator。
第六步: 最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。
Checkpoint总体过程
(5)严格一次语义: barrier 对齐
在多并行度下, 如果要实现严格一次, 则要执行barrier 对齐。
当 job graph 中的每个 operator 接收到 barriers 时,它就会记录下其状态。拥有 两个输入流的 Operators(例如 CoProcessFunction)会执行 barrier 对齐 (barrier alignment) 以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有 events 而产生的状态。
当 operator 收到数字流的 barrier n 时, 它就 不能处理(但是可以接收) 来自该流的任 何数据记录,直到它从字母流所有输入接收到 barrier n 为止。否则,它会混合属于快 照 n 的记录和属于快照 n + 1 的记录。
接收到 barrier n 的流(数字流)暂时被搁置。从这些流接收的记录入输入缓冲区, 不会被处理。
图一中的 Checkpoint barrier n 之后的数据 123 已结到达了算子, 存入到输入缓冲 区没有被处理, 只有等到字母流的 Checkpoint barrier n 到达之后才会开始处理.
一旦最后所有输入流都接收到 barrier n,Operator 就会把缓冲区中 pending 的输出数 据发出去,然后把 CheckPoint barrier n 接着往下游发送。这里还会对自身进行快照。
(6)至少一次语义: barrier 不对齐
前面介绍了 barrier 对齐, 如果 barrier 不对齐会怎么样?会重复消费, 就是 至少一次 语义.
假设不对齐, 在字母流的 Checkpoint barrier n 到达前, 已经处理了 1 2 3. 等字母 流 Checkpoint barrier n 到达之后, 会做 Checkpoint n. 假设这个时候程序异常错误了, 则重新启动的时候会 Checkpoint n 之后的数据重新计算. 1 2 3 会被再次被计算, 所以 123 出现了重复计算.
(7)checkpoint 和 savepoint 的区别
(8)基于State Backend的CheckPoint开发
package com.aikfk.flink.datastream.state; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.ArrayList; import java.util.List; /** * @author :caizhengjie * @description:TODO * @date :2021/3/31 4:04 下午 */ public class CheckPoint { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // start a checkpoint every 1000ms env.enableCheckpointing(1000); // advanced options // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】 env.getCheckpointConfig().setCheckpointTimeout(10000); // 同一时间只允许进行一个检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 如果有更近的保存点时,是否将作业回退到该检查点 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); env.getCheckpointConfig().enableUnalignedCheckpoints(); env.setStateBackend(new FsStateBackend( "file:Users/caizhengjie/Desktop/test ",true)); DataStreamSink<Tuple2<String,Long>> dataStream = env.addSource(new MySource()) .map(new MapFunction<String, Tuple2<String,Long>>() { @Override public Tuple2<String, Long> map(String line) throws Exception { String[] words = line.split(","); return new Tuple2<>(words[0],Long.parseLong(words[1])); } }) .keyBy(value -> value.f0) .addSink(new BufferingSink()); env.execute("KeyedState"); } static class BufferingSink implements SinkFunction<Tuple2<String,Long>>, CheckpointedFunction { private ListState<Tuple2<String,Long>> listState; private List<Tuple2<String,Long>> bufferedElements = new ArrayList<>(); @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Tuple2<String, Long>> descriptor = new ListStateDescriptor<Tuple2<String, Long>>("bufferedSinkState", TypeInformation.of(new TypeHint<Tuple2<String,Long>>() {})); listState = context.getOperatorStateStore().getListState(descriptor); if (context.isRestored()){ for (Tuple2<String, Long> element : listState.get()){ bufferedElements.add(element); } } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { for (Tuple2<String, Long> element : bufferedElements){ listState.add(element); } } @Override public void invoke(Tuple2<String,Long> value, Context context) throws Exception { bufferedElements.add(value); System.out.println("invoke>>> " + value); for (Tuple2<String,Long> element : bufferedElements){ System.out.println(Thread.currentThread().getId() + " >> " + element.f0 + " : " + element.f1); } } } public static class MySource implements SourceFunction<String> { @Override public void cancel() { } @Override public void run(SourceContext<String> ctx) throws Exception { String data = "s,4"; while (true) { ctx.collect(data); } } } }