RequestChannel
实现了Kafka Request队列。传输Request/Response的通道。有了Request和Response的基础,下面我们可以学习RequestChannel类的实现了。
定义
RequestChannel类实现KafkaMetricsGroup trait,后者封装许多实用指标监控方法:
- newGauge
创建数值型监控指标 - newHistogram
创建直方图型监控指标
属性
每个RequestChannel对象实例创建时,会定义队列保存Broker接收到的各类请求,这个队列被称为请求队列或Request队列。
Kafka使用Java提供的阻塞队列ArrayBlockingQueue实现请求队列,并利用它天然提供的线程安全保证多个线程能够并发安全高效地访问请求队列。
代码中该队列由变量requestQueue定义。
queueSize
Request队列的最大长度。当Broker启动时,SocketServer组件会创建RequestChannel对象
并把Broker端参数queued.max.requests
赋值给queueSize。默认情况每个RequestChannel上的队列长度500。
processors
封装RequestChannel的Processor线程池。每个Processor线程负责具体的请求处理逻辑。
Processor管理
刚才的processors即是被创建的Processor线程池,使用Java#ConcurrentHashMap保存:
Key:processor序号
Value:具体的Processor线程对象
因此当前Kafka Broker端所有网络线程都是在RequestChannel中维护的。
管理线程池
- RequestChannel中的addProcessor和removeProcessor。
- 分别实现增加和移除线程。每当Broker启动,都会调用addProcessor方法,向RequestChannel对象添加num.network.threads个Processor线程。
num.network.threads这个参数的更新模式(Update Mode)是Cluster-wide,即Kafka允许你动态修改此参数值。比如,Broker启动时指定num.network.threads为8,之后你通过kafka-configs命令将其修改为3。显然该操作会减少Processor线程池中的线程数量。在这个场景下,removeProcessor方法会被调用。
处理Request和Response
即收发Request和发送Response。
sendRequest和receiveRequest:
- 发送Request仅是将Request对象置于Request队列
- 接收Request则是从队列中取出Request
- 整个流程其实就是“生产者-消费者”模式,依靠ArrayBlockingQueue的线程安全确保整个过程的线程安全
- 没有所谓的接收Response,只有发送Response,即sendResponse方法。sendResponse是啥意思呢?其实就是把Response对象发送出去,也就是将Response添加到Response队列的过程。
- 当Processor处理完某个Request后,会把自己的序号封装进对应的Response对象。
一旦找出之前是由哪个Processor线程处理,代码直接调用该Processor的enqueueResponse方法,将Response放入Response队列中,等待后续发送。
监控指标
RequestChannel类定义封装了与Request队列相关的重要监控指标,以实时动态地监测Request和Response的性能表现。
具体指标项
object RequestMetrics { val consumerFetchMetricName = ApiKeys.FETCH.name + "Consumer" val followFetchMetricName = ApiKeys.FETCH.name + "Follower" val RequestsPerSec = "RequestsPerSec" val RequestQueueTimeMs = "RequestQueueTimeMs" val LocalTimeMs = "LocalTimeMs" val RemoteTimeMs = "RemoteTimeMs" val ThrottleTimeMs = "ThrottleTimeMs" val ResponseQueueTimeMs = "ResponseQueueTimeMs" val ResponseSendTimeMs = "ResponseSendTimeMs" val TotalTimeMs = "TotalTimeMs" val RequestBytes = "RequestBytes" val MessageConversionsTimeMs = "MessageConversionsTimeMs" val TemporaryMemoryBytes = "TemporaryMemoryBytes" val ErrorsPerSec = "ErrorsPerSec" }
RequestsPerSec
每秒处理的Request数,用来评估Broker的繁忙状态。
RequestQueueTimeMs
计算Request在Request队列中的平均等候时间,单位是毫秒。倘若Request在队列的等待时间过长,你通常需要增加后端I/O线程的数量,来加快队列中Request的拿取速度。
LocalTimeMs
计算Request实际被处理的时间,单位是毫秒。一旦定位到这个监控项的值很大,你就需要进一步研究Request被处理的逻辑了,具体分析到底是哪一步消耗了过多的时间。
RemoteTimeMs
Kafka的读写请求(PRODUCE请求和FETCH请求)逻辑涉及等待其他Broker操作的步骤。RemoteTimeMs计算的,就是等待其他Broker完成指定逻辑的时间。因为等待的是其他Broker,因此被称为Remote Time。这个监控项非常重要!Kafka生产环境中设置acks=all的Producer程序发送消息延时高的主要原因,往往就是Remote Time高。因此,如果你也碰到了这样的问题,不妨先定位一下Remote Time是不是瓶颈。
TotalTimeMs
计算Request被处理的完整流程时间。这是最实用的监控指标,没有之一!毕竟,我们通常都是根据TotalTimeMs来判断系统是否出现问题的。一旦发现了问题,我们才会利用前面的几个监控项进一步定位问题的原因。
RequestChannel定义了updateMetrics方法,用于实现监控项的更新