学习方法|一把解开Kafka背后机制的“钥匙”

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 学习方法|一把解开Kafka背后机制的“钥匙”

在Kafka中有一个非常重要的角色:控制器,KafkaController,承担着Kafka核心运作机制,从本文开始将逐步深入Kafka内核,揭晓Kafka内部的运作机制,为更好的运维Kafka储备充足的弹药。


深入探究Kafka内核,我们将如何展开呢?经过笔者初略的浏览了KafkaController类的源码,发现Kafka的控制器严重依赖Zookeeper集群,而基于Zookeeper编程的常规套路:基于事件监听模型:


  • Zookeeper创建相应的节点
  • 创建Watch,监听节点的新增/修改/删除事件,并执行对应的事件回调函数。


Kafka Zookeeper事件监听如下图所示:

f8872ca41a2fc06bde41ad3a7eb2bf81.png

基于此种编程模型,事件回调方法就成为我们探究Kafka的突破口,而kafka也为控制类事件定义了统一的接口:ControllerEvent,简单声明如下图所示:

bff029970f62b86424951b851acce99b.png

关键信息如下:


  • long enqueueTimeMs 进入请求队列的时间戳
  • ControllerState 事件状态(应该用类型更加准确),例如ISR变更、Broker变更等。
  • process() 事件的处理函数,由各个子类实现。


ControllerEvent的所有子类如下图所示:

6d7f0e87cb8044abab445ec75f7a5ad6.png

上面每一种事件都代表Kafka内核蕴含的一种机制,这些也是接下来探究的重点,本文将重点关注Startup事件,包含了Kafka控制器选举,揭晓控制器的职权范围。


1、Kafka控制选举机制


通过查看Kafka启动流程发现,Broker启动时首先会向/broker/ids节点下注册自己,然后会将Startup事件放入到任务队列中,从而将触发Controller的选举,Startup事件的定义如下图所示:

b78fc969f3723905a40e4083e6e12191.png

主要完成两件事情:


  • 注册节点状态变更事件处理器,Kafka Controller会在zookeeper中创建节点/controller。
  • 基于Zookeeper进行Kafka Controller选举。


接下来将详细探究其实现细节,将其运作机制掌握于心。


1.1 Kafka Controller选举机制


Kafka Controller的选举由具体由KafkaController的elect()方法实现,接下来将对该方法进行详细剖析。

426b0253a169dab7312b3f7fd3ae6b73.png

核心实现步骤如下:;


  • 尝试从zookeeper中查看 /controller 节点中的内容,如果返回的值不等与-1,表示集群已经选举出KafkaController,则无需再次选举,直接返回。
  • 发起KafkaController的选举,具体做法是尝试向zookeeper创建 /controller 节点,该行为会原子的返回如下两种结果:
  • 节点已经存在 如果节点已经存在,则说明其他Broker提前创建了该节点,也就是获得Controller选举的胜利,如果节点存在,则出发异常处理(即当前节点没有成功选举后的善后操作,稍后会详细解读)
  • 节点成功创建 如果节点创建成功,则自己在本轮Controller选举中取得胜利,成功成为新的kafka Controller。
  • 如果节点成功被选举为新的Controller,则触发Controller故障转移的相关逻辑。
  • 如果选举失败,则进行必要的善后工作,接下来探究。


1.1.1 Broker选举为Controller


当一个新节点被选举成Kafka Controller后,也就是“新的秩序”重新建立,需要做哪些事情呢?由KafkaController的onControllerFailover方法实现,该方法代码有点多,将分步骤介绍。


Step1:重新注册ChildChange事件,这类事件主要是关注其子节点的新增/删除/修改时间,代码如下图所示:

ced289ea1f09b62c68cc37c5e47303c8.png

详情说明如下:


  • /brokers/ids 每一个子节点代表一个Broker,监听broker的加入与下线并做出对应的响应
  • /brokers/topics 记录所有主题的元信息,主要是分区信息(分区数量,各个分区的leader节点、ISR集合等)。
  • /admin/delete_topics 待删除的topic。
  • /log_dir_event_notification 如果当某一个Broker节点的日志读写出现异常,例如磁盘损坏,会向zookeeper,从而触发副本下线机制,触发相应的分区Leader选举
  • /isr_change_notification ISR变更通知


上述节点的事件监听,背后都蕴含了Kafka相关的机制,后续会一一揭晓。


Step2:重新注册Node节点改变事件,代码如下图所示:

125b45dd3452040212e9fd46671a9752.png

