那个男人竟然不会Flink的CheckPoint机制(一)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 这篇来讲Flink另一个比较重要的知识,就是它的容错机制checkpoint原理。

开胃菜(复习)


作为用户,我们写好Flink的程序,上管理平台提交,Flink就跑起来了(只要程序代码没有问题),细节对用户都是屏蔽的。

1.jpg

实际上大致的流程是这样的:

  1. Flink会根据我们所写代码,会生成一个StreamGraph的图出来,来代表我们所写程序的拓扑结构。
  2. 然后在提交的之前会将StreamGraph这个图优化一把(可以合并的任务进行合并),变成JobGraph
  3. JobGraph提交给JobManager
  4. JobManager收到之后JobGraph之后会根据JobGraph生成ExecutionGraphExecutionGraphJobGraph 的并行化版本)
  5. TaskManager接收到任务之后会将ExecutionGraph生成为真正的物理执行图

2.jpg

可以看到物理执行图真正运行在TaskManagerTransformSink之间都会有ResultPartitionInputGate这俩个组件,ResultPartition用来发送数据,而InputGate用来接收数据。

3.jpg

屏蔽掉这些Graph,可以发现Flink的架构是:Client->JobManager->TaskManager

4.jpg

从名字就可以看出,JobManager是干「管理」,而TaskManager是真正干活的。回到我们今天的主题,checkpoint就是由JobManager发出。

5.jpg

Flink本身就是有状态的,Flink可以让你选择执行过程中的数据保存在哪里,目前有三个地方,在Flink的角度称作State Backends

  • MemoryStateBackend(内存)
  • FsStateBackend(文件系统,一般是HSFS)
  • RocksDBStateBackend(RocksDB数据库)

同样地,checkpoint信息就是保存在State Backends

6.jpg

先来简单描述一下checkpoint的实现流程:

checkpoint的实现大致就是插入barrier,每个operator收到barrier就上报给JobManager,等到所有的operator都上报了barrier,那JobManager 就去完成一次checkpointi

7.jpg

因为checkpoint机制是Flink实现容错机制的关键,我们在实际使用中,往往都要配置checkpoint相关的配置,例如有以下的配置:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

简单铺垫过后,我们就来撸源码了咯?


Checkpoint(原理)


JobManager发送checkpoint


从上面的图我们可以发现 checkpoint是由JobManager发出的,并且JobManager收到的是JobGraph,会将JobGraph转换成ExecutionGraph

这块在JobMaster的构造器就能体现出来:

public JobMaster(...) throws Exception {
  // 创建ExecutionGraph
  this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
 }

我们点击进去createAndRestoreExecutionGraph看下:

8.jpg

CheckpointCoordinator这个名字,就觉得他很重要,有木有?它从ExecutionGraph来,我们就进去createExecutionGraph里边看看呗。

点了两层buildGraph()方法,可以看到在方法的末尾处有checkpoint相关的信息:

executionGraph.enableCheckpointing(
    chkConfig.getCheckpointInterval(),
    chkConfig.getCheckpointTimeout(),
    chkConfig.getMinPauseBetweenCheckpoints(),
    chkConfig.getMaxConcurrentCheckpoints(),
    chkConfig.getCheckpointRetentionPolicy(),
    triggerVertices,
    ackVertices,
    confirmVertices,
    hooks,
    checkpointIdCounter,
    completedCheckpoints,
    rootBackend,
    checkpointStatsTracker);

前面的几个参数就是我们在配置checkpoint参数的时候指定的,而triggerVertices/confirmVertices/ackVertices我们溯源看了一下,在源码中注释也写得清清楚楚的。

// collect the vertices that receive "trigger checkpoint" messages.
// currently, these are all the sources 
List<JobVertexID> triggerVertices = new ArrayList<>();
// collect the vertices that need to acknowledge the checkpoint
// currently, these are all vertices
List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());
// collect the vertices that receive "commit checkpoint" messages
// currently, these are all vertices
List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size());

下面还是进去enableCheckpointing()看看大致做了些什么吧:

// 将上面的入参分别封装成ExecutionVertex数组
ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
// 创建触发器
checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
// 创建checkpoint协调器
checkpointCoordinator = new CheckpointCoordinator(
  jobInformation.getJobId(),
  interval,
  checkpointTimeout,
  minPauseBetweenCheckpoints,
  maxConcurrentCheckpoints,
  retentionPolicy,
  tasksToTrigger,
  tasksToWaitFor,
  tasksToCommitTo,
  checkpointIDCounter,
  checkpointStore,
  checkpointStateBackend,
  ioExecutor,
  SharedStateRegistry.DEFAULT_FACTORY);
// 设置触发器
checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
// 状态变更监听器
// job status changes (running -> on, all other states -> off)
if (interval != Long.MAX_VALUE) {
  registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
}

值得一提的是,点进去CheckpointCoordinator()构造方法可以发现有状态后端StateBackend的身影(因为checkpoint就是保存在所配置的状态后端)

9.jpg

如果Job的状态变更了,CheckpointCoordinatorDeActivator是能监听到的。

10.jpg

当我们的Job启动的时候,又简单看看startCheckpointScheduler()里边究竟做了些什么操作:

