那个男人竟然不会Flink的CheckPoint机制(一)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 这篇来讲Flink另一个比较重要的知识,就是它的容错机制checkpoint原理。

开胃菜(复习)


作为用户,我们写好Flink的程序,上管理平台提交,Flink就跑起来了(只要程序代码没有问题),细节对用户都是屏蔽的。

1.jpg

实际上大致的流程是这样的:

  1. Flink会根据我们所写代码,会生成一个StreamGraph的图出来,来代表我们所写程序的拓扑结构。
  2. 然后在提交的之前会将StreamGraph这个图优化一把(可以合并的任务进行合并),变成JobGraph
  3. JobGraph提交给JobManager
  4. JobManager收到之后JobGraph之后会根据JobGraph生成ExecutionGraphExecutionGraphJobGraph 的并行化版本)
  5. TaskManager接收到任务之后会将ExecutionGraph生成为真正的物理执行图

2.jpg

可以看到物理执行图真正运行在TaskManagerTransformSink之间都会有ResultPartitionInputGate这俩个组件,ResultPartition用来发送数据,而InputGate用来接收数据。

3.jpg

屏蔽掉这些Graph,可以发现Flink的架构是:Client->JobManager->TaskManager

4.jpg

从名字就可以看出,JobManager是干「管理」,而TaskManager是真正干活的。回到我们今天的主题,checkpoint就是由JobManager发出。

5.jpg

Flink本身就是有状态的,Flink可以让你选择执行过程中的数据保存在哪里,目前有三个地方,在Flink的角度称作State Backends

  • MemoryStateBackend(内存)
  • FsStateBackend(文件系统,一般是HSFS)
  • RocksDBStateBackend(RocksDB数据库)

同样地,checkpoint信息就是保存在State Backends

6.jpg

先来简单描述一下checkpoint的实现流程:

checkpoint的实现大致就是插入barrier,每个operator收到barrier就上报给JobManager,等到所有的operator都上报了barrier,那JobManager 就去完成一次checkpointi

7.jpg

因为checkpoint机制是Flink实现容错机制的关键,我们在实际使用中,往往都要配置checkpoint相关的配置,例如有以下的配置:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

简单铺垫过后,我们就来撸源码了咯?


Checkpoint(原理)


JobManager发送checkpoint


从上面的图我们可以发现 checkpoint是由JobManager发出的,并且JobManager收到的是JobGraph,会将JobGraph转换成ExecutionGraph

这块在JobMaster的构造器就能体现出来:

public JobMaster(...) throws Exception {
  // 创建ExecutionGraph
  this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
 }

我们点击进去createAndRestoreExecutionGraph看下:

8.jpg

CheckpointCoordinator这个名字,就觉得他很重要,有木有?它从ExecutionGraph来,我们就进去createExecutionGraph里边看看呗。

点了两层buildGraph()方法,可以看到在方法的末尾处有checkpoint相关的信息:

executionGraph.enableCheckpointing(
    chkConfig.getCheckpointInterval(),
    chkConfig.getCheckpointTimeout(),
    chkConfig.getMinPauseBetweenCheckpoints(),
    chkConfig.getMaxConcurrentCheckpoints(),
    chkConfig.getCheckpointRetentionPolicy(),
    triggerVertices,
    ackVertices,
    confirmVertices,
    hooks,
    checkpointIdCounter,
    completedCheckpoints,
    rootBackend,
    checkpointStatsTracker);

前面的几个参数就是我们在配置checkpoint参数的时候指定的,而triggerVertices/confirmVertices/ackVertices我们溯源看了一下,在源码中注释也写得清清楚楚的。

// collect the vertices that receive "trigger checkpoint" messages.
// currently, these are all the sources 
List<JobVertexID> triggerVertices = new ArrayList<>();
// collect the vertices that need to acknowledge the checkpoint
// currently, these are all vertices
List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());
// collect the vertices that receive "commit checkpoint" messages
// currently, these are all vertices
List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size());

下面还是进去enableCheckpointing()看看大致做了些什么吧:

// 将上面的入参分别封装成ExecutionVertex数组
ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
// 创建触发器
checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
// 创建checkpoint协调器
checkpointCoordinator = new CheckpointCoordinator(
  jobInformation.getJobId(),
  interval,
  checkpointTimeout,
  minPauseBetweenCheckpoints,
  maxConcurrentCheckpoints,
  retentionPolicy,
  tasksToTrigger,
  tasksToWaitFor,
  tasksToCommitTo,
  checkpointIDCounter,
  checkpointStore,
  checkpointStateBackend,
  ioExecutor,
  SharedStateRegistry.DEFAULT_FACTORY);
// 设置触发器
checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
// 状态变更监听器
// job status changes (running -> on, all other states -> off)
if (interval != Long.MAX_VALUE) {
  registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
}

值得一提的是,点进去CheckpointCoordinator()构造方法可以发现有状态后端StateBackend的身影(因为checkpoint就是保存在所配置的状态后端)

9.jpg

如果Job的状态变更了,CheckpointCoordinatorDeActivator是能监听到的。

