Kafka 删除主题流程分析

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 之前有个 Kafka 集群的每个节点的挂载磁盘多达 20+ 个,平均每个磁盘约 1T,每个节点的分区日志被平均分配到这些磁盘中,但由于每个分区的数据不一致,而集群节点 log.retention.bytes 这个参数的默认值是 -1,也就是没有任何限制,因此 Kafka 的日志删除日志依赖 log.retention.hours 参数来删除,因此会出现日志未过期,磁盘写满的情况。针对该集群双十一会遇到某些挂载磁盘被写满的情况,需要手动对主题进行删除以清空磁盘的操作,现在分析删除主题对集群以及客户端会有什么影响,以及 Kafka 都做了哪些动作。

之前有个 Kafka 集群的每个节点的挂载磁盘多达 20+ 个,平均每个磁盘约 1T,每个节点的分区日志被平均分配到这些磁盘中,但由于每个分区的数据不一致,而集群节点 log.retention.bytes 这个参数的默认值是 -1,也就是没有任何限制,因此 Kafka 的日志删除日志依赖 log.retention.hours 参数来删除,因此会出现日志未过期,磁盘写满的情况。


针对该集群双十一会遇到某些挂载磁盘被写满的情况,需要手动对主题进行删除以清空磁盘的操作,现在分析删除主题对集群以及客户端会有什么影响,以及 Kafka 都做了哪些动作。


图解删除过程



1. 删除主题


删除主题有多种方法,可通过 kafka-topic.sh 脚本并执行 --delete 命令,或者用暴力方式直接在 zk 删除对应主题节点,其实删除主题无非就是令 zk 节点删除,以触发 controller 对应监听器,然后再通过监听器通知到所有 broker,具体流程如下:

640.jpg


删除主题执行后,controller 监听到 zk 主题节点被删除,通知到所有 broker 删除主题对应的副本,这里会分成两个步骤,第一个步骤先将下线主题对应的副本,最后才执行真正的删除操作,注意,这里也并为真正的将主题从磁盘中删除,此时仅仅只会将要删除的副本所在的目录重命名,以免之后创建主题时目录有冲突,每个 broker 都会有一个定时线程,定时清除已重命名为删除状态的日志文件,具体如下:


640.jpg


2. 自动创建主题


自动创建主题的前提是 broker 配置参数 auto.create.topic.enble=true,删除主题后,当 Producer 发送时会对发送进行重试,期间会发送 MetadataRquest 命令到 broker 请求获取最新的元数据,在获取元数据的同时,会判断是否需要自动创建主题,如果需要,则调用 zk 客户端创建主题节点,controller 监听到有新主题创建,就会触发 controller 相关状态机工作创建主题。


640.jpg


刚刚也说过,kafka 重命名要删除的主题后,并不会立马就会删除,而是等待异步线程去删除,如下图所示,重命名后与重新创建的分区不冲突,可以证明删除是异步执行的了,且不影响生产发送,但是被重命名后的日志就不能消费了,即丢失了。


640.jpg


如下图可看出,在一分钟后,重命名后的副本被删除。

640.jpg


相关日志分析



1、controller.log


触发删除主题监听器:

[2019-11-07 19:24:11,121] DEBUG [Controller id=0] Delete topics listener fired for topics test-topic to be deleted (kafka.controller.KafkaController)


开始删除主题操作:

[2019-11-07 19:24:11,121] INFO [Topic Deletion Manager 0] Handling deletion for topics test-topic (kafka.controller.TopicDeletionManager)


开始停止主题,但此时并未删除:

[2019-11-07 19:24:11,143] DEBUG The stop replica request (delete = false) sent to broker 2 is StopReplicaRequestInfo([Topic=test-topic,Partition=1,Replica=2],false),StopReplicaRequestInfo([Topic=test-topic,Partition=0,Replica=2],false),StopReplicaRequestInfo([Topic=test-topic,Partition=2,Replica=2],false) (kafka.controller.ControllerBrokerRequestBatch)


开始执行真正的删除动作:

[2019-11-07 19:24:11,145] DEBUG [Topic Deletion Manager 0] Deletion started for replicas
[2019-11-07 19:24:11,147] DEBUG The stop replica request (delete = true) sent to broker 2 is StopReplicaRequestInfo([Topic=test-topic,Partition=1,Replica=2],true),StopReplicaRequestInfo([Topic=test-topic,Partition=0,Replica=2],true),StopReplicaRequestInfo([Topic=test-topic,Partition=2,Replica=2],true) (kafka.controller.ControllerBrokerRequestBatch)


收到 broker 删除的回调:

[2019-11-07 19:24:11,170] DEBUG [Controller id=0] Delete topic callback invoked on StopReplica response received from broker 2: request error = NONE, partition errors = Map(test-topic-2 -> NONE, test-topic-0 -> NONE, test-topic-1 -> NONE) (kafka.controller.KafkaController)
[2019-11-07 19:24:11,170] DEBUG [Topic Deletion Manager 0] Deletion successfully completed for replicas


