Apache Flink fault tolerance源码剖析(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 继续Flink Fault Tolerance机制剖析。上一篇文章我们结合代码讲解了Flink中检查点是如何应用的(如何根据快照做失败恢复,以及检查点被应用的场景),这篇我们来谈谈检查点的触发机制以及基于Actor的消息驱动的协同机制。

继续Flink Fault Tolerance机制剖析。上一篇文章我们结合代码讲解了Flink中检查点是如何应用的(如何根据快照做失败恢复,以及检查点被应用的场景),这篇我们来谈谈检查点的触发机制以及基于Actor的消息驱动的协同机制。这篇涉及到一个非常关键的类——CheckpointCoordinator

org.apache.flink.runtime.checkpoint.CheckpointCoordinator

该类可以理解为检查点的协调器,用来协调operatorstate的分布式快照。

周期性的检查点触发机制

检查点的触发机制是基于定时器的周期性触发。这涉及到一个定时器的实现类ScheduledTrigger

ScheduledTrigger

触发检查点的定时任务类。其实现就是调用triggerCheckpoint方法。这个方法后面会具体介绍。

public void run() {
    try {
        triggerCheckpoint(System.currentTimeMillis());
    }
    catch (Exception e) {
        LOG.error("Exception while triggering checkpoint", e);
    }
}

startCheckpointScheduler

启动触发检查点的定时任务的方法实现:

    public void startCheckpointScheduler() {
        synchronized (lock) {
            if (shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }

            // make sure all prior timers are cancelled
            stopCheckpointScheduler();

            try {
                // Multiple start calls are OK
                checkpointIdCounter.start();
            } catch (Exception e) {
                String msg = "Failed to start checkpoint ID counter: " + e.getMessage();
                throw new RuntimeException(msg, e);
            }

            periodicScheduling = true;
            currentPeriodicTrigger = new ScheduledTrigger();
            timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);
        }
    }

方法的实现包含两个主要动作:

  • 启动检查点ID计数器checkpointIdCounter
  • 启动触发检查点的定时任务

stopCheckpointScheduler

关闭定时任务的方法,用来释放资源,重置一些标记变量。

triggerCheckpoint

该方法是触发一个新的检查点的核心逻辑。

首先,方法中会去判断一个flag:triggerRequestQueued。该标识表示是否一个检查点的触发请求不能被立即执行。

// sanity check: there should never be more than one trigger request queued
if (triggerRequestQueued) {
    LOG.warn("Trying to trigger another checkpoint while one was queued already");
    return false;
}

如果不能被立即执行,则直接返回。

不能被立即执行的原因是:还有其他处理没有完成。

接着检查正在并发处理的未完成的检查点:

            // if too many checkpoints are currently in progress, we need to mark that a request is queued
            if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
                triggerRequestQueued = true;
                if (currentPeriodicTrigger != null) {
                    currentPeriodicTrigger.cancel();
                    currentPeriodicTrigger = null;
                }
                return false;
            }

如果未完成的检查点过多,大于允许的并发处理的检查点数目的阈值,则将当前检查点的触发请求设置为不能立即执行,如果定时任务已经启动,则取消定时任务的执行,并返回。

以上这些检查处于基于锁机制实现的同步代码块中。

接着检查需要被触发检查点的task是否都处于运行状态:

        ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length];
        for (int i = 0; i < tasksToTrigger.length; i++) {
            Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
            if (ee != null && ee.getState() == ExecutionState.RUNNING) {
                triggerIDs[i] = ee.getAttemptId();
            } else {
                LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
                        tasksToTrigger[i].getSimpleName());
                return false;
            }
        }

只要有一个task不满足条件,则不会触发检查点,并立即返回。

然后检查是否所有需要ack检查点的task都处于运行状态:

        Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);

        for (ExecutionVertex ev : tasksToWaitFor) {
            Execution ee = ev.getCurrentExecutionAttempt();
            if (ee != null) {
                ackTasks.put(ee.getAttemptId(), ev);
            } else {
                LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
                        ev.getSimpleName());
                return false;
            }
        }

