Apache Flink fault tolerance源码剖析(四)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 上篇文章我们探讨了Zookeeper在Flink的fault tolerance中发挥的作用(存储/恢复已完成的检查点以及检查点编号生成器)。 这篇文章会谈论一种特殊的检查点,Flink将之命名为——Savepoint(保存点)。

上篇文章我们探讨了Zookeeper在Flink的fault tolerance中发挥的作用(存储/恢复已完成的检查点以及检查点编号生成器)。

这篇文章会谈论一种特殊的检查点,Flink将之命名为——Savepoint(保存点)。

因为保存点只不过是一种特殊的检查点,所以在Flink中并没有太多代码实现。但作为一个特性,值得花费一个篇幅来介绍。

检查点VS保存点

使用数据流API编写的程序可以从保存点来恢复执行。保存点允许你在更新程序的同时还能保证Flink集群不丢失任何状态。

保存点是人工触发的检查点,它会对应用程序做快照并将快照存入持久化存储(state backend)。保存点依赖于常规的检查点机制,在程序执行期间,Flink会周期性得在工作节点上执行快照并产生检查点。恢复机制仅仅需要最新的已完成的检查点,一旦有新的检查点完成,老的检查点就可以被安全地丢弃。

保存点跟那些周期性的检查点是相似的。不同点有两个:

  • 它们是由用户触发的
  • 当有新的已完成的检查点产生的时候,不会自动失效

flink-fault-tolerance-4_savepoint-and-checkpoint

上图是两者区别的一个图示。在上面的例子中,job 0xA312Bc产生了检查点c1c2c3c4。周期性的检查点c1c2已经被丢弃了,c4是最新的检查点。而c2有些特别,它的状态关联着保存点s1,它已被用户触发了并且不会自动过期(图中可见c1c3在新的检查点产生之后,已经自动过期了)。

需要注意的是,s1仅仅是一个指向检查点c2指针。这意味着,真实的状态不会被拷贝给保存点,但是关联的检查点的状态会得到保存。

保存点的触发机制

上面我们说保存点跟检查点其中一个显著的区别是保存点是用户自行触发的。那么用户是通过什么手段触发的?答案是Flink提供的命令行客户端

Flink有个独立的client模块flink-clients。触发代码所在的类位于该模块下的CliFrontend

org.apache.flink.client.CliFrontend

代码位于方法triggerSavepoint中:

ActorGateway jobManager = getJobManagerGateway(options);

logAndSysout("Triggering savepoint for job " + jobId + ".");
Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId),new FiniteDuration(1, TimeUnit.HOURS));

基于Akka的actor的消息驱动机制,client会向jobManager发送一个TriggerSavepoint消息。来驱动jobManager响应触发保存点请求。

Flink定义了一系列跟client交互的消息:

org.apache.flink.runtime.messages.JobManagerMessages

  /**
    * Triggers a savepoint for the specified job.
    *
    * This is not a subtype of [[AbstractCheckpointMessage]], because it is a
    * control-flow message, which is *not* part of the checkpointing mechanism
    * of triggering and acknowledging checkpoints.
    *
    * @param jobId The JobID of the job to trigger the savepoint for.
    */
  case class TriggerSavepoint(jobId: JobID) extends RequiresLeaderSessionID

  /**
    * Response after a successful savepoint trigger containing the savepoint path.
    *
    * @param jobId The job ID for which the savepoint was triggered.
    * @param savepointPath The path of the savepoint.
    */
  case class TriggerSavepointSuccess(jobId: JobID, savepointPath: String)

  /**
    * Response after a failed savepoint trigger containing the failure cause.
    *
    * @param jobId The job ID for which the savepoint was triggered.
    * @param cause The cause of the failure.
    */
  case class TriggerSavepointFailure(jobId: JobID, cause: Throwable)

  /**
    * Disposes a savepoint.
    *
    * @param savepointPath The path of the savepoint to dispose.
    */
  case class DisposeSavepoint(savepointPath: String) extends RequiresLeaderSessionID

  /** Response after a successful savepoint dispose. */
  case object DisposeSavepointSuccess

  /**
    * Response after a failed savepoint dispose containing the failure cause.
    *
    * @param cause The cause of the failure.
    */
  case class DisposeSavepointFailure(cause: Throwable)