已经成功全部删除:

[2019-11-07 19:24:11,202] INFO [Topic Deletion Manager 0] Deletion of topic test-topic successfully completed (kafka.controller.TopicDeletionManager)


如果此时有新的消息写入,会自动创建主题:

[2019-11-07 19:24:11,203] INFO [Controller id=0] New topics: [Set()], deleted topics: [Set()], new partition replica assignment [Map()] (kafka.controller.KafkaController)
[2019-11-07 19:24:11,267] INFO [Controller id=0] New topics: [Set(test-topic)], deleted topics: [Set()], new partition replica assignment [Map(test-topic-2 -> Vector(1, 2, 0), test-topic-1 -> Vector(0, 1, 2), test-topic-0 -> Vector(2, 0, 1))] (kafka.controller.KafkaController)
[2019-11-07 19:24:11,267] INFO [Controller id=0] New partition creation callback for test-topic-2,test-topic-1,test-topic-0 (kafka.controller.KafkaController)


2、server.log


broker 收到删除主题通通知(此时并没有删除):

[2019-11-07 19:24:11,144] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions Set(test-topic-2, test-topic-0, test-topic-1) (kafka.server.ReplicaFetcherManager)


停止分区 fetch 线程:

[2019-11-07 19:24:11,145] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2019-11-07 19:24:11,146] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=293639440, epoch=1824) to node 1: java.io.IOException: Client was shutdown before response was read. (org.apache.kafka.clients.FetchSessionHandler)
[2019-11-07 19:24:11,146] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2019-11-07 19:24:11,147] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)


接收到真正删除主题指令后,会重命名分区日志目录,此时还未删除,会等待异步线程执行:

[2019-11-07 19:24:11,157] INFO Log for partition test-topic-2 is renamed to /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete and is scheduled for deletion (kafka.log.LogManager)


如果此时有新的消息写入,会自动创建主题:

[2019-11-08 15:39:39,343] INFO Creating topic test-topic with configuration {} and initial partition assignment Map(2 -> ArrayBuffer(1, 0, 2), 1 -> ArrayBuffer(0, 2, 1), 0 -> ArrayBuffer(2, 1, 0)) (kafka.zk.AdminZkClient)
[2019-11-08 15:39:39,369] INFO [KafkaApi-1] Auto creation of topic test-topic with 3 partitions and replication factor 3 is successful (kafka.server.KafkaApis)
[2019-11-07 19:24:11,286] INFO Created log for partition test-topic-0 in /tmp/kafka-logs/kafka_3 with properties {...}


异步线程删除重命名后的主题:

[2019-11-07 19:25:11,161] INFO Deleted log /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete/00000000000000000000.log. (kafka.log.LogSegment)
[2019-11-07 19:25:11,163] INFO Deleted offset index /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete/00000000000000000000.index. (kafka.log.LogSegment)
[2019-11-07 19:25:11,164] INFO Deleted time index /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete/00000000000000000000.timeindex. (kafka.log.LogSegment)
[2019-11-07 19:25:11,165] INFO Deleted log for partition test-topic-2 in /tmp/kaf




相关文章
|
2月前
|
消息中间件 存储 Kafka
【Kafka】Kafka 概述分析
【4月更文挑战第5天】【Kafka】Kafka 概述分析
【Kafka】Kafka 概述分析
|
2月前
|
消息中间件 大数据 Kafka
【Kafka】Kafka 中生产者运行流程
【4月更文挑战第10天】【Kafka】Kafka 中生产者运行流程
|
2月前
|
消息中间件 存储 Java
【Kafka】Kafka 组件分析
【4月更文挑战第5天】【Kafka】Kafka 组件分析
|
2月前
|
消息中间件 存储 网络协议
【Kafka】Kafka 性能高的原因分析
【4月更文挑战第5天】【Kafka】Kafka 性能高的原因分析
|
2月前
|
消息中间件 存储 Kafka
【Kafka】Kafka 架构设计分析
【4月更文挑战第5天】【Kafka】kafka 架构设计分析
|
2月前
|
消息中间件 大数据 Kafka
【Kafka】kafka 发展历史分析
【4月更文挑战第5天】【Kafka】kafka 发展历史分析
|
2月前
|
消息中间件 监控 安全
探究Kafka主题删除失败的根本原因
探究Kafka主题删除失败的根本原因
43 0
|
2月前
|
消息中间件 存储 缓存
玩转Kafka—Kafka高性能原因分析
玩转Kafka—Kafka高性能原因分析
37 0
|
2月前
|
消息中间件 存储 负载均衡
kafka底层原理分析
kafka底层原理分析
50 2
|
2天前
|
消息中间件 NoSQL Kafka
基于Kafka的nginx日志收集分析与监控平台(3)
基于Kafka的nginx日志收集分析与监控平台(3)