如果有一个task不满足条件,则不会触发检查点,并立即返回。

以上条件都满足(即没有return false;),才具备触发一个检查点的基本条件。

下一步,获得checkpointId

        final long checkpointID;
        if (nextCheckpointId < 0) {
            try {
                // this must happen outside the locked scope, because it communicates
                // with external services (in HA mode) and may block for a while.
                checkpointID = checkpointIdCounter.getAndIncrement();
            }
            catch (Throwable t) {
                int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
                LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
                return false;
            }
        }
        else {
            checkpointID = nextCheckpointId;
        }

这依赖于该方法的另一个参数nextCheckpointId,如果其值为-1,则起到标识的作用,指示checkpointId将从外部获取(比如Zookeeper,后续文章会谈及检查点ID的生成机制)。

接着创建一个PendingCheckpoint对象:

final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);

该类表示一个待处理的检查点。

与此同时,会定义一个针对当前检查点超时进行资源清理的取消器canceller。该取消器主要是针对检查点没有释放资源的情况进行资源释放操作,同时还会调用triggerQueuedRequests方法启动一个触发检查点的定时任务,如果有的话(取决于triggerRequestQueued是否为true)。

然后会再次进入同步代码段,对上面的是否新建检查点的判断条件做二次检查,防止产生竞态条件。这里做二次检查的原因是,中间有一段关于获得checkpointId的代码,不在同步块中。

检查后,如果触发检查点的条件仍然是满足的,那么将上面创建的PendingCheckpoint对象加入集合中:

pendingCheckpoints.put(checkpointID, checkpoint);

同时会启动针对当前检查点的超时取消器:

timer.schedule(canceller, checkpointTimeout);

接下来会发送消息给task以真正触发检查点(基于消息驱动的协同机制):

for (int i = 0; i < tasksToTrigger.length; i++) {
    ExecutionAttemptID id = triggerIDs[i];
    TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
    tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
}

基于Actor的消息驱动的协同机制

上面已经谈到了检查点的触发机制是基于定时任务的周期性触发,那么定时任务的启停机制又是什么?Flink使用的是基于AKKA的Actor模型的消息驱动机制。

CheckpointCoordinatorDeActivator是一个Actor的实现,它用于基于消息来驱动检查点的定时任务的启停:

    public void handleMessage(Object message) {
        if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
            JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus();

            if (status == JobStatus.RUNNING) {
                // start the checkpoint scheduler
                coordinator.startCheckpointScheduler();
            } else {
                // anything else should stop the trigger for now
                coordinator.stopCheckpointScheduler();
            }
        }

        // we ignore all other messages
    }

Actor会收到Job状态的变化通知:JobStatusChanged。一旦变成RUNNING,那么检查点的定时任务会被立即启动;否则会被立即关闭。

Actor被创建的代码是CheckpointCoordinator中的createActivatorDeactivator方法:

    public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) {
        synchronized (lock) {
            if (shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }

            if (jobStatusListener == null) {
                Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, leaderSessionID);

                // wrap the ActorRef in a AkkaActorGateway to support message decoration
                jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID);
            }

            return jobStatusListener;
        }
    }

既然,是基于消息驱动机制,那么就需要各种类型的消息对应不同的业务逻辑。这些消息在Flink中被定义在package:org.apache.flink.runtime.messages.checkpoint中。

类图如下:

flink-fault-tolerance-2_message-class-diagram

AbstractCheckpointMessage

检查点消息的基础抽象类,提供了三个公共属性(从构造器注入):

  • job:JobID的实例,表示当前这条消息实例的归属;
  • taskExecutionId:ExecutionAttemptID的实例,表示检查点的源/目的任务
  • checkpointId:当前消息协调的检查点ID

除此之外,该实现仅仅override了hashCodeequals方法。

TriggerCheckpoint

该消息由JobManager发送给TaskManager,用于告诉一个task触发它的检查点。

触发消息

位于CheckpointCoordinator类的triggerCheckpoint中,上面已经提及过。

