Kafka 命令行操作
Kafka 主要包括三大部分:生产者、主题分区节点、消费者。
1、Topic 命令行操作
也就是我们 kafka 下的脚本 kafka-topics.sh 的相关操作。
常用命令行操作
参数 |
描述 |
--bootstrap-server <String: server toconnect to> |
连接的Kafka Broker主机名称和端口号。 |
--topic <String: topic> |
操作的topic名称。 |
--create |
创建主题。 |
--delete |
删除主题。 |
--alter |
修改主题。 |
--list |
查看所有主题。 |
--describe |
查看主题详细描述。 |
--partitions <Integer: # of partitions> |
设置分区数。 |
--replication-factor<Integer: replication factor> |
设置分区副本。 |
--config <String: name=value> |
更新系统默认的配置。 |
查看当前一共有多少个 topic:
我们测试环境下一般只通过一台节点连接,但是生产环境下可以在 hadoop102:9092 后面加个逗号,再跟一个 hadoop103:9092 。
kafka-topics.sh --bootstrap-server hadoop102:9092 --list
创建 topic :
创建一个 点赞 的主题,必须指定分区和副本才能创建成功,这里我们指定分区数和副本数分别为 1 和 3 。
kafka-topics.sh --bootstrap-server hadoop102:9092 --topic like --create --partitions 1 --replication-factor 3
查看主题详细信息:
kafka-topics.sh --bootstrap-server hadoop102:9092 --topic like --describe
运行结果:
Topic: like TopicId: 2RDnQulpR_K1FSmLVIKMlw PartitionCount: 1 ReplicationFactor: 3 Configs: Topic: like Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
可以看到副本分别位于 2,1,0, 也就是我们之前安装时指定的 broker.id 这个参数。leader:2 表示 leader 节点的 broker.id = 2 ,也就是我们的 hadoop104 节点。
修改 like 主题的分区数:
修改分区数为 3(原本是 1)。
kafka-topics.sh --bootstrap-server hadoop102:9092 --topic like --alter --partitions 3
修改结果:
Topic: like TopicId: 2RDnQulpR_K1FSmLVIKMlw PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: like Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: like Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: like Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
注意:
分区数只能增加不能减少!副本也不能通过命令行减少。
删除 topic like
kafka-topics.sh --bootstrap-server hadoop102:9092 --topic like --delete
2、生产者命令行操作
也就是我们 kafka 下的脚本 kafka-console-producer.sh 的相关操作。
常用命令式行
参数 |
描述 |
--bootstrap-server <String: server toconnect to> |
连接的Kafka Broker主机名称和端口号。 |
--topic <String: topic> |
操作的topic名称。 |
生产消息
kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic like > 消息1 > 消息2 > ...
3、消费者命令行操作
常用命令行
参数 |
描述 |
--bootstrap-server <String: server toconnect to> |
连接的Kafka Broker主机名称和端口号。 |
--topic <String: topic> |
操作的topic名称。 |
--from-beginning |
从头开始消费。 |
--group <String: consumer group id> |
指定消费者组名称。 |
消费消息
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic like
把主题中所有数据读取出来(所有历史数据)
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
注意:慎用,可能会把已经处理过的消息又处理一遍。