Kafka常用命令归纳

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 本文档详细介绍了Kafka 2.2及以上版本中Topic的操作命令,包括创建、查看、修改及删除Topic,以及动态调整主题参数和限速。此外,还涵盖了数据生产和消费的相关命令与性能测试方法,并对内部Topic(如`__consumer_offsets`和`__transaction_state`)的操作进行了说明。最后,提供了常见错误处理方案及Kafka推荐配置,帮助用户更好地管理和优化Kafka集群。

# 一. 日常Topic操作

这里的命令以kafka2.2之后版本进行说明,社区推荐命令指定 --bootstrap-server参数,受kafka安全认证体系的约束,如果使用 --zookeeper 会绕过 Kafka 的安全体系。

1. 创建topic

bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name  --partitions 1 --replication-factor 1

2. 查看所有topic列表

bin/kafka-topics.sh --bootstrap-server broker_host:port --list

3. 查看某个特定topic

bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>

4. 增加topic分区数

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数>

5. 动态修改主题参数

以 max.message.bytes为例

5.1 增加指定broker的配置

bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760

eg:bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name testInfoTopic --alter --add-config max.message.bytes=128000
查看topic修改情况
bin/kafka-topics.sh  --bootstrap-server localhost:9092 --describe --topic testInfoTopic 

Topic: testInfoTopic    TopicId: KzPy24fVSsCR03ZOYRzq8g PartitionCount: 3       ReplicationFactor: 1    Configs: max.message.bytes=128000,unclean.leader.election.enable=false
        Topic: testInfoTopic    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: testInfoTopic    Partition: 1    Leader: 2       Replicas: 2     Isr: 2
        Topic: testInfoTopic    Partition: 2    Leader: 1       Replicas: 1     Isr: 1

zookeeper 查看修改后内容

./zookeeper-shell.sh localhost:2181

> get /config/topics/testInfoTopic
{"version":1,"config":{"max.message.bytes":"128000"}}

5.2 删除指定broker的配置

bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config max.message.bytes

6. 修改主题限速

限制某个主题副本在执行副本同步机制时,带宽消耗不要过多(不得占用超过 100MBps)

--entity-name 就是 Broker ID。倘若该主题的副本分别在 0、1、2 多个 Broker 上,那么你还要依次为 Broker 1、2、3 执行这条命令。

for i in {0..2}
do 
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name $i
done

7. 删除topic

bin/kafka-topics.sh --bootstrap-server broker_host:port --delete  --topic <topic_name>

二. 数据生产消费

测试数据

broker_host:port ==> localhost:9092

Topic ==> testInfoTopic

Consumer Group ==> G1

1 生产数据

./kafka-console-producer.sh --broker-list localhost:9092 --topic testInfoTopic

举例:

# 指定生产者参数 acks 为 -1,同时启用了 LZ4 的压缩算法
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testInfoTopic --request-required-acks -1 --producer-property compression.type=lz4

1.1 生产者性能测试

向topic 发送10w条消息,每条消息1KB,在producer-props 后面指定要设置的生产者参数,比如本例中的压缩算法、延时时间等

bin/kafka-producer-perf-test.sh --topic testInfoTopic --num-records 100000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=localhost:9092 acks=-1 linger.ms=2000 compression.type=lz4

100000 records sent, 24764.735017 records/sec (24.18 MB/sec), 93.07 ms avg latency, 672.00 ms max latency, 56 ms 50th, 301 ms 95th, 325 ms 99th, 335 ms 99.9th.

生产吞吐量,消息发送延迟都可以看到

2 消费数据

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testInfoTopic

举例:

# 注意,这里消费最好指定一个消费组G1,如果没有指定的话,每次运行 Console Consumer,它都会自动生成一个新的消费者组来消费。时间长久后,就会产生大量的以 console-consumer的消费者组
# --from-beginning 等同于Consumer 端参数 auto.offset.reset 设置成 earliest;如果不指定,会默认从最新位移消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testInfoTopic --group G1 --from-beginning --consumer-property enable.auto.commit=false

2.1消费者性能测试

 bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --messages 100000 --topic testInfoTopic

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-11-05 10:35:46:535, 2022-11-05 10:35:48:699, 97.5835, 45.0940, 100013, 46216.7283, 665, 1499, 65.0990, 66719.8132

