Kafka消费过程关键源码解析

简介: Kafka消费过程关键源码解析

1 案例引入

  • 官方Consumer最简代码用例:
  • image.png
  • 简短的代码,背后牵涉很多问题,Consumer如何绑定特定分区?如何实现订阅 topic 的?又如何实现拉消息?

2 订阅流程

image.png

订阅主流程主要更新如下关键属性:


订阅状态(SubscriptionState) - subscriptions

主要维护所订阅的topic和patition的消费位置等状态信息

元数据中的topic信息

metadata中维护了Kafka集群元数据的一个子集,包括集群的Broker节点、Topic和Partition在节点上分布,以及我们聚焦的第二个问题:Coordinator给Consumer分配的Partition信息。


注意acquireAndEnsureOpen()和try-finally release()保证该方法的线程安全。


跟进到更新元数据的方法metadata.requestUpdateForNewTopics()

Metadata.requestUpdateForNewTopics()

  • 请求更新元数据。
  • image.png
  • 这里,并未真正发送更新元数据的请求,只是将需要更新元数据的标志位needUpdate置true。Kafka必须确保在第一次拉消息前元数据可用,即必须更新一次元数据,否则Consumer不知道应该去哪个Broker拉哪个Partition的消息。

3 拉消息流程

那元数据何时才真正更新呢?

  • 拉消息时序图
  • image.png
  • KafkaConsumer#poll()方法中主要调用如下方法:

updateAssignmentMetadataIfNeeded()

  • 更新元数据
  • image.png
  • 其内部调用coordinator.poll()poll()里又调用

ConsumerNetworkClient#ensureFreshMetadata()

image.png

ConsumerNetworkClient#awaitMetadataUpdate

image.png

内部调用了client.poll()方法,实现与Cluster通信,在Coordinator注册Consumer并拉取和更新元数据。


这些都是 client 类中方法,ConsumerNetworkClient封装了Consumer和Cluster之间所有网络通信的实现,是个完全的异步实现类。没有维护任何线程


待发送Request都存在unsent域

image.png

Response存放在pendingCompletion域

image.png

每次调用poll()时,在当前线程中发送所有待发送Request,处理所有收到Response。

异步设计

  • 优势:
  • 无需维护用于异步发送的和处理响应的线程,并且能充分发挥批量处理的优势,这也是Kafka的性能非常好的原因之一。

很少的线程实现高吞吐量。劣势:极大增加了代码的复杂度。

好了,下面再看另一关键方法:

image.png

pollForFetches()

image.png

主要由fetcher.sendFetches()实现,由于代码过长,简述其流程如下:


根据元数据的信息,构造所需Broker的拉消息的Request对象

然后调用ConsumerNetworkClient#send异步发送Request

并且注册一个回调类处理返回的Response

所有返回的Response被暂时存放在Fetcher#completedFetches。注意,此时的Request并未被真正发给各Broker,而被暂存在client.unsend等待发送。

然后,在调用ConsumerNetworkClient#poll时,会真正将之前构造的所有Request发送出去,并处理收到的Response

最后,fetcher.fetchedRecords()方法中,将返回的Response反序列化后转换为消息列表,返回给调用者

总结

综上过程讲解,给出整个拉消息流程涉及关键类的类图

image.png

参考

目录
相关文章
|
5天前
|
消息中间件 存储 算法
深度解析Kafka中的消息奥秘
深度解析Kafka中的消息奥秘
28 0
|
14天前
|
XML Java Android开发
Android实现自定义进度条(源码+解析)
Android实现自定义进度条(源码+解析)
47 1
|
28天前
|
Python
区域代理分红商城系统开发源码片段示例规则解析
level = Column(Integer, default=1) # 代理等级,例如:1代表普通用户,2代表初级代理,3代表高级代理等 parent_id = Column(Integer, ForeignKey('user.id')) # 上级代理ID 【更全面的开发源码搭建可V or TG我昵称】 parent = relationship("User", remote_side=[id]) # 上级代理对象
|
17天前
|
存储 NoSQL 算法
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)(二)
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)
33 0
|
18天前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
42 1
|
26天前
|
Linux C++ iOS开发
VLC源码解析:视频播放速度控制背后的技术
VLC源码解析:视频播放速度控制背后的技术
68 0
|
27天前
|
存储 编解码 缓存
FFmpeg之旅:深入解析FFplay源码
FFmpeg之旅:深入解析FFplay源码
96 0
|
1月前
|
消息中间件 网络协议 Kafka
Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
【2月更文挑战第21天】Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
55 3
|
1月前
|
存储 安全 Java
ArrayList源码全面解析
ArrayList源码全面解析
|
2月前
|
C语言
内核源码中遇到不会解析的宏怎么办?
内核源码中遇到不会解析的宏怎么办?
202 1

热门文章

最新文章

推荐镜像

更多