for (int i = 0; i < tasksToTrigger.length; i++) {
    ExecutionAttemptID id = triggerIDs[i];
    TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
    tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
}

消息处理

TaskManagerhandleCheckpointingMessage实现:

      case message: TriggerCheckpoint =>
        val taskExecutionId = message.getTaskExecutionId
        val checkpointId = message.getCheckpointId
        val timestamp = message.getTimestamp

        log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.")

        val task = runningTasks.get(taskExecutionId)
        if (task != null) {
          task.triggerCheckpointBarrier(checkpointId, timestamp)
        } else {
          log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")
        }

主要是触发检查点屏障Barrier

DeclineCheckpoint

该消息由TaskManager发送给JobManager,用于告诉检查点协调器:检查点的请求还没有能够被处理。这种情况通常发生于:某task已处于RUNNING状态,但在内部可能还没有准备好执行检查点。

它除了AbstractCheckpointMessage需要的三个属性外,还需要用于关联检查点的timestamp

触发消息

位于Task类的triggerCheckpointBarrier方法中:

                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
                            if (!success) {
                                DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp);
                                jobManager.tell(decline);
                            }
                        }
                        catch (Throwable t) {
                            if (getExecutionState() == ExecutionState.RUNNING) {
                                failExternally(new RuntimeException(
                                    "Error while triggering checkpoint for " + taskName,
                                    t));
                            }
                        }
                    }
                };

消息处理

位于JobManagerhandleCheckpointMessage

具体的实现在CheckpointCoordinatorreceiveDeclineMessage中:

首先从接收的消息中(DeclineCheckpoint)获得检查点编号:

final long checkpointId = message.getCheckpointId();

接下来的逻辑是判断当前检查点是否是未完成的检查点:isPendingCheckpoint

接下来分为三种情况对待:

  • 如果是未完成的检查点,并且相关资源没有被释放(检查点没有被discarded
isPendingCheckpoint = true;
pendingCheckpoints.remove(checkpointId);
checkpoint.discard(userClassLoader);
rememberRecentCheckpointId(checkpointId);

isPendingCheckpointtrue,根据检查点编号,将检查点从未完成的检查点集合中移除,discard检查点,记住最近的检查点(将其保持到到一个最近的检查点列表中)。

接下来查找是否还有待处理的检查点,根据检查点时间戳来判断:

boolean haveMoreRecentPending = false;
Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
while (entries.hasNext()) {
    PendingCheckpoint p = entries.next().getValue();
    if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
        haveMoreRecentPending = true;
        break;
    }
}

根据标识haveMoreRecentPending来进入不同的处理逻辑:

if (!haveMoreRecentPending && !triggerRequestQueued) {
    LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId);
    triggerCheckpoint(System.currentTimeMillis());
} else if (!haveMoreRecentPending) {
    LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId);
    triggerQueuedRequests();
}

如果有需要处理的检查点,并且当前能立即处理,则立即触发检查点定时任务;如果有需要处理的检查点,但不能立即处理,则触发入队的定时任务。

  • 如果是未完成的检查点,并且检查点已经被discarded

抛出IllegalStateException异常

  • 如果不是未完成的检查点

如果在最近未完成的检查点列表中找到,则有可能表示消息来迟了,将isPendingCheckpoint置为true,否则将isPendingCheckpoint置为false.

最后返回isPendingCheckpoint

AcknowledgeCheckpoint

该消息是一个应答信号,表示某个独立的task的检查点已经完成。也是由TaskManager发送给JobManager。该消息会携带task的状态:

  • state
  • stateSize

触发消息

RuntimeEnvironment类的acknowledgeCheckpoint方法。

消息处理

