一文读懂kafka消息拉取机制|线程拉取模型

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 一文读懂kafka消息拉取机制|线程拉取模型

在详细介绍Kafka拉取之前,我们再来回顾一下消息拉取的整体流程:

e021d262e9318b4f4b24c4e11dddfbac.png

在消费者加入到消费组后,消费者Leader会根据当前在线消费者个数与分区的数量进行队列负载,每一个消费者获得一部分分区,接下来就是要从Broker服务端将数据拉取下来,提交给消费端进行消费,对应流程中的pollForFetches方法。


要正确写出优秀的Kafka端消费代码,详细了解其拉取模型是非常重要的一步。


1、消息拉取详解


1.1 消费端拉取流程详解


消息拉取的实现入口为:KafkaConsumer的pollForFetches,接下来我们将详细剖析其流程,探讨kafka消息拉取模型,其实现如下所示:


3f80d284071f9290a7e266ff5331884d.png

整个消息拉取的核心步骤如下:


  • 获取本次拉取的超时时间,会取自用户设置的超时时间与一个心跳包的间隔之中的最小值。
  • 拉取缓存区中解析已异步拉取的消息。
  • 向Broker发送拉取请求,该请求是一个异步请求
  • 通过ConsumerNetworkClient触发底层NIO通信。
  • 再次尝试从缓存区中解析已拉起的消息。


1.1 Fetch的sendFetches详解


经过队列负载算法分配到部分分区后,消费者接下来需要向Broker发送消息拉起请求,具体由sendFetches方法实现。

2b93396af292bdac6b50cc9d0ff49de4.jpg

Step1:通过调用preparefetchRequest,构建请求对象,其实现的核心要点如下:


  • 构建一个请求列表,这里采用了Build设计模式,最终生成的请求对象:Node为Key,FetchSessionHandler.FetchRequestData为Value的请求,我觉得这里有必要看一下FetchRequestData的数据结构:

e275c54be223d3d9fd47c1e002c0ea19.png

  • 其中ParitionData汇总包含了本次消息拉取的开始位点。
  • 通过fetchablePartitions方法获取本次可拉取的队列,其核心实现要点如下:
  • 从队列负载结果中获取可拉取的分区信息,主要的判断标准:未被暂停与有正确位点信息
  • nextInLineRecords?
  • 去除掉拉取缓存区中的存在队列信息(completedFetches),即如果缓存区中的数据未被消费端消费则不会继续拉取新的内容
  • 获取待拉取分区所在的leader信息,如果未找到,本次拉取将忽略该分区,但是会设置需要更新topic路由信息,在下次拉取之前会从Broker拉取最新的路由信息。
  • 如果客户端与待拉取消息的broker节点有待发送的网络请求(见代码@4),则本次拉取任务将不会再发起新的拉取请求,待已有的请求处理完毕后才会拉取新的消息。
  • 拉取消息时需要指定拉取消息偏移量,来自队列负载算法时指定,主要消费组的最新消费位点。


c91cf9d6f24b5c164b5a72010b496a66.png

Step2:按Node依次构建请求节点,并通过client的send方法将请求异步发送,当收到请求结果后会调用对应的事件监听器,这里主要的是一次拉取最大的字节数50M。


值得注意的是在Kafka中调用client的send方法并不会真正触发网络请求,而是将请求放到发送缓冲区中,Client的poll方法才会真正触发底层网络请求。


Step3:当客户端收到服务端请求后会将原始结果放入到completedFetches中,等待客户端从中解析。


本篇文章暂时不关注服务端对fetch请求的处理,等到详细剖析了Kafka的存储相关细节后再回过来看Fetch请求的响应。


1.2 Fetcher的fetchedRecords方法详解


向服务端发送拉取请求异步返回后会将结果返回到一个completedFetches中,也可以理解为接收缓存区,接下来将从缓存区中将结果解析并返回给消费者消费。从接收缓存区中解析数据的具体实现见Fetcher的fetchedRecords方法。

a108d58f5d3703ce7986016b67b574e1.png

核心实现要点如下:


  • 首先说明一下nextInLineRecords的含义,接下来的fetchedRecords方法将从这里获取值,该参数主要是因为引入了maxPollRecords(默认为500),一次拉取的消息条数,一次Fetch操作一次每一个分区最多返回50M数据,可能包含的消息条数大于maxPollRecords。
    如果nextInLineRecords为空或者所有内容已被拉取,则从completedFetch中解析。
  • 从completedFetch中解析解析成nextInlineRecords。
  • 从nextInlineRecords中继续解析数据。


关于将CompletedFetch中解析成PartitionRecords以及从PartitionRecords提取数据成Map< TopicPartition, List< ConsumerRecord< K, V>>>的最终供应用程序消费的数据结构,代码实现非常简单,这里就不再介绍。


有关服务端响应SEND_FETCH的相关分析,将在详细分析Kafka存储相关机制时再介绍。在深入存储细节时,从消息拉取,消息写入为切入点是一个非常不错的选择。


2、消息消费端模型


阅读源码是手段而不是目的,通过阅读源码,我们应该总结提炼一下Kafka消息拉取模型(特点),以便更好的指导实践。


