3 Broker如何处理客户端的更新元数据请求?
Broker处理所有RPC请求的入口方法
KafkaApis#handleTopicMetadataRequest
- 处理更新元数据的方法
- handleTopicMetadataRequest(RequestChannel.Request):
def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = { ... // 需要获取哪些topic的元数据 val topics = if (metadataRequest.isAllTopics) metadataCache.getAllTopics() // 不会披露未经Describe授权的主题的存在,因此甚至都没有检查它们是否存在 val unauthorizedForDescribeTopicMetadata = // 对于所有主题,请勿包括未经授权的主题 // 在旧版本的协议中,每次都获取所有主题的元数据 if ((requestVersion == 0 && (metadataRequest.topics == null || metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics) Set.empty[MetadataResponseTopic] else unauthorizedForDescribeTopics.map(topic => metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, util.Collections.emptyList())) // 从元数据缓存过滤出相关主题的元数据 getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners) var clusterAuthorizedOperations = Int.MinValue if (request.header.apiVersion >= 8) { // 获取集群授权的操作 if (metadataRequest.data.includeClusterAuthorizedOperations) { ... // 获取主题授权操作 if (metadataRequest.data.includeTopicAuthorizedOperations) { ... val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata // 获取所有Broker列表 val brokers = metadataCache.getAliveBrokers // 构建Response并发送 sendResponseMaybeThrottle(request, requestThrottleMs => ... }
先根据请求中的topic列表
去本地元数据缓存MetadataCache中过滤出相应主题的元数据,即 topics 子树的子集
然后再去本地元数据缓存中获取所有Broker的集合, 即 ids 子树
最后把这两部分合在一起,作为响应返回给客户端。
Kafka在每个Broker中都维护了一份和zk中一样的元数据缓存,并非每次client请求元数据就去读一次zk。由于zk的Watcher机制,Kafka可感知到zk中的元数据变化,从而及时更新Broker的元数据缓存。
4 最佳实践
目前Kafka集群的可用性高度耦合zk,若zk集群不能提供服务,整个Kafka集群就无法服务了,Kafka的开发者也意识到了这个问题,目前正在讨论开发一个元数据服务来替代 zk。
若需部署大规模Kafka集群,推荐拆分成多个互相独立的小集群部署,每个小集群都使用一组独立的zk提供服务。这样,每个zk中存储的数据相对较少,且若某zk集群异常,只会影响一个小Kafka集群,尽量减小了影响范围。
参考