那么JobManager是如何响应TriggerSavepoint消息的呢?

            future {
              try {
                // Do this async, because checkpoint coordinator operations can
                // contain blocking calls to the state backend or ZooKeeper.
                val savepointFuture = savepointCoordinator.triggerSavepoint(
                  System.currentTimeMillis())

                savepointFuture.onComplete {
                  // Success, respond with the savepoint path
                  case scala.util.Success(savepointPath) =>
                    senderRef ! TriggerSavepointSuccess(jobId, savepointPath)

                  // Failure, respond with the cause
                  case scala.util.Failure(t) =>
                    senderRef ! TriggerSavepointFailure(
                      jobId,
                      new Exception("Failed to complete savepoint", t))
                }(context.dispatcher)
              } catch {
                case e: Exception =>
                  senderRef ! TriggerSavepointFailure(jobId, new Exception(
                    "Failed to trigger savepoint", e))
              }
            }(context.dispatcher)

从代码中可见,它调用了SavepointCoordinator#triggerSavepoint方法来完成触发保存点的逻辑,并返回一个Future对象,然后为其注册了一个callback。在触发的检查点转变为已完成的检查点之后,该callback将会被触发调用,如果成功将给client回复TriggerSavepointSuccess消息。

具体的触发保存点的逻辑是在类SavepointCoordinator中实现的。我们在分析检查点触发机制时,谈论了CheckpointCoordinatorSavepointCoordinatorCheckpointCoordinator的子类。

SavepointCoordinatortriggerSavepoint中,其具体的触发逻辑又间接调用了父类CheckpointCoordinator的实例方法triggerCheckpoint

try {
    // All good. The future will be completed as soon as the
    // triggered checkpoint is done.
    success = triggerCheckpoint(timestamp, checkpointId);
}
finally {
    if (!success) {
        savepointPromises.remove(checkpointId);
        promise.failure(new Exception("Failed to trigger savepoint"));
    }
}

这里需要注意的是,CheckpointCoordinatortriggerCheckpoint产生的只是PendingCheckpoint,即并未完成的检查点。这时,保存点并未建立跟当前检查点的关系(因为PendingCheckpoint并不一定会成功地转化成CompletedCheckpoint,这个时候建立对应关系没有意义),直到该检查点变成已完成的检查点。

在一个检查点变成已完成的检查点CompletedCheckpoint后会触发一个回调onFullyAcknowledgedCheckpoint,此时才是保存点跟该检查点建立关系的时机:

    protected void onFullyAcknowledgedCheckpoint(CompletedCheckpoint checkpoint) {
        // Sanity check
        Promise<String> promise = checkNotNull(savepointPromises
                .remove(checkpoint.getCheckpointID()));

        // Sanity check
        if (promise.isCompleted()) {
            throw new IllegalStateException("Savepoint promise completed");
        }

        try {
            // Save the checkpoint
            String savepointPath = savepointStore.putState(checkpoint);
            promise.success(savepointPath);
        }
        catch (Exception e) {
            LOG.warn("Failed to store savepoint.", e);
            promise.failure(e);
        }
    }

也正是在调用了

promise.success(savepointPath);

之后,JobManager才会真正回复client消息(即触发savepointFuture.onComplete回调)。

与此同时,从上面的代码段中我们也看到了保存点跟检查点是如何建立关系的。它就是savepointStore,也就是之前提到的指针savepointStore类型是StateStore,这是我们下面要分析的内容——保存点状态的存取。

保存点状态

