阿里四面:kafka何时、如何删除Topic?

本文涉及的产品
云原生网关 MSE Higress,422元/月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: Kafka有很多状态机和管理器,如Controller通道管理器ControllerChannelManager、处理Controller事件的ControllerEventManager等。这些管理器和状态机,大多与各自“宿主”联系密切。就如Controller这俩管理器,必须与Controller组件紧耦合,才能实现各自功能。

Topic是怎么被删除的?


Kafka有很多状态机和管理器,如Controller通道管理器ControllerChannelManager、处理Controller事件的ControllerEventManager等。这些管理器和状态机,大多与各自“宿主”联系密切。就如Controller这俩管理器,必须与Controller组件紧耦合,才能实现各自功能。


Kafka还有一些状态机和管理器,具有相对独立的功能框架,不严重依赖使用方,如:


TopicDeletionManager(主题删除管理器)


负责对指定Kafka主题执行删除操作,清除待删除主题在集群上的各类“痕迹”。


ReplicaStateMachine(副本状态机)


负责定义Kafka副本状态、合法的状态转换,以及管理状态之间的转换。


PartitionStateMachine(分区状态机)


负责定义Kafka分区状态、合法的状态转换,以及管理状态之间的转换。


本文看看Kafka是如何删除一个主题的。


前言


以为成功执行kafka-topics.sh --delete命令后,主题就会被删除。这种不正确的认知会导致经常发现主题没被删干净。于是,网传终极“武林秘籍”:手动删除磁盘上的日志文件,手动删除ZooKeeper下关于主题的各节点。但我不推荐这么干:


并不完整


除非你重启Broker,否则,这套“秘籍”无法清理Controller端和各个Broker上元数据缓存中的待删除主题的相关条目


并没有被官方所认证,后果自负


与其琢磨删除主题失败之后怎么自救,还是研究Kafka到底如何执行该操作。TopicDeletionManager.scala包括:


DeletionClient接口:负责实现删除主题以及后续的动作


如更新元数据


ControllerDeletionClient类:实现DeletionClient接口的类,分别实现了刚刚说到的那4个方法。


TopicDeletionManager类:主题删除管理器类


定义方法维护主题删除前后集群状态的正确性。如,何时删除主题、何时主题不能被删除、主题删除过程中要规避哪些操作等


DeletionClient接口及实现


删除主题,并将删除主题的事件同步给其他Broker。


DeletionClient接口目前只有一个实现类ControllerDeletionClient,构造器的两个字段:


KafkaController实例


Controller组件对象


KafkaZkClient实例


Kafka与ZooKeeper交互的客户端对象


API


deleteTopic

删除主题在zk上的所有“痕迹”。分别调用KafkaZkClient的3个方法删除ZooKeeper下/brokers/topics/节点、/config/topics/节点和/admin/delete_topics/节点。


deleteTopicDeletions

删除zk下待删除主题的标记节点。调用KafkaZkClient#deleteTopicDeletions,批量删除一组主题在/admin/delete_topics下的子节点。注意,deleteTopicDeletions这个方法名结尾的Deletions,表示/admin/delete_topics下的子节点。所以:


deleteTopic是删除主题

deleteTopicDeletions是删除/admin/delete_topics下的对应子节点

这两个方法里都有epochZkVersion字段,代表期望的Controller Epoch版本号。若使用一个旧Epoch版本号执行这些方法,zk会拒绝,因为和它自己保存的版本号不匹配。若一个Controller的Epoch<ZooKeeper中保存的,则该Controller很可能是已过期的Controller。这就是Zombie Controller。epochZkVersion字段的作用,就是隔离Zombie Controller发送的操作。


mutePartitionModifications

屏蔽主题分区数据变更监听器:取消/brokers/topics/节点数据变更的监听。


当该主题的分区数据发生变更后,由于对应zk监听器已被取消,因此不会触发Controller相应处理逻辑。


为何取消该监听器?为避免操作相互干扰:假设用户A发起主题删除,同时用户B为这个主题新增分区。此时,这两个操作就会冲突,若允许Controller同时处理这俩操作,势必会造成逻辑混乱及状态不一致。为应对这种情况,在移除主题副本和分区对象前,代码要先执行这个方法,确保不再响应用户对该主题的其它操作。


mutePartitionModifications调用unregisterPartitionModificationsHandlers,并接着调用KafkaZkClient#unregisterZNodeChangeHandler,取消zk上对给定主题的分区节点数据变更的监听。


sendMetadataUpdate

调用KafkaController#sendUpdateMetadataRequest,给集群所有Broker发送更新请求,告诉它们不要再为已删除主题的分区提供服务:

1.png



该方法会给集群中的所有Broker发送更新元数据请求,告知它们要同步给定分区的状态。


