美团二面:详细说说Kafka拉消息的过程?

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: AbstractFetcherThread:拉取消息的步骤副本机制是Kafka实现数据高可靠性的基础:同一个分区下的多个副本分散在不同的Broker机器上,它们保存相同的消息数据以实现高可靠性。那如何确保所有副本上的数据一致性呢?最常见方案当属Leader/Follower备份机制(Leader/Follower Replication)。

AbstractFetcherThread:拉取消息的步骤


副本机制是Kafka实现数据高可靠性的基础:同一个分区下的多个副本分散在不同的Broker机器上,它们保存相同的消息数据以实现高可靠性。那如何确保所有副本上的数据一致性呢?最常见方案当属Leader/Follower备份机制(Leader/Follower Replication)。Kafka分区的:


某个副本会被指定为Leader,负责响应客户端的读、写请求


其他副本自动成为Follower,被动同步Leader副本中的数据


被动同步:Follower副本不断向Leader副本发送读取请求,以获取Leader处写入的最新消息数据


本文就研究Follower副本如何通过拉取线程实现这一目标。Follower副本在副本同步过程中,还可能发生截断(Truncation),其原理又是为何?


案例


这部分源码贴近底层设计架构原理。阅读它对我实际有啥用?


生产环境曾发现,一旦Broker上副本数过多,Broker内存占用就会很高。HeapDump后,发现在于ReplicaFetcherThread#buildFetch有这么一行代码:


val builder = fetchSessionHandler.newBuilder()


内部会实例化一个LinkedHashMap。若分区数很多,该Map会被扩容数次,带来大量不必要的数据拷贝,既增加内存Footprint,又浪费CPU。后续通过将负载转移到其他Broker解决该问题。


Kafka社区也发现了这个Bug,所以现在变成:


修改后语句直接传入FETCH请求中总的分区数,并直接将其传给LinkedHashMap,避免再执行扩容。


说回Follower副本从Leader副本拉取数据。Kafka就是通过ReplicaFetcherThread,副本获取线程实现的消息拉取及处理。


本文先从抽象基类AbstractFetcherThread研究,最终彻底搞明白Follower端同步Leader端消息的原理。


AbstractFetcherThread


抽象类,从Broker获取多个分区的消息数据,至于获取之后如何对这些数据进行处理,则交由子类来实现。


类定义及字段

1.png


除了构造器的这几个字段,AbstractFetcherThread还定义了两个type类型。关键字type定义一个类型,可当做一个快捷方式,如FetchData:


type FetchData = FetchResponse.PartitionData[Records]


类似快捷方式:凡源码用到FetchResponse.PartitionData[Records],都可使用FetchData替换,EpochData同理。


FetchData定义里的PartitionData类型,是客户端clients工程中FetchResponse类的嵌套类。FetchResponse类封装的是FETCH请求的Response对象,其内PartitionData是个POJO,保存Response中单个分区数据拉取的各项数据:


从该分区的Leader副本拉取回来的消息


该分区的高水位值


日志起始位移值


2.png3.png





在PartitionData中,最需关注的是recordSet,保存了实际的消息集合。


注意到EpochData定义位置,它也是PartitionData类型,但EpochData的PartitionData是OffsetsForLeaderEpochRequest的PartitionData类型


Kafka源码有很多名为PartitionData的嵌套类。很多请求类型中的数据都是按分区层级分组,因此源码很自然地在这些请求类中创建同名嵌套类。所以,注意区分PartitionData嵌套类是定义在哪类请求中的!


分区读取状态类


AbstractFetcherThread构造器中,还有个**PartitionStates[PartitionFetchState]**类型的字段:


泛型参数类型PartitionFetchState类,表征分区读取状态,保存分区的已读取位移值和对应副本状态。

这里的状态有二:


副本读取状态

副本读取状态由ReplicaState接口表示:

4.png

分区读取状态:

可获取,表明副本获取线程当前能够读取数据。

截断中,表明分区副本正在执行截断操作(比如该副本刚刚成为Follower副本)。

被推迟,表明副本获取线程获取数据时出现错误,需要等待一段时间后重试。

分区读取状态中的【可获取、截断中】与副本读取状态的【获取中、截断中】并非严格对应。副本读取状态处获取中,并不一定表示分区读取状态就是可获取状态。对于分区,它是否能被获取的条件要比副本严格。

5.png



副本获取线程做的事情,日志截断和消息获取:


isReplicaInSync,副本限流,出镜率不高


isDelayed,判断是否需要推迟获取对应分区的消息


源码会不断调整那些不需要推迟的分区的读取顺序,以保证读取公平性。公平性实现在partitionStates字段的PartitionStates类,定义在clients工程。会接收一组要读取的主题分区,然后轮询读取这些分区以确保公平性。


clients端源码自行查阅。


public class PartitionStates<S> {

   private final LinkedHashMap<TopicPartition, S> map = new LinkedHashMap<>();

   ......

   public void updateAndMoveToEnd(TopicPartition topicPartition, S state) {

     map.remove(topicPartition);

     map.put(topicPartition, state);

     updateSize();

   }

   ......

}


PartitionStates轮询处理要读取的多个分区,依靠LinkedHashMap保存所有主题分区,其元素有明确迭代顺序,默认为元素插入的顺序。


