聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR

简介: 聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR

我们上一篇讲的是 聊聊 Kafka:协调者 GroupCoordinator 源码剖析之实例化与启动 GroupCoordinator,这一篇我们来讲讲聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR。

四、ApiKeys.FIND_COORDINATOR

我们前面的文章说过,与消费组相关的两个组件,一个是消费者客户端的 ConsumerCoordinator,一个是 Kafka Broker 服务端的 GroupCoordinator。ConsumerCoordinator 负责与 GroupCoordinator 通信,Broker 启动的时候,都会启动一个 GroupCoordinator 实例,而一个集群中,会有多个 Broker,那么如何确定一个新的 Consumer 加入 Consumer Group 后,到底和哪个 Broker 上的 GroupCoordinator 进行交互呢?

这个问题就就交给服务端的 ApiKeys.FIND_COORDINATOR 命令来处理。

4.1 客户端源码分析

coordinator 即获取到的 group 节点对象,client.isUnavailable(coordinator) 是在与 group 建立连接,每次判断 coordinator 不为空且 client 与 group 连接失败,则将 coordinator 置空,为什么会这样呢?很有可能是请求到 group 的信息之后发现该节点已下线或者不可用,此时服务端很有可能也在进行选举,所以我们需要将 coordinator 清空,待服务端选举完成后再次通信。

如果通信一次发现该 GroupCoordinator 的信息还未获取到则继续重试,直到超时,这里的超时时间即为 poll 时传入的超时时间,这个时间设置贯穿了整个 consume 的运行代码。

我们来看看是如何寻找负载最小节点的,首先就是取随机数,防止每次都从第一个节点连接,如果判断没有在途的 request 则直接返回该节点,否则取在途 request 最小的节点,如果该节点不存在,则依次取连接的节点、需要重试的节点,如果找到不为 null 的节点则返回该节点,否则返回 null。

4.2 FindCoordinatorRequest 请求报文




key_type 有两种枚举,一种是 GROUP,另一种是 TRANSACTION,如果 type 为 GROUP 的话那 key 就是 groupId,反之是 transactionId。

4.3 服务端源码分析

直接看到 FIND_COORDINATOR 命令调用的方法 kafka.server.KafkaApis#handleFindCoordinatorRequest

kafka.coordinator.group.GroupMetadataManager#partitionFor

def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
// 记录 offsets topic 的分区数量,这个字段会调用 getGroupMetadataTopicPartitionCount() 进行初始化,默认 50。
private val groupMetadataTopicPartitionCount = getGroupMetadataTopicPartitionCount
private def getGroupMetadataTopicPartitionCount: Int = {
  zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicNumPartitions)
}
val DefaultOffsetsTopicNumPartitions = 50

__consumer_offsets 这个内部 Topic,专门用来存储 Consumer Group 消费的情况,内置 Topic 初始化时由 offsets.topic.num.partitions 参数来决定分区数,默认值是 50。相同 Consumer Group 的 offset 最终会保存在其中一个分区中,而保存在哪个分区就由上面这段代码来决定,可以看到逻辑很简单,就是取 groupId 的 hashCode,然后对总的分区数取模。

举个例子,假设一个 GroupId 计算出来的 hashCode 是 8,之后取模 50 得到 8。那么 partition-8 的 leader 所在的 broker 就是我们要找的那个节点。这个 Consumer Group 后面都会直接在 partition-8 分区保存位点。

kafka.server.KafkaApis#getOrCreateInternalTopic

  • 首先从当前 node 的元数据缓存中拿到对应 topic 的数据,如果没有,则创建。
  • 注意:kafka 创建 topic 是需要时间的,而这里的实现方式是往 zk 中写入数据触发创建 topic 流程,是一种异步方式,往 zk 中写入数据之后会返回一个 error,LEADER_NOT_AVAILABLE,待创建 topic 的流程走完,并同步各个节点 metaData 之后,最后从 metaData 中取到该节点信息 findCoordinatorRequest 才会成功返回。

4.4 FindCoordinatorResponse 响应报文


4.5 小结

总体分析下来,主要流程如下图所示:

  • 寻找最小负载节点信息
  • 向最小负载节点发送 FindCoordinatorRequest
  • 最小负载节点处理该请求
  • 首先找到该 groupId 对应的分区
  • 通过内存中缓存的 metaData 获取该分区的信息,如果不存在则创建 topic。
  • 返回查找到的分区 leader 信息
  • 最小负载节点向 client 响应 FindCoordinatorResponse



欢迎大家关注我的公众号老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。

相关文章
|
7月前
|
消息中间件 分布式计算 Kafka
亿万级别Kafka演进之路:可靠性+事务+消息中间件+源码+日志
Kafka起初是由LinkedIn公司采用Scala语言开发的-一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
|
7月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
504 4
|
2月前
|
消息中间件 存储 分布式计算
大数据-72 Kafka 高级特性 稳定性-事务 (概念多枯燥) 定义、概览、组、协调器、流程、中止、失败
大数据-72 Kafka 高级特性 稳定性-事务 (概念多枯燥) 定义、概览、组、协调器、流程、中止、失败
43 4
|
6月前
|
消息中间件 运维 数据管理
Kafka 如何基于 KRaft 实现集群最终一致性协调
Kafka 3.3.1 引入了 KRaft 元数据管理组件,替代 Zookeeper,以简化集群一致性维护,支持更大规模集群并减轻运维复杂性。在 Zookeeper 模式下,需同时运维 ZK 和 Broker,而 KRaft 模式仅需 3 个节点即可构成最小生产集群,且通信协调基于 Raft 协议,增强了一致性。KRaft 模式中,Controller 使用单线程处理请求,通过 KRaft 保持内存状态与多节点一致性。此外,Broker 根据 KRaft 记录更新元数据,实现声明式管理,提高集群协调效率。KRaft 的引入是集群协调机制的演进,采用事件驱动模型实现元数据的一致性。
362 1
Kafka 如何基于 KRaft 实现集群最终一致性协调
|
7月前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
137 1
|
7月前
|
消息中间件 存储 负载均衡
[AIGC ~ coze] Kafka 消费者——从源码角度深入理解
[AIGC ~ coze] Kafka 消费者——从源码角度深入理解
|
7月前
|
消息中间件 网络协议 Kafka
Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
【2月更文挑战第21天】Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
238 3
|
7月前
|
存储 Java 关系型数据库
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
200 1
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
|
7月前
|
消息中间件 Java 关系型数据库
【Spring Boot+Kafka+Mysql+HBase】实现分布式优惠券后台应用系统(附源码)
【Spring Boot+Kafka+Mysql+HBase】实现分布式优惠券后台应用系统(附源码)
313 2
|
7月前
|
消息中间件 缓存 Kafka
kafka源码解析——第一篇:producer
kafka源码解析——第一篇:producer
101 0