十分钟,了解Kafka的Sender线程

简介: 十分钟,了解Kafka的Sender线程

〇、前言

在上两篇文章《连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka》和《一文了解Kafka的消息收集器RecordAccumulate》中,我们介绍了Main ThreadRecordAccumulate的工作原理,那么在本篇文章中,我们继续介绍第三部分内容:Sender线程

在介绍原理之前,大家再重温一下Producer端的整体架构,图示如下所示:

这个图看不懂没有关系,我们会在介绍Producer端原理时一一介绍每个部分的含义及其所复杂的功能。

一、Sender线程

除了我们前面曾经介绍过的Main Thread主线程之外,在KafkaProducer中还启动了一个Sender线程,那么,本节我们就来针对Sender线程进行解析,Send线程启动代码如下所示:

Sender线程负责从RecordAccumulate中获取缓存消息,在获取了以Map<TopicPartition,Deque<ProducerBatch>> 的对应关系存储的消息缓存之后,会通过主题信息分区信息创建TopicPartition实例对象tp,然后再以此为key,获取ProducerBatch的双向队列,如下所示:

然后,会进一步将映射中key的类型从TopicPartition转换为NodeId,即:Map<NodeId,Deque<ProducerBatch>> 的对应关系。这是由于当Producer端最终发送消息的时候,关注的是向哪个Broker节点发送消息,而并不是关心哪个主题分区,所以此处需要做一个从应用逻辑层面向网络I/O层面的转换。如下所示:

当最后要进行消息发送的时候,还要再次进行封装,封装出用于消息发送的ProduceRequest,此时的对应关系就变成了NodeId和ProduceRequest了,代码如下所示:

请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求。代码如下所示:

如果我们开启幂等配置,则会创建TransactionManager实例对象,如下所示:

当TransactionManager实例对象不为null的时候,Sender现成则会执行下图红框内的代码逻辑,那么在黄框中,我们会看到调用了maybeSendAndPollTransactionalRequest()方法,代码如下所示:

在选择目标节点的时候,如果coordinatorType为空,则会调用client的leastLoadeNode(...)方法,通过该方法可以获得所有Node中负载最小的那一个。那怎么来判断Node节点的负载呢? 在上面的内容中,我们其实提到了,发出去的消息也会保存到InFlightRequests中,它其实是一个缓存的作用,主要用来缓存已经发出去但是还没有接收到响应确认的消息请求。因此,我们可以通过它来判断那些Node节点未接到响应的请求数量最少,则就是负载最小的。通过这种方式,可以保证负载尽可能的平均,而不会造成某一个节点的重度阻塞从而影响整个消息的发送性能。代码如下所示:

除了上面之外,我们在元数据更新的时候,也是通过leastLoadeNode(...)方法来获得负载最低的节点的,那么,在Kafka中什么是元数据呢? 当我们发送消息的时候,消息发送到哪个分区,这个分区对应的Broker的地址和端口,已经这个是否配置了Kafka集群,集群中都包含哪些节点等等,都是保存在元数据信息中的。那么,在什么步骤触发了元数据更新呢? 我们可以把视野转向Sender的runOnce()方法上,在下图红框处,我们调用了client的poll方法,如下是该方法的源码:

此处的client对应的是NetworkClient的实例对象,在该类的poll(...)方法中,执行了更新元数据的逻辑,即下图红框所示:

maybeUpdate方法中,我们看到了熟悉的一段代码Node node = leastLoadedNode(now); 此处就是获得负载最低节点的地方。那么获得到了这个node之后,就可以调用maybeUpdate(now, node)来尝试更新元数据信息了:

maybeUpdate(now, node)方法中我们可以看到,更新元数据也是采用发送消息的方式,即:向这个负载最低的Node发送MetadataRequest请求来获取具体的元数据信息。在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的处理逻辑一样的:

今天的文章内容就这些了:

写作不易,笔者几个小时甚至数天完成的一篇文章,只愿换来您几秒钟的 点赞 & 分享

更多技术干货,欢迎大家关注公众号“爪哇缪斯” ~ \(^o^)/ ~ 「干货分享,每天更新」

相关文章
|
消息中间件 监控 Java
图解Kafka线程模型及其设计缺陷
图解Kafka线程模型及其设计缺陷
图解Kafka线程模型及其设计缺陷
|
4月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
395 4
|
5月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
120 7
|
5月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
214 4
|
8月前
|
消息中间件 存储 网络协议
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
Apache Kafka的单分区写入性能在某些严格保序场景中至关重要,但其现有线程模型限制了性能发挥。本文分析了Kafka的串行处理模型,包括SocketServer、KafkaChannel、RequestChannel等组件,指出其通过KafkaChannel状态机确保请求顺序处理,导致处理效率低下。AutoMQ提出流水线处理模型,简化KafkaChannel状态机,实现网络解析、校验定序和持久化的阶段间并行化,提高处理效率。测试结果显示,AutoMQ的极限吞吐是Kafka的2倍,P99延迟降低至11ms。
157 3
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
|
消息中间件 网络协议 安全
【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
|
监控 Java 程序员
十分钟掌握java多线程进阶
十分钟掌握java多线程进阶
144 0
十分钟掌握java多线程进阶
|
消息中间件 Kafka
Kafka单线程Consumer及参数详解
Kafka单线程Consumer及参数详解
519 57
|
消息中间件 存储 设计模式
一文读懂kafka消息拉取机制|线程拉取模型
一文读懂kafka消息拉取机制|线程拉取模型
一文读懂kafka消息拉取机制|线程拉取模型
|
消息中间件 存储 缓存
KafkaProducer Sender 线程详解(含详细的执行流程图)
KafkaProducer Sender 线程详解(含详细的执行流程图)
KafkaProducer Sender 线程详解(含详细的执行流程图)