Flink1.4 Fault Tolerance源码解析-1

简介: 前言:本篇关注Flink,对Fault Tolerance的源码实现进行阐述,主要介绍Api层及Flink现有实现。 本篇文章重点关注以下问题: 1. 具备Fault Tolerance能力的两种对象:Function和Operator 2.

前言:本篇关注Flink,对Fault Tolerance的源码实现进行阐述,主要介绍Api层及Flink现有实现


本篇文章重点关注以下问题:

1. 具备Fault Tolerance能力的两种对象:Function和Operator

2. 分析两个接口,列举典型实现,并做简要分析


1. 具备Fault Tolerance能力的两种对象

1.1 Function对象

org.apache.flink.api.common.functions.Function

所有用户自定义函数的基本接口,如已经预定义的FlatMapFunction就是基础自Function,Function并未定义任何方法,只是作为标识接口。

所有Function对象的Fault Tolerance都是通过继承CheckpointedFunction接口实现的,换话说,容错能力是Function的可选项,这点与Operator不同

1.2 Operator对象

org.apache.flink.streaming.api.operators.StreamOperator

        所有Operator的基本接口,如已经预定义的StreamFilter、StreamFlatMap就是StreamOperator的实现。

        与Function是标识接口不同,StreamOperator内置了几个和检查点相关的接口方法,因此,在Operator中,容错能力是实现Operator的必选项,这点不难理解,因为Operator处于运行时时,诸如分区信息都是必要要做快照的。

2. CheckpointedFunction

org.apache.flink.streaming.api.checkpoint.CheckpointedFunction


CheckpointedFunction接口是有状态转换函数的核心接口,两个接口方法:

> initializeState:Function初始化的时候调用,一般用作初始化state数据结构。

> snapshotState:请求state快照时被调用,方法签名中的参数FunctionSnapshotContext可以获取此Function中的所有State信息(快照),通过该上下文,可以获取该Function之前变更所产生的最终结果。

2.1 FlinkKafkaProducerBase

org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase


方法签名:

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {

}
FlinkKafkaConsumerBase是Flink实现基于Kafka的Source的基类,Kafka提供基于offset并且可重复消费的机制,使其非常容易实现Fault Tolerance机制。

        关键代码:        

/** Consumer从各topic partitions读取的初始offsets. */
private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;

/** 保存已消费的、但是Offset未提交至Broken或Zk的数据. */
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();

/**
 * 如果程序从Checkpoint启动,此变量保存此Consumer上次消费的offset</br>
 * 
 * <p>此变量主要由 {@link #initializeState(FunctionInitializationContext)} 进行赋值.
 *
 */
private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;

/** 在state backend上保存的State信息(Offset信息) . */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;

@Override
public final void initializeState(FunctionInitializationContext context) throws Exception {

	OperatorStateStore stateStore = context.getOperatorStateStore();
	
	// 兼容1.2.0版本的State,可无视
	ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
		stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

	// 各Partition的offset信息
	this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
			OFFSETS_STATE_NAME,
			TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));

	if (context.isRestored() && !restoredFromOldState) {
		restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());

		// 兼容1.2.0版本的State,可无视
		for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
			restoredFromOldState = true;
			unionOffsetStates.add(kafkaOffset);
		}
		oldRoundRobinListState.clear();

		if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
			throw new IllegalArgumentException(
				"Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
		}

		// 将待恢复的State信息保存进‘restoredState’变量中,以便程序异常时用于恢复
		for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
			restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
		}

		LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState);
	} else {
		LOG.info("No restore state for FlinkKafkaConsumer.");
	}
}

