Apache Flink fault tolerance源码剖析(四)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 上篇文章我们探讨了Zookeeper在Flink的fault tolerance中发挥的作用(存储/恢复已完成的检查点以及检查点编号生成器)。 这篇文章会谈论一种特殊的检查点,Flink将之命名为——Savepoint(保存点)。

上篇文章我们探讨了Zookeeper在Flink的fault tolerance中发挥的作用(存储/恢复已完成的检查点以及检查点编号生成器)。

这篇文章会谈论一种特殊的检查点,Flink将之命名为——Savepoint(保存点)。

因为保存点只不过是一种特殊的检查点,所以在Flink中并没有太多代码实现。但作为一个特性,值得花费一个篇幅来介绍。

检查点VS保存点

使用数据流API编写的程序可以从保存点来恢复执行。保存点允许你在更新程序的同时还能保证Flink集群不丢失任何状态。

保存点是人工触发的检查点,它会对应用程序做快照并将快照存入持久化存储(state backend)。保存点依赖于常规的检查点机制,在程序执行期间,Flink会周期性得在工作节点上执行快照并产生检查点。恢复机制仅仅需要最新的已完成的检查点,一旦有新的检查点完成,老的检查点就可以被安全地丢弃。

保存点跟那些周期性的检查点是相似的。不同点有两个:

  • 它们是由用户触发的
  • 当有新的已完成的检查点产生的时候,不会自动失效

flink-fault-tolerance-4_savepoint-and-checkpoint

上图是两者区别的一个图示。在上面的例子中,job 0xA312Bc产生了检查点c1c2c3c4。周期性的检查点c1c2已经被丢弃了,c4是最新的检查点。而c2有些特别,它的状态关联着保存点s1,它已被用户触发了并且不会自动过期(图中可见c1c3在新的检查点产生之后,已经自动过期了)。

需要注意的是,s1仅仅是一个指向检查点c2指针。这意味着,真实的状态不会被拷贝给保存点,但是关联的检查点的状态会得到保存。

保存点的触发机制

上面我们说保存点跟检查点其中一个显著的区别是保存点是用户自行触发的。那么用户是通过什么手段触发的?答案是Flink提供的命令行客户端

Flink有个独立的client模块flink-clients。触发代码所在的类位于该模块下的CliFrontend

org.apache.flink.client.CliFrontend

代码位于方法triggerSavepoint中:

ActorGateway jobManager = getJobManagerGateway(options);

logAndSysout("Triggering savepoint for job " + jobId + ".");
Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId),new FiniteDuration(1, TimeUnit.HOURS));

基于Akka的actor的消息驱动机制,client会向jobManager发送一个TriggerSavepoint消息。来驱动jobManager响应触发保存点请求。

Flink定义了一系列跟client交互的消息:

org.apache.flink.runtime.messages.JobManagerMessages

  /**
    * Triggers a savepoint for the specified job.
    *
    * This is not a subtype of [[AbstractCheckpointMessage]], because it is a
    * control-flow message, which is *not* part of the checkpointing mechanism
    * of triggering and acknowledging checkpoints.
    *
    * @param jobId The JobID of the job to trigger the savepoint for.
    */
  case class TriggerSavepoint(jobId: JobID) extends RequiresLeaderSessionID

  /**
    * Response after a successful savepoint trigger containing the savepoint path.
    *
    * @param jobId The job ID for which the savepoint was triggered.
    * @param savepointPath The path of the savepoint.
    */
  case class TriggerSavepointSuccess(jobId: JobID, savepointPath: String)

  /**
    * Response after a failed savepoint trigger containing the failure cause.
    *
    * @param jobId The job ID for which the savepoint was triggered.
    * @param cause The cause of the failure.
    */
  case class TriggerSavepointFailure(jobId: JobID, cause: Throwable)

  /**
    * Disposes a savepoint.
    *
    * @param savepointPath The path of the savepoint to dispose.
    */
  case class DisposeSavepoint(savepointPath: String) extends RequiresLeaderSessionID

  /** Response after a successful savepoint dispose. */
  case object DisposeSavepointSuccess

  /**
    * Response after a failed savepoint dispose containing the failure cause.
    *
    * @param cause The cause of the failure.
    */
  case class DisposeSavepointFailure(cause: Throwable)

那么JobManager是如何响应TriggerSavepoint消息的呢?

            future {
              try {
                // Do this async, because checkpoint coordinator operations can
                // contain blocking calls to the state backend or ZooKeeper.
                val savepointFuture = savepointCoordinator.triggerSavepoint(
                  System.currentTimeMillis())

                savepointFuture.onComplete {
                  // Success, respond with the savepoint path
                  case scala.util.Success(savepointPath) =>
                    senderRef ! TriggerSavepointSuccess(jobId, savepointPath)

                  // Failure, respond with the cause
                  case scala.util.Failure(t) =>
                    senderRef ! TriggerSavepointFailure(
                      jobId,
                      new Exception("Failed to complete savepoint", t))
                }(context.dispatcher)
              } catch {
                case e: Exception =>
                  senderRef ! TriggerSavepointFailure(jobId, new Exception(
                    "Failed to trigger savepoint", e))
              }
            }(context.dispatcher)

从代码中可见,它调用了SavepointCoordinator#triggerSavepoint方法来完成触发保存点的逻辑,并返回一个Future对象,然后为其注册了一个callback。在触发的检查点转变为已完成的检查点之后,该callback将会被触发调用,如果成功将给client回复TriggerSavepointSuccess消息。

具体的触发保存点的逻辑是在类SavepointCoordinator中实现的。我们在分析检查点触发机制时,谈论了CheckpointCoordinatorSavepointCoordinatorCheckpointCoordinator的子类。

