Flink checkpoint(二)| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习 Flink checkpoint。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 Flink checkpoint(二)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10044


Flink checkpoint(二)


四、Checkpoint 的执行机制

(1)State 的存储

在讲 Checkpoint 执行机制之前先对 State 的存储进行一个认知checkpoint 的主要数据就是 State那么 State 存储在哪里?就存储在每个Operator state-backend 里面,

如下有三个state-backend 类型每种 state- backend 会分别创建出自己的KeyedStateBackend  OperatorStateBackend

这三种state- backend 均会创建同一种 OperatorState Backend我们称之为 DefaultOperatorStateBackend就是一个存内存的

这三类会创建 keyed state- backend  Memory  Fs 来说它都会创建一个 HeapKeyedStateBackend也就是储存在内存中的,

RocksDBStateBackend 会创建一个

RocksDBKeyedStateBackendkeyed state- backend 顾名思义会使用 RocksDBKeyedStateBackend存储框内是需要使用 state- backend 的一个声明

比如这里写了要使用 FsStateBackend要创建一个 node 进行一个配置如果不配置,配置默认会创建一个 

MemoryStateBackend但需要 keyed state-backend会有 MemoryStateBackend 专门创建相关的

HeapKeyedStateBackend  RocksDBKeyedStateBackend

从这张图上看 MemoryStateBackend  FsStateBackend 完全没有任何区别创建 keyed state-backend 一般都是创建 HeapKeyedBackend ,它们的区别在于他们在做 Checkpoint 的时候机制不一样,即对Memory 来说数据一般是直接返回给 master 节点而FsStateBackend 它将文件路径返回给 master而RocksDBStateBackend  是将数据写入文件中将文件路径传递给master

下图中绿颜色的数据都存储在内存中黄颜色的数据存储在 RocksDB 数据是混合的

image.png

(2)HeapKeyedStateBackend 存储格式

支持异步 checkpoint (默认):

CopyOnWriteStateTable[],整体相当于一个 map

仅支持同步 checkpoint : Map>[],由嵌套 map 的数组构成

在 MemoryStateBackend 内使用时, checkpoint序列化数据阶段默认有最大 5MB 数据的限制。

(3)RocksDBKeyedStateBackend 存储格式

每个 column family 互相之间是独立的如下图可以看到 State1、State2 ,它们是不同的 column familycolumn family 可以区分开不同的文件可以看一下整个文件是怎么存储的可以看到RocksDB  key 部分分别是 keyGroupKey  Namespace keyGroupkeyGroup Key 是表示说这个 key 属于哪个 group是个 int key 就是用户所定义的  process-key

Namespace其实默认情况下是 void-Namespace它主要是使用在 Window 里面表示 key 属于哪个 Window

image.png

(4)Checkpoint 执行流程

①接下来看 Checkpoint 执行流程checkpoint 是存储在 JM 中CheckpointCoordinator 中的首先它会所有的 source 去切割 checkpoint

可以看到如图所示:

image.png

②当 task 收到所有的 barrier 之后它会执行一次快照它在执行快照的时候会把 barrier 往下游进行广播然后会将自己的状态异步的写入到持久化存储中也就是下图中红色的三角形

如下图:

image.png

③当 source 节点异步的写下来之后它会产生一个 state handle返回给 checkpoint coordinator这个 handle 的表针告诉 checkpoint 说”我应该做完了我自己这部分 checkpoint ,然后我通过源数据告诉你”同时可以看到 barrier 在整个拐弯里面聚集往下流

如下图:

image.png

④最后一个节点是 sink 节点涉及到 barrier 之后它同样会执行checkpoint这里假设 sink 节点是 RocksDBStateBackend那么有个增量 checkpoint首先它会在收集齐 barrier 之后它会执行一次 checkpoint也就是红色的大三角所谓这样的 checkpoint 其实对于 Flink 而言数据是增量的Flink  task 里面记录了一些之前上传成功的 checkpoint 的信息那么它可以做一次过滤就是出来之前还没有上传过得文件也就是图示中紫色的小三角将这些部分文件上传到持久化存储中

如下图:

image.png

⑤同样的当存储完之后,同样将需要的 state handle 告诉coordinator ,这时已经收集齐了所有三个 task  handle ,在整个Flink 中这次的 checkpoint 是完成的,

如下图:

image.png

⑥它会这些 handle 再转换成 checkpoint 对象这个对象是包含了整个所有 map 的文化再将这些数据传入目录下面去这样整个checkpoint 就完成了

如下图:

image.png

(5)EXACTLY ONCE 和 AT LEAST ONCE

之前也提到了说会有一个 barrier 对于若干 check 来说即EXACT ONCE 是说当收集齐 barrier 之前所有数据是先会 buffer不往下流传 barrier 收集齐之后再往下传那就意味着说数据是不会重复处理的而对于 AT LAEAST ONCE 来说它没有一个 buffer 的过程只要收集到 barrier 就直接往下传最后再强调一下所谓的真正的 EXACTLY ONCE Flink所说的 EXACTLY ONCE 是它的计算工程可以做到EXACTLY ONCE end-to-end 的EXACTLY ONCE 是需要 source  sink 的支持也就是说 source可以 replayed比如可以 replay 回一分钟之前的状态

同样 sink也是需要支持的目前例如卡不卡是两阶段提交了那么它就可以实现 sink  EXACTLY ONCE所以说对于用户来说比如把数据,会发现数据有可能是重复写入那其实是因为 sink 并没有支持EXACTLY ONCE所以要想真正做到 end-to-end 需要将 sink 升级成支持 EXACTLY ONCE 否则对普通的文件效果数据可能会被重复写出去会发现你的输出结果就冗余了

