开胃菜(复习)
作为用户,我们写好Flink
的程序,上管理平台提交,Flink
就跑起来了(只要程序代码没有问题),细节对用户都是屏蔽的。

实际上大致的流程是这样的:
Flink
会根据我们所写代码,会生成一个StreamGraph
的图出来,来代表我们所写程序的拓扑结构。- 然后在提交的之前会将
StreamGraph
这个图优化一把(可以合并的任务进行合并),变成JobGraph
- 将
JobGraph
提交给JobManager
JobManager
收到之后JobGraph
之后会根据JobGraph
生成ExecutionGraph
(ExecutionGraph
是 JobGraph
的并行化版本)TaskManager
接收到任务之后会将ExecutionGraph
生成为真正的物理执行图

可以看到物理执行图
真正运行在TaskManager
上Transform
和Sink
之间都会有ResultPartition
和InputGate
这俩个组件,ResultPartition
用来发送数据,而InputGate
用来接收数据。

屏蔽掉这些Graph
,可以发现Flink
的架构是:Client
->JobManager
->TaskManager

从名字就可以看出,JobManager
是干「管理」,而TaskManager
是真正干活的。回到我们今天的主题,checkpoint
就是由JobManager
发出。

Flink
本身就是有状态的,Flink
可以让你选择执行过程中的数据保存在哪里,目前有三个地方,在Flink
的角度称作State Backends
:
- MemoryStateBackend(内存)
- FsStateBackend(文件系统,一般是HSFS)
- RocksDBStateBackend(RocksDB数据库)
同样地,checkpoint
信息就是保存在State Backends
上

先来简单描述一下checkpoint
的实现流程:
checkpoint
的实现大致就是插入barrier
,每个operator
收到barrier
就上报给JobManager
,等到所有的operator
都上报了barrier
,那JobManager
就去完成一次checkpointi

因为checkpoint
机制是Flink
实现容错机制的关键,我们在实际使用中,往往都要配置checkpoint
相关的配置,例如有以下的配置:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
简单铺垫过后,我们就来撸源码了咯?
Checkpoint(原理)
JobManager发送checkpoint
从上面的图我们可以发现 checkpoint
是由JobManager
发出的,并且JobManager
收到的是JobGraph
,会将JobGraph
转换成ExecutionGraph
。
这块在JobMaster
的构造器就能体现出来:
public JobMaster(...) throws Exception {
// 创建ExecutionGraph
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
}
我们点击进去createAndRestoreExecutionGraph
看下:

看CheckpointCoordinator
这个名字,就觉得他很重要,有木有?它从ExecutionGraph
来,我们就进去createExecutionGraph
里边看看呗。
点了两层buildGraph()
方法,可以看到在方法的末尾处有checkpoint
相关的信息:
executionGraph.enableCheckpointing(
chkConfig.getCheckpointInterval(),
chkConfig.getCheckpointTimeout(),
chkConfig.getMinPauseBetweenCheckpoints(),
chkConfig.getMaxConcurrentCheckpoints(),
chkConfig.getCheckpointRetentionPolicy(),
triggerVertices,
ackVertices,
confirmVertices,
hooks,
checkpointIdCounter,
completedCheckpoints,
rootBackend,
checkpointStatsTracker);
前面的几个参数就是我们在配置checkpoint
参数的时候指定的,而triggerVertices/confirmVertices/ackVertices
我们溯源看了一下,在源码中注释也写得清清楚楚的。
// collect the vertices that receive "trigger checkpoint" messages.
// currently, these are all the sources
List<JobVertexID> triggerVertices = new ArrayList<>();
// collect the vertices that need to acknowledge the checkpoint
// currently, these are all vertices
List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());
// collect the vertices that receive "commit checkpoint" messages
// currently, these are all vertices
List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size());
下面还是进去enableCheckpointing()
看看大致做了些什么吧:
// 将上面的入参分别封装成ExecutionVertex数组
ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
// 创建触发器
checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
// 创建checkpoint协调器
checkpointCoordinator = new CheckpointCoordinator(
jobInformation.getJobId(),
interval,
checkpointTimeout,
minPauseBetweenCheckpoints,
maxConcurrentCheckpoints,
retentionPolicy,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
checkpointIDCounter,
checkpointStore,
checkpointStateBackend,
ioExecutor,
SharedStateRegistry.DEFAULT_FACTORY);
// 设置触发器
checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
// 状态变更监听器
// job status changes (running -> on, all other states -> off)
if (interval != Long.MAX_VALUE) {
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
}
值得一提的是,点进去CheckpointCoordinator()
构造方法可以发现有状态后端StateBackend
的身影(因为checkpoint
就是保存在所配置的状态后端)

如果Job
的状态变更了,CheckpointCoordinatorDeActivator
是能监听到的。

当我们的Job
启动的时候,又简单看看startCheckpointScheduler()
里边究竟做了些什么操作:

它会启动一个定时任务,我们具体看看定时任务具体做了些什么ScheduledTrigger
,然后看到比较重要的方法:triggerCheckpoint()
这块代码的逻辑有点多,我们简单来总结一下
- 前置检查(是否可以触发
checkpoint
,距离上一次checkpoint的间隔时间是否符合...) - 检查是否所有的需要做
checkpoint
的Task都处于running
状态 - 生成
checkpointId
,然后生成PendingCheckpoint
对象来代表待处理的检查点 - 注册一个定时任务,如果
checkpoint
超时后取消checkpoint

注:检查task
的任务状态时,只会把source
的task
封装给进Execution[]
数组




JobManager
侧只会发给source
的task
发送checkpoint

JobManager发送总结
贴的图有点多,最后再来简单总结一波,顺便画个流程图,你就会发现还是比较清晰的。
JobManager
收到client
提交的JobGraph
JobManger
需要通过JobGraph
生成ExecutionGraph
- 在生成
ExcutionGraph
的过程中实际上就会触发checkpoint
的逻辑
- 定时任务会前置检查(其实就是你实际上配置的各种参数是否符合)
- 判断
checkpoint
相关的task
是否都是running
状态,将source
的任务封装到Execution
数组中 - 创建
checkpointID
/checkpointStorageLocation
(checkpoint保存的地方)/PendingCheckpoint
(待处理的checkpoint) - 创建定时任务(如果当
checkpoint
超时,会将相关状态清除,重新触发) - 真正触发
checkPoint
给TaskManager
(只会发给source
的task
) - 找出所有
source
和需要ack
的Task - 创建
checkpointCoordinator
协调器 - 创建
CheckpointCoordinatorDeActivator
监听器,监听Job
状态的变更 - 当
Job
启动时,会触发ScheduledTrigger
定时任务


TaskManager(source Task接收)
前面提到了,JobManager
在生成ExcutionGraph
时,会给所有的source
任务发送checkpoint
,那么source
收到barrier
又是怎么处理的呢?会到TaskExecutor
这里进行处理。
TaskExecutor
有个triggerCheckpoint()
方法对接收到的checkpoint
进行处理:

进入triggerCheckpointBarrier()
看看:

再想点进去triggerCheckpoint()
看实现时,我们会发现走到performCheckpoint()
这个方法上:

从实现的注释我们可以很方便看出方法大概做了什么:

这块我们先在这里放着,知道Source
的任务接收到Checkpoint
会广播到下游,然后会做快照处理就好。
下面看看非Source
的任务接收到checkpoint
是怎么处理的。