开胃菜(复习)
作为用户,我们写好Flink
的程序,上管理平台提交,Flink
就跑起来了(只要程序代码没有问题),细节对用户都是屏蔽的。
实际上大致的流程是这样的:
Flink
会根据我们所写代码,会生成一个StreamGraph
的图出来,来代表我们所写程序的拓扑结构。- 然后在提交的之前会将
StreamGraph
这个图优化一把(可以合并的任务进行合并),变成JobGraph
- 将
JobGraph
提交给JobManager
JobManager
收到之后JobGraph
之后会根据JobGraph
生成ExecutionGraph
(ExecutionGraph
是JobGraph
的并行化版本)TaskManager
接收到任务之后会将ExecutionGraph
生成为真正的物理执行图
可以看到物理执行图
真正运行在TaskManager
上Transform
和Sink
之间都会有ResultPartition
和InputGate
这俩个组件,ResultPartition
用来发送数据,而InputGate
用来接收数据。
屏蔽掉这些Graph
,可以发现Flink
的架构是:Client
->JobManager
->TaskManager
从名字就可以看出,JobManager
是干「管理」,而TaskManager
是真正干活的。回到我们今天的主题,checkpoint
就是由JobManager
发出。
Flink
本身就是有状态的,Flink
可以让你选择执行过程中的数据保存在哪里,目前有三个地方,在Flink
的角度称作State Backends
:
- MemoryStateBackend(内存)
- FsStateBackend(文件系统,一般是HSFS)
- RocksDBStateBackend(RocksDB数据库)
同样地,checkpoint
信息就是保存在State Backends
上
先来简单描述一下checkpoint
的实现流程:
checkpoint
的实现大致就是插入barrier
,每个operator
收到barrier
就上报给JobManager
,等到所有的operator
都上报了barrier
,那JobManager
就去完成一次checkpointi
因为checkpoint
机制是Flink
实现容错机制的关键,我们在实际使用中,往往都要配置checkpoint
相关的配置,例如有以下的配置:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
简单铺垫过后,我们就来撸源码了咯?
Checkpoint(原理)
JobManager发送checkpoint
从上面的图我们可以发现 checkpoint
是由JobManager
发出的,并且JobManager
收到的是JobGraph
,会将JobGraph
转换成ExecutionGraph
。
这块在JobMaster
的构造器就能体现出来:
public JobMaster(...) throws Exception { // 创建ExecutionGraph this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup); }
我们点击进去createAndRestoreExecutionGraph
看下:
看CheckpointCoordinator
这个名字,就觉得他很重要,有木有?它从ExecutionGraph
来,我们就进去createExecutionGraph
里边看看呗。
点了两层buildGraph()
方法,可以看到在方法的末尾处有checkpoint
相关的信息:
executionGraph.enableCheckpointing( chkConfig.getCheckpointInterval(), chkConfig.getCheckpointTimeout(), chkConfig.getMinPauseBetweenCheckpoints(), chkConfig.getMaxConcurrentCheckpoints(), chkConfig.getCheckpointRetentionPolicy(), triggerVertices, ackVertices, confirmVertices, hooks, checkpointIdCounter, completedCheckpoints, rootBackend, checkpointStatsTracker);
前面的几个参数就是我们在配置checkpoint
参数的时候指定的,而triggerVertices/confirmVertices/ackVertices
我们溯源看了一下,在源码中注释也写得清清楚楚的。
// collect the vertices that receive "trigger checkpoint" messages. // currently, these are all the sources List<JobVertexID> triggerVertices = new ArrayList<>(); // collect the vertices that need to acknowledge the checkpoint // currently, these are all vertices List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size()); // collect the vertices that receive "commit checkpoint" messages // currently, these are all vertices List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size());
下面还是进去enableCheckpointing()
看看大致做了些什么吧:
// 将上面的入参分别封装成ExecutionVertex数组 ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger); ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor); ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo); // 创建触发器 checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker"); // 创建checkpoint协调器 checkpointCoordinator = new CheckpointCoordinator( jobInformation.getJobId(), interval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, retentionPolicy, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, checkpointIDCounter, checkpointStore, checkpointStateBackend, ioExecutor, SharedStateRegistry.DEFAULT_FACTORY); // 设置触发器 checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker); // 状态变更监听器 // job status changes (running -> on, all other states -> off) if (interval != Long.MAX_VALUE) { registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator()); }
值得一提的是,点进去CheckpointCoordinator()
构造方法可以发现有状态后端StateBackend
的身影(因为checkpoint
就是保存在所配置的状态后端)
如果Job
的状态变更了,CheckpointCoordinatorDeActivator
是能监听到的。
当我们的Job
启动的时候,又简单看看startCheckpointScheduler()
里边究竟做了些什么操作:
它会启动一个定时任务,我们具体看看定时任务具体做了些什么ScheduledTrigger
,然后看到比较重要的方法:triggerCheckpoint()
这块代码的逻辑有点多,我们简单来总结一下
- 前置检查(是否可以触发
checkpoint
,距离上一次checkpoint的间隔时间是否符合...) - 检查是否所有的需要做
checkpoint
的Task都处于running
状态 - 生成
checkpointId
,然后生成PendingCheckpoint
对象来代表待处理的检查点 - 注册一个定时任务,如果
checkpoint
超时后取消checkpoint
注:检查task
的任务状态时,只会把source
的task
封装给进Execution[]
数组
JobManager
侧只会发给source
的task
发送checkpoint
JobManager发送总结
贴的图有点多,最后再来简单总结一波,顺便画个流程图,你就会发现还是比较清晰的。
JobManager
收到client
提交的JobGraph
JobManger
需要通过JobGraph
生成ExecutionGraph
- 在生成
ExcutionGraph
的过程中实际上就会触发checkpoint
的逻辑
- 定时任务会前置检查(其实就是你实际上配置的各种参数是否符合)
- 判断
checkpoint
相关的task
是否都是running
状态,将source
的任务封装到Execution
数组中 - 创建
checkpointID
/checkpointStorageLocation
(checkpoint保存的地方)/PendingCheckpoint
(待处理的checkpoint) - 创建定时任务(如果当
checkpoint
超时,会将相关状态清除,重新触发) - 真正触发
checkPoint
给TaskManager
(只会发给source
的task
) - 找出所有
source
和需要ack
的Task - 创建
checkpointCoordinator
协调器 - 创建
CheckpointCoordinatorDeActivator
监听器,监听Job
状态的变更 - 当
Job
启动时,会触发ScheduledTrigger
定时任务
TaskManager(source Task接收)
前面提到了,JobManager
在生成ExcutionGraph
时,会给所有的source
任务发送checkpoint
,那么source
收到barrier
又是怎么处理的呢?会到TaskExecutor
这里进行处理。
TaskExecutor
有个triggerCheckpoint()
方法对接收到的checkpoint
进行处理:
进入triggerCheckpointBarrier()
看看:
再想点进去triggerCheckpoint()
看实现时,我们会发现走到performCheckpoint()
这个方法上:
从实现的注释我们可以很方便看出方法大概做了什么:
这块我们先在这里放着,知道Source
的任务接收到Checkpoint
会广播到下游,然后会做快照处理就好。
下面看看非Source
的任务接收到checkpoint
是怎么处理的。