具体订阅的节点如下:


  • /admin/preferred_replica_election 副本倾向性选择
  • /admin/reassign_partitions 分区在Broker直接重新分配机制


上述节点的事件监听,同样背后都蕴含了Kafka的相关机制,这些后续都会一一介绍,从而揭晓Kafka核心机制。


**Step3:**删除/log_dir_event_notification、/isr_change_notification相关的子节点,新的Controller节点选举成功后,分区故障转移,ISR变更都将忽略,这些事项后续会重新触发,代码如下图所示:


eade719ce73e44431e7e397dece5e7df.png

Step4:初始化控制器上下文,核心代码如下图所示:

8aae32cd15009c36e5766faff3f0230e.png

Kafka Controller的上下文主要包括如下信息:


  • 获取当前所有活跃的Broker节点信息(获取 /brokers/ids 节点下的子节点)
  • 获取集群中所有的Topic元数据,后续会将这些信息传播到各个Broker节点,也正是基于如此,生产者、消费者无需再连接Zookeeper。
  • 为每一个主题创建Watch,监听主题的元信息变更事件。
  • 获取每一个主题的分区分布情况,即每一个分区Leader所在的brokerId,副本在哪些Broker中,/broker/topics/{topicName}节点中存储的数据如下图所示:

8f4288997b5393702d69c5dad65ed26d.png

为每一个/broker/ids/{id} broker注册节点内容变更事件,即可以监听broker元数据的变更,其实主要是监听endpoints的变化,如果发生变化,KafkaController会向所有注册(包括被认为正在关闭中的broker)发送UpdateMetadata命令。

0f883bcdc1dc0fb50a58f8df2bf2503b.png

从zk中加载所有分区的ISR与Leader Epoch,我们可以看看一个分区的状态:

f537e9d89c23954022e29805ec531ee3.png

  • 启动控制器的通道管理器ControllerChannelManager,管理着控制器与集群内其他Broker中的控制通道,并且包含QueueSize、RequestRateAndQueueTimeMs等重要监控指标,这部分后续会深入展开。
  • 读取/admin/reassign_partitions中的内容(Broker端分区重新分布计划),触发PartitionReassignmentIsrChangeHandler事件,关于分区重新分布的运维实战可以查阅笔者:https://mp.weixin.qq.com/s/X4zmnIg0zP3XFnogC45k4w


**Step5:**获取需要删除与不需要删除的topic,代码如下图所示:

5db6c711e30741196a5361f289b76eb4.png

具体的处理逻辑:


  • 首先从zookeeper中读取到 /kafka_cluster_01/admin/delete_topics节点下的所有topic,执行topic删除命令时首先会往该节点上添加一个节点。
  • 这些节点中如果topic的分区的状态为online(在线)或者分区在执行分区移动,则这些topic将不会立即被删除,存入topicsIneligibleForDeletion集合中。


**Step6:**启动副本状态机与分区状态机,代码如下图所示:

30edd01ef1cef44fb186284dc6e01833.png

Kafka中分区、副本是存储的核心,分区、副本各自有多种状态,在Kafka内部实现中使用状态机来实现,其中副本的状态机状态驱动示意图如下所示:


f41c73478cea14eff16f09d5376605d0.png

分区的状态比较简单,其状态驱动如下图所示:

2b8af547df662381fd871bee163d9793.png

Step7:处理分区重分配、topic删除、触发倾向性副本的选举,具体代码如下图所示:

759519f2e109c9328b03e2c195d864d9.png

核心要点:


  • 如果/admin/reassign_partitions中节点存储了topic的分区的重分配计划,则控制器选举成功后需要触发该部分逻辑的处理。
  • 如果Broker端允许删除topic,配置项(delete.topic.enable),默认为true,如果/admin/delete_topics节点存在子节点,并且需要执行删除动作,则触发主题的删除动作。
  • 如果/admin/preferred_replica_election节点存储了需要进行倾向性选举,则触发倾向性选举。

温馨提示:本文将不会深入探讨这些机制的详细实现,但这里送大家一个**“彩蛋”**,也是笔者的学习方法:大家可以去判断这些节点值是在什么情况下写入的、写入后是如何执行对应逻辑的,从而可以探知对应的实现机制。


1.1.2 Broker选举Controller失败


在Kafka集群中,只有一个Broker能成功选举成Controller,当Broker选举Controller失败后,执行的操作为KafkaController的maybeResign方法, 接下来我们看看其实现细节。

maybeResign方法的代码截图如下图所示:

