开发者社区> java3y> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

那个男人竟然不会Flink的CheckPoint机制(一)

简介: 这篇来讲Flink另一个比较重要的知识,就是它的容错机制checkpoint原理。
+关注继续查看

开胃菜(复习)


作为用户,我们写好Flink的程序,上管理平台提交,Flink就跑起来了(只要程序代码没有问题),细节对用户都是屏蔽的。

1.jpg

实际上大致的流程是这样的:

  1. Flink会根据我们所写代码,会生成一个StreamGraph的图出来,来代表我们所写程序的拓扑结构。
  2. 然后在提交的之前会将StreamGraph这个图优化一把(可以合并的任务进行合并),变成JobGraph
  3. JobGraph提交给JobManager
  4. JobManager收到之后JobGraph之后会根据JobGraph生成ExecutionGraphExecutionGraphJobGraph 的并行化版本)
  5. TaskManager接收到任务之后会将ExecutionGraph生成为真正的物理执行图

2.jpg

可以看到物理执行图真正运行在TaskManagerTransformSink之间都会有ResultPartitionInputGate这俩个组件,ResultPartition用来发送数据,而InputGate用来接收数据。

3.jpg

屏蔽掉这些Graph,可以发现Flink的架构是:Client->JobManager->TaskManager

4.jpg

从名字就可以看出,JobManager是干「管理」,而TaskManager是真正干活的。回到我们今天的主题,checkpoint就是由JobManager发出。

5.jpg

Flink本身就是有状态的,Flink可以让你选择执行过程中的数据保存在哪里,目前有三个地方,在Flink的角度称作State Backends

  • MemoryStateBackend(内存)
  • FsStateBackend(文件系统,一般是HSFS)
  • RocksDBStateBackend(RocksDB数据库)

同样地,checkpoint信息就是保存在State Backends

6.jpg

先来简单描述一下checkpoint的实现流程:

checkpoint的实现大致就是插入barrier,每个operator收到barrier就上报给JobManager,等到所有的operator都上报了barrier,那JobManager 就去完成一次checkpointi

7.jpg

因为checkpoint机制是Flink实现容错机制的关键,我们在实际使用中,往往都要配置checkpoint相关的配置,例如有以下的配置:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

简单铺垫过后,我们就来撸源码了咯?


Checkpoint(原理)


JobManager发送checkpoint


从上面的图我们可以发现 checkpoint是由JobManager发出的,并且JobManager收到的是JobGraph,会将JobGraph转换成ExecutionGraph

这块在JobMaster的构造器就能体现出来:

public JobMaster(...) throws Exception {
  // 创建ExecutionGraph
  this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
 }

我们点击进去createAndRestoreExecutionGraph看下:

8.jpg

CheckpointCoordinator这个名字,就觉得他很重要,有木有?它从ExecutionGraph来,我们就进去createExecutionGraph里边看看呗。

点了两层buildGraph()方法,可以看到在方法的末尾处有checkpoint相关的信息:

executionGraph.enableCheckpointing(
    chkConfig.getCheckpointInterval(),
    chkConfig.getCheckpointTimeout(),
    chkConfig.getMinPauseBetweenCheckpoints(),
    chkConfig.getMaxConcurrentCheckpoints(),
    chkConfig.getCheckpointRetentionPolicy(),
    triggerVertices,
    ackVertices,
    confirmVertices,
    hooks,
    checkpointIdCounter,
    completedCheckpoints,
    rootBackend,
    checkpointStatsTracker);

前面的几个参数就是我们在配置checkpoint参数的时候指定的,而triggerVertices/confirmVertices/ackVertices我们溯源看了一下,在源码中注释也写得清清楚楚的。

// collect the vertices that receive "trigger checkpoint" messages.
// currently, these are all the sources 
List<JobVertexID> triggerVertices = new ArrayList<>();
// collect the vertices that need to acknowledge the checkpoint
// currently, these are all vertices
List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());
// collect the vertices that receive "commit checkpoint" messages
// currently, these are all vertices
List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size());

下面还是进去enableCheckpointing()看看大致做了些什么吧:

// 将上面的入参分别封装成ExecutionVertex数组
ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
// 创建触发器
checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
// 创建checkpoint协调器
checkpointCoordinator = new CheckpointCoordinator(
  jobInformation.getJobId(),
  interval,
  checkpointTimeout,
  minPauseBetweenCheckpoints,
  maxConcurrentCheckpoints,
  retentionPolicy,
  tasksToTrigger,
  tasksToWaitFor,
  tasksToCommitTo,
  checkpointIDCounter,
  checkpointStore,
  checkpointStateBackend,
  ioExecutor,
  SharedStateRegistry.DEFAULT_FACTORY);
// 设置触发器
checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
// 状态变更监听器
// job status changes (running -> on, all other states -> off)
if (interval != Long.MAX_VALUE) {
  registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
}

值得一提的是,点进去CheckpointCoordinator()构造方法可以发现有状态后端StateBackend的身影(因为checkpoint就是保存在所配置的状态后端)

9.jpg

如果Job的状态变更了,CheckpointCoordinatorDeActivator是能监听到的。

10.jpg

当我们的Job启动的时候,又简单看看startCheckpointScheduler()里边究竟做了些什么操作:

11.jpg

它会启动一个定时任务,我们具体看看定时任务具体做了些什么ScheduledTrigger,然后看到比较重要的方法:triggerCheckpoint()

