Flink之Checkpoint机制

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)Checkpoint的背景


State场景:


flink中有状态函数和运算符在各个元素(element)/事件(event)的处理过程中存储的数据,这些数据可以修改和查询,可以自己维护,根据自己的业务场景,保存历史数据或者中间结果到状态(state)中);


使用状态计算的例子:


当应用程序搜索某些事件模式时,状态将存储到目前为止遇到的事件序列。

在每分钟/小时/天聚合事件时,状态保存待处理的聚合。

当在数据点流上训练机器学习模型时,状态保持模型参数的当前版本。

当需要管理历史数据时,状态允许有效访问过去发生的事件。

比如:以wordcount中计算pv/uv为例:

输出的结果跟之前的状态有关系,不符合幂等性,访问多次,pv会增加;


为什么需要state管理


流式作业的特点是7*24小时运行,数据不重复消费,不丢失,保证只计算一次,数据实时产出不延迟,但是当状态很大,内存容量限制,或者实例运行奔溃,或需要扩展并发度等情况下,如何保证状态正确的管理,在任务重新执行的时候能正确执行,状态管理就显得尤为重要。


理想中的state管理


易用, flink提供了丰富的数据结构,简洁易用的接口;

高效,flink对状态的处理读写快,可以横向扩展,保存状态不影响计算性能;

可靠,flink对状态可以做持久化,而且可以保证exactly一once语义;


(2)Flink 中Checkpoint


checkpoint 机制是 Flink 可靠性的基石,可以保证 Flink 集群在某个算子因为 某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某 一状态,保证应用流图状态的一致性.


快照的实现算法:


简单算法–暂停应用, 然后开始做检查点, 再重新恢复应用


Flink 的改进 Checkpoint 算法. Flink 的 checkpoint 机制原理来自 "Chandy-Lamport algorithm"算法(分布式快照算)的一种变体: 异步 barrier 快照 (asynchronous barrier snapshotting)


每个需要 checkpoint 的应用在启动时,Flink 的 JobManager 为其创 建一个 CheckpointCoordinator ,CheckpointCoordinator 全权负责本应用 的快照制作。

35.png36.png37.png





(3)理解 Barrier


流的 barrier 是 Flink 的 Checkpoint 中的一个核心概念. 多个 barrier 被插入到数据 流中, 然后作为数据流的一部分随着数据流动(有点类似于 Watermark).这些 barrier 不会 跨越流中的数据.


每个 barrier 会把数据流分成两部分: 一部分数据进入 当前的快照 , 另一部分数据进入下一个快照每个 barrier 携带着快照的 id. barrier 不会暂停数据的流动, 所以非 常轻量级. 在流中, 同一时间可以有来源于多个不同快照的多个 barrier, 这个意味着可 以并发的出现不同的快照.


38.png

(4)Flink 中Checkpoint 执行过程


checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。Flink的checkpoint机制原理来自"Chandy一Lamport

algorithm”算法。(分布式快照算)

每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。

CheckpointCoordinator周期性的向该流应用的所有source算子发送barrier。

当某个source算子收到一个barrier时,便暂停数据处理过程,然后将自己的当前状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自己快照制作情况,同时向自身所有下游算子广播该barrier,恢复数据处理

下游算子收到barrier之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自身快照情况,同时向自身所有下游算子广播该barrier,恢复数据处理。

每个算子按照步骤3不断制作快照并向下游广播,直到最后barrier传递到sink算子,快照制作完成。

当CheckpointCoordinator收到所有算子的报告之后,认为该周期的快照制作成功;否则,如果在规定的时间内没有收到所有算子的报告,则认为本周期快照制作失败;

第一步: Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint. 然后 Source Task 会在数据流中安插 CheckPoint barrier

39.png

第二步: source 节点向下游广播 barrier,这个 barrier 就是实现 ChandyLamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才 会执行相应的 Checkpoint

40.png

第三步: 当 task 完成 state 备份后,会将备份数据的地址(state handle) 通知给 Checkpoint coordinator。

41.png

第四步: 下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行 本地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会从中选择没 有上传的文件进行持久化备份(紫色小三角)。

42.png

第五步: 同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator。

43.png

第六步: 最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。

44.png


Checkpoint总体过程

45.png


(5)严格一次语义: barrier 对齐


在多并行度下, 如果要实现严格一次, 则要执行barrier 对齐。


当 job graph 中的每个 operator 接收到 barriers 时,它就会记录下其状态。拥有 两个输入流的 Operators(例如 CoProcessFunction)会执行 barrier 对齐 (barrier alignment) 以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有 events 而产生的状态。


46.png

当 operator 收到数字流的 barrier n 时, 它就 不能处理(但是可以接收) 来自该流的任 何数据记录,直到它从字母流所有输入接收到 barrier n 为止。否则,它会混合属于快 照 n 的记录和属于快照 n + 1 的记录。


接收到 barrier n 的流(数字流)暂时被搁置。从这些流接收的记录入输入缓冲区, 不会被处理。


图一中的 Checkpoint barrier n 之后的数据 123 已结到达了算子, 存入到输入缓冲 区没有被处理, 只有等到字母流的 Checkpoint barrier n 到达之后才会开始处理.


