Apache Flink fault tolerance源码剖析(一)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 因某些童鞋的建议,从这篇文章开始结合源码谈谈Flink Fault Tolerance相关的话题。上篇官方介绍的翻译是理解这个话题的前提,所以如果你想更深入得了解Flink Fault Tolerance的机制,推荐先读一下前篇文章理解它的实现原理。

因某些童鞋的建议,从这篇文章开始结合源码谈谈Flink Fault Tolerance相关的话题。上篇官方介绍的翻译是理解这个话题的前提,所以如果你想更深入得了解Flink Fault Tolerance的机制,推荐先读一下前篇文章理解它的实现原理。当然原理归原理,原理体现在代码实现里并不是想象中的那么直观。这里的源码剖析也是我学习以及理解的过程。

作为源码解析Flink Fault Tolerance的首篇文章,我们先暂且不谈太有深度的东西,先来了解一下:Flink哪里涉及到检查点/快照机制来保存/恢复状态?这就是本篇要谈的主要内容。

跟Flink官方文档的说明一样,在文章中会混杂着检查点快照这两个术语,不要太过于纠结它们,某种程度它们是一致的。

从这篇文章开始,当我谈及一个类,我会给出它的全限定名,以方便大家对照。

在Flink中,需要具备Fault Tolerance能力的通常是两类对象:function以及operator

其中function通常通过实现Checkpointed来达到这个目的,而operator通过实现StreamOpeator(该接口中包含了快照、恢复状态的接口方法)。

我们会分别来分析这两个接口,然后列举一些典型的需要具备Fault Tolerance功能的对象,并分析它们的实现。

Checkpointed

org.apache.flink.streaming.api.checkpoint.Checkpointed

该接口提供给那些需要持久化状态的functionoperator使用。该接口是同步模式的快照机制。

两个接口方法:

  • snapshotState:获得function或者operator的当前状态(快照),这个状态必须反映该function之前的变更所产生的最终结果

该方法接收两个参数,第一个参数是checkpointId,表示该检查点的ID,第二个参数checkpointTimestamp,检查点的时间戳,被JobManagerSystem.currentTimeMillis()驱动

  • restoreState:用于从之前的检查点中恢复functionoperator的状态,需要注意的是该方法的调用会早于open方法

CheckpointedAsynchronously

org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously

该接口继承自Checkpointed,属于标记型接口,用于跟Checkpointed同步快照机制进行区分。

MessageAcknowledgingSourceBase

org.apache.flink.streaming.api.functions.source

该类是Flink内置的众多SouceFunction之一,也是基于具备ack(确认机制)的Message Queue的实现模板之一。

该类的完整签名:

public abstract class MessageAcknowledgingSourceBase<Type, UId>
    extends RichSourceFunction<Type>
    implements Checkpointed<SerializedCheckpointData[]>, CheckpointListener

首先我们从该类的实现接口中可以看到它希望保存的状态是:SerializedCheckpointData[]:

SerializedCheckpointData表示作为快照数据的被序列化元素的集合

有两个非常重要的属性:

/** The list gathering the IDs of messages emitted during the current checkpoint */
private transient List<UId> idsForCurrentCheckpoint;

/** The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion */
private transient ArrayDeque<Tuple2<Long, List<UId>>> pendingCheckpoints;


/**
 * Set which contain all processed ids. Ids are acknowledged after checkpoints. When restoring
 * a checkpoint, ids may be processed again. This happens when the checkpoint completed but the
 * ids for a checkpoint haven't been acknowledged yet.
 */
private transient Set<UId> idsProcessedButNotAcknowledged;
  • idsForCurrentCheckpoint:该集合存储着当前检查点覆盖范围内,消费掉的消息的ID集合
  • pendingCheckpoints:该集合收集的是:检查点以及该检查点中那些已经被触发处理的但没有完成的(或没有收到完成通知的)消息的ID集合对(Tuple2)
  • idsProcessedButNotAcknowledged:它用于存储那些已经被处理过的消息的ID,这些消息的ack在检查点完成之后。也就是说,如果从该检查点开始恢复,那么这些id的消息可能会被重放。

看到上面的定义,虽然都是用来存储跟消息ID相关的“集合”,但却是三种不同的数据结构,而且前两个是有序的,最后一个是无序的

后面的文章我们会谈到检查点分:PendingCheckpoint(未完成的)和CompletedCheckpoint(已完成的)

来看Checkpointed的两个接口方法的实现:

    public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, timestamp: {}",
                    idsForCurrentCheckpoint, checkpointId, checkpointTimestamp);

        pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint));

        idsForCurrentCheckpoint = new ArrayList<>(64);

        return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
    }

