环境准备
- 配置有ssh服务,可以进行免密登录
- Kafka运行在JVM上,需要安装JDK
- kafka依赖zookeeper,需要安装zookeeper
Kafka 安装
解压缩
tar -zxvf kafka_2.12-3.2.3.tgz mv kafka_2.12-3.2.3 kafka
创建kafka消息存放目录
cd kafka mkdir kafka-logs
修改配置文件
vim /opt/kafka/config/server.properties
- 修改参数
broker.id=0 # 集群内全局唯一标识,每个节点上需要设置不同的值 listeners=PLAINTEXT://bigdata1:9092 # 每个节点上设置为自己的IP地址 log.dirs=/opt/kafka/kafka-logs # 存放kafka消息的路径 zookeeper.connect=bigdata1:2181,bigdata2:2181,bigdata3:2181
分发安装目录给其他集群节点
scp -r /opt/kafka/ bigdata2:/opt/ scp -r /opt/kafka/ bigdata3:/opt/
注意每台服务器都需要修改配置文件server.properties中的 broker.id 和listeners 参数
编写kafka集群操作脚本
vim /opt/kafka/bin/kafka-cluster.sh
#!/bin/bash case $1 in "start"){ for i in bigdata1 bigdata2 bigdata3 do echo -------------------------------- $i kafka 启动 --------------------------- ssh $i "source /etc/profile;/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties" done } ;; "stop"){ for i in bigdata1 bigdata2 bigdata3 do echo -------------------------------- $i kafka 停止 --------------------------- ssh $i "/opt/kafka/bin/kafka-server-stop.sh" done } ;; esac
启动
/opt/kafka/bin/kafka-cluster.sh start
停止
/opt/kafka/bin/kafka-cluster.sh stop
测试
- 创建topic
cd /opt/kafka ./bin/kafka-topics.sh --create --bootstrap-server bigdata1:9092 --replication-factor 3 --partitions 1 --topic test
- 查看topic列表
./bin/kafka-topics.sh --list --bootstrap-server bigdata1:9092
- 启动生产者
./bin/kafka-console-producer.sh --broker-list bigdata1:9092 --topic test
- 启动消费者
./bin/kafka-console-consumer.sh --bootstrap-server bigdata1:9092 --topic test --from-beginning
在生产端输入消息,消费端消费到生产者的消息,即可