一旦最后所有输入流都接收到 barrier n,Operator 就会把缓冲区中 pending 的输出数 据发出去,然后把 CheckPoint barrier n 接着往下游发送。这里还会对自身进行快照。


(6)至少一次语义: barrier 不对齐


前面介绍了 barrier 对齐, 如果 barrier 不对齐会怎么样?会重复消费, 就是 至少一次 语义.

47.png



假设不对齐, 在字母流的 Checkpoint barrier n 到达前, 已经处理了 1 2 3. 等字母 流 Checkpoint barrier n 到达之后, 会做 Checkpoint n. 假设这个时候程序异常错误了, 则重新启动的时候会 Checkpoint n 之后的数据重新计算. 1 2 3 会被再次被计算, 所以 123 出现了重复计算.


(7)checkpoint 和 savepoint 的区别


48.png49.pngimage.png

网络异常,图片无法展示
|
image.png


(8)基于State Backend的CheckPoint开发


package com.aikfk.flink.datastream.state;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.ArrayList;
import java.util.List;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/31 4:04 下午
 */
public class CheckPoint {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // start a checkpoint every 1000ms
        env.enableCheckpointing(1000);
        // advanced options
        // set mode to exactly-once (this is the default)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
        env.getCheckpointConfig().setCheckpointTimeout(10000);
        // 同一时间只允许进行一个检查点
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 如果有更近的保存点时,是否将作业回退到该检查点
        env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
        env.getCheckpointConfig().enableUnalignedCheckpoints();
        env.setStateBackend(new FsStateBackend(
                "file:Users/caizhengjie/Desktop/test ",true));
        DataStreamSink<Tuple2<String,Long>> dataStream = env.addSource(new MySource())
                .map(new MapFunction<String, Tuple2<String,Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String line) throws Exception {
                        String[] words = line.split(",");
                        return new Tuple2<>(words[0],Long.parseLong(words[1]));
                    }
                })
                .keyBy(value -> value.f0)
                .addSink(new BufferingSink());
        env.execute("KeyedState");
    }
    static class BufferingSink implements SinkFunction<Tuple2<String,Long>>, CheckpointedFunction {
        private ListState<Tuple2<String,Long>> listState;
        private List<Tuple2<String,Long>> bufferedElements = new ArrayList<>();
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Tuple2<String, Long>> descriptor =
                    new ListStateDescriptor<Tuple2<String, Long>>("bufferedSinkState",
                            TypeInformation.of(new TypeHint<Tuple2<String,Long>>() {}));
            listState = context.getOperatorStateStore().getListState(descriptor);
            if (context.isRestored()){
                for (Tuple2<String, Long> element : listState.get()){
                    bufferedElements.add(element);
                }
            }
        }
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            for (Tuple2<String, Long> element : bufferedElements){
                listState.add(element);
            }
        }
        @Override
        public void invoke(Tuple2<String,Long> value, Context context) throws Exception {
            bufferedElements.add(value);
            System.out.println("invoke>>> " + value);
            for (Tuple2<String,Long> element : bufferedElements){
                System.out.println(Thread.currentThread().getId() + " >> " + element.f0 + " : " + element.f1);
            }
        }
    }
    public static class MySource implements SourceFunction<String> {
        @Override
        public void cancel() {
        }
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            String data = "s,4";
            while (true) {
                ctx.collect(data);
            }
        }
    }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
63 3
|
3月前
|
容灾 流计算
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
|
3月前
|
存储 数据处理 Apache
超越传统数据库:揭秘Flink状态机制,让你的数据处理效率飞升!
【8月更文挑战第26天】Apache Flink 在流处理领域以其高效实时的数据处理能力脱颖而出,其核心特色之一便是状态管理机制。不同于传统数据库依靠持久化存储及 ACID 事务确保数据一致性和可靠性,Flink 利用内存中的状态管理和分布式数据流模型实现了低延迟处理。Flink 的状态分为键控状态与非键控状态,前者依据数据键值进行状态维护,适用于键值对数据处理;后者与算子实例关联,用于所有输入数据共享的状态场景。通过 checkpointing 机制,Flink 在保障状态一致性的同时,提供了更适合流处理场景的轻量级解决方案。
57 0
|
3月前
|
消息中间件 监控 Java
实时计算 Flink版产品使用问题之该如何解决checkpoint频繁失败
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
容灾 流计算
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
|
3月前
|
存储 调度 流计算
Flink 新一代流计算和容错问题之如何实现 Generalized Log-Based Incremental Checkpoint
Flink 新一代流计算和容错问题之如何实现 Generalized Log-Based Incremental Checkpoint
|
3月前
|
存储 缓存 数据处理
Flink 新一代流计算和容错问题之中间数据流动缓慢导致 Checkpoint 慢的问题要如何解决
Flink 新一代流计算和容错问题之中间数据流动缓慢导致 Checkpoint 慢的问题要如何解决
|
3月前
|
存储 分布式计算 算法
Flink四大基石——4.Checkpoint容错机制
Flink四大基石——4.Checkpoint容错机制
72 1
|
3月前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用问题之mini-cluster模式下,怎么指定checkpoint的时间间隔
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
存储 监控 Serverless
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决