11.jpg

它会启动一个定时任务,我们具体看看定时任务具体做了些什么ScheduledTrigger,然后看到比较重要的方法:triggerCheckpoint()

这块代码的逻辑有点多,我们简单来总结一下

  1. 前置检查(是否可以触发checkpoint,距离上一次checkpoint的间隔时间是否符合...)
  2. 检查是否所有的需要做checkpoint的Task都处于running状态
  3. 生成checkpointId,然后生成PendingCheckpoint对象来代表待处理的检查点
  4. 注册一个定时任务,如果checkpoint超时后取消checkpoint

12.jpg

注:检查task的任务状态时,只会把sourcetask封装给进Execution[]数组

13.jpg14.jpg15.jpg16.jpg

JobManager侧只会发给sourcetask发送checkpoint

17.jpg


JobManager发送总结


贴的图有点多,最后再来简单总结一波,顺便画个流程图,你就会发现还是比较清晰的。

  1. JobManager 收到client提交的JobGraph
  2. JobManger 需要通过JobGraph生成ExecutionGraph
  3. 在生成ExcutionGraph的过程中实际上就会触发checkpoint的逻辑
  1. 定时任务会前置检查(其实就是你实际上配置的各种参数是否符合)
  2. 判断checkpoint相关的task是否都是running状态,将source的任务封装到Execution数组中
  3. 创建checkpointID/checkpointStorageLocation(checkpoint保存的地方)/PendingCheckpoint(待处理的checkpoint)
  4. 创建定时任务(如果当checkpoint超时,会将相关状态清除,重新触发)
  5. 真正触发checkPointTaskManager(只会发给sourcetask)
  6. 找出所有source和需要ack的Task
  7. 创建checkpointCoordinator 协调器
  8. 创建CheckpointCoordinatorDeActivator监听器,监听Job状态的变更
  9. Job启动时,会触发ScheduledTrigger 定时任务

18.jpg19.jpg


TaskManager(source Task接收)


前面提到了,JobManager 在生成ExcutionGraph时,会给所有的source 任务发送checkpoint,那么source收到barrier又是怎么处理的呢?会到TaskExecutor这里进行处理。

TaskExecutor有个triggerCheckpoint()方法对接收到的checkpoint进行处理:

20.jpg

进入triggerCheckpointBarrier()看看:

21.jpg

再想点进去triggerCheckpoint()看实现时,我们会发现走到performCheckpoint()这个方法上:

22.jpg

从实现的注释我们可以很方便看出方法大概做了什么:

23.jpg

这块我们先在这里放着,知道Source的任务接收到Checkpoint会广播到下游,然后会做快照处理就好。

下面看看非Source 的任务接收到checkpoint是怎么处理的。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
107 3
|
5月前
|
容灾 流计算
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
|
5月前
|
存储 数据处理 Apache
超越传统数据库:揭秘Flink状态机制,让你的数据处理效率飞升!
【8月更文挑战第26天】Apache Flink 在流处理领域以其高效实时的数据处理能力脱颖而出,其核心特色之一便是状态管理机制。不同于传统数据库依靠持久化存储及 ACID 事务确保数据一致性和可靠性,Flink 利用内存中的状态管理和分布式数据流模型实现了低延迟处理。Flink 的状态分为键控状态与非键控状态,前者依据数据键值进行状态维护,适用于键值对数据处理;后者与算子实例关联,用于所有输入数据共享的状态场景。通过 checkpointing 机制,Flink 在保障状态一致性的同时,提供了更适合流处理场景的轻量级解决方案。
72 0
|
5月前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用问题之mini-cluster模式下,怎么指定checkpoint的时间间隔
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
存储 监控 Serverless
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决
|
5月前
|
缓存 流计算
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
|
5月前
|
分布式计算 流计算
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
|
5月前
|
流计算
美团 Flink 大作业部署问题之新启动作业的 Checkpoint 跨作业文件引用的问题要如何避免
美团 Flink 大作业部署问题之新启动作业的 Checkpoint 跨作业文件引用的问题要如何避免
|
5月前
|
流计算 索引
美团 Flink 大作业部署问题之RocksDBStateBackend 的增量 Checkpoint 要怎么制作
美团 Flink 大作业部署问题之RocksDBStateBackend 的增量 Checkpoint 要怎么制作
|
5月前
|
存储 Java 流计算
Flink 分布式快照,神秘机制背后究竟隐藏着怎样的惊人奥秘?快来一探究竟!
【8月更文挑战第26天】Flink是一款开源框架,支持有状态流处理与批处理任务。其核心功能之一为分布式快照,通过“检查点(Checkpoint)”机制确保系统能在故障发生时从最近的一致性状态恢复,实现可靠容错。Flink通过JobManager触发检查点,各节点暂停接收新数据并保存当前状态至稳定存储(如HDFS)。采用“异步屏障快照(Asynchronous Barrier Snapshotting)”技术,插入特殊标记“屏障(Barrier)”随数据流传播,在不影响整体流程的同时高效完成状态保存。例如可在Flink中设置每1000毫秒进行一次检查点并指定存储位置。
91 0