1TopicCommand
1.Topic创建
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
相关可选参数
参数 | 描述 | 例子 |
--bootstrap-server 指定kafka服务 |
指定连接到的kafka服务; 如果有这个参数,则 --zookeeper 可以不需要 |
--bootstrap-server localhost:9092 |
--zookeeper |
弃用, 通过zk的连接方式连接到kafka集群; | --zookeeper localhost:2181 或者localhost:2181/kafka |
--replication-factor |
副本数量,注意不能大于broker数量;如果不提供,则会用集群中默认配置 | --replication-factor 3 |
--partitions |
分区数量,当创建或者修改topic的时候,用这个来指定分区数;如果创建的时候没有提供参数,则用集群中默认值; 注意如果是修改的时候,分区比之前小会有问题 | --partitions 3 |
--replica-assignment |
副本分区分配方式;创建topic的时候可以自己指定副本分配情况; | --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 这个意思是有三个分区和三个副本,对应分配的Broker; 逗号隔开标识分区;冒号隔开表示副本 |
--config <String: name=value> |
用来设置topic级别的配置以覆盖默认配置;只在--create 和--bootstrap-server 同时使用时候生效; 可以配置的参数列表请看文末附件 | 例如覆盖两个配置 --config retention.bytes=123455 --config retention.ms=600001 |
--command-config <String: command 文件路径> |
用来配置客户端Admin Client启动配置,只在--bootstrap-server 同时使用时候生效; | 例如:设置请求的超时时间 --command-config config/producer.proterties ; 然后在文件中配置 request.timeout.ms=300000 |
2.删除Topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test
支持正则表达式匹配Topic来进行删除,只需要将topic 用双引号包裹起来例如: 删除以create_topic_byhand_zk
为开头的topic;
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic "create_topic_byhand_zk.*"
.
表示任意匹配除换行符 \n 之外的任何单字符。要匹配 . ,请使用 . 。·*·
:匹配前面的子表达式零次或多次。要匹配 * 字符,请使用 *。.*
: 任意字符
删除任意Topic (慎用)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic ".*?"
更多的用法请参考正则表达式
相关配置
配置 | 描述 | 默认 |
file.delete.delay.ms | topic删除被标记为--delete文件之后延迟多长时间删除正在的Log文件 | 60000 |
delete.topic.enable | 是否能够删除topic | true |
3.Topic分区扩容
zk方式(不推荐)
>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
kafka版本 >= 2.2 支持下面方式(推荐)
单个Topic扩容
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4
批量扩容 (将所有正则表达式匹配到的Topic分区扩容到4个)
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4
".*?"
正则表达式的意思是匹配所有; 您可按需匹配
PS: 当某个Topic的分区少于指定的分区数时候,他会抛出异常;但是不会影响其他Topic正常进行;
相关可选参数
参数 | 描述 | 例子 |
--replica-assignment |
副本分区分配方式;创建topic的时候可以自己指定副本分配情况; | --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 这个意思是有三个分区和三个副本,对应分配的Broker; 逗号隔开标识分区;冒号隔开表示副本 |
PS: 虽然这里配置的是全部的分区副本分配配置,但是正在生效的是新增的分区;比如: 以前3分区1副本是这样的
Broker-1 | Broker-2 | Broker-3 | Broker-4 |
0 | 1 | 2 |
现在新增一个分区,--replica-assignment
2,1,3,4 ; 看这个意思好像是把0,1号分区互相换个Broker
Broker-1 | Broker-2 | Broker-3 | Broker-4 |
1 | 0 | 2 | 3 |
但是实际上不会这样做,Controller在处理的时候会把前面3个截掉; 只取新增的分区分配方式,原来的还是不会变
Broker-1 | Broker-2 | Broker-3 | Broker-4 |
0 | 1 | 2 | 3 |
4.查询Topic描述
1.查询单个Topic
sh bin/kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal
2.批量查询Topic(正则表达式匹配,下面是查询所有Topic)
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server xxxx:9092 --describe --exclude-internal
支持正则表达式匹配Topic,只需要将topic 用双引号包裹起来
相关可选参数
参数 | 描述 | 例子 |
--bootstrap-server 指定kafka服务 |
指定连接到的kafka服务; 如果有这个参数,则 --zookeeper 可以不需要 |
--bootstrap-server localhost:9092 |
--at-min-isr-partitions |
查询的时候省略一些计数和配置信息 | --at-min-isr-partitions |
--exclude-internal |
排除kafka内部topic,比如__consumer_offsets-* |
--exclude-internal |
--topics-with-overrides |
仅显示已覆盖配置的主题,也就是单独针对Topic设置的配置覆盖默认配置;不展示分区信息 | --topics-with-overrides |
5.查询Topic列表
1.查询所有Topic列表
sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal
2.查询匹配Topic列表(正则表达式)
查询
test_create_
开头的所有Topic列表sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal --topic "test_create_.*"
相关可选参数
参数 | 描述 | 例子 |
--exclude-internal |
排除kafka内部topic,比如__consumer_offsets-* |
--exclude-internal |
--topic |
可以正则表达式进行匹配,展示topic名称 | --topic |
关于作者:石臻臻的杂货铺, 专注于 Java领域、大数据领域 等知识分享, 内容多为 原理 、源码、实战 等等, 坚持输出干货,所写内容必定经过验证,并深入源码分析,保证内容准确性, 长期在CSDN、和公众号【石臻臻的杂货铺】发布原创文章,欢迎关注! 如果有相关技术领域问题,欢迎进群交流,各个领域都有专人解答,你所问的,都会得到回应!
欢迎Star和共建由滴滴开源的kafka的管理平台 满足所有开发运维日常需求
滴滴开源Logi-KafkaManager 一站式Kafka监控与管控平台
2More
Kafka专栏持续更新中...(源码、原理、实战、运维、视频、面试视频)
【kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议收藏!!!)_石臻臻的杂货铺-CSDN博客
【kafka实战】分区重分配可能出现的问题和排查问题思路(生产环境实战,干货!!!非常干!!!建议收藏)
【kafka异常】kafka 常见异常处理方案(持续更新! 建议收藏)
【kafka运维】分区从分配、数据迁移、副本扩缩容 (附教学视频)
【kafka源码】ReassignPartitionsCommand源码分析(副本扩缩、数据迁移、副本重分配、副本跨路径迁移