首先是构建快照的方法,它将当前检查点checkpointId及其相关的消费走的消息集合idsForCurrentCheckpoint构建成一个元组Tuple2加入待处理的检查点中pendingCheckpoints。接着重新初始化了idsForCurrentCheckpoint(因为当前这个检查点的快照已经生成了,所以跟当前检查点相关的元素也需要清空掉)。

    public void restoreState(SerializedCheckpointData[] state) throws Exception {
        pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
        // build a set which contains all processed ids. It may be used to check if we have
        // already processed an incoming message.
        for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
            idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
        }
    }

接下来是恢复状态的逻辑,它的过程也很简单。首先反序列化状态对象到pendingCheckpoints,然后遍历整个元组集合,针对每个没有完成的检查点元组checkpoint,提取出每个这些checkpoint对应的消息ID集合,将他们全部加入idsProcessedButNotAcknowledged集合中去。

从构建快照的snapshotState方法中,我们看到针对每个checkpointId都将其涵盖范围内的所有的ID集合拼装成一个二元组加入到pendingCheckpoints。而随着消息被消费,如果到最后该checkpointId对应的所有消息ID都被完全处理也就是说该检查点变成了CompletedCheckpoint,那么如何将该二元组从pendingCheckpoints移除?Flink提供了一个CheckpointListener它会在某个检查点完成之后给出通知,客户程序可以订阅它然后进行相应的回调处理notifyCheckpointComplete,该类的回调实现如下:

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        LOG.debug("Committing Messages externally for checkpoint {}", checkpointId);

        for (Iterator<Tuple2<Long, List<UId>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
            Tuple2<Long, List<UId>> checkpoint = iter.next();
            long id = checkpoint.f0;

            if (id <= checkpointId) {
                LOG.trace("Committing Messages with following IDs {}", checkpoint.f1);
                acknowledgeIDs(checkpointId, checkpoint.f1);
                // remove deduplication data
                idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
                // remove checkpoint data
                iter.remove();
            }
            else {
                break;
            }
        }
    }

获取到已完成的checkpointId,然后遍历整个pendingCheckpoints集合,找到所有checkpointId小于当前已完成的checkpointId,然后完成三个动作:

  • ack 该checkpointId对应的所有这些消息的ID
  • 将这些消息的ID从idsProcessedButNotAcknowledged中移除
  • 将该二元组从pendingCheckpoints中移除

为什么这里判断条件是<=呢,因为checkpointId是时序递增的,而且Flink保证如果某个检查点完成,那么比该检查点小的检查点肯定也完成了。因为,检查点越小与其有关的消息集合越早被处理。

另外一个需要注意的是,该方法中的acknowledgeIDs是抽象方法,待具体类根据自己的ack机制实现。

RabbitMQ 对接Flink的Source —— RMQSource就是通过继承MessageAcknowledgingSourceBase 实现的

FlinkKafkaConsumerBase

org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase

FlinkKafkaConsumerBase是Flink实现基于Kafka的Source的基类,Kafka提供基于offset并且可重复消费的机制,使其非常容易实现Fault Tolerance机制,只不过这里实现的是异步模式的检查点机制CheckpointedAsynchronously

该类的完整签名如下:

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
        implements CheckpointListener, CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, ResultTypeQueryable<T>

从签名来看,它快照的数据类型是:

HashMap<KafkaTopicPartition, Long>

该类型描述了kafka消息消费的具体的信息(包含topic,partition,offset)。

然后继续看snapshotState方法:

// the use of clone() is okay here is okay, we just need a new map, the keys are not changed
        //noinspection unchecked
        HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) offsetsState.clone();

        // the map cannot be asynchronously updated, because only one checkpoint call can happen
        // on this function at a time: either snapshotState() or notifyCheckpointComplete()
        pendingCheckpoints.put(checkpointId, currentOffsets);

        while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) {
            pendingCheckpoints.remove(0);
        }

这是其核心代码。它先将offsetsState克隆一份,作为当前检查点的快照,然后放入pendingCheckpoints作为待处理的检查点集合。

这里有一个安全性检查:如果待处理的安全点集合大于默认设定的阈值(100),则移除集合中第一个检查点,这么做的目的是为了防止集合太大导致内存泄漏

restoreState方法的实现,只是将待恢复的偏移量快照对象赋予当前对象的偏移量而已。

MessageAcknowledgingSourceBase,为了得到检查点完成的通知,FlinkKafkaConsumerBase也实现了CheckpointListener接口,以在检查点完成时进行回调处理。来看看notifyCheckpointComplete方法的实现:

            HashMap<KafkaTopicPartition, Long> checkpointOffsets;

            // the map may be asynchronously updates when snapshotting state, so we synchronize
            synchronized (pendingCheckpoints) {
                final int posInMap = pendingCheckpoints.indexOf(checkpointId);
                if (posInMap == -1) {
                    LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
                    return;
                }

                //noinspection unchecked
                checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);


                // remove older checkpoints in map
                for (int i = 0; i < posInMap; i++) {
                    pendingCheckpoints.remove(0);
                }
            }
            if (checkpointOffsets == null || checkpointOffsets.size() == 0) {
                LOG.debug("Checkpoint state was empty.");
                return;
            }
            commitOffsets(checkpointOffsets);