SavepointCoordinatortriggerSavepoint中,其具体的触发逻辑又间接调用了父类CheckpointCoordinator的实例方法triggerCheckpoint

try {
    // All good. The future will be completed as soon as the
    // triggered checkpoint is done.
    success = triggerCheckpoint(timestamp, checkpointId);
}
finally {
    if (!success) {
        savepointPromises.remove(checkpointId);
        promise.failure(new Exception("Failed to trigger savepoint"));
    }
}

这里需要注意的是,CheckpointCoordinatortriggerCheckpoint产生的只是PendingCheckpoint,即并未完成的检查点。这时,保存点并未建立跟当前检查点的关系(因为PendingCheckpoint并不一定会成功地转化成CompletedCheckpoint,这个时候建立对应关系没有意义),直到该检查点变成已完成的检查点。

在一个检查点变成已完成的检查点CompletedCheckpoint后会触发一个回调onFullyAcknowledgedCheckpoint,此时才是保存点跟该检查点建立关系的时机:

    protected void onFullyAcknowledgedCheckpoint(CompletedCheckpoint checkpoint) {
        // Sanity check
        Promise<String> promise = checkNotNull(savepointPromises
                .remove(checkpoint.getCheckpointID()));

        // Sanity check
        if (promise.isCompleted()) {
            throw new IllegalStateException("Savepoint promise completed");
        }

        try {
            // Save the checkpoint
            String savepointPath = savepointStore.putState(checkpoint);
            promise.success(savepointPath);
        }
        catch (Exception e) {
            LOG.warn("Failed to store savepoint.", e);
            promise.failure(e);
        }
    }

也正是在调用了

promise.success(savepointPath);

之后,JobManager才会真正回复client消息(即触发savepointFuture.onComplete回调)。

与此同时,从上面的代码段中我们也看到了保存点跟检查点是如何建立关系的。它就是savepointStore,也就是之前提到的指针savepointStore类型是StateStore,这是我们下面要分析的内容——保存点状态的存取。

保存点状态

在Flink中提供了一个接口:StateStore来支持保存点状态的存取。它对外提供了存取保存点状态的方法:

  • putState
  • getState
  • disposeState

不管,最终的存储介质是什么,他们都是基于逻辑路径(logic path)的存取方式。

目前该接口有三个实现:

  • FileSystemStateStore:基于文件系统的状态存储
  • HeapStateStore:基于Java堆内存的状态存储
  • SavepointStore:对保存点的状态存取,是装饰器模式的实现并且将泛型类型具体化为CompletedCheckpoint

这三个接口中前两个接口真正是基于不同存储介质的状态保存机制的实现。并且它们在Flink中也分别对应了两种存储机制:FileSystemStateStore对应filesystemHeapStateStore对应jobmanager

JobManager

这是保存点机制的默认实现。保存点被存储在job manager的堆内存中。它们在job manager关闭后会丢失。这种模式只在同一集群运行中你关闭以及恢复程序时才有用。不推荐在生产环境中使用这种模式。并且这种模式,保存点也不是job manager高可用保证的一部分。

配置如下:

savepoints.state.backend: jobmanager

File system

保存点存储在文件系统基于配置的文件夹中。它们对集群的每个节点的实例都可见,并且允许你的程序在不同的集群之间进行迁移。

配置:

savepoints.state.backend: filesystem 
savepoints.state.backend.fs.dir: hdfs:///flink/savepoints

需要注意的是,一个保存点是一个指向已完成的检查点的指针。那意味着保存点的状态不仅仅指保存点文件本身所存储的内容,而且需要包含检查点数据(可能被存储在另一个文件集合中)。因此,如果你使用filesystem作为保存点的持久化方式而使用jobmanager作为检查点的持久化方式,那么这种情况下Flink将无法实现fault tolerance,因为在job manager重启之后检查点的数据将无法被访问。所以最好保证两个机制的一致性。

Flink通过SavepointStoreFactory#createFromConfig结合配置文件来创建具体的StateStore的实现。

小结

本篇我们主要围绕了Flink的保存点进行展开,分析了保存点跟检查点的联系与区别,结合代码分析了保存点的触发机制以及保存点状态的存储。





原文发布时间为:2016-06-04


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2天前
|
Oracle 关系型数据库 数据库
实时计算 Flink版操作报错合集之执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
38 0
|
3天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
25 5
|
3天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之报错:org.apache.flink.table.api.validationexception如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
34 1
|
3天前
|
存储 SQL 关系型数据库
实时计算 Flink版操作报错合集之报错:WARN (org.apache.kafka.clients.consumer.ConsumerConfig:logUnused)这个错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
33 3
|
6天前
|
存储 缓存 负载均衡
【Apache ShenYu源码】如何实现负载均衡模块设计
整个模块为ShenYu提供了什么功能。我们可以看下上文我们提到的工厂对象。/***/核心方法很清晰,我们传入Upsteam列表,通过这个模块的负载均衡算法,负载均衡地返回其中一个对象。这也就是这个模块提供的功能。
28 1
|
6天前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1844 2
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
6天前
|
Java API Apache
【Apache ShenYu源码】看看贡献者如何实现支持提醒通知设计
在阅读中,还发现了有个html文件忘记加了开源协议,我们提下PR修复下,又收获了一次开源贡献!!PR提交戳这。
29 1
【Apache ShenYu源码】看看贡献者如何实现支持提醒通知设计
|
6天前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1862 2
官宣|Apache Flink 1.19 发布公告
|
6天前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
351 3
|
6天前
|
Oracle 关系型数据库 流计算
flink cdc 同步问题之报错org.apache.flink.util.SerializedThrowable:如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
445 0

热门文章

最新文章

推荐镜像

更多