Kafka放弃Zookeeper后如何持存储主题与消费组呢?

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Kafka放弃Zookeeper后如何持存储主题与消费组呢?

1、主题元数据存储在Zookeeper中


进入到Kafka Broker连接的Zookeeper集群,我们不难发现在 /{namespace}/brokers/topics节点下存在该集群中所有的主题信息,展开某一个具体的主题,如下图所示:

8353ba9a5110986563c4cd1b7a24d025.png

关于主题的元信息,其实主要包括如下信息:


  • 分区数量 每一个具体topic下会有一个partitions节点,该节点下的每一个子节点代表一个分区。
  • 分区状态信息 每一个分区的的状态由叶子节点 /{namespace}/brokers/topics/{topicName}/parttions/{partNO}/state表示,存储的内容如下:
  • controller_epoch 控制器当前的选举版本。
  • leader 该分区的Leader所在的Broker节点ID。
  • version 当前的存储格式版本,默认为1。
  • leader_epoch 分区Leader的选举版本。
  • isr 分区的ISR集合。


主题的路由信息是存储在Zookeeper中,那为什么客户端只需要Broker的地址,就可以获取到主题的路由信息呢?


1.1 主题路由寻址


查找路由信息在Kafka2.1版本中是发送ApiKeys.METADATA请求,该请求的响应逻辑定义在Broker中,那客户端是如何对Broker进行路由,Broker中的路由信息又是从何而来呢?


消息发送者首次发送METADATA定位Broker机制:首次发送请求会从KafkaProducer的bootstrap.servers中设置的broker列表中选择当前最空闲的Broker,后续能感知所有的Broker。


消息消费者发送METADATA定位Broker机制:发送到当前消费组的组协调所在的Broker。


根据查阅KafkaApis的handleTopicMetadataRequest方法,进行一些ACL校验后进入其核心方法:


577cf6ad91464824ce651bfab1d425b8.png

关键点:


  • MetadataCache中获取topic到路由信息。
  • 如果MetadataCache中不存在指定topic的路由信息,如果Broker允许自动创建主题(auto.create.topics.enable),默认为true,则自动创建该主题的信息,并将主题信息写入到zookeeper,具体操作:
  • 在/brokers/topics节点下创建子节点,子节点名称为topic的名称。
  • 根据当前kafka分区的机架信息,分区数、副本数,broker节点数,进行分配,主要尽量将主分区不放在同一个机架、存储在主题的节点信息中,例如{"version":1,"partitions":{"4":[2,0,1],"5":[0,1,2],"1":[2,1,0],"0":[1,0,2],"2":[0,2,1],"3":[1,2,0]}},其中key为分区名称,值为副本所在的brokerId,其中排在第一位是倾向性Leader,主题中存储的值是静态数据,具体还会触发选举,选举算法会参考这个分配。
  • 控制器还会注册调用registerPartitionModificationsHandlers方法,监听主题信息的变化,从而触发后续流程,启动分区的真正创建(各个分区的Leader选举等)。

⚠️温馨提示:Kafka开启自动创建主题,分区数量取自kafka broker中的num.partitions参数,默认为1,副本因子则取决于default.replication.factor参数,默认为1。


1.2 路由信息同步机制


MetadataCache,元信息缓存,那这里的数据又是从何而来呢?MetadataCache中路由信息的更新调用链如下图所示:

6353324bb1d47ccc90a118c11ead8d81.png


Kafka的KafkaController(后续统称控制器)首先会听/brokers/topics/{topicName}节点内容的变化,一旦有新主题创建或主题信息变更,topic变更事件就会触发,此时TopicChange的process方法会调用,最终调用updatePartitionReplicaAssignment,也就是一旦主题的信息发生变更,控制器会向所有Broker节点发送ApiKeys.UPDATE_METADATA,各个Broker在到该请求后,会更新各个Broker中的内存缓存,供消息发送者查找topic路由信息。


即Kafka2.2版本中,topic的元信息存储在Zookeeper中,同时Kafka Controller会监听zookeeper中相关节点,从而感知信息变更,从而将路由信息通过RPC发送到集群内所有的Broker中,故每一个Broker的内存中都存储一份相同的路由信息。


⚠️Kafka2.8版本开始尝试去Zookeeper化。


思考题:⚠️为什么各个Broker不都监听zookeeper,从而感知topic变化,更新本地内存呢?欢迎各位留言讨论或私信dingwpmz,共同交流。


2、消费组存储在位点主题中


在较低版本中,启动Kafka消费组需要指定zookeeper集群的地址,因为在低版本中消费组的元信息存储在zookeeper中,具体路径为/consumers,但后续版本中消费端的启动已经不需指定zookeeper,而是指定broker的地址列表即可,那这个时候,消费组的信息是存储在哪呢?


在前面介绍Kafka故障解决相关的文章中我们常常看到消费组组协调器,内部持有一个消费组元数据管理器GroupMetadataManager,相关的代码截图如下所示:


36b524498a8c635cc4b8cf0f7b7ae08e.jpg

