那个男人竟然不会Flink的CheckPoint机制(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 这篇来讲Flink另一个比较重要的知识,就是它的容错机制checkpoint原理。

TaskManager(非source Task接收)


在上一篇《背压原理》又或是这篇的基础铺垫上,其实我们可以看到在Flink接收数据用的是InputGate,所以我们还是回到org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput这个方法上

随后定位到处理数据的逻辑:

final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();

想点击进去,发现有两个实现类:

  • BarrierBuffer
  • BarrierTracker

24.jpg

这两个实现类其实就是对应着AT_LEAST_ONCEEXACTLY_ONCE这两种模式。

/**
 * The BarrierTracker keeps track of what checkpoint barriers have been received from
 * which input channels. Once it has observed all checkpoint barriers for a checkpoint ID,
 * it notifies its listener of a completed checkpoint.
 *
 * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input
 * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing
 * guarantees. It can, however, be used to gain "at least once" processing guarantees.
 *
 * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.
 */
/**
 * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
 * all inputs have received the barrier for a given checkpoint.
 *
 * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
 * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until
 * the blocks are released.
 */

简单翻译下就是:

  • BarrierTrackerat least once模式,只要inputChannel接收到barrier,就直接通知完成处理checkpoint
  • BarrierBufferexactly-once模式,当所有的inputChannel接收到barrier才通知完成处理checkpoint,如果有的inputChannel还没接收到barrier,那已接收到barrierinputChannel会读数据到缓存中,直到所有的inputChannel都接收到barrier,这有可能会造成反压。

说白了,就是BarrierBuffer会有对齐barrier的处理。

这里又提到exactly-onceat least once了。在文章开头也说过Flink是可以实现exactly-once的,含义就是:状态只持久化一次最终的存储介质中(本地数据库/HDFS)。

在这里我还是画个图和举个例子配合BarrierBuffer/BarrierTracker来解释一下。

现在我有一个Topic,假定这个Topic有两个分区partition(又或者你可以理解我设置消费的并行度是2)。现在要拉取Kafka这两个分区的数据,由算子Map进行消费转换,期间在转化的时候可能会存储些信息到State(Flink给我们提供的存储,你就当做是会存到HDFS上就好了),最终输出到Sink

25.jpg

从上面的知识点我们应该可以知道, 在Flinkcheckpoint的时候JobManager往每个Source任务(简单对应图上的两个paritiion) 发送checkpointId,然后做快照存储。

显然,source任务存储最主要的内容就是消费分区的offset嘛。比如现在source 1offerset100,而source2offset105

26.jpg

目前看来source2的数据会比source1的数据先到达Map

假定我们用的是BarrierBufferexactly-once模式,那么source2barrier到达Map算子的后,source2之后的数据只能停下来,放到buffer上,不做处理。等source1barrier来了以后,再真正处理source2放在buffer的数据。

这就是所谓的barrier对齐

27.jpg

假定我们用的是BarrierTrackerat least once模式,那么source2barrier到达Map算子的后,source2之后的数据不会停下来等待source1,后面的数据会继续处理。

28.jpg

现在问题就来了,那对不对齐的区别是什么呢?

依照上面图的的运行状态(无论是BarrierTrackerat least once模式还是BarrierBufferexactly-once模式),现在我们的checkpoint都没做,因为source1barrier还没到sink端呢。现在Flink挂了,那显然会重新拉取source 1offerset小于100,而source2offset小于105的数据,State的最终信息也不会保存。

29.jpg

checkpoint从没做过的时候,对数据不会产生任何的影响(所以这里在Flink的内部是没啥区别的)

而假设我们现在是BarrierTrackerat least once模式,没有任何问题,程序继续执行。现在source1barrier也走到了slink,最后完成了一次checkpoint

30.jpg

由于source2barriersource1barrier要快,那么source1所处理的State的数据实际是包括offset>105的数据的,自然Flink保存的时候也会把这部分保存进去。

程序继续运行,刚好保存完checkpoint后,此时系统出了问题,挂了。因为checkpoint已经做完了,所以Flink会从source 1offerset100,而source2offset105重新消费。

但是,由于我们是BarrierTrackerat least once模式,所以State里边的保存状态实际上有过source2offset 大于105 的记录了。那source2重新从offset105开始消费,那就是会重复消费!

31.jpg

理解了上面所讲的话,那再看BarrierBufferexactly-once模式应该就不难理解了(各位大哥大嫂你也要经过这个operator处理保存吗?我们一起吧?有问题,我们一起重来,没问题我们一起保存

32.jpg

无论是BarrierTracker还是BarrierBuffer也好,在处理checkpoint的时候都需要调用notifyCheckpoint() 方法,而notifyCheckpoint()方法最终调用的是triggerCheckpointOnBarrier

33.jpg

triggerCheckpointOnBarrier()最终还是会调用performCheckpoint()方法,所以无论是source接收到checkpoint还是operator接收到checkpoint,最终还是会调用performCheckpoint()方法。

34.jpg

大家有兴趣可以进去checkpointState()方法里边详细看看,里边会对State状态信息进行写入,完成后上报给TaskManager

35.jpg


TaskManager总结


36.jpg

  • TaskExecutor接收到JobManager下发的checkpoint,由triggerCheckpoint方法进行处理
  • triggerCheckpoint方法最终会调用org.apache.flink.streaming.runtime.tasks.StreamTask#triggerCheckpoint,而最主要的就是performCheckpoint方法
  • performCheckpoint方法会对checkpoint做前置处理,barrier广播到下游,处理State状态做快照,最后回到成功消息给JobManager
  • 普通算子由org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput这个方法读取数据,具体处理逻辑在getNextNonBlocked方法上。
  • 该方法有两个实例,分别是BarrierBufferBarrierTracker,这两个实例对应着checkpoint不同的模式(至少一次和精确一次)。精确一次需要对barrier对齐,有可能导致反压的情况
  • 最后处理完,会调用notifyCheckpoint方法,实际上还是会调performCheckpoint方法

所以说,最终处理checkpoint的逻辑是一致的,只是会source会直接通过TaskExecutor处理,而普通算子会根据不同的配置在接收到后有不同的实例处理:BarrierTracker/BarrierBuffer


JobManager接收回应


前面提到了,无论是source还是普通算子,都会调用performCheckpoint方法进行处理。

performCheckpoint方法里边处理完State快照的逻辑,会调用reportCompletedSnapshotStates告诉JobManager快照已经处理完了。

reportCompletedSnapshotStates方法里边又会调用acknowledgeCheckpoint方法通过RPC去通知JobManager

37.jpg

兜兜转转,最后还是会回到checkpointCoordinator上,调用receiveAcknowledgeMessage进行处理

38.jpg

进入到receiveAcknowledgeMessage上,主要就是下面图的逻辑:处理完返回不同的状态,根据不同的状态进行处理

39.jpg

主要我们看的其实就是acknowledgeTask方法里边做了些什么。

PendingCheckpoint维护了两个Map:

//  已经接收到 Ack 的算子的状态句柄
private final Map<OperatorID, OperatorState> operatorStates;
// 需要 Ack 但还没有接收到的 Task
private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;

然后我们进去acknowledgeTask简单了解一下可以发现就是在处理operatorStatesnotYetAcknowledgedTasks

synchronized (lock) {
   if (discarded) {
    return TaskAcknowledgeResult.DISCARDED;
   }
    // 接收到Task了,从notYetAcknowledgedTasks移除
   final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);
   if (vertex == null) {
    if (acknowledgedTasks.contains(executionAttemptId)) {
     return TaskAcknowledgeResult.DUPLICATE;
    } else {
     return TaskAcknowledgeResult.UNKNOWN;
    }
   } else {
    acknowledgedTasks.add(executionAttemptId);
   }
    // ...
   if (operatorSubtaskStates != null) {
    for (OperatorID operatorID : operatorIDs) {
     // ...
     OperatorState operatorState = operatorStates.get(operatorID);
     // 新来的operatorID,添加到operatorStates
     if (operatorState == null) {
      operatorState = new OperatorState(
       operatorID,
       vertex.getTotalNumberOfParallelSubtasks(),
       vertex.getMaxParallelism());
      operatorStates.put(operatorID, operatorState);
     }
          //....
    }
   }

等到所有的Task都到齐以后,就会调用isFullyAcknowledged进行处理。

40.jpg

最后调用completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();来实现最终的存储,所有完毕以后会通知所有的Task 现在checkpoint已经完成了。

41.jpg


最后


总的来说,这篇文章带着大家走马观花撸了下Checkpoint,很多细节我也没去深入,但我认为这篇文章可以让你大概了解到Checkpoint的实现过程。

最后再来看看官网的图,看完应该大概就能看得懂啦:

42.jpg

相信我,或许你现在还没用到Flink,但等你真正去用Flink的时候,checkpoint是肯定得搞搞的

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
Java 关系型数据库 数据库
实时计算 Flink版操作报错合集之拉取全量数据时,如何解决Checkpoint失败并且报错为 "java.lang.OutOfMemoryError: Java heap space"
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
存储 缓存 监控
Flink内存管理机制及其参数调优
Flink内存管理机制及其参数调优
|
1月前
|
SQL 消息中间件 对象存储
实时计算 Flink版产品使用问题之checkpoint从几百毫秒突然变成10分钟失败,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12天前
|
消息中间件 安全 Kafka
Flink与Kafka的终极联盟:揭秘如何在一瞬间切换SASL机制,保护您的数据不受黑客侵袭!
【8月更文挑战第7天】Apache Flink作为高性能流处理框架,在与Kafka集成时确保数据安全至关重要。通过配置`KafkaConsumer`使用SASL机制如SCRAM-SHA-256或PLAIN,可有效防止未授权访问。SCRAM-SHA-256采用强化的身份验证流程提高安全性,而PLAIN机制则相对简单。配置涉及设置`properties`参数,包括指定`sasl.mechanism`、`security.protocol`及JAAS认证信息。合理选择和配置这些参数对于保护Flink应用与Kafka间的数据通信安全至关重要。
23 0
|
1月前
|
NoSQL API 数据处理
实时计算 Flink版产品使用问题之不开启checkPoint会导致什么情况发生
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 资源调度 Kubernetes
实时计算 Flink版产品使用问题之从上一个checkpoint点位重新启动任务,该如何操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
消息中间件 NoSQL Kafka
实时计算 Flink版产品使用问题之在处理大数据量时,checkpoint超时,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之通过设置什么可以自动清理旧的checkpoint数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 缓存 Oracle
实时计算 Flink版产品使用问题之如何实现重启后直接跑最新的任务而不是根据checkpoint跑历史数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到检查点(checkpoint)状态不单调递增,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。