TaskManager(非source Task接收)
在上一篇《背压原理》又或是这篇的基础铺垫上,其实我们可以看到在Flink
接收数据用的是InputGate
,所以我们还是回到org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput
这个方法上
随后定位到处理数据的逻辑:
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
想点击进去,发现有两个实现类:
BarrierBuffer
BarrierTracker
这两个实现类其实就是对应着AT_LEAST_ONCE
和EXACTLY_ONCE
这两种模式。
/** * The BarrierTracker keeps track of what checkpoint barriers have been received from * which input channels. Once it has observed all checkpoint barriers for a checkpoint ID, * it notifies its listener of a completed checkpoint. * * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing * guarantees. It can, however, be used to gain "at least once" processing guarantees. * * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs. */ /** * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until * all inputs have received the barrier for a given checkpoint. * * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until * the blocks are released. */
简单翻译下就是:
BarrierTracker
是at least once
模式,只要inputChannel
接收到barrier
,就直接通知完成处理checkpoint
BarrierBuffer
是exactly-once
模式,当所有的inputChannel
接收到barrier
才通知完成处理checkpoint
,如果有的inputChannel
还没接收到barrier
,那已接收到barrier
的inputChannel
会读数据到缓存中,直到所有的inputChannel
都接收到barrier
,这有可能会造成反压。
说白了,就是BarrierBuffer
会有对齐barrier
的处理。
这里又提到exactly-once
和at least once
了。在文章开头也说过Flink
是可以实现exactly-once
的,含义就是:状态只持久化一次到最终的存储介质中(本地数据库/HDFS)。
在这里我还是画个图和举个例子配合BarrierBuffer
/BarrierTracker
来解释一下。
现在我有一个Topic
,假定这个Topic
有两个分区partition
(又或者你可以理解我设置消费的并行度是2)。现在要拉取Kafka
这两个分区的数据,由算子Map
进行消费转换,期间在转化的时候可能会存储些信息到State
(Flink
给我们提供的存储,你就当做是会存到HDFS
上就好了),最终输出到Sink
。
从上面的知识点我们应该可以知道, 在Flink
做checkpoint
的时候JobManager
往每个Source
任务(简单对应图上的两个paritiion
) 发送checkpointId
,然后做快照存储。
显然,source
任务存储最主要的内容就是消费分区的offset
嘛。比如现在source 1
的offerset
是100
,而source2
的offset
是105
。
目前看来source2
的数据会比source1
的数据先到达Map
假定我们用的是BarrierBuffer
exactly-once
模式,那么source2
的barrier
到达Map
算子的后,source2
之后的数据只能停下来,放到buffer
上,不做处理。等source1
的barrier
来了以后,再真正处理source2
放在buffer
的数据。
这就是所谓的barrier
对齐
假定我们用的是BarrierTracker
at least once
模式,那么source2
的barrier
到达Map
算子的后,source2
之后的数据不会停下来等待source1
,后面的数据会继续处理。
现在问题就来了,那对不对齐的区别是什么呢?
依照上面图的的运行状态(无论是BarrierTracker
at least once
模式还是BarrierBuffer
exactly-once
模式),现在我们的checkpoint
都没做,因为source1
的barrier
还没到sink
端呢。现在Flink
挂了,那显然会重新拉取source 1
的offerset
是小于100
,而source2
的offset
是小于105
的数据,State
的最终信息也不会保存。
checkpoint
从没做过的时候,对数据不会产生任何的影响(所以这里在Flink
的内部是没啥区别的)
而假设我们现在是BarrierTracker
at least once
模式,没有任何问题,程序继续执行。现在source1
的barrier
也走到了slink
,最后完成了一次checkpoint
。
由于source2
的barrier
比source1
的barrier
要快,那么source1
所处理的State
的数据实际是包括offset>105
的数据的,自然Flink
保存的时候也会把这部分保存进去。
程序继续运行,刚好保存完checkpoint
后,此时系统出了问题,挂了。因为checkpoint
已经做完了,所以Flink
会从source 1
的offerset
是100
,而source2
的offset
是105
重新消费。
但是,由于我们是BarrierTracker
at least once
模式,所以State
里边的保存状态实际上有过source2
的offset
大于105
的记录了。那source2
重新从offset
是105
开始消费,那就是会重复消费!
理解了上面所讲的话,那再看BarrierBuffer
exactly-once
模式应该就不难理解了(各位大哥大嫂你也要经过这个operator
处理保存吗?我们一起吧?有问题,我们一起重来,没问题我们一起保存)
无论是BarrierTracker
还是BarrierBuffer
也好,在处理checkpoint
的时候都需要调用notifyCheckpoint()
方法,而notifyCheckpoint()
方法最终调用的是triggerCheckpointOnBarrier
triggerCheckpointOnBarrier()
最终还是会调用performCheckpoint()
方法,所以无论是source
接收到checkpoint
还是operator
接收到checkpoint
,最终还是会调用performCheckpoint()
方法。
大家有兴趣可以进去checkpointState()
方法里边详细看看,里边会对State
状态信息进行写入,完成后上报给TaskManager
TaskManager总结
TaskExecutor
接收到JobManager
下发的checkpoint
,由triggerCheckpoint
方法进行处理
triggerCheckpoint
方法最终会调用org.apache.flink.streaming.runtime.tasks.StreamTask#triggerCheckpoint
,而最主要的就是performCheckpoint
方法performCheckpoint
方法会对checkpoint
做前置处理,barrier
广播到下游,处理State
状态做快照,最后回到成功消息给JobManager
- 普通算子由
org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput
这个方法读取数据,具体处理逻辑在getNextNonBlocked
方法上。
- 该方法有两个实例,分别是
BarrierBuffer
和BarrierTracker
,这两个实例对应着checkpoint
不同的模式(至少一次和精确一次)。精确一次需要对barrier
对齐,有可能导致反压的情况 - 最后处理完,会调用
notifyCheckpoint
方法,实际上还是会调performCheckpoint
方法
所以说,最终处理checkpoint
的逻辑是一致的,只是会source
会直接通过TaskExecutor
处理,而普通算子会根据不同的配置在接收到后有不同的实例处理:BarrierTracker
/BarrierBuffer
。
JobManager接收回应
前面提到了,无论是source
还是普通算子,都会调用performCheckpoint
方法进行处理。
performCheckpoint
方法里边处理完State
快照的逻辑,会调用reportCompletedSnapshotStates
告诉JobManager
快照已经处理完了。
reportCompletedSnapshotStates
方法里边又会调用acknowledgeCheckpoint
方法通过RPC
去通知JobManager
兜兜转转,最后还是会回到checkpointCoordinator
上,调用receiveAcknowledgeMessage
进行处理
进入到receiveAcknowledgeMessage
上,主要就是下面图的逻辑:处理完返回不同的状态,根据不同的状态进行处理
主要我们看的其实就是acknowledgeTask
方法里边做了些什么。
在 PendingCheckpoint
维护了两个Map:
// 已经接收到 Ack 的算子的状态句柄 private final Map<OperatorID, OperatorState> operatorStates; // 需要 Ack 但还没有接收到的 Task private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
然后我们进去acknowledgeTask
简单了解一下可以发现就是在处理operatorStates
和notYetAcknowledgedTasks
synchronized (lock) { if (discarded) { return TaskAcknowledgeResult.DISCARDED; } // 接收到Task了,从notYetAcknowledgedTasks移除 final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId); if (vertex == null) { if (acknowledgedTasks.contains(executionAttemptId)) { return TaskAcknowledgeResult.DUPLICATE; } else { return TaskAcknowledgeResult.UNKNOWN; } } else { acknowledgedTasks.add(executionAttemptId); } // ... if (operatorSubtaskStates != null) { for (OperatorID operatorID : operatorIDs) { // ... OperatorState operatorState = operatorStates.get(operatorID); // 新来的operatorID,添加到operatorStates if (operatorState == null) { operatorState = new OperatorState( operatorID, vertex.getTotalNumberOfParallelSubtasks(), vertex.getMaxParallelism()); operatorStates.put(operatorID, operatorState); } //.... } }
等到所有的Task
都到齐以后,就会调用isFullyAcknowledged
进行处理。
最后调用completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
来实现最终的存储,所有完毕以后会通知所有的Task
现在checkpoint
已经完成了。
最后
总的来说,这篇文章带着大家走马观花撸了下Checkpoint
,很多细节我也没去深入,但我认为这篇文章可以让你大概了解到Checkpoint
的实现过程。
最后再来看看官网的图,看完应该大概就能看得懂啦:
相信我,或许你现在还没用到Flink
,但等你真正去用Flink
的时候,checkpoint
是肯定得搞搞的