0.8的官方文档提供了一个删除topic的命令:
kafka-topics.sh --delete 但是在运行时会报错找不到这个方法。
kafka-topics.sh最终是运行了kafka.admin.TopicCommand这个类,在0.8的源码中这个类中没有找到有delete topic相关的代码。
在kafka的admin包下,提供了一个DeleteTopicCommand的类,可以实现删除topic的功能。
kafka.admin.DeleteTopicCommand
其中删除topic的具体实现代码如下:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
import
org.I0Itec.zkclient.ZkClient
import
kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
.......
val topic = options.valueOf(topicOpt)
val zkConnect = options.valueOf(zkConnectOpt)
var zkClient: ZkClient =
null
try
{
zkClient =
new
ZkClient(zkConnect,
30000
,
30000
, ZKStringSerializer)
zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
//其实最终还是通过删除zk里面对应的路径来实现删除topic的功能
println(
"deletion succeeded!"
)
}
catch
{
case
e: Throwable =>
println(
"delection failed because of "
+ e.getMessage)
println(Utils.stackTrace(e))
}
finally
{
if
(zkClient !=
null
)
zkClient.close()
}
|
因为这个命令只会删除zk里面的信息,真实的数据还是没有删除,所以需要登录各个broker,把对应的topic的分区数据目录删除,也可能正因为这一点,delete命令才没有集成到kafka.admin.TopicCommand这个类。
本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1548069,如需转载请自行联系原作者