假设Kafka要读5个分区的消息:A、B、C、D和E。若插入顺序:ABCDE,则首先读分区A。一旦A被读取后,为确保各分区都有同等机会被读取,代码需将A插入到分区列表的最后一位,这就是updateAndMoveToEnd:把A从map中移除,再插回去,这样A自然就处于列表的最后一位了。这便是PartitionStates的作用。


core API


processPartitionData、truncate、buildFetch和doWork,涵盖拉取线程所做的最重要的3件事:


构建FETCH请求


执行截断操作


处理拉取后的结果


doWork串联起前面的这3方法。


最重要的processPartitionData,用于处理读取回来的消息集合。它是个抽象方法,因此需子类实现它的逻辑。具体到Follower副本而言, 由ReplicaFetcherThread类实现:


protected def processPartitionData(

 topicPartition: TopicPartition,  // 读取哪个分区的数据

 fetchOffset: Long,               // 读取到的最新位移值

 partitionData: FetchData         // 读取到的分区消息数据

): Option[LogAppendInfo]           // 写入已读取消息数据前的元数据


返回值Option[LogAppendInfo]:


对Follower副本读消息写入日志,可忽略Option,因为肯定会返回具体LogAppendInfo实例,而不是None

LogAppendInfo类封装了很多消息数据被写入到日志前的重要元数据信息,如首条消息的位移值、最后一条消息位移值、最大时间戳等


truncate


protected def truncate(

 topicPartition: TopicPartition, // 要对哪个分区下副本执行截断操作

 truncationState: OffsetTruncationState  // Offset + 截断状态

): Unit


OffsetTruncationState类告诉Kafka要把指定分区下副本截断到哪个位移值,封装了:


一个位移值


一个截断完成与否的布尔值状态


buildFetch


protected def buildFetch(

 // 一组要读取的分区列表

 // 分区是否可读取取决于PartitionFetchState中的状态

 partitionMap: Map[TopicPartition, PartitionFetchState]):

// 封装FetchRequest.Builder对象

ResultWithPartitions[Option[ReplicaFetch]]


本质为指定分区构建对应FetchRequest.Builder对象,而该对象是构建FetchRequest的核心组件。Kafka中任何类型的消息读取,都是通过给指定Broker发送FetchRequest请求来完成的。


doWork


串联前面3个方法的主要入口方法。


总结


本文研究Kafka的副本同步机制和副本管理器组件。Kafka副本间的消息同步依赖ReplicaFetcherThread线程。AbstractFetcherThread作为拉取线程的公共基类,AbstractFetcherThread类定义了很多重要方法。


AbstractFetcherThread类:拉取线程的抽象基类。它定义了公共方法处理所有拉取线程的共同逻辑,如执行截断操作,获取消息。


拉取线程逻辑:循环执行截断操作和获取数据操作。


分区读取状态:当前,源码定义了3类分区读取状态。拉取线程只能拉取处于可读取状态的分区的数据

6.png

目录
相关文章
|
1月前
|
消息中间件 存储 缓存
美团面试: Kafka为啥能实现 10Wtps 到100Wtps ?kafka 如何实现零复制 Zero-copy?
40岁老架构师尼恩分享了Kafka如何实现高性能的秘诀,包括零拷贝技术和顺序写。Kafka采用mmap和sendfile两种零拷贝技术,前者用于读写索引文件,后者用于向消费者发送消息,减少数据在用户空间和内核空间间的拷贝次数,提高数据传输效率。此外,Kafka通过顺序写日志文件,避免了磁盘寻道和旋转延迟,进一步提升了写入性能。尼恩还提供了系列技术文章和PDF资料,帮助读者深入理解这些技术,提升面试竞争力。
美团面试: Kafka为啥能实现 10Wtps 到100Wtps ?kafka 如何实现零复制 Zero-copy?
|
6月前
|
消息中间件 存储 监控
美团面试:Kafka如何处理百万级消息队列?
美团面试:Kafka如何处理百万级消息队列?
181 1
|
6月前
|
消息中间件 缓存 算法
美团面试官让我聊聊kafka的副本同步机制,我忍不住哭了
美团面试官让我聊聊kafka的副本同步机制,我忍不住哭了
|
消息中间件 Kafka 程序员
美团面试真题,如何保证Kafka消息不丢失?
一位工作了5年的小伙伴去美团面试以后,跟我反馈说,被问到一个“如何保证Kafka消息不丢失?”的问题,不知道如何回答。其实,这道题真的很基础。 很多小伙伴可能只会回答说,消息要持久化,添加消息确认机制。如果,你只是这样回答,那就和普通的程序员没什么区别。要想让面试官感觉你确实有不一样的理解,就应该从多个方面更全面地来分析和回答这个问题。今天,我来给大家讲明白。
132 0
|
消息中间件 缓存 算法
美团面试官让我聊聊kafka的副本同步机制,我忍不住哭了
美团面试官让我聊聊kafka的副本同步机制,我忍不住哭了
美团面试官让我聊聊kafka的副本同步机制,我忍不住哭了
|
SQL 存储 NoSQL
最新鲜的美团现场面试41题(三面技术+HR面):Redis+Kafka+分布式
  一面 hashmap与concurrenthashmap的区别 垃圾回收算法以及垃圾回收器 CMS的回收步骤 G1和CMS的区别 CMS哪个阶段是并发的哪个阶段是串行的? G1内部是如何分区的(re...
1498 0
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
46 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
268 9
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
67 3