Flink checkpoint| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习 Flink checkpoint。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 Flink checkpoint】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10044


Flink checkpoint

 

内容简介:

一、Checkpoint 与 state 的关系

二、什么是 state

三、如何在 Flink 中使用 state

四、Checkpoint 的执行机制

五、回答部分问题

 

一、Checkpoint 与 state 的关系

Checkpoint 是一个动词,它最终如果顺利会产生一个 complete Checkpoint ,它是一个动词,也就是在下图中会看到红框里面一共有 Trigger 了569027,没有 Failed ,如下图:

image.png

State 其实就是 Checkpoint 所必备的主要的数据,下图中可以看到官方中 state 的 size 是9.17 KB ,是很小的,如下图:

image.png

 

二、什么是 state

(1)state 的含义

先看一段代码:

env.socketTextStream(localhost,9000)

//split up the lines in pairs (2-tuples)

.flatMap(new Tokenizer())

//group by the tuple field 0 and sum up tuple field 1

.keyBy(0).sum(1)

.print();

这是一段非常直白的 Wellcome 的代码,如果执行上述代码,它会去监控本地的9000端口的数据,然后本地启动 netcat ,如果输入 hello world ,执行程序会自动输出什么?

之前提到它是 Wellcome ,便会输出 hello 1 、 world 1 ,那么如果再次输入 hello world  执行程序会输出什么?

hello a 、 world a ,为什么第二次流式计算,

第一次 hello world 来的时候就已经把结果输入进去了,

为什么第二次 hello world 来的时候 Flink 知道它已经看到过一次 hello world ,这就是 state 的作用, 

因为它把这个存储在 state 里面,它知道 hello 和 world 分别出现过一次, State 我们可以认为是流式计算中持久化的状态,可以看到数据输入进来,最后在每个地方会存储到 state backend 里面,  state backend 就是存储

state 的,如下图:

image.png

(2)state的分类

①Keyed State

在 Flink 中如果对 state 进行分类,

有这样两类:一个是 Keyed State ,它只能应用于 KeyedStream 的函数与操作中,它是已经划分好的,换言之每个 Key 只能属于某一个 Keyed State ,刚才实例中提到的的 hello world 的中的 hello 在作业没有发生变化的情况下,只要它一直在正常的运行中,那么 hello 永远只会出现在某一个State 的并发上面,不会去其他地方,再来看这段

word count 的代码:

env.socketTextStream(localhost,9000)

//split up the lines in pairs (2-tuples)

.flatMap(new Tokenizer())

//group by the tuple field 0 and sum up tuple field 1

.keyBy(0).sum(1)

.print();

其中的 keyBy 便是之前提到的使用 KeyedStream ,要创建KeyedStream ,并要对 key 进行划分,不同的 task 上不会出现相同的 key ,其中 sum 是调用内置的 StreamGroupedReduce UD 进行一个累积的计算。

在进一步讲之前先看一下下图中 keyBy  左边有三个并发,右边也是三个并发,于是左边的词进来之后用 keyBy ,并会根据 key 进行自分发,如下图:

image.png

可以看到 hello world  中的 hello 永远只会去到右下方并发路task 上面去,这个就是 keyBy 语。

②Operator State

另外一个 state 是 Operator State ,这个不需要作用在 KeyStream ,换言之无需 keyBy 也能拿到它,它用每一个 operator state 都仅与一个 operator 的实例绑定, Operator State 中比较常见的是 source state ,例如记录当前 source 的 offset ,

再来看一段 word count 代码 :

env.fromElements(WordCountData.WORDS)

//split up the lines in pairs (2-tuples) containing: (word,1)

.flatMap(new Tokenizer())

//group by the tuple field 0 and sum up tuple field 1

.keyBy(0).sum(1)

.print();

可以看到 env 后面的 Function 换成了 fromElements ,这个源码里面是 Operator State ,

它会调用内置的 romElementsFunction ,以下这段代码就是源码里面的一部分,

如下:

