Kafka请求队列源码实现-RequestChannel请求通道(下)

简介: Kafka请求队列源码实现-RequestChannel请求通道

RequestChannel

实现了Kafka Request队列。传输Request/Response的通道。有了Request和Response的基础,下面我们可以学习RequestChannel类的实现了。

定义

image.png

RequestChannel类实现KafkaMetricsGroup trait,后者封装许多实用指标监控方法:

  • newGauge
    创建数值型监控指标
  • newHistogram
    创建直方图型监控指标

属性

每个RequestChannel对象实例创建时,会定义队列保存Broker接收到的各类请求,这个队列被称为请求队列或Request队列。

Kafka使用Java提供的阻塞队列ArrayBlockingQueue实现请求队列,并利用它天然提供的线程安全保证多个线程能够并发安全高效地访问请求队列。


代码中该队列由变量requestQueue定义。

image.png

queueSize

Request队列的最大长度。当Broker启动时,SocketServer组件会创建RequestChannel对象

image.png

并把Broker端参数queued.max.requests赋值给queueSize。默认情况每个RequestChannel上的队列长度500。

image.png

processors

image.png

封装RequestChannel的Processor线程池。每个Processor线程负责具体的请求处理逻辑。


Processor管理

刚才的processors即是被创建的Processor线程池,使用Java#ConcurrentHashMap保存:

Key:processor序号

Value:具体的Processor线程对象

因此当前Kafka Broker端所有网络线程都是在RequestChannel中维护的。

管理线程池

  • RequestChannel中的addProcessor和removeProcessor。
  • image.png
  • 分别实现增加和移除线程。每当Broker启动,都会调用addProcessor方法,向RequestChannel对象添加num.network.threads个Processor线程。


num.network.threads这个参数的更新模式(Update Mode)是Cluster-wide,即Kafka允许你动态修改此参数值。比如,Broker启动时指定num.network.threads为8,之后你通过kafka-configs命令将其修改为3。显然该操作会减少Processor线程池中的线程数量。在这个场景下,removeProcessor方法会被调用。

处理Request和Response

即收发Request和发送Response。

sendRequest和receiveRequest:

  • 发送Request仅是将Request对象置于Request队列
  • image.png
  • 接收Request则是从队列中取出Request
  • image.png
  • 整个流程其实就是“生产者-消费者”模式,依靠ArrayBlockingQueue的线程安全确保整个过程的线程安全
  • image.png
  • 没有所谓的接收Response,只有发送Response,即sendResponse方法。sendResponse是啥意思呢?其实就是把Response对象发送出去,也就是将Response添加到Response队列的过程。
  • image.png
  • 当Processor处理完某个Request后,会把自己的序号封装进对应的Response对象。

一旦找出之前是由哪个Processor线程处理,代码直接调用该Processor的enqueueResponse方法,将Response放入Response队列中,等待后续发送。


监控指标

RequestChannel类定义封装了与Request队列相关的重要监控指标,以实时动态地监测Request和Response的性能表现。

具体指标项

object RequestMetrics {
  val consumerFetchMetricName = ApiKeys.FETCH.name + "Consumer"
  val followFetchMetricName = ApiKeys.FETCH.name + "Follower"
  val RequestsPerSec = "RequestsPerSec"
  val RequestQueueTimeMs = "RequestQueueTimeMs"
  val LocalTimeMs = "LocalTimeMs"
  val RemoteTimeMs = "RemoteTimeMs"
  val ThrottleTimeMs = "ThrottleTimeMs"
  val ResponseQueueTimeMs = "ResponseQueueTimeMs"
  val ResponseSendTimeMs = "ResponseSendTimeMs"
  val TotalTimeMs = "TotalTimeMs"
  val RequestBytes = "RequestBytes"
  val MessageConversionsTimeMs = "MessageConversionsTimeMs"
  val TemporaryMemoryBytes = "TemporaryMemoryBytes"
  val ErrorsPerSec = "ErrorsPerSec"
}

RequestsPerSec

每秒处理的Request数,用来评估Broker的繁忙状态。

RequestQueueTimeMs

计算Request在Request队列中的平均等候时间,单位是毫秒。倘若Request在队列的等待时间过长,你通常需要增加后端I/O线程的数量,来加快队列中Request的拿取速度。

LocalTimeMs

计算Request实际被处理的时间,单位是毫秒。一旦定位到这个监控项的值很大,你就需要进一步研究Request被处理的逻辑了,具体分析到底是哪一步消耗了过多的时间。

RemoteTimeMs

Kafka的读写请求(PRODUCE请求和FETCH请求)逻辑涉及等待其他Broker操作的步骤。RemoteTimeMs计算的,就是等待其他Broker完成指定逻辑的时间。因为等待的是其他Broker,因此被称为Remote Time。这个监控项非常重要!Kafka生产环境中设置acks=all的Producer程序发送消息延时高的主要原因,往往就是Remote Time高。因此,如果你也碰到了这样的问题,不妨先定位一下Remote Time是不是瓶颈。

TotalTimeMs

计算Request被处理的完整流程时间。这是最实用的监控指标,没有之一!毕竟,我们通常都是根据TotalTimeMs来判断系统是否出现问题的。一旦发现了问题,我们才会利用前面的几个监控项进一步定位问题的原因。

RequestChannel定义了updateMetrics方法,用于实现监控项的更新


目录
相关文章
|
7月前
|
消息中间件 分布式计算 Kafka
亿万级别Kafka演进之路:可靠性+事务+消息中间件+源码+日志
Kafka起初是由LinkedIn公司采用Scala语言开发的-一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
|
7月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
504 4
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
103 1
|
2月前
|
消息中间件 大数据 Kafka
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
39 2
|
2月前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
44 1
|
5月前
|
消息中间件 Java Kafka
kafka Linux环境搭建安装及命令创建队列生产消费消息
kafka Linux环境搭建安装及命令创建队列生产消费消息
123 4
|
4月前
|
消息中间件 存储 Kafka
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
|
6月前
|
消息中间件 监控 Java
Java一分钟之-Kafka:分布式消息队列
【6月更文挑战第11天】Apache Kafka是一款高性能的消息队列,适用于大数据处理和实时流处理,以发布/订阅模型和分布式设计处理大规模数据流。本文介绍了Kafka基础,包括生产者、消费者、主题和代理,以及常见问题:分区选择、偏移量管理和监控不足。通过Java代码示例展示了如何创建生产者和消费者。理解并妥善处理这些问题,结合有效的监控和配置优化,是充分发挥Kafka潜力的关键。
112 0
|
7月前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
137 1
|
7月前
|
消息中间件 存储 负载均衡
[AIGC ~ coze] Kafka 消费者——从源码角度深入理解
[AIGC ~ coze] Kafka 消费者——从源码角度深入理解

热门文章

最新文章