首先再强调一下消费端的三个重要参数:


  • fetch.max.bytes
    客户端单个Fetch请求一次拉取的最大字节数,默认为50M,根据上面的源码分析得知,Kafka会按Broker节点为维度进行拉取, 即按照队列负载算法分配在同一个Broker上的多个队列进行聚合,同时尽量保证各个分区的拉取平衡,通过max.partition.fetch.bytes参数设置。
  • max.partition.fetch.bytes
    一次fetch拉取单个队列最大拉取字节数量,默认为1M。
  • max.poll.records
    调用一次KafkaConsumer的poll方法,返回的消息条数,默认为500条。

实践思考:fetch.max.bytes默认是max.partition.fetch.bytes的50倍,也就是默认情况一下,一个消费者一个Node节点上至少需要分配到50个队列,才能尽量满额拉取。但50个分区(队列)可以来源于这个消费组订阅的所有的topic


2.1Kafka消费线程拉取线程模型


KafkaConsumer并不是线程安全的,即KafkaConsumer的所有方法调用必须在同一个线程中,但消息拉取却是是并发的,线程模型说明如下图所示:

62f42a13c2cbe08562302019f4489c5b.png

其核心设计理念是KafkaConsumer在调用poll方法时,如果本地缓存区中(completedFeches)存在未拉取的消息,则直接从本地缓存区中拉取消息,否则会调用client#send方法进行异步多线程并行发送拉取请求,发往不同的broker节点的请求是并发执行,执行完毕后,再将结果放入到poll方法所在线程中的缓存区,实现多个线程的协同


2.2 poll方法返回给消费端线程特点


pol l方法会从缓存区中依次获取一个CompletedFetch对象,一次只从CompletedFetch对象中获取500条消息,一个CompletedFetch对象包含一个分区的数据,默认最大的消息体大小为1M,可通过max.partition.fetch.bytes改变默认值。


如果一个分区中消息超过500条,则KafkaConsumer的poll方法将只会返回1个分区的数据,这样在顺序消费时基于单分区的顺序性保证时如果采取与RocketMQl类似的机制,对分区加锁,则其并发度非常低,因为此时顺序消费的并发度取决于这500条消息包含的分区个数


Kafka顺序消费最佳实践:单分区中消息可以并发执行,但要保证同一个key的消息必须串行执行。因为在实践应用场景中,通常只需要同一个业务实体的不同消息顺序执行。

好了,本文就介绍到这里了,一键三连(关注、点赞、留言)是对我最大的鼓励


掌握一到两门java主流中间件,是敲开BAT等大厂必备的技能,送给大家一个Java中间件学习路线,助力大家实现职场的蜕变。


相关文章
|
2月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
25天前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
68 20
剖析 Redis List 消息队列的三种消费线程模型
|
1月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
97 4
|
2月前
|
消息中间件 负载均衡 Java
揭秘Kafka背后的秘密!Kafka 架构设计大曝光:深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理及流传输设计的高效率消息系统。其核心特性包括高吞吐量、低延迟及出色的可扩展性。Kafka采用分布式日志模型,支持数据分区与副本,确保数据可靠性和持久性。系统由Producer(消息生产者)、Consumer(消息消费者)及Broker(消息服务器)组成。Kafka支持消费者组,实现数据并行处理,提升整体性能。通过内置的故障恢复机制,即使部分节点失效,系统仍能保持稳定运行。提供的Java示例代码展示了如何使用Kafka进行消息的生产和消费,并演示了故障转移处理过程。
40 3
|
2月前
|
消息中间件 Java Kafka
如何在Kafka分布式环境中保证消息的顺序消费?深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据管道和流处理设计的分布式平台,以其高效的消息发布与订阅功能著称。在分布式环境中确保消息按序消费颇具挑战。本文首先介绍了Kafka通过Topic分区实现消息排序的基本机制,随后详细阐述了几种保证消息顺序性的策略,包括使用单分区Topic、消费者组搭配单分区消费、幂等性生产者以及事务支持等技术手段。最后,通过一个Java示例演示了如何利用Kafka消费者确保消息按序消费的具体实现过程。
64 3
|
2月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
70 7
|
2月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
87 4
|
2月前
|
消息中间件 负载均衡 Java
"深入Kafka核心:探索高效灵活的Consumer机制,以Java示例展示数据流的优雅消费之道"
【8月更文挑战第10天】在大数据领域,Apache Kafka凭借其出色的性能成为消息传递与流处理的首选工具。Kafka Consumer作为关键组件,负责优雅地从集群中提取并处理数据。它支持消息的负载均衡与容错,通过Consumer Group实现消息的水平扩展。下面通过一个Java示例展示如何启动Consumer并消费数据,同时体现了Kafka Consumer设计的灵活性与高效性,使其成为复杂消费场景的理想选择。
97 4
|
2月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
59 3
|
1月前
|
安全 Java API
Java线程池原理与锁机制分析
综上所述,Java线程池和锁机制是并发编程中极其重要的两个部分。线程池主要用于管理线程的生命周期和执行并发任务,而锁机制则用于保障线程安全和防止数据的并发错误。它们深入地结合在一起,成为Java高效并发编程实践中的关键要素。
18 0
下一篇
无影云桌面