Kafka请求队列源码实现-RequestChannel请求通道(中)

简介: Kafka请求队列源码实现-RequestChannel请求通道

ApiVersions请求作用

当Broker接收到ApiVersionsRequest,它会返回Broker当前支持的请求类型列表,包括请求类型名称、支持的最早版本号和最新版本号。查看Kafka的bin目录,能找到kafka-broker-api-versions.sh脚本工具。它就是,构造ApiVersionsRequest对象,然后发送给对应的Broker。

若是ApiVersions类型请求,代码中为什么要判断一下它的版本呢?

和处理其他类型请求不同,Kafka必须保证版本号比最新支持版本还要高的ApiVersions请求也能被处理。这主要是考虑客户端和服务器端版本兼容。客户端发请求给Broker,可能不知道Broker到底支持哪些版本请求,它需使用ApiVersionsRequest去获取完整请求版本支持列表。若不做该判断,Broker可能无法处理客户端发送的ApiVersionsRequest。

metrics

metrics是Request相关的各种监控指标的一个管理类。它构建了一个Map,封装了所有请求JMX指标。

响应(Response)

定义了与Request对应的各类响应。

类设计

  • Response
    定义Response的抽象父类。每个Response对象都包含对应Request对象。该类核心方法onComplete,用来实现每类Response被处理后需要执行的回调逻辑。
  • SendResponse
    大多数Request处理完成后都需执行一段回调,SendResponse即保存返回结果的Response子类。核心字段onCompletionCallback,即指定处理完成之后的回调逻辑。

正常需要发送Response。

  • NoResponse
    有些Request处理完成后无需单独执行额外的回调逻辑。NoResponse就是为这类Response准备的。

无需发送Response。

  • CloseConnectionResponse
    出错后需要关闭TCP连接的场景,此时返回CloseConnectionResponse给Request发送方,显式地通知它关闭连接。

标识关闭连接通道的Response。

StartThrottlingResponse

通知Broker的Socket Server组件(后面几节课我会讲到它)某个TCP连接通信通道开始被限流(throttling)。

EndThrottlingResponse

与StartThrottlingResponse对应,通知Broker的SocketServer组件某个TCP连接通信通道的限流已结束。

后两个Response类不常用,仅在对Socket连接进行限流时,才会使用。

Response代码

abstract class Response(val request: Request) {
  locally {
    val nowNs = Time.SYSTEM.nanoseconds
    request.responseCompleteTimeNanos = nowNs
    if (request.apiLocalCompleteTimeNanos == -1L)
      request.apiLocalCompleteTimeNanos = nowNs
  }
  def processor: Int = request.processor
  def responseString: Option[String] = Some("")
  def onComplete: Option[Send => Unit] = None
  override def toString: String
}

该抽象类只有一个属性字段:request。即每个Response对象都要保存它对应的Request对象。

onComplete方法是调用指定回调逻辑的地方。

SendResponse类就重写了该方法:

class SendResponse(request: Request,
                     val responseSend: Send,
                     val responseAsString: Option[String],
                     val onCompleteCallback: Option[Send => Unit]) 
  extends Response(request) {
    ......
    // 指定输入参数onCompleteCallback
    override def onComplete: Option[Send => Unit] = onCompleteCallback
}

onComplete方法把函数赋值给另一个函数,并作为结果返回。好处在于可以灵活变更onCompleteCallback实现不同回调逻辑。

目录
相关文章
|
7月前
|
消息中间件 分布式计算 Kafka
亿万级别Kafka演进之路:可靠性+事务+消息中间件+源码+日志
Kafka起初是由LinkedIn公司采用Scala语言开发的-一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
|
7月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
504 4
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
103 1
|
2月前
|
消息中间件 大数据 Kafka
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
39 2
|
2月前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
44 1
|
5月前
|
消息中间件 Java Kafka
kafka Linux环境搭建安装及命令创建队列生产消费消息
kafka Linux环境搭建安装及命令创建队列生产消费消息
123 4
|
4月前
|
消息中间件 存储 Kafka
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
|
6月前
|
消息中间件 监控 Java
Java一分钟之-Kafka:分布式消息队列
【6月更文挑战第11天】Apache Kafka是一款高性能的消息队列,适用于大数据处理和实时流处理,以发布/订阅模型和分布式设计处理大规模数据流。本文介绍了Kafka基础,包括生产者、消费者、主题和代理,以及常见问题:分区选择、偏移量管理和监控不足。通过Java代码示例展示了如何创建生产者和消费者。理解并妥善处理这些问题,结合有效的监控和配置优化,是充分发挥Kafka潜力的关键。
112 0
|
7月前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
137 1
|
7月前
|
消息中间件 存储 负载均衡
[AIGC ~ coze] Kafka 消费者——从源码角度深入理解
[AIGC ~ coze] Kafka 消费者——从源码角度深入理解

热门文章

最新文章