具体的实现在CheckpointCoordinatorreceiveAcknowledgeMessage中,开始的实现同receiveDeclineMessage,也是判断当前接收到的消息中包含的检查点是否是待处理的检查点。如果是,并且也没有discard掉,则执行如下逻辑:

                if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize())) {
                    if (checkpoint.isFullyAcknowledged()) {
                        completed = checkpoint.toCompletedCheckpoint();

                        completedCheckpointStore.addCheckpoint(completed);

                        LOG.info("Completed checkpoint " + checkpointId + " (in " +
                                completed.getDuration() + " ms)");
                        LOG.debug(completed.getStates().toString());

                        pendingCheckpoints.remove(checkpointId);
                        rememberRecentCheckpointId(checkpointId);

                        dropSubsumedCheckpoints(completed.getTimestamp());

                        onFullyAcknowledgedCheckpoint(completed);

                        triggerQueuedRequests();
                    }
                }

检查点首先应答相关的task,如果检查点已经完全应答完成,则将检查点转换成CompletedCheckpoint,然后将其加入completedCheckpointStore列表,并从pendingCheckpoints中移除。然后调用dropSubsumedCheckpoints它会从pendingCheckpointsdiacard所有时间戳小于当前检查点的时间戳,并从集合中移除。

最后,如果该检查点被转化为已完成的检查点,则:

        if (completed != null) {
            final long timestamp = completed.getTimestamp();

            for (ExecutionVertex ev : tasksToCommitTo) {
                Execution ee = ev.getCurrentExecutionAttempt();
                if (ee != null) {
                    ExecutionAttemptID attemptId = ee.getAttemptId();
                    NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
                    ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
                }
            }

            statsTracker.onCompletedCheckpoint(completed);
        }

迭代所有待commit的task,发送NotifyCheckpointComplete消息。同时触发状态跟踪器的onCompletedCheckpoint回调方法。

NotifyCheckpointComplete

该消息由JobManager发送给TaskManager,用于告诉一个task它的检查点已经得到完成确认,task可以向第三方提交该检查点。

触发消息

位于CheckpointCoordinator类的receiveAcknowledgeMessage方法中,当检查点acktask完成,转化为CompletedCheckpoint之后

        if (completed != null) {
            final long timestamp = completed.getTimestamp();

            for (ExecutionVertex ev : tasksToCommitTo) {
                Execution ee = ev.getCurrentExecutionAttempt();
                if (ee != null) {
                    ExecutionAttemptID attemptId = ee.getAttemptId();
                    NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
                    ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
                }
            }

            statsTracker.onCompletedCheckpoint(completed);
        }

消息处理

TaskManagerhandleCheckpointingMessage

实现:

      case message: NotifyCheckpointComplete =>
        val taskExecutionId = message.getTaskExecutionId
        val checkpointId = message.getCheckpointId
        val timestamp = message.getTimestamp

        log.debug(s"Receiver ConfirmCheckpoint $checkpointId@$timestamp for $taskExecutionId.")

        val task = runningTasks.get(taskExecutionId)
        if (task != null) {
          task.notifyCheckpointComplete(checkpointId)
        } else {
          log.debug(
            s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.")
        }

主要是触发tasknotifyCheckpointComplete方法。

小结

这篇文章主要讲解了检查点的基于定时任务的周期性的触发机制,以及基于Akka的Actor模型的消息驱动的协同处理机制。




原文发布时间为:2016-05-29


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
413 33
The Past, Present and Future of Apache Flink
|
4月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
1054 13
Apache Flink 2.0-preview released
|
4月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
182 3
|
5月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
6月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
409 2
|
5月前
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
487 31
Apache Flink 流批融合技术介绍
|
4月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
128 1
|
4月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
415 0
|
4月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
123 0
|
6月前
|
Java 微服务 Spring
驾驭复杂性:Spring Cloud在微服务构建中的决胜法则
【8月更文挑战第31天】Spring Cloud是在Spring Framework基础上打造的微服务解决方案,提供服务发现、配置管理、消息路由等功能,适用于构建复杂的微服务架构。本文介绍如何利用Spring Cloud搭建微服务,包括Eureka服务发现、Config Server配置管理和Zuul API网关等组件的配置与使用。通过Spring Cloud,可实现快速开发、自动化配置,并提升系统的伸缩性和容错性,尽管仍需面对分布式事务等挑战,但其强大的社区支持有助于解决问题。
111 0

推荐镜像

更多