在Flink中提供了一个接口:StateStore来支持保存点状态的存取。它对外提供了存取保存点状态的方法:

  • putState
  • getState
  • disposeState

不管,最终的存储介质是什么,他们都是基于逻辑路径(logic path)的存取方式。

目前该接口有三个实现:

  • FileSystemStateStore:基于文件系统的状态存储
  • HeapStateStore:基于Java堆内存的状态存储
  • SavepointStore:对保存点的状态存取,是装饰器模式的实现并且将泛型类型具体化为CompletedCheckpoint

这三个接口中前两个接口真正是基于不同存储介质的状态保存机制的实现。并且它们在Flink中也分别对应了两种存储机制:FileSystemStateStore对应filesystemHeapStateStore对应jobmanager

JobManager

这是保存点机制的默认实现。保存点被存储在job manager的堆内存中。它们在job manager关闭后会丢失。这种模式只在同一集群运行中你关闭以及恢复程序时才有用。不推荐在生产环境中使用这种模式。并且这种模式,保存点也不是job manager高可用保证的一部分。

配置如下:

savepoints.state.backend: jobmanager

File system

保存点存储在文件系统基于配置的文件夹中。它们对集群的每个节点的实例都可见,并且允许你的程序在不同的集群之间进行迁移。

配置:

savepoints.state.backend: filesystem 
savepoints.state.backend.fs.dir: hdfs:///flink/savepoints

需要注意的是,一个保存点是一个指向已完成的检查点的指针。那意味着保存点的状态不仅仅指保存点文件本身所存储的内容,而且需要包含检查点数据(可能被存储在另一个文件集合中)。因此,如果你使用filesystem作为保存点的持久化方式而使用jobmanager作为检查点的持久化方式,那么这种情况下Flink将无法实现fault tolerance,因为在job manager重启之后检查点的数据将无法被访问。所以最好保证两个机制的一致性。

Flink通过SavepointStoreFactory#createFromConfig结合配置文件来创建具体的StateStore的实现。

小结

本篇我们主要围绕了Flink的保存点进行展开,分析了保存点跟检查点的联系与区别,结合代码分析了保存点的触发机制以及保存点状态的存储。





原文发布时间为:2016-06-04


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
21天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
505 13
Apache Flink 2.0-preview released
|
25天前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
59 3
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
205 2
|
2月前
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
373 31
Apache Flink 流批融合技术介绍
|
30天前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
52 1
|
29天前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
|
1月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
67 0
|
3月前
|
Java 微服务 Spring
驾驭复杂性:Spring Cloud在微服务构建中的决胜法则
【8月更文挑战第31天】Spring Cloud是在Spring Framework基础上打造的微服务解决方案,提供服务发现、配置管理、消息路由等功能,适用于构建复杂的微服务架构。本文介绍如何利用Spring Cloud搭建微服务,包括Eureka服务发现、Config Server配置管理和Zuul API网关等组件的配置与使用。通过Spring Cloud,可实现快速开发、自动化配置,并提升系统的伸缩性和容错性,尽管仍需面对分布式事务等挑战,但其强大的社区支持有助于解决问题。
73 0
|
3月前
|
消息中间件 Java 数据处理
揭秘Apache Flink的Exactly-Once神技:如何在数据流海中确保每条信息精准无误,不丢不重?
【8月更文挑战第26天】Apache Flink 是一款先进的流处理框架,其核心特性 Exactly-Once 语义保证了数据处理的精准无误。尤其在金融及电商等高要求场景下,该特性极为关键。本文深入解析 Flink 如何实现 Exactly-Once 语义:通过状态管理确保中间结果可靠存储;利用一致的检查点机制定期保存状态快照;以及通过精确的状态恢复避免数据重复处理或丢失。最后,提供一个 Java 示例,展示如何计算用户访问次数,并确保 Exactly-Once 语义的应用。
86 0

推荐镜像

更多