public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedFunction { 

private transient ListState<Integer> checkpointedState;

public FromElementsFunction(TypeSerializer<T> serializer,T...elments) throws I0Exception {

this(serializer,Arrays.asList(elements));

}

可以看到它用到了 ListState 和 checkpointState ,这个就存储了相关的 Operator State 信息。

下面这个图是整个 State 负类子类的关系,这个比较黄的词是平常中经常用到的 State ,那么有个问题来了 这些 State 是类的定义,它们哪些属于 Key State ?

哪些属于 Operator State ?

比如是像 ValueState 代表说是 key 所对应的value 是单值 value , MapState 的 key 所对应的是 value 是map, ListState 的 key 所对应的 value 是 list ,

如下图:

image.png

看看其中哪些是 key 的 state ?即 ValueState 、 MapState 、ListState ,说明一下AggregatingState 、 ReducingState、 FoldingState 其实都是对 ValueState 的一个封装,上面加了一个function,那么哪些是 Operator State ?

即 ListState BroadcastState 。

③Managed 和 Raw

除了 key 和 Operator 一个维度来区分 State 还有一个维度,称之为 Managed 和 Raw  ,其中 Managed State 是由 Flink 管理的state ,刚才举例的所有 state 均是 managed ;

而 Raw State是Flink 仅提供 stream 可以进行存储数据,从它的角度来看这些可能只是一些 bytes , 比如下面的这段方法,就是相关的 Raw State的东西,

如下:

public interface StateSnapshotContext extends FunctionSnapshotContext {

/**

* Returns an output stream for keyed state

*/

KeyedStateCheckpointOutputStream getRawKeyedOperatorStateOutput() throws Exception;

/**

* Returns an output stream for operator state

*/

OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Exception;

 

二、如何在 Flink 中使用 State

先来看这段 word count 代码:

env.fromElements(WordCountData.WORDS)

//split up the lines in pairs (2-tuples) containing: (word,1)

.flatMap(new Tokenizer())

//group by the tuple field 0 and sum up tuple field1

.keyBy(0).sum(1)

.print();

之前sum 里面调用了内置的 StreamGroupedReduce

再来看看这段源码:

public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN,ReduceFunction<IN>>

implements OneInputStreamOperator<IN,IN> {

private transient ValueState<IN> values;

@Override

public void open() throws Exception {

Super.open():

ValueStateDescriptor<IN> stated = new valueStateDescriptor<>(STATE_NAME, serializer);

values = getRuntimeContext().getState(stateId);

}

@Override

public void processElement(StreamRecord<IN> element) throws Exception {

IN value = element.getValue():

IN currentValue = values.value():

if (currentValue != null) {

IN reduce = userFunction.reduce(currentValue, value);

values.update(reduceed);

output.collect(element.replace(reduced));

} else {

value.update(value):

output.collect(element.replace(value));

} 

}

可以看到里面有个 ValueState ,它就是 StreamGroupedReduce 使用 state 的地方 open 方法里面可以对它进行初始化

通过 RuntimeContext 去访问 state如果你想使用它就在processElement 中使用包括 valueupdate 则是对 state 进行更新

另外一方面如果在 state 中使用 Operator State同样使用内置的 FromElementsFunction我们以源码的方式去展示一下

如下:

public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedFunction {

private transient ListState<Integer> checkpointedState

@Override

public void initializeState(FunctionInitializationContext context) throws Exception {

Precondition.checkState(this.checkpointedState == null,

The + getClass().gesSimpleName() + has already been initialized.);

this.checkpointedState = context.getOperatorStateStore().getLiState(

new ListStateDescriptor<>(from-elements-state,IntSerializer,INSTANCE)

);

if (context.isRestored()) {

List<Integer> retrievedStates = new ArrayList<>():

for (Integer entry :this.checkpointedState.get()){

retrievedStates.add(entry);

//given that the parallelism of the function is1,we can only have 1 state

Preconditions.checkArgument(retrievedStates,sze() == 1,getClass().getSimpleName() +retrieved invalid state.);

this.numElementsToSkip = retrievedStates.get(0);

}

}

@Override

public void snapshotState(FunctionSnapshotContextcontext) throws Exception {

Preconditions.checkState(this.checkpointedState != null,

The + getClass().getSimpleName() + has not been properly initialized.);

this.checkpointedState.clear();

this.checkpointedState.add(this.numElementsEmitted);

} 

要访问它对 Operator State 进行显示的初始化,即这个 list 里面需要去存储需要去初始化是需要显示声明的换言之需要继承接口来显示的对进行初始化和存储checkpointedState就是这里使用的 Operator State可以在另外的方法里面

比如在 process 里就可以对此进行读写更新的这个逻辑实际上就是每次在snapshot 时把内存里面的 numElementsEmitted 加载到 ListState里面相当于就是说下次如果发生 failed会直接从 numElementsEmitted 进行恢复

 

四、Checkpoint 的执行机制

(1)State 的存储

在讲 Checkpoint 执行机制之前先对 State 的存储进行一个认知checkpoint 的主要数据就是 State那么 State 存储在哪里?就存储在每个Operator state-backend 里面,

如下有三个state-backend 类型每种 state- backend 会分别创建出自己的KeyedStateBackend  OperatorStateBackend

这三种state- backend 均会创建同一种 OperatorState Backend我们称之为 DefaultOperatorStateBackend就是一个存内存的

这三类会创建 keyed state- backend  Memory  Fs 来说它都会创建一个 HeapKeyedStateBackend也就是储存在内存中的,

RocksDBStateBackend 会创建一个

RocksDBKeyedStateBackendkeyed state- backend 顾名思义会使用 RocksDBKeyedStateBackend存储框内是需要使用 state- backend 的一个声明

比如这里写了要使用 FsStateBackend要创建一个 node 进行一个配置如果不配置,配置默认会创建一个 

MemoryStateBackend但需要 keyed state-backend会有 MemoryStateBackend 专门创建相关的

HeapKeyedStateBackend  RocksDBKeyedStateBackend

从这张图上看 MemoryStateBackend  FsStateBackend 完全没有任何区别创建 keyed state-backend 一般都是创建 HeapKeyedBackend ,它们的区别在于他们在做 Checkpoint 的时候机制不一样,即对Memory 来说数据一般是直接返回给 master 节点而FsStateBackend 它将文件路径返回给 master而RocksDBStateBackend  是将数据写入文件中将文件路径传递给master

下图中绿颜色的数据都存储在内存中黄颜色的数据存储在 RocksDB 数据是混合的

image.png

(2)HeapKeyedStateBackend 存储格式

支持异步 checkpoint (默认):

CopyOnWriteStateTable<k,N,S>[],整体相当于一个 map

仅支持同步 checkpoint : Map<N,Map<K,S>>[],由嵌套 map 的数组构成

在 MemoryStateBackend 内使用时, checkpoint序列化数据阶段默认有最大 5MB 数据的限制。

(3)RocksDBKeyedStateBackend 存储格式

每个 column family 互相之间是独立的如下图可以看到 State1、State2 ,它们是不同的 column familycolumn family 可以区分开不同的文件可以看一下整个文件是怎么存储的可以看到RocksDB  key 部分分别是 keyGroupKey  Namespace keyGroupkeyGroup Key 是表示说这个 key 属于哪个 group是个 int key 就是用户所定义的  process-key

Namespace其实默认情况下是 void-Namespace它主要是使用在 Window 里面表示 key 属于哪个 Window

image.png

(4)Checkpoint 执行流程

①接下来看 Checkpoint 执行流程checkpoint 是存储在 JM 中CheckpointCoordinator 中的首先它会所有的 source 去切割 checkpoint

可以看到如图所示:

image.png

②当 task 收到所有的 barrier 之后它会执行一次快照它在执行快照的时候会把 barrier 往下游进行广播然后会将自己的状态异步的写入到持久化存储中也就是下图中红色的三角形

如下图:

image.png

③当 source 节点异步的写下来之后它会产生一个 state handle返回给 checkpoint coordinator这个 handle 的表针告诉 checkpoint 说”我应该做完了我自己这部分 checkpoint ,然后我通过源数据告诉你”同时可以看到 barrier 在整个拐弯里面聚集往下流

如下图:

image.png

④最后一个节点是 sink 节点涉及到 barrier 之后它同样会执行checkpoint这里假设 sink 节点是 RocksDBStateBackend那么有个增量 checkpoint首先它会在收集齐 barrier 之后它会执行一次 checkpoint也就是红色的大三角所谓这样的 checkpoint 其实对于 Flink 而言数据是增量的Flink  task 里面记录了一些之前上传成功的 checkpoint 的信息那么它可以做一次过滤就是出来之前还没有上传过得文件也就是图示中紫色的小三角将这些部分文件上传到持久化存储中

如下图:

image.png

⑤同样的当存储完之后,同样将需要的 state handle 告诉coordinator ,这时已经收集齐了所有三个 task  handle ,在整个Flink 中这次的 checkpoint 是完成的,

如下图:

image.png

⑥它会这些 handle 再转换成 checkpoint 对象这个对象是包含了整个所有 map 的文化再将这些数据传入目录下面去这样整个checkpoint 就完成了

如下图:

image.png

(5)EXACTLY ONCE 和 AT LEAST ONCE

之前也提到了说会有一个 barrier 对于若干 check 来说即EXACT ONCE 是说当收集齐 barrier 之前所有数据是先会 buffer不往下流传 barrier 收集齐之后再往下传那就意味着说数据是不会重复处理的而对于 AT LAEAST ONCE 来说它没有一个 buffer 的过程只要收集到 barrier 就直接往下传最后再强调一下所谓的真正的 EXACTLY ONCE Flink所说的 EXACTLY ONCE 是它的计算工程可以做到EXACTLY ONCE end-to-end 的EXACTLY ONCE 是需要 source  sink 的支持也就是说 source可以 replayed比如可以 replay 回一分钟之前的状态

同样 sink也是需要支持的目前例如卡不卡是两阶段提交了那么它就可以实现 sink  EXACTLY ONCE所以说对于用户来说比如把数据,会发现数据有可能是重复写入那其实是因为 sink 并没有支持EXACTLY ONCE所以要想真正做到 end-to-end 需要将 sink 升级成支持 EXACTLY ONCE 否则对普通的文件效果数据可能会被重复写出去会发现你的输出结果就冗余了

可参考下图:

image.png

(6) 增量 checkpoint

之前提到了所谓的增量 checkpoint其实是增量之前没有上传过的数据可以看看下图的实例RocksDB 本身做了三四个 checkpoint

第一次的时候它的文件是 123sst  MANIFEST  sst 文件这里涉及到数据库的概念sst 文件生产之后是不可变得不管执行多少次 checkpoint哪怕一万次只要 checkpoint 中有sst 文件那么文件其实都是一模一样的正是利用了这点才实现了所谓的增量的 checkpoint然后可以在下图看到在第二个checkpoint 的时候是 124sst  MANIFEST如果第一次成功只需要上传4点 sst 和一个可变文件 MANIFEST因为 MANIFEST  是源数据文件是可变的

所以不管之前有没有上传成功它都要上传 checkpoint 三的时候可以在下图中看到有 45  sst 和MANIFEST那么这时候 5.sst 也是个新文件需要将它上传 MANIFEST 文件同样也需要上传

如果整个 checkpoint 失败了,不是说 task 完成的 checkpoint ,而是因为其他 task 导致整个checkpoint 失败被认为是不可用的那么 Flink 机制会保证说checkpoint 数据是不可信的当为 checkpoint 三的时候说明4. sst 文件并没有上传上去那么这时候会把4和5 sst 文件都上传一遍

如下图: 

image.png

(7)如何从已停止的作业进行状态恢复

 Flink 里面有两个概念分别是 Savepoint  ExternalizedCheckpoint Savepoint 就是由用户管理触发的数据它的格式是标准化的允许作业升级或者配置变更,比较慢,用户也可以从Externalized Checkpoint 机制继续 Flink 的恢复之后可能Savepoint 对于非必要场景大部分可以被 Externalized Checkpoint 所替代

(8)已停止的作业进行状态恢复

 Keyed State 的改并发

可以看到下图上面的并发路是3下面的并发路是4改并发有个签订是说 KeyGroup 总数是不变的可以看到对于 Subtask0 来说它之前的 KeyGroup 是0-3 在它改并发后减少了变成了0-2也就是需要将之前自己的0-3抽出0-8的部分给自己使用对于新的Subtask 来说它需要继承老 Subtask 一部分,并进行改并发

如下图:

image.png

 Operator State 改并发

一共有三种划分分别是如果是使用 ListState 就是均匀划分如果是使用 UnionListState 就是 Union 划分Union 划分是说每个并发上面的数据都会拿到之前所有的数据的总和每个地方都可以拿到之前所有 state 的数据然后进行过滤筛选而不是用之前所有的 state 进行下一次的 checkpoint所以要正确的使用unionstate

Boradcast 就比较明确了因为每个数据都一样还是获得之前那个备份

 

五、回答部分问题

(1) state 大小的推荐什么时候用 stateBackend ?其实在生产环境中可能只能用 RocksDBStateBackend ,虽然用FsStateBackend 也不是不可以但是需要对 State 有个非常深刻的了解也就是说 checkpoint 数据不会突然间的增大或者减少,使用FsStateBackend 可能会导致作业挂掉 

(2) checkpoint 的时候sink 如果挂了 Flink 会有什么处理?

之前在 Checkpoint 执行流程图中讲到过比如 sink 挂了它没有把 state handle 返回回去这个时候会有两种情况:一种是直接告诉 JM  ,告诉这种情况废掉了没有用那么这时 JM 会把它所收到的所有的 handle逐一异步的将 handle 路径的文件全部给清理掉

第二种情况是做失败了那么 checkpoint 这边有一个超时的时间限制 默认10分钟也就是从出发到10分钟这个时间仍然没有收集齐所有的 handle  会认为 checkpoint 超时了也是不可用的也会将所有的 checkpoint handle 对应的文件异步的进行删除

(3)RocksDB 文件对应一个或几个 Operator State 结构吗?如果Operator State 内容没有改变,它们使用的文件应该是一样的吗?可能很多人觉得一开始不太理解什么是增量 checkpoint其实增量checkpoint 它所依赖的基础是 LSM当文件生成完备之后这个文件就不会再更新了,它是基于这样一种概念

(4) RocksDBState 开销很大吗?

,因为 RocksDB 写的时候要进行序列化,读的时候要进行一些反序列化,那么当KB比较大时,开销确实很大

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
JSON Java API
Flink CDC 2.0 支持全量故障恢复,可以从 checkpoint 点恢复。
【2月更文挑战第17天】Flink CDC 2.0 支持全量故障恢复,可以从 checkpoint 点恢复。
40 3
|
2月前
|
消息中间件 Java Kafka
Flink背压问题之checkpoint超时如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
存储 SQL canal
Flink CDC数据同步问题之同步数据到checkpoint失败如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。
|
4月前
|
分布式计算 资源调度 Hadoop
Hadoop学习笔记(HDP)-Part.18 安装Flink
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
121 2
Hadoop学习笔记(HDP)-Part.18 安装Flink
|
9月前
|
Web App开发 消息中间件 固态存储
Flink Unaligned Checkpoint 在 Shopee 的优化和实践
Tech Lead of Shopee Flink Runtime Team 范瑞,在 Flink Forward Asia 2022 核心技术的分享。
533 0
Flink Unaligned Checkpoint 在 Shopee 的优化和实践
|
Web App开发 消息中间件 机器学习/深度学习
Flink Unaligned Checkpoint 在 Shopee 的优化和实践
介绍 Shopee 对 Unaligned Checkpoint 的改进、对 Flink 社区的贡献以及内部的实践和落地。
Flink Unaligned Checkpoint 在 Shopee 的优化和实践
|
前端开发 数据可视化 关系型数据库
用 PolarDB - X + Flink 搭建实时数据大屏|学习笔记(三)
快速学习用 PolarDB - X + Flink 搭建实时数据大屏
344 0
用 PolarDB - X + Flink 搭建实时数据大屏|学习笔记(三)
|
存储 运维 监控
如何开通实时计算 Flink 版|学习笔记(三)
快速学习如何开通实时计算 Flink 版
290 0
如何开通实时计算 Flink 版|学习笔记(三)
|
机器学习/深度学习 SQL 人工智能
实时计算 Flink 训练营场景与应用|学习笔记(三)
快速学习实时计算 Flink 训练营场景与应用
269 0
实时计算 Flink 训练营场景与应用|学习笔记(三)
|
SQL 存储 搜索推荐
实时计算 Flink 训练营场景与应用|学习笔记(二)
快速学习实时计算 Flink 训练营场景与应用
166 0
实时计算 Flink 训练营场景与应用|学习笔记(二)