ApiVersions请求作用
当Broker接收到ApiVersionsRequest,它会返回Broker当前支持的请求类型列表,包括请求类型名称、支持的最早版本号和最新版本号。查看Kafka的bin目录,能找到kafka-broker-api-versions.sh脚本工具。它就是,构造ApiVersionsRequest对象,然后发送给对应的Broker。
若是ApiVersions类型请求,代码中为什么要判断一下它的版本呢?
和处理其他类型请求不同,Kafka必须保证版本号比最新支持版本还要高的ApiVersions请求也能被处理。这主要是考虑客户端和服务器端版本兼容。客户端发请求给Broker,可能不知道Broker到底支持哪些版本请求,它需使用ApiVersionsRequest去获取完整请求版本支持列表。若不做该判断,Broker可能无法处理客户端发送的ApiVersionsRequest。
metrics
metrics是Request相关的各种监控指标的一个管理类。它构建了一个Map,封装了所有请求JMX指标。
响应(Response)
定义了与Request对应的各类响应。
类设计
- Response
定义Response的抽象父类。每个Response对象都包含对应Request对象。该类核心方法onComplete,用来实现每类Response被处理后需要执行的回调逻辑。 - SendResponse
大多数Request处理完成后都需执行一段回调,SendResponse即保存返回结果的Response子类。核心字段onCompletionCallback,即指定处理完成之后的回调逻辑。
正常需要发送Response。
- NoResponse
有些Request处理完成后无需单独执行额外的回调逻辑。NoResponse就是为这类Response准备的。
无需发送Response。
- CloseConnectionResponse
出错后需要关闭TCP连接的场景,此时返回CloseConnectionResponse给Request发送方,显式地通知它关闭连接。
标识关闭连接通道的Response。
StartThrottlingResponse
通知Broker的Socket Server组件(后面几节课我会讲到它)某个TCP连接通信通道开始被限流(throttling)。
EndThrottlingResponse
与StartThrottlingResponse对应,通知Broker的SocketServer组件某个TCP连接通信通道的限流已结束。
后两个Response类不常用,仅在对Socket连接进行限流时,才会使用。
Response代码
abstract class Response(val request: Request) { locally { val nowNs = Time.SYSTEM.nanoseconds request.responseCompleteTimeNanos = nowNs if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = nowNs } def processor: Int = request.processor def responseString: Option[String] = Some("") def onComplete: Option[Send => Unit] = None override def toString: String }
该抽象类只有一个属性字段:request。即每个Response对象都要保存它对应的Request对象。
onComplete方法是调用指定回调逻辑的地方。
SendResponse类就重写了该方法:
class SendResponse(request: Request, val responseSend: Send, val responseAsString: Option[String], val onCompleteCallback: Option[Send => Unit]) extends Response(request) { ...... // 指定输入参数onCompleteCallback override def onComplete: Option[Send => Unit] = onCompleteCallback }
onComplete方法把函数赋值给另一个函数,并作为结果返回。好处在于可以灵活变更onCompleteCallback实现不同回调逻辑。