消费吞吐量的指标

2.2 查看消费进度

bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>

./bin/kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --group G1 --describe

三. 内部topic操作

1. __consumer_offsets

该主题保存了消费者组的位移数据,默认有50个分区

1.1 变更主题副本数

如果该主题的副本值已经是 1 了,我们如何增加该主题的副本到3

第一步:创建一个 json 文件,显式提供 50 个分区对应的副本数,注意要将replicas 中的 3 台 Broker 排列顺序不同,使 Leader 副本均匀地分散在 Broker上

{"version":1, "partitions":[
 {"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]}, 
  {"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
  {"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
  ...
  {"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}`

第二步:执行kafka-reassign-partitions.sh

bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute

1.2 查看__consumer_offsets消费者组提交的位移数据

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

1.3 读取__consumer_offsets消息,查看消费者组状态信息

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning

2. __transaction_state

该主题为了支持事务引入的,默认有50个分区,操作方法参考__consumer_offsets

四. 常见错误处理

1. topic删除失败

原因1:副本所在的broker宕机

解决办法:重启broker后,会自动恢复

原因2:待删除的全部或者部分分区在迁移中

解决办法:

第 1 步,手动删除 ZooKeeper 节点 /admin/delete_topics 下待删除topic的 znode。

第 2 步,手动删除该主题在磁盘上的分区目录。

第 3 步,在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。(会导致大量的leader重选举)

2. __consumer_offsets占用太多磁盘

原因:kafka-log-cleaner-thread线程挂了

​ 可以用 jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题。

解决办法

重启对应的broker节点

五. kafka 推荐配置

#### kafka推荐配置

auto.create.topics.enable=false # 是否允许自动创建Topic
unclean.leader.election.enable=false    # 是否允许 Unclean Leader 选举
auto.leader.rebalance.enable=false  # 是否允许定期进行 Leader 选举。

六.参考资料

https://kafka.apache.org/documentation/#operations

相关文章
|
3月前
|
消息中间件 Kafka 测试技术
Kafka常用命令大全及kafka-console-consumer.sh及参数说明
该文章汇总了Kafka常用命令,包括集群管理、Topic操作、生产者与消费者的命令行工具使用方法等,适用于Kafka的日常运维和开发需求。
680 2
|
5月前
|
消息中间件 Java Kafka
kafka Linux环境搭建安装及命令创建队列生产消费消息
kafka Linux环境搭建安装及命令创建队列生产消费消息
123 4
|
7月前
|
消息中间件 存储 关系型数据库
探究Kafka原理-2.Kafka基本命令实操(下)
探究Kafka原理-2.Kafka基本命令实操
86 0
|
7月前
|
消息中间件 存储 运维
探究Kafka原理-2.Kafka基本命令实操(上)
探究Kafka原理-2.Kafka基本命令实操
96 0
|
7月前
|
消息中间件 存储 Kafka
kafka常用命令
kafka常用命令
|
7月前
|
消息中间件 Java Kafka
Apache Kafka-初体验Kafka(02)-Centos7下搭建单节点kafka_配置参数详解_基本命令实操
Apache Kafka-初体验Kafka(02)-Centos7下搭建单节点kafka_配置参数详解_基本命令实操
122 0
|
消息中间件 Kafka Apache
kafka2.x常用命令笔记(一)创建topic,查看topic列表、分区、副本详情,删除topic,测试topic发送与消费
kafka2.x常用命令笔记(一)创建topic,查看topic列表、分区、副本详情,删除topic,测试topic发送与消费
547 0
|
消息中间件 Kafka Shell
116 Kafka常用操作命令
116 Kafka常用操作命令
38 0
|
消息中间件 缓存 Java
聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 GROUP、OFFSET、HEARTBEAT 相关命令
聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 GROUP、OFFSET、HEARTBEAT 相关命令
201 0
|
消息中间件 存储 Kafka
【Kafka从入门到放弃系列 二】Kafka集群搭建及基本命令
【Kafka从入门到放弃系列 二】Kafka集群搭建及基本命令
158 0

热门文章

最新文章