我们前面说过:
聊聊 Kafka:协调者 GroupCoordinator 源码剖析之实例化与启动 GroupCoordinator
聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR
这一篇我们继续说下:
《聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 GROUP、OFFSET、HEARTBEAT 相关命令》
五、GROUP 相关命令
- ApiKeys.JOIN_GROUP
- ApiKeys.LEAVE_GROUP
- ApiKeys.SYNC_GROUP
- ApiKeys.DESCRIBE_GROUPS
- ApiKeys.LIST_GROUPS
- ApiKeys.DELETE_GROUPS
JOIN_GROUP 和 SYNC_GROUP请求处理在
聊聊 Kafka:Consumer 源码解析之 Consumer 如何加入 Consumer Group 已经说过了。
接下来主要说说另外两个 GROUP 的命令:
5.1 DESCRIBE_GROUPS
主要是返回 group 中各个 member 的详细信息,比如 memberId、groupInstanceId、clientId、clientHost、memberMetadata、memberAssignment。
5.2 LEAVE_GROUP
不难看出,LEAVE_GROUP 请求最重要的逻辑是在 removeMemberAndUpdateGroup 方法,从 group 移除失败的 member,并且将进行相应的状态转换。
- 如果 group 原来是在 Dead 或 Empty 时,那么由于 group 本来就没有 member,就不再进行任何操作。
- 如果 group 原来是在 Stable 或 CompletingRebalance 时,那么将会执行 maybePrepareRebalance() 方法,进行 rebalance 操作。
- 如果 group 已经在 PreparingRebalance 状态了,那么这里将检查一下 join-group 的延迟操作是否完成了,如果操作完成了,那么 GroupCoordinator 就会向 group 的 member 发送 join-group response,然后将状态更新为 CompletingRebalance。
六、OFFSET 相关命令
- ApiKeys.OFFSET_COMMIT
- ApiKeys.OFFSET_FETCH
- ApiKeys.OFFSET_FOR_LEADER_EPOCH
- ApiKeys.OFFSET_DELETE
这里我们来说下 OFFSET_FETCH 与 OFFSET_COMMIT 两个重要的请求命令,
6.1 OFFSET_FETCH
关于 OFFSET_FETCH 的请求,Server 端的处理如下,新版本>= 1从Kafka读取偏移量,我们这里直接看新版,fetch commit 分两种情况:
- 获取 group 所消费的所有 topic-partition 的 offset
- 获取指定 topic-partition 的 offset
在 groupCoordinator.handleFetchOffsets() 的实现中,主要是调用了 groupManager.getOffsets() 获取相应的 offset 信息,在查询时加锁的原因应该是为了避免在查询的过程中 offset 不断更新。
6.2 OFFSET_COMMIT
同样,我们也直接看高版本的移交 offset 的方式:
这里主要介绍下 groupManager.storeOffsets() 方法,主要逻辑如下:
- 首先过滤掉 offset 信息超过范围的 metadata
- 将 offset 信息追加到 replicated log 中
- 调用 prepareOffsetCommit() 方法,先将 offset 信息更新到 group 的 pendingOffsetCommits 中(这时还没有真正提交,后面如果失败的话,是可以撤回的)
- 在 putCacheCallback 回调函数中,如果 offset 信息追加到 replicated log 成功,那么就更新缓存(将 group 的 pendingOffsetCommits 中的信息更新到 offset 变量中)
七、HEARTBEAT 相关命令
- ApiKeys.HEARTBEAT
心跳请求是很重要的请求之一,为啥这么说呢?
- 对于 Client 端而言,心跳请求是 client 感应 group 状态变化的一个重要中介,比如:此时有一个新的 consumer 加入到 consumer group 中了,这时候会进行 rebalance 操作,group 端的状态会发送变化,当 group 其他 member 发送心跳请求,GroupCoordinator 就会通知 client 此时这个 group 正处于 rebalance 阶段,让它们 rejoin group。
- 对于 Server 端来说,它是 GroupCoordinator 判断一个 consumer member 是否存活的重要条件,如果其中一个 consumer 在给定的时间没有发送心跳请求,那么就会将这个 consumer 从这个 group 中移除,并执行 rebalance 操作。
八、Group 的状态机
通过上面的分析发现,GroupCoordinator 针对 GROUP、OFFSET、HEARTBEAT 相关命令的请求,Group 的状态机的维护是非常重要的,状态机中里的 rebalance 操作是重中之重,我们来回顾下 rebalance 操作的流程:
- 当消费者小跳丢失时将 group 从 Stable 状态变为 PreparingRebalance;
- 然后就是等待 group 中的所有 consumer member 发送 join-group 请求加入 group,如果都已经发送 join-group 请求,此时 GroupCoordinator 会向所有 member 发送 join-group response,那么 group 的状态变为 CompletingRebalance;
- leader consumer 会收到各个 member 订阅的 topic 详细信息,等待其分配好 partition 后,通过 sync-group 请求将结果发给 GroupCoordinator(非 leader consumer 发送的 sync-group 请求的 data 是为空的);
- 如果 GroupCoordinator 收到了 leader consumer 发送的 response,获取到了这个 group 各个 member 所分配的 topic-partition 列表,group 的状态就会变成 Stable。
欢迎大家关注我的公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。