# 一. 日常Topic操作
这里的命令以kafka2.2之后版本进行说明,社区推荐命令指定 --bootstrap-server参数,受kafka安全认证体系的约束,如果使用 --zookeeper 会绕过 Kafka 的安全体系。
1. 创建topic
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1
2. 查看所有topic列表
bin/kafka-topics.sh --bootstrap-server broker_host:port --list
3. 查看某个特定topic
bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>
4. 增加topic分区数
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数>
5. 动态修改主题参数
以 max.message.bytes为例
5.1 增加指定broker的配置
bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
eg:bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name testInfoTopic --alter --add-config max.message.bytes=128000
查看topic修改情况
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic testInfoTopic
Topic: testInfoTopic TopicId: KzPy24fVSsCR03ZOYRzq8g PartitionCount: 3 ReplicationFactor: 1 Configs: max.message.bytes=128000,unclean.leader.election.enable=false
Topic: testInfoTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: testInfoTopic Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: testInfoTopic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
zookeeper 查看修改后内容
./zookeeper-shell.sh localhost:2181
> get /config/topics/testInfoTopic
{"version":1,"config":{"max.message.bytes":"128000"}}
5.2 删除指定broker的配置
bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config max.message.bytes
6. 修改主题限速
限制某个主题副本在执行副本同步机制时,带宽消耗不要过多(不得占用超过 100MBps)
--entity-name 就是 Broker ID。倘若该主题的副本分别在 0、1、2 多个 Broker 上,那么你还要依次为 Broker 1、2、3 执行这条命令。
for i in {0..2}
do
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name $i
done
7. 删除topic
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>
二. 数据生产消费
测试数据
broker_host:port ==> localhost:9092
Topic ==> testInfoTopic
Consumer Group ==> G1
1 生产数据
./kafka-console-producer.sh --broker-list localhost:9092 --topic testInfoTopic
举例:
# 指定生产者参数 acks 为 -1,同时启用了 LZ4 的压缩算法
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testInfoTopic --request-required-acks -1 --producer-property compression.type=lz4
1.1 生产者性能测试
向topic 发送10w条消息,每条消息1KB,在producer-props 后面指定要设置的生产者参数,比如本例中的压缩算法、延时时间等
bin/kafka-producer-perf-test.sh --topic testInfoTopic --num-records 100000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=localhost:9092 acks=-1 linger.ms=2000 compression.type=lz4
100000 records sent, 24764.735017 records/sec (24.18 MB/sec), 93.07 ms avg latency, 672.00 ms max latency, 56 ms 50th, 301 ms 95th, 325 ms 99th, 335 ms 99.9th.
生产吞吐量,消息发送延迟都可以看到
2 消费数据
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testInfoTopic
举例:
# 注意,这里消费最好指定一个消费组G1,如果没有指定的话,每次运行 Console Consumer,它都会自动生成一个新的消费者组来消费。时间长久后,就会产生大量的以 console-consumer的消费者组
# --from-beginning 等同于Consumer 端参数 auto.offset.reset 设置成 earliest;如果不指定,会默认从最新位移消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testInfoTopic --group G1 --from-beginning --consumer-property enable.auto.commit=false
2.1消费者性能测试
bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --messages 100000 --topic testInfoTopic
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-11-05 10:35:46:535, 2022-11-05 10:35:48:699, 97.5835, 45.0940, 100013, 46216.7283, 665, 1499, 65.0990, 66719.8132
消费吞吐量的指标
2.2 查看消费进度
bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>
./bin/kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --group G1 --describe
三. 内部topic操作
1. __consumer_offsets
该主题保存了消费者组的位移数据,默认有50个分区
1.1 变更主题副本数
如果该主题的副本值已经是 1 了,我们如何增加该主题的副本到3
第一步:创建一个 json 文件,显式提供 50 个分区对应的副本数,注意要将replicas 中的 3 台 Broker 排列顺序不同,使 Leader 副本均匀地分散在 Broker上
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},
{"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
...
{"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}`
第二步:执行kafka-reassign-partitions.sh
bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute
1.2 查看__consumer_offsets消费者组提交的位移数据
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
1.3 读取__consumer_offsets消息,查看消费者组状态信息
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
2. __transaction_state
该主题为了支持事务引入的,默认有50个分区,操作方法参考__consumer_offsets
四. 常见错误处理
1. topic删除失败
原因1:副本所在的broker宕机
解决办法:重启broker后,会自动恢复
原因2:待删除的全部或者部分分区在迁移中
解决办法:
第 1 步,手动删除 ZooKeeper 节点 /admin/delete_topics 下待删除topic的 znode。
第 2 步,手动删除该主题在磁盘上的分区目录。
第 3 步,在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。(会导致大量的leader重选举)
2. __consumer_offsets占用太多磁盘
原因:kafka-log-cleaner-thread线程挂了
可以用 jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题。
解决办法
重启对应的broker节点
五. kafka 推荐配置
#### kafka推荐配置
auto.create.topics.enable=false # 是否允许自动创建Topic
unclean.leader.election.enable=false # 是否允许 Unclean Leader 选举
auto.leader.rebalance.enable=false # 是否允许定期进行 Leader 选举。