我们上一篇讲的是 聊聊 Kafka:协调者 GroupCoordinator 源码剖析之实例化与启动 GroupCoordinator,这一篇我们来讲讲聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR。
四、ApiKeys.FIND_COORDINATOR
我们前面的文章说过,与消费组相关的两个组件,一个是消费者客户端的 ConsumerCoordinator,一个是 Kafka Broker 服务端的 GroupCoordinator。ConsumerCoordinator 负责与 GroupCoordinator 通信,Broker 启动的时候,都会启动一个 GroupCoordinator 实例,而一个集群中,会有多个 Broker,那么如何确定一个新的 Consumer 加入 Consumer Group 后,到底和哪个 Broker 上的 GroupCoordinator 进行交互呢?
这个问题就就交给服务端的 ApiKeys.FIND_COORDINATOR 命令来处理。
4.1 客户端源码分析
coordinator 即获取到的 group 节点对象,client.isUnavailable(coordinator)
是在与 group 建立连接,每次判断 coordinator 不为空且 client 与 group 连接失败,则将 coordinator 置空,为什么会这样呢?很有可能是请求到 group 的信息之后发现该节点已下线或者不可用,此时服务端很有可能也在进行选举,所以我们需要将 coordinator 清空,待服务端选举完成后再次通信。
如果通信一次发现该 GroupCoordinator 的信息还未获取到则继续重试,直到超时,这里的超时时间即为 poll 时传入的超时时间,这个时间设置贯穿了整个 consume 的运行代码。
我们来看看是如何寻找负载最小节点的,首先就是取随机数,防止每次都从第一个节点连接,如果判断没有在途的 request 则直接返回该节点,否则取在途 request 最小的节点,如果该节点不存在,则依次取连接的节点、需要重试的节点,如果找到不为 null 的节点则返回该节点,否则返回 null。
4.2 FindCoordinatorRequest 请求报文
key_type 有两种枚举,一种是 GROUP,另一种是 TRANSACTION,如果 type 为 GROUP 的话那 key 就是 groupId,反之是 transactionId。
4.3 服务端源码分析
直接看到 FIND_COORDINATOR 命令调用的方法 kafka.server.KafkaApis#handleFindCoordinatorRequest
kafka.coordinator.group.GroupMetadataManager#partitionFor
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount // 记录 offsets topic 的分区数量,这个字段会调用 getGroupMetadataTopicPartitionCount() 进行初始化,默认 50。 private val groupMetadataTopicPartitionCount = getGroupMetadataTopicPartitionCount private def getGroupMetadataTopicPartitionCount: Int = { zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicNumPartitions) } val DefaultOffsetsTopicNumPartitions = 50
__consumer_offsets
这个内部 Topic,专门用来存储 Consumer Group 消费的情况,内置 Topic 初始化时由 offsets.topic.num.partitions
参数来决定分区数,默认值是 50。相同 Consumer Group 的 offset 最终会保存在其中一个分区中,而保存在哪个分区就由上面这段代码来决定,可以看到逻辑很简单,就是取 groupId 的 hashCode,然后对总的分区数取模。
举个例子,假设一个 GroupId 计算出来的 hashCode 是 8,之后取模 50 得到 8。那么 partition-8 的 leader 所在的 broker 就是我们要找的那个节点。这个 Consumer Group 后面都会直接在 partition-8 分区保存位点。
kafka.server.KafkaApis#getOrCreateInternalTopic
- 首先从当前 node 的元数据缓存中拿到对应 topic 的数据,如果没有,则创建。
- 注意:kafka 创建 topic 是需要时间的,而这里的实现方式是往 zk 中写入数据触发创建 topic 流程,是一种异步方式,往 zk 中写入数据之后会返回一个 error,LEADER_NOT_AVAILABLE,待创建 topic 的流程走完,并同步各个节点 metaData 之后,最后从 metaData 中取到该节点信息 findCoordinatorRequest 才会成功返回。
4.4 FindCoordinatorResponse 响应报文
4.5 小结
总体分析下来,主要流程如下图所示:
- 寻找最小负载节点信息
- 向最小负载节点发送 FindCoordinatorRequest
- 最小负载节点处理该请求
- 首先找到该 groupId 对应的分区
- 通过内存中缓存的 metaData 获取该分区的信息,如果不存在则创建 topic。
- 返回查找到的分区 leader 信息
- 最小负载节点向 client 响应 FindCoordinatorResponse
欢迎大家关注我的公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。