在GroupMetadataManager对象中持有一个Map结构的缓存,其键为消费组的名称,值为GroupMetadata对象,内部记录消费组的状态,消费组的成员列表,位点信息。


内存的特点:访问高效,但随着Broker进程的退出而丢失,消费组存储在内存中显然不行,但又不在zookeeper中,那消费组的定义信息存储在什么地方呢?


2.1消费组元信息存储


消费组的定义信息存储在系统主题__consumer_offsets中,什么,这个主题不是用来存储消费位点的吗?


原来__consumer_offsets不仅存储消费组的位点信息,还存储消费组的元信息,具体代码入口:GroupMetadataManager#storeGroup,部分代码截图如下所示:

e8679b0162e0e2eb587d134f27c88717.jpg

即消费组元信息当成一条消息写入到__consumer_offsets,一条消费组元信息存储的value值,由GroupMetadataManager的groupMetadataValue方法定义,具体代码如下:

f836242a73d1f4127a78664a217a0e5b.jpg

随着Kafka的不断演化,存储格式进行了多次修改,对应的版本如下:


  • V0:Kafka 0.10级以下版本
  • V1:大于 0.10,低于等于2.1版本。
  • V2:2.2版本及以后


消费组元信息存储的格式为Json,具体存储的内容:


  • protocol_type 协议版本,取自AbstractCoordinator的抽象方法protocolType(),消费组的固定为:consumer。
  • generation 消费组元信息的版本号,每发生一次消费组重平衡,该值会加一。
  • protocol 协议内容,存储消费组的队列负载算法,在构建消费者时可通过partition.assignment.strategy参数传递,可以传递多个,消费组具体的负载算法会选择每一个消费者都支持的协议进行队列负载,默认的负载算法为RangeAssignor。
  • leader 当前消费组的Leader,通常为第一个加入该消费组的消费者。
  • current_state_timestamp 最新状态变更的时间戳,该值是从V2版本开始引入。
  • members 消费组的成员信息,每一个成员信息存储的信息如下:
  • member_id 成员id,客户端id(clientId) + uuid。
  • client_id 客户端ID。
  • client_host 客户端ip地址。
  • rebalance_timeout 重平衡时间,默认为300000,5分钟。
  • session_timeout 会话超时时间,默认为10s。
  • subscription 元信息,取自AbstractCoordinator的抽象方法metadata(),消费组的实现类为ConsumerCoordinator,主要是遍历负载算法,每一个负载算法根据订阅信息计算元信息。
  • assignment
    各个消费者的队列负载情况。

⚠️温馨提示:GroupMetadataManager的storeGroup方法的调用时间是在消费组进行重平衡时,具体是重平衡第二阶段(SYNC_GROUP)与完成重平衡。


2.2加载消息组元信息


消费组元信息是存储在 __consumer_offsets主题中,在什么时候会从该主题中加载到内存中呢?


在__consumer_offsets的分区发生Leader选举时会触发将对应分区中的数据加载到内存,具体的处理入口在KafkaApis的handleLeaderAndIsrRequest方法,简易调用链如下图所示:

95b453e637a2e82d9f3fc051acc2c685.png


3、总结


本文主要介绍了Kafka 主题与消费组的持久化机制,在Kafka2.8版本开始,官方逐步去除对Zookeeper的依赖,那kafka3.x之后,又会是如何存储消费组、主题的信息呢?大家可以尝试思考后,笔者也将在该专栏的后续文章中加以介绍,敬请期待。



目录
打赏
0
0
0
0
231
分享
相关文章
Kafka 为什么要抛弃 Zookeeper?
本文探讨了Kafka为何逐步淘汰ZooKeeper。长久以来,ZooKeeper作为Kafka的核心组件,负责集群管理和协调任务。然而,随着Kafka的发展,ZooKeeper带来的复杂性增加、性能瓶颈及一致性问题日益凸显。为解决这些问题,Kafka引入了KRaft,这是一种基于Raft算法的内置元数据管理方案,不仅简化了部署流程,还提升了系统的一致性和扩展性。本文详细分析了这一转变背后的原因及其带来的优势,并展望了Kafka未来的发展方向。
363 1
大数据-71 Kafka 高级特性 物理存储 磁盘存储特性 如零拷贝、页缓存、mmp、sendfile
大数据-71 Kafka 高级特性 物理存储 磁盘存储特性 如零拷贝、页缓存、mmp、sendfile
108 3
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
77 1
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
80 4
大数据-68 Kafka 高级特性 物理存储 日志存储概述
大数据-68 Kafka 高级特性 物理存储 日志存储概述
51 1
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
161 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
ZooKeeper助力Kafka:掌握这四大作用,让你的消息队列系统稳如老狗!
【8月更文挑战第24天】Kafka是一款高性能的分布式消息队列系统,其稳定运行很大程度上依赖于ZooKeeper提供的分布式协调服务。ZooKeeper在Kafka中承担了四大关键职责:集群管理(Broker的注册与选举)、主题与分区管理、领导者选举机制以及消费者组管理。通过具体的代码示例展示了这些功能的具体实现方式。
211 2
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
121 11
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等