Kafka 删除主题流程分析

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 之前有个 Kafka 集群的每个节点的挂载磁盘多达 20+ 个,平均每个磁盘约 1T,每个节点的分区日志被平均分配到这些磁盘中,但由于每个分区的数据不一致,而集群节点 log.retention.bytes 这个参数的默认值是 -1,也就是没有任何限制,因此 Kafka 的日志删除日志依赖 log.retention.hours 参数来删除,因此会出现日志未过期,磁盘写满的情况。针对该集群双十一会遇到某些挂载磁盘被写满的情况,需要手动对主题进行删除以清空磁盘的操作,现在分析删除主题对集群以及客户端会有什么影响,以及 Kafka 都做了哪些动作。

之前有个 Kafka 集群的每个节点的挂载磁盘多达 20+ 个,平均每个磁盘约 1T,每个节点的分区日志被平均分配到这些磁盘中,但由于每个分区的数据不一致,而集群节点 log.retention.bytes 这个参数的默认值是 -1,也就是没有任何限制,因此 Kafka 的日志删除日志依赖 log.retention.hours 参数来删除,因此会出现日志未过期,磁盘写满的情况。


针对该集群双十一会遇到某些挂载磁盘被写满的情况,需要手动对主题进行删除以清空磁盘的操作,现在分析删除主题对集群以及客户端会有什么影响,以及 Kafka 都做了哪些动作。


图解删除过程



1. 删除主题


删除主题有多种方法,可通过 kafka-topic.sh 脚本并执行 --delete 命令,或者用暴力方式直接在 zk 删除对应主题节点,其实删除主题无非就是令 zk 节点删除,以触发 controller 对应监听器,然后再通过监听器通知到所有 broker,具体流程如下:

640.jpg


删除主题执行后,controller 监听到 zk 主题节点被删除,通知到所有 broker 删除主题对应的副本,这里会分成两个步骤,第一个步骤先将下线主题对应的副本,最后才执行真正的删除操作,注意,这里也并为真正的将主题从磁盘中删除,此时仅仅只会将要删除的副本所在的目录重命名,以免之后创建主题时目录有冲突,每个 broker 都会有一个定时线程,定时清除已重命名为删除状态的日志文件,具体如下:


640.jpg


2. 自动创建主题


自动创建主题的前提是 broker 配置参数 auto.create.topic.enble=true,删除主题后,当 Producer 发送时会对发送进行重试,期间会发送 MetadataRquest 命令到 broker 请求获取最新的元数据,在获取元数据的同时,会判断是否需要自动创建主题,如果需要,则调用 zk 客户端创建主题节点,controller 监听到有新主题创建,就会触发 controller 相关状态机工作创建主题。


640.jpg


刚刚也说过,kafka 重命名要删除的主题后,并不会立马就会删除,而是等待异步线程去删除,如下图所示,重命名后与重新创建的分区不冲突,可以证明删除是异步执行的了,且不影响生产发送,但是被重命名后的日志就不能消费了,即丢失了。


640.jpg


如下图可看出,在一分钟后,重命名后的副本被删除。

640.jpg


相关日志分析



1、controller.log


触发删除主题监听器:

[2019-11-07 19:24:11,121] DEBUG [Controller id=0] Delete topics listener fired for topics test-topic to be deleted (kafka.controller.KafkaController)


开始删除主题操作:

[2019-11-07 19:24:11,121] INFO [Topic Deletion Manager 0] Handling deletion for topics test-topic (kafka.controller.TopicDeletionManager)


开始停止主题,但此时并未删除:

[2019-11-07 19:24:11,143] DEBUG The stop replica request (delete = false) sent to broker 2 is StopReplicaRequestInfo([Topic=test-topic,Partition=1,Replica=2],false),StopReplicaRequestInfo([Topic=test-topic,Partition=0,Replica=2],false),StopReplicaRequestInfo([Topic=test-topic,Partition=2,Replica=2],false) (kafka.controller.ControllerBrokerRequestBatch)


开始执行真正的删除动作:

[2019-11-07 19:24:11,145] DEBUG [Topic Deletion Manager 0] Deletion started for replicas
[2019-11-07 19:24:11,147] DEBUG The stop replica request (delete = true) sent to broker 2 is StopReplicaRequestInfo([Topic=test-topic,Partition=1,Replica=2],true),StopReplicaRequestInfo([Topic=test-topic,Partition=0,Replica=2],true),StopReplicaRequestInfo([Topic=test-topic,Partition=2,Replica=2],true) (kafka.controller.ControllerBrokerRequestBatch)


收到 broker 删除的回调:

[2019-11-07 19:24:11,170] DEBUG [Controller id=0] Delete topic callback invoked on StopReplica response received from broker 2: request error = NONE, partition errors = Map(test-topic-2 -> NONE, test-topic-0 -> NONE, test-topic-1 -> NONE) (kafka.controller.KafkaController)
[2019-11-07 19:24:11,170] DEBUG [Topic Deletion Manager 0] Deletion successfully completed for replicas


已经成功全部删除:

