环境准备:
1.使用三台ECS云服务器,内存建议最小2G
2.获取kafka软件包,解压
[root@kafka-1 ~]# tar -zxf kafka_2.11-1.1.1.tgz [root@kafka-1 ~]# ls kafka_2.11-1.1.1 kafka_2.11-1.1.1.tgz
3.安装jdk1.8.0
[root@kafka-1 bin]# yum -y install java-1.8.0-openjdk.x86_64 java-1.8.0-openjdk-devel.x86_64
4.配置主机名映射
5.其余selinux,防火墙配置略
kafka包里的文件夹结构,bin目录下为启动管理服务的脚本,config目录下为服务配置
一、配置kafka内置zookeeper集群
kafka集群依赖于zk集群,这里就用kafka包内置的zk服务配置集群
//编辑zookeeper.properties配置文件,添加以下配置
[root@kafka-1 config]# vim zookeeper.properties initLimit=10 syncLimit=5 server.1=kafka1:2888:3888 server.2=kafka2:2888:3888 server.3=kafka3:2888:3888 quorumListenOnAllIPs=true
//创建myid表示zk服务节点,写入id号(对应配置文件中的server.x中的x)到myid文件中
[root@kafka-1 ~]# mkdir -p /tmp/zookeeper [root@kafka-1 ~]# echo 1 > /tmp/zookeeper/myid
//启动zk
[root@kafka-1 bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
//启动完成后的端口监听,在其中一台leader有监听2888端口
[root@kafka-2 zookeeper]# ss -ntlp State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 50 :::2181 :::* users:(("java",pid=30223,fd=96)) LISTEN 0 50 :::2888 :::* users:(("java",pid=30223,fd=99)) LISTEN 0 50 :::3888 :::* users:(("java",pid=30223,fd=97)) LISTEN 0 128
2181:对cline端提供服务
3888:选举leader使用
2888:集群内机器通讯使用(Leader监听此端口)
二、创建kafka集群
//修改kafka的配置文件server.properties。
[root@kafka-1 config]# vim server.properties broker.id=1 //id对应节点号,有所区分 port=9092 advertised.host.name=kafka1 //端口监听的IP或主机名 zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181 //连接各节点的IP或主机名:端口
//启动kafka
[root@kafka-1 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
//启动完成后的端口监听
[root@kafka-1 ~]# ss -ntlp State Recv-Q Send-Q Local Address:Port Peer Address:Port LISTEN 0 50 :::9092 :::* users:(("java",pid=27495,fd=101)) LISTEN 0 50 :::2181 :::* users:(("java",pid=521,fd=96))
//查看java进程,显示kafka
[root@kafka-1 ~]# jps 2049 Jps 27495 Kafka 521 QuorumPeerMain
三、测试kafka服务
//在节点1上创建topic
[root@kafka-1 bin]# ./kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic taoresetTEST Created topic "taoresetTEST".
//在节点2上查看topic
[root@kafka-2 bin]# ./kafka-topics.sh --list --zookeeper kafka2:2181 taoresetTEST
//在节点2上查看topic
[root@kafka-3 bin]# ./kafka-topics.sh --list --zookeeper kafka3:2181 taoresetTEST