可参考下图:

image.png

(6) 增量 checkpoint

之前提到了所谓的增量 checkpoint其实是增量之前没有上传过的数据可以看看下图的实例RocksDB 本身做了三四个 checkpoint

第一次的时候它的文件是 123sst  MANIFEST  sst 文件这里涉及到数据库的概念sst 文件生产之后是不可变得不管执行多少次 checkpoint哪怕一万次只要 checkpoint 中有sst 文件那么文件其实都是一模一样的正是利用了这点才实现了所谓的增量的 checkpoint然后可以在下图看到在第二个checkpoint 的时候是 124sst  MANIFEST如果第一次成功只需要上传4点 sst 和一个可变文件 MANIFEST因为 MANIFEST  是源数据文件是可变的

所以不管之前有没有上传成功它都要上传 checkpoint 三的时候可以在下图中看到有 45  sst 和MANIFEST那么这时候 5.sst 也是个新文件需要将它上传 MANIFEST 文件同样也需要上传

如果整个 checkpoint 失败了,不是说 task 完成的 checkpoint ,而是因为其他 task 导致整个checkpoint 失败被认为是不可用的那么 Flink 机制会保证说checkpoint 数据是不可信的当为 checkpoint 三的时候说明4. sst 文件并没有上传上去那么这时候会把4和5 sst 文件都上传一遍

如下图: 

image.png

(7)如何从已停止的作业进行状态恢复

 Flink 里面有两个概念分别是 Savepoint  ExternalizedCheckpoint Savepoint 就是由用户管理触发的数据它的格式是标准化的允许作业升级或者配置变更,比较慢,用户也可以从Externalized Checkpoint 机制继续 Flink 的恢复之后可能Savepoint 对于非必要场景大部分可以被 Externalized Checkpoint 所替代

(8)已停止的作业进行状态恢复

 Keyed State 的改并发

可以看到下图上面的并发路是3下面的并发路是4改并发有个签订是说 KeyGroup 总数是不变的可以看到对于 Subtask0 来说它之前的 KeyGroup 是0-3 在它改并发后减少了变成了0-2也就是需要将之前自己的0-3抽出0-8的部分给自己使用对于新的Subtask 来说它需要继承老 Subtask 一部分,并进行改并发

如下图:

image.png

 Operator State 改并发

一共有三种划分分别是如果是使用 ListState 就是均匀划分如果是使用 UnionListState 就是 Union 划分Union 划分是说每个并发上面的数据都会拿到之前所有的数据的总和每个地方都可以拿到之前所有 state 的数据然后进行过滤筛选而不是用之前所有的 state 进行下一次的 checkpoint所以要正确的使用unionstate

Boradcast 就比较明确了因为每个数据都一样还是获得之前那个备份

 

五、回答部分问题

(1) state 大小的推荐什么时候用 stateBackend ?其实在生产环境中可能只能用 RocksDBStateBackend ,虽然用FsStateBackend 也不是不可以但是需要对 State 有个非常深刻的了解也就是说 checkpoint 数据不会突然间的增大或者减少,使用FsStateBackend 可能会导致作业挂掉 

(2) checkpoint 的时候sink 如果挂了 Flink 会有什么处理?

之前在 Checkpoint 执行流程图中讲到过比如 sink 挂了它没有把 state handle 返回回去这个时候会有两种情况:一种是直接告诉 JM  ,告诉这种情况废掉了没有用那么这时 JM 会把它所收到的所有的 handle逐一异步的将 handle 路径的文件全部给清理掉

第二种情况是做失败了那么 checkpoint 这边有一个超时的时间限制 默认10分钟也就是从出发到10分钟这个时间仍然没有收集齐所有的 handle  会认为 checkpoint 超时了也是不可用的也会将所有的 checkpoint handle 对应的文件异步的进行删除

(3)RocksDB 文件对应一个或几个 Operator State 结构吗?如果Operator State 内容没有改变,它们使用的文件应该是一样的吗?可能很多人觉得一开始不太理解什么是增量 checkpoint其实增量checkpoint 它所依赖的基础是 LSM当文件生成完备之后这个文件就不会再更新了,它是基于这样一种概念

(4) RocksDBState 开销很大吗?

,因为 RocksDB 写的时候要进行序列化,读的时候要进行一些反序列化,那么当KB比较大时,开销确实很大

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用问题之mini-cluster模式下,怎么指定checkpoint的时间间隔
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之如何通过savepoint恢复Flink CDC任务
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之实时计算 Flink版操作报错合集之使用两个并行度时会报错,一个并行度不会报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到检查点(checkpoint)状态不单调递增,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
存储 分布式数据库 Apache
【Flink】Flink的Checkpoint 存在哪里?
【4月更文挑战第19天】【Flink】Flink的Checkpoint 存在哪里?
|
6月前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错合集之在尝试触发checkpoint时遇到了报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
63 0
|
6月前
|
SQL 消息中间件 关系型数据库
实时计算 Flink版产品使用合集之flink-cdc-oracle 可以并行读取吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
存储 流计算
Flink Checkpoint所有配置解读
Flink Checkpoint所有配置解读
205 0
|
存储 机器学习/深度学习 算法
|
缓存 运维 监控
Flink 1.11 Unaligned Checkpoint 解析
由于 Checkpoint 与反压的耦合,反压反过来也会作用于 Checkpoint,导致 Checkpoint 的种种问题。针对于此,Flink 在 1.11 引入 Unaligned Checkpint 来解耦 Checkpoint 机制与反压机制,优化高反压情况下的 Checkpoint 表现。
Flink 1.11 Unaligned Checkpoint 解析
下一篇
无影云桌面