[2019-11-07 19:24:11,202] INFO [Topic Deletion Manager 0] Deletion of topic test-topic successfully completed (kafka.controller.TopicDeletionManager)


如果此时有新的消息写入,会自动创建主题:

[2019-11-07 19:24:11,203] INFO [Controller id=0] New topics: [Set()], deleted topics: [Set()], new partition replica assignment [Map()] (kafka.controller.KafkaController)
[2019-11-07 19:24:11,267] INFO [Controller id=0] New topics: [Set(test-topic)], deleted topics: [Set()], new partition replica assignment [Map(test-topic-2 -> Vector(1, 2, 0), test-topic-1 -> Vector(0, 1, 2), test-topic-0 -> Vector(2, 0, 1))] (kafka.controller.KafkaController)
[2019-11-07 19:24:11,267] INFO [Controller id=0] New partition creation callback for test-topic-2,test-topic-1,test-topic-0 (kafka.controller.KafkaController)


2、server.log


broker 收到删除主题通通知(此时并没有删除):

[2019-11-07 19:24:11,144] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions Set(test-topic-2, test-topic-0, test-topic-1) (kafka.server.ReplicaFetcherManager)


停止分区 fetch 线程:

[2019-11-07 19:24:11,145] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2019-11-07 19:24:11,146] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=293639440, epoch=1824) to node 1: java.io.IOException: Client was shutdown before response was read. (org.apache.kafka.clients.FetchSessionHandler)
[2019-11-07 19:24:11,146] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2019-11-07 19:24:11,147] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)


接收到真正删除主题指令后,会重命名分区日志目录,此时还未删除,会等待异步线程执行:

[2019-11-07 19:24:11,157] INFO Log for partition test-topic-2 is renamed to /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete and is scheduled for deletion (kafka.log.LogManager)


如果此时有新的消息写入,会自动创建主题:

[2019-11-08 15:39:39,343] INFO Creating topic test-topic with configuration {} and initial partition assignment Map(2 -> ArrayBuffer(1, 0, 2), 1 -> ArrayBuffer(0, 2, 1), 0 -> ArrayBuffer(2, 1, 0)) (kafka.zk.AdminZkClient)
[2019-11-08 15:39:39,369] INFO [KafkaApi-1] Auto creation of topic test-topic with 3 partitions and replication factor 3 is successful (kafka.server.KafkaApis)
[2019-11-07 19:24:11,286] INFO Created log for partition test-topic-0 in /tmp/kafka-logs/kafka_3 with properties {...}


异步线程删除重命名后的主题:

[2019-11-07 19:25:11,161] INFO Deleted log /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete/00000000000000000000.log. (kafka.log.LogSegment)
[2019-11-07 19:25:11,163] INFO Deleted offset index /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete/00000000000000000000.index. (kafka.log.LogSegment)
[2019-11-07 19:25:11,164] INFO Deleted time index /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete/00000000000000000000.timeindex. (kafka.log.LogSegment)
[2019-11-07 19:25:11,165] INFO Deleted log for partition test-topic-2 in /tmp/kaf




相关文章
|
5月前
|
消息中间件 负载均衡 Kafka
Kafka消费组重新平衡流程
Kafka消费组重新平衡流程
|
2月前
|
消息中间件 存储 分布式计算
大数据-72 Kafka 高级特性 稳定性-事务 (概念多枯燥) 定义、概览、组、协调器、流程、中止、失败
大数据-72 Kafka 高级特性 稳定性-事务 (概念多枯燥) 定义、概览、组、协调器、流程、中止、失败
41 4
|
2月前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
47 4
|
2月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
41 2
|
2月前
|
消息中间件 缓存 大数据
大数据-57 Kafka 高级特性 消息发送相关01-基本流程与原理剖析
大数据-57 Kafka 高级特性 消息发送相关01-基本流程与原理剖析
48 3
|
2月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
60 1
|
2月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
92 0
|
3月前
|
数据采集 消息中间件 存储
实时数据处理的终极武器:Databricks与Confluent联手打造数据采集与分析的全新篇章!
【9月更文挑战第3天】本文介绍如何结合Databricks与Confluent实现高效实时数据处理。Databricks基于Apache Spark提供简便的大数据处理方式,Confluent则以Kafka为核心,助力实时数据传输。文章详细阐述了利用Kafka进行数据采集,通过Delta Lake存储并导入数据,最终在Databricks上完成数据分析的全流程,展示了一套完整的实时数据处理方案。
75 3
|
4月前
|
消息中间件 负载均衡 Kafka
Kafka 实现负载均衡与故障转移:深入分析 Kafka 的架构特点与实践
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理和流传输设计的高性能消息系统。其核心设计注重高吞吐量、低延迟与可扩展性,并具备出色的容错能力。Kafka采用分布式日志概念,通过数据分区及副本机制确保数据可靠性和持久性。系统包含Producer(消息生产者)、Consumer(消息消费者)和Broker(消息服务器)三大组件。Kafka利用独特的分区机制实现负载均衡,每个Topic可以被划分为多个分区,每个分区可以被复制到多个Broker上,确保数据的高可用性和可靠性。
105 2
|
4月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
196 4
下一篇
DataWorks