b937257e83890fd2355a41f198451b9b.png

重点如图所示:


  • 监听/controller节点,如果该节点被删除,则会触发KafkaController的选举。
  • 如果当前节点从KafkaController角色变为非Controller角色,需要取消相关节点事件的注册,如下图所示:

73a1b3ed74cb0dec86a06e5c090acdbb.png


2、Kafka Conntroller职责总结


从Kafka Broker控制器的启动流程,我们也可以窥探出Kafka众多的机制都需要Kafka Controller主导,主要包含如下:


  • 元信息同步(Topic路由信息)
  • 删除主题
  • 副本故障转移机制
  • ISR变更机制
  • 分区副本倾向性选举
  • 分区重新分配(移动)机制
  • 分区、副本状态管理


要想理解kafka内部的运作机制,Kafka Controller是一个非常不错的入口,从下文开始,将逐步研究Kafka内部运作机制,从而更好运维Kafka集群。

相关文章
|
6月前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
373 0
|
1月前
|
消息中间件 大数据 Kafka
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
26 2
|
1月前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
32 1
|
1月前
|
消息中间件 Java 大数据
Kafka ISR机制详解!
本文详细解析了Kafka的ISR(In-Sync Replicas)机制,阐述其工作原理及如何确保消息的高可靠性和高可用性。ISR动态维护与Leader同步的副本集,通过不同ACK确认机制(如acks=0、acks=1、acks=all),平衡可靠性和性能。此外,ISR机制支持故障转移,当Leader失效时,可从ISR中选取新的Leader。文章还包括实例分析,展示了ISR在不同场景下的变化,并讨论了其优缺点,帮助读者更好地理解和应用ISR机制。
41 0
Kafka ISR机制详解!
|
1月前
|
消息中间件 Java Kafka
Kafka ACK机制详解!
本文深入剖析了Kafka的ACK机制,涵盖其原理、源码分析及应用场景,并探讨了acks=0、acks=1和acks=all三种级别的优缺点。文中还介绍了ISR(同步副本)的工作原理及其维护机制,帮助读者理解如何在性能与可靠性之间找到最佳平衡。适合希望深入了解Kafka消息传递机制的开发者阅读。
148 0
|
3月前
|
消息中间件 负载均衡 Java
揭秘Kafka背后的秘密!Kafka 架构设计大曝光:深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理及流传输设计的高效率消息系统。其核心特性包括高吞吐量、低延迟及出色的可扩展性。Kafka采用分布式日志模型,支持数据分区与副本,确保数据可靠性和持久性。系统由Producer(消息生产者)、Consumer(消息消费者)及Broker(消息服务器)组成。Kafka支持消费者组,实现数据并行处理,提升整体性能。通过内置的故障恢复机制,即使部分节点失效,系统仍能保持稳定运行。提供的Java示例代码展示了如何使用Kafka进行消息的生产和消费,并演示了故障转移处理过程。
50 3
|
3月前
|
消息中间件 Java Kafka
如何在Kafka分布式环境中保证消息的顺序消费?深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据管道和流处理设计的分布式平台,以其高效的消息发布与订阅功能著称。在分布式环境中确保消息按序消费颇具挑战。本文首先介绍了Kafka通过Topic分区实现消息排序的基本机制,随后详细阐述了几种保证消息顺序性的策略,包括使用单分区Topic、消费者组搭配单分区消费、幂等性生产者以及事务支持等技术手段。最后,通过一个Java示例演示了如何利用Kafka消费者确保消息按序消费的具体实现过程。
119 3
|
3月前
|
消息中间件 负载均衡 Java
"深入Kafka核心:探索高效灵活的Consumer机制,以Java示例展示数据流的优雅消费之道"
【8月更文挑战第10天】在大数据领域,Apache Kafka凭借其出色的性能成为消息传递与流处理的首选工具。Kafka Consumer作为关键组件,负责优雅地从集群中提取并处理数据。它支持消息的负载均衡与容错,通过Consumer Group实现消息的水平扩展。下面通过一个Java示例展示如何启动Consumer并消费数据,同时体现了Kafka Consumer设计的灵活性与高效性,使其成为复杂消费场景的理想选择。
116 4
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
65 3
|
4月前
|
消息中间件 存储 监控
深入理解Kafka核心设计及原理(六):Controller选举机制,分区副本leader选举机制,再均衡机制
深入理解Kafka核心设计及原理(六):Controller选举机制,分区副本leader选举机制,再均衡机制
91 1