10.jpg

当我们的Job启动的时候,又简单看看startCheckpointScheduler()里边究竟做了些什么操作:

11.jpg

它会启动一个定时任务,我们具体看看定时任务具体做了些什么ScheduledTrigger,然后看到比较重要的方法:triggerCheckpoint()

这块代码的逻辑有点多,我们简单来总结一下

  1. 前置检查(是否可以触发checkpoint,距离上一次checkpoint的间隔时间是否符合...)
  2. 检查是否所有的需要做checkpoint的Task都处于running状态
  3. 生成checkpointId,然后生成PendingCheckpoint对象来代表待处理的检查点
  4. 注册一个定时任务,如果checkpoint超时后取消checkpoint

12.jpg

注:检查task的任务状态时,只会把sourcetask封装给进Execution[]数组

13.jpg14.jpg15.jpg16.jpg

JobManager侧只会发给sourcetask发送checkpoint

17.jpg


JobManager发送总结


贴的图有点多,最后再来简单总结一波,顺便画个流程图,你就会发现还是比较清晰的。

  1. JobManager 收到client提交的JobGraph
  2. JobManger 需要通过JobGraph生成ExecutionGraph
  3. 在生成ExcutionGraph的过程中实际上就会触发checkpoint的逻辑
  1. 定时任务会前置检查(其实就是你实际上配置的各种参数是否符合)
  2. 判断checkpoint相关的task是否都是running状态,将source的任务封装到Execution数组中
  3. 创建checkpointID/checkpointStorageLocation(checkpoint保存的地方)/PendingCheckpoint(待处理的checkpoint)
  4. 创建定时任务(如果当checkpoint超时,会将相关状态清除,重新触发)
  5. 真正触发checkPointTaskManager(只会发给sourcetask)
  6. 找出所有source和需要ack的Task
  7. 创建checkpointCoordinator 协调器
  8. 创建CheckpointCoordinatorDeActivator监听器,监听Job状态的变更
  9. Job启动时,会触发ScheduledTrigger 定时任务

18.jpg19.jpg


TaskManager(source Task接收)


前面提到了,JobManager 在生成ExcutionGraph时,会给所有的source 任务发送checkpoint,那么source收到barrier又是怎么处理的呢?会到TaskExecutor这里进行处理。

TaskExecutor有个triggerCheckpoint()方法对接收到的checkpoint进行处理:

20.jpg

进入triggerCheckpointBarrier()看看:

21.jpg

再想点进去triggerCheckpoint()看实现时,我们会发现走到performCheckpoint()这个方法上:

22.jpg

从实现的注释我们可以很方便看出方法大概做了什么:

23.jpg

这块我们先在这里放着,知道Source的任务接收到Checkpoint会广播到下游,然后会做快照处理就好。

下面看看非Source 的任务接收到checkpoint是怎么处理的。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
JSON Java API
Flink CDC 2.0 支持全量故障恢复,可以从 checkpoint 点恢复。
【2月更文挑战第17天】Flink CDC 2.0 支持全量故障恢复,可以从 checkpoint 点恢复。
39 3
|
2月前
|
消息中间件 Java Kafka
Flink背压问题之checkpoint超时如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
存储 SQL canal
Flink CDC数据同步问题之同步数据到checkpoint失败如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。
|
3月前
|
消息中间件 存储 Kafka
在Flink中,可以通过配置`KafkaConsumer`的`properties`参数来设置两个不同的SASL机制
【1月更文挑战第19天】【1月更文挑战第91篇】在Flink中,可以通过配置`KafkaConsumer`的`properties`参数来设置两个不同的SASL机制
78 3
|
8月前
|
分布式计算 数据处理 流计算
【原理】Flink如何巧用WaterMark机制解决乱序问题
【原理】Flink如何巧用WaterMark机制解决乱序问题
|
8月前
|
存储 关系型数据库 MySQL
Flink的Checkpoints机制详解
Flink的Checkpoints机制详解
|
9月前
|
Web App开发 消息中间件 固态存储
Flink Unaligned Checkpoint 在 Shopee 的优化和实践
Tech Lead of Shopee Flink Runtime Team 范瑞,在 Flink Forward Asia 2022 核心技术的分享。
528 0
Flink Unaligned Checkpoint 在 Shopee 的优化和实践
|
消息中间件 存储 Kafka
带你理解并使用flink中的WaterMark机制
提问:你了解事件的乱序吗?乱序是怎么产生的呢?在flink流处理中是以什么事件类型判定乱序的呢? 当一条一条的数据从产生到经过消息队列传输,然后Flink接受后处理,这个流程中数据都是按照数据产生的先后顺序在flink中处理的,这时候就是有序的数据流。
1041 0
带你理解并使用flink中的WaterMark机制
|
SQL 缓存 Java
【Flink】超详细Window机制……(下)
【Flink】超详细Window机制……
169 0
【Flink】超详细Window机制……(下)
|
流计算
【Flink】超详细Window机制……(上)
【Flink】超详细Window机制……
138 0
【Flink】超详细Window机制……(上)

热门文章

最新文章