TopicDeletionManager定义及初始化


2.png


创建TopicDeletionManager类实例


在KafkaController类初始化时被创建:

3.png



实例化了一个全新的ControllerDeletionClient对象,然后利用该对象实例和replicaStateMachine、partitionStateMachine,一起创建TopicDeletionManager实例。


KafkaServerStartable.startup()=》KafkaServer.startup()=》KafkaController.init=》TopicDeletionManager


TopicDeletionManager重要API


除了类定义和初始化,还有resumeDeletions:重启主题删除操作过程。


主题因为某些事件可能一时无法完成删除,如主题分区正在进行副本重分配等。一旦这些事件完成,主题重新具备可删除资格。就需调用resumeDeletions重启删除操作。

4.png



从元数据缓存中获取要删除主题列表,之后定义了两个空的主题列表,分别保存待重试删除主题和待删除主题

遍历每个要删除的主题,去看它所有副本的状态。如果副本状态都是ReplicaDeletionSuccessful,就表明该主题已经被成功删除,此时,再调用completeDeleteTopic方法,完成后续的操作就可以了。对于那些删除操作尚未开始,并且暂时无法执行删除的主题,源码会把这类主题加到待重试主题列表中,用于后续重试;如果主题是能够被删除的,就将其加入到待删除列表中。

最后,调用retryDeletionForIneligibleReplicas重试待重试主题列表中的主题删除操作。对待删除主题列表中的主题则调用onTopicDeletion删除。

retryDeletionForIneligibleReplicas重试主题删除:将对应主题副本的状态,从ReplicaDeletionIneligible变更到OfflineReplica。这样,后续再次调用resumeDeletions,就会尝试重新删除主题。


下面,我再用一张图来解释下resumeDeletions方法的执行流程:

5.png



resumeDeletions串联起了TopicDeletionManger中的很多方法,较关键的:


completeDeleteTopic:


7.png


onTopicDeletion:


8.png


onTopicDeletion会多次使用分区状态机,调整待删除主题的分区状态。最后调用onPartitionDeletion执行真正的底层物理磁盘文件删除。这是通过副本状态机状态转换操作完成的。


总结


在主题删除过程中,Kafka会调整集群中三个地方的数据:


ZooKeeper


删除主题时,zk上与该主题相关的所有ZNode节点必须被清除


元数据缓存


Controller端元数据缓存中的相关项,也必须要被处理,并且要被同步到集群的其他Broker上


磁盘日志文件


要清理的首要目标


这三个地方须统一处理,就好似原子操作。回想“秘籍”,它无法清除Controller端的元数据缓存项。因此,避免使用这“大招”。


DeletionClient接口主要是操作ZooKeeper,实现ZooKeeper节点的删除等操作。


TopicDeletionManager,是在KafkaController创建过程中被初始化的,主要通过与元数据缓存进行交互的方式,来更新各类数据。


9.png

9.png

目录
相关文章
|
9月前
|
消息中间件 存储 负载均衡
深入了解Kafka中Topic的神奇之处
深入了解Kafka中Topic的神奇之处
608 0
|
9月前
|
消息中间件 Kafka 流计算
Flink的分区表订阅功能是通过Kafka的topic分区来实现的
Flink的分区表订阅功能是通过Kafka的topic分区来实现的【1月更文挑战第6天】【1月更文挑战第26篇】
151 1
|
9月前
|
消息中间件 存储 Kafka
阿里 P7 三面凉凉,kafka Borker 日志持久化没答上来
阿里 P7 三面凉凉,kafka Borker 日志持久化没答上来
|
4月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
121 4
|
5月前
|
消息中间件 Kafka Apache
kafka: invalid configuration (That topic/partition is already being consumed)
kafka: invalid configuration (That topic/partition is already being consumed)
|
9月前
|
消息中间件 大数据 Kafka
记录一下Kafka报错:timeout expired while fetching topic metadata
记录一下Kafka报错:timeout expired while fetching topic metadata
701 0
|
7月前
|
消息中间件 监控 Kafka
查询Kafka集群中消费组(group)信息和对应topic的消费情况
查询Kafka集群中消费组(group)信息和对应topic的消费情况
3331 0
|
消息中间件 JSON 负载均衡
kafka 动态扩容现有 topic 的分区数和副本数
kafka 动态扩容现有 topic 的分区数和副本数
2109 0
|
9月前
|
消息中间件 负载均衡 监控
【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
【4月更文挑战第13天】【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
|
9月前
|
消息中间件 存储 缓存
【Kakfa】Kafka 的Topic中 Partition 数据是怎么存储到磁盘的?
【4月更文挑战第13天】【Kakfa】Kafka 的Topic中 Partition 数据是怎么存储到磁盘的?