可以看到核心代码都进行了同步保护,因为pendingCheckpoints很可能会被异步更新。它先根据完成了的检查点,获得其在pendingCheckpoints中的索引。如果判断索引不存在,则直接退出。否则,移除该索引对应的快照信息,然后将小于当前索引(较旧的)的快照信息也一并移除(这一点我之前解释过,因为所有的检查点都是按时间递增有序的)。最后将当前完成的检查点对应的消息的偏移量进行commit,也即commitOffsets。只不过这里该方法被定义为抽象方法,因为Kafka不同版本的API差别的原因,由适配不同版本的consumer各自实现。

Flink以Kafka作为Source的具体实现机制,不是本文的重点,后续可以另开文章进行讲解

StatefulSequenceSource

org.apache.flink.streaming.api.functions.source

这是一个有状态的、给定起始和截止元素的并行序列发射器,由于它需要提供exactly once保证,所以它实现了Checkpointed接口。
它主要是用来维护一个称之为collected的发射进度状态,对其进行快照以便于实现fault tolerance

StreamOperator

org.apache.flink.streaming.api.operators. StreamOperator

StreamOperator内置了我们上面谈到的几个跟检查点相关的接口方法:

  • snapshotOperatorState
  • restoreState
  • notifyOfCompletedCheckpoint

这三个方法来自于我们之前谈到的Checkpointed以及CheckpointListener。这也由此可见,在operator中快照机制由可选项变成了必选项。

这是不难理解的,因为operator处于运行时,诸如分区信息都是必须要做快照的。

这里需要注意的是snapshotOperatorState方法,它返回值为StreamTaskState。它是表示task所有状态的一个容器对象,它包含了三类状态:

  • operatorState
  • functionState
  • kvStates

这不是本文的重点,后面的文章再谈

AbstractStreamOperator

org.apache.flink.streaming.api.operators.AbstractStreamOperator

AbstractStreamOperatorStreamOperator的抽闲类,为operator的实现提供模板,当然也为以上的三个跟快照相关的接口方法的实现提供了模板。

来看snapshotOperatorState方法:

    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        // here, we deal with key/value state snapshots

        StreamTaskState state = new StreamTaskState();

        if (stateBackend != null) {
            HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =
                stateBackend.snapshotPartitionedState(checkpointId, timestamp);
            if (partitionedSnapshots != null) {
                state.setKvStates(partitionedSnapshots);
            }
        }


        return state;
    }

可以看到它依赖于一个叫stateBackend的东西,在之前一篇文章中我们有谈及过,它是state最终的持久化机制的实现。并且从注释可以看到这里只提供了针对key/value状态的快照模板。

AbstractUdfStreamOperator

org.apache.flink.streaming.api.operators

该抽象类继承自AbstractStreamOperator,用于进一步为operator的实现提供模板,不过从类名可以看出来,它主要是为用户定义函数(udf)的operator提供模板。

snapshotOperatorState方法:

    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp);

        if (userFunction instanceof Checkpointed) {
            @SuppressWarnings("unchecked")
            Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;

            Serializable udfState;
            try {
                udfState = chkFunction.snapshotState(checkpointId, timestamp);
            } 
            catch (Exception e) {
                throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
            }

            if (udfState != null) {
                try {
                    AbstractStateBackend stateBackend = getStateBackend();
                    StateHandle<Serializable> handle = 
                            stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
                    state.setFunctionState(handle);
                }
                catch (Exception e) {
                    throw new Exception("Failed to add the state snapshot of the function to the checkpoint: "
                            + e.getMessage(), e);
                }
            }
        }

        return state;
    }

这里我们终于再次看到了Checkpointed接口。这是因为function只是静态的函数,它的运行还必须借助于operator,因此其状态也必须借助于operator来帮助其与Flink的运行时交互以达到最终的持久化的目的。

函数状态的持久化代码:

AbstractStateBackend stateBackend = getStateBackend();
StateHandle<Serializable> handle = 
    stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
state.setFunctionState(handle);

小结

本篇剖析了Flink针对Function以及Operator如何做快照以及如何恢复的实现。虽然,还没有涉及到fault tolerance的最终实现机制,但是这是我们的入口。




原文发布时间为:2016-05-26


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
413 33
The Past, Present and Future of Apache Flink
|
4月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
1054 13
Apache Flink 2.0-preview released
|
4月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
182 3
|
4月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
414 0
|
6月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
64 1
|
5月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
6月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
409 2
|
6月前
|
消息中间件 分布式计算 Hadoop
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
81 3
|
6月前
|
消息中间件 运维 Kafka
Apache Flink 实践问题之达到网卡的最大速度如何解决
Apache Flink 实践问题之达到网卡的最大速度如何解决
74 2
|
6月前
|
消息中间件 前端开发 Kafka
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
105 2

热门文章

最新文章

推荐镜像

更多