@Override
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
	if (!running) {
		LOG.debug("snapshotState() called on closed source");
	} else {
		// 首先清空state backend对应offset的全局存储(State信息)
		unionOffsetStates.clear();

		// KafkaServer的连接器,根据Kafka版本由子类实现
		final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
		if (fetcher == null) {
			// 连接器还未初始化,unionOffsetStates的值从 restored offsets 或是 subscribedPartition上读取
			for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
				unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
			}

			if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
				// 如果启用快照时同步提交Offset,则在初始化时,用restoredState给pendingOffsetsToCommit赋值
				pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
			}
		} else {
			// 通过连接器获取当前消费的Offsets
			HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

			if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
				// 保存当前消费的Offset
				pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
			}

			// 给state backend对应offset的全局存储(State信息)赋值
			for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
				unionOffsetStates.add(
						Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
			}
		}

		if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
			// pendingOffsetsToCommit的保护机制,最多存储100个元素,正也是此Map需要有序的原因
			while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
				pendingOffsetsToCommit.remove(0);
			}
		}
	}
}

快照总结:
  • initializeState方法从state backend中恢复State,并将相关信息保存入restoredState;
  • snapshotState方法将当前准备放入state backend的state信息保存至unionOffsetStates,如果应用需要在快照的同时提交Offset,则将消费的Offset信息保存至pendingOffsetsToCommit。
FlinkKafkaConsumerBase继承了CheckpointListener接口,此接口是一个监听接口,以便当快照完成时通知Function进行一些必要处理;FlinkKafkaConsumerBase借用此接口来提交Offset,代码如下:

Java代码

收藏代码
  1. @Override
  2. public final void notifyCheckpointComplete(long checkpointId) throws Exception {
  3. if (!running) {
  4. LOG.debug("notifyCheckpointComplete() called on closed source");
  5. return;
  6. }
  7. final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
  8. if (fetcher == null) {
  9. LOG.debug("notifyCheckpointComplete() called on uninitialized source");
  10. return;
  11. }
  12. if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
  13. try {
  14. // 在pendingOffsetsToCommit中找出checkpointId对应的offset信息
  15. final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
  16. if (posInMap == -1) {
  17. LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
  18. return;
  19. }
  20. @SuppressWarnings("unchecked")
  21. // 取出checkpointId对应的Offset信息
  22. Map<KafkaTopicPartition, Long> offsets =
  23. (Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
  24. // 将该checkpointId之前的Offset信息移除(pendingOffsetsToCommit有序的原因)
  25. for (int i = 0; i < posInMap; i++) {
  26. pendingOffsetsToCommit.remove(0);
  27. }
  28. if (offsets == null || offsets.size() == 0) {
  29. LOG.debug("Checkpoint state was empty.");
  30. return;
  31. }
  32. // 通过连接器向Broken或Zk提交Offset信息
  33. fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
  34. } catch (Exception e) {
  35. if (running) {
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
12月前
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
1124 29
|
12月前
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
470 4
|
12月前
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
12月前
|
移动开发 前端开发 JavaScript
从入门到精通:H5游戏源码开发技术全解析与未来趋势洞察
H5游戏凭借其跨平台、易传播和开发成本低的优势,近年来发展迅猛。接下来,让我们深入了解 H5 游戏源码开发的技术教程以及未来的发展趋势。
|
12月前
|
存储 前端开发 JavaScript
在线教育网课系统源码开发指南:功能设计与技术实现深度解析
在线教育网课系统是近年来发展迅猛的教育形式的核心载体,具备用户管理、课程管理、教学互动、学习评估等功能。本文从功能和技术两方面解析其源码开发,涵盖前端(HTML5、CSS3、JavaScript等)、后端(Java、Python等)、流媒体及云计算技术,并强调安全性、稳定性和用户体验的重要性。
|
机器学习/深度学习 自然语言处理 算法
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
3355 1
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
1271 1
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
12月前
|
负载均衡 JavaScript 前端开发
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
533 16
|
自然语言处理 数据处理 索引
mindspeed-llm源码解析(一)preprocess_data
mindspeed-llm是昇腾模型套件代码仓,原来叫"modelLink"。这篇文章带大家阅读一下数据处理脚本preprocess_data.py(基于1.0.0分支),数据处理是模型训练的第一步,经常会用到。
426 0

热门文章

最新文章

推荐镜像

更多
  • DNS