这块代码的逻辑有点多,我们简单来总结一下

  1. 前置检查(是否可以触发checkpoint,距离上一次checkpoint的间隔时间是否符合...)
  2. 检查是否所有的需要做checkpoint的Task都处于running状态
  3. 生成checkpointId,然后生成PendingCheckpoint对象来代表待处理的检查点
  4. 注册一个定时任务,如果checkpoint超时后取消checkpoint

12.jpg

注:检查task的任务状态时,只会把sourcetask封装给进Execution[]数组

13.jpg14.jpg15.jpg16.jpg

JobManager侧只会发给sourcetask发送checkpoint

17.jpg


JobManager发送总结


贴的图有点多,最后再来简单总结一波,顺便画个流程图,你就会发现还是比较清晰的。

  1. JobManager 收到client提交的JobGraph
  2. JobManger 需要通过JobGraph生成ExecutionGraph
  3. 在生成ExcutionGraph的过程中实际上就会触发checkpoint的逻辑
    1. 定时任务会前置检查(其实就是你实际上配置的各种参数是否符合)
    2. 判断checkpoint相关的task是否都是running状态,将source的任务封装到Execution数组中
    3. 创建checkpointID/checkpointStorageLocation(checkpoint保存的地方)/PendingCheckpoint(待处理的checkpoint)
    4. 创建定时任务(如果当checkpoint超时,会将相关状态清除,重新触发)
    5. 真正触发checkPointTaskManager(只会发给sourcetask)
    6. 找出所有source和需要ack的Task
    7. 创建checkpointCoordinator 协调器
    8. 创建CheckpointCoordinatorDeActivator监听器,监听Job状态的变更
    9. Job启动时,会触发ScheduledTrigger 定时任务

18.jpg19.jpg


TaskManager(source Task接收)


前面提到了,JobManager 在生成ExcutionGraph时,会给所有的source 任务发送checkpoint,那么source收到barrier又是怎么处理的呢?会到TaskExecutor这里进行处理。

TaskExecutor有个triggerCheckpoint()方法对接收到的checkpoint进行处理:

20.jpg

进入triggerCheckpointBarrier()看看:

21.jpg

再想点进去triggerCheckpoint()看实现时,我们会发现走到performCheckpoint()这个方法上:

22.jpg

从实现的注释我们可以很方便看出方法大概做了什么:

23.jpg

这块我们先在这里放着,知道Source的任务接收到Checkpoint会广播到下游,然后会做快照处理就好。

下面看看非Source 的任务接收到checkpoint是怎么处理的。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
【Flink-容错API】重启策略-CheckPoint/StateBackend
【Flink-容错API】重启策略-CheckPoint/StateBackend
0 0
Flink checkpoint(一)| 学习笔记
快速学习 Flink checkpoint。
0 0
Flink checkpoint| 学习笔记
快速学习 Flink checkpoint。
0 0
Flink可靠性的基石-checkpoint机制详细解析 (二)
Flink可靠性的基石-checkpoint机制详细解析
0 0
Flink可靠性的基石-checkpoint机制详细解析 (一)
Flink可靠性的基石-checkpoint机制详细解析
0 0
那个男人竟然不会Flink的CheckPoint机制(二)
这篇来讲Flink另一个比较重要的知识,就是它的容错机制checkpoint原理。
0 0
Flink 1.11 Unaligned Checkpoint 解析
由于 Checkpoint 与反压的耦合,反压反过来也会作用于 Checkpoint,导致 Checkpoint 的种种问题。针对于此,Flink 在 1.11 引入 Unaligned Checkpint 来解耦 Checkpoint 机制与反压机制,优化高反压情况下的 Checkpoint 表现。
0 0
Flink Checkpoint 问题排查实用指南
本文会统一聊一聊 Flink 中 Checkpoint 异常的情况(包括失败和慢),以及可能的原因和排查思路。
8206 0
Flink在大规模状态数据集下的checkpoint调优
5万人关注的大数据成神之路,不来了解一下吗?5万人关注的大数据成神之路,真的不来了解一下吗?5万人关注的大数据成神之路,确定真的不来了解一下吗? 欢迎您关注《大数据成神之路》 今天接到一个同学的反馈问题,大概是: Flink程序运行一段时间就会报这个错误,定位好多天都没有定位到。
2320 0
[企业云-实时计算]SLS 全新 Connector 实现来看 Flink Connector 的细节(FLIP-27/FLIP-191)
背景根据之前的企业云的实时计算架构可以知道,我们的选型中,SLS 承担的重要的角色:打通弹外向弹内实时数据回流的链路作为接口 Interface,向工程和算法同学提供宽表和服务,满足业务的自定义需求。但是 SLS 的 connector 怎么说呢,几个字:严重不满足需求。目前网上搜索,大体上有以下几个实现aliyun-log-flink-connector:貌似是SLS 官方团队的实现,只提供了 
0 0
+关注
java3y
公众号:Java3y。文章导航:https://github.com/ZhongFuCheng3y
文章
问答
文章排行榜
最热
最新
相关电子书
更多
朱翥、贺小令|更快更稳更易用:Flink 自适应批处理能力演
立即下载
任庆盛|Flink CDC + Kafka 加速业务实时化
立即下载
李劲松|Flink Table Store 典型应用场景
立即下载