一、集群规划
Kafka从早期的消息传输系统转型为开源分布式事件流处理平台系统,所以很多核心组件,核心操作都是基于分布式多节点的,以下是分布式软件环境安装计划。
服务节点 |
kafka-broker1 |
kafka-broker2 |
kafka-broker3 |
服务进程 |
QuorumPeerMain |
QuorumPeerMain |
QuorumPeerMain |
Kafka |
Kafka |
Kafka |
二、安装虚拟机
生产环境中,我们会使用多台服务器搭建Kafka集群系统,但是对于学习来讲,准备多台独立的服务器还是比较困难的,所以我们这里采用虚拟机的方式进行学习。
1、模板机导入
Kafka集群搭建相关资料链接: https://pan.baidu.com/s/1qwW6BltYoqkBFssuIM55MA 提取码: ghcn
模板机属性 |
模板机属性值 |
IP地址 |
192.168.8.100 |
主机名称 |
hadoop100 |
内存(虚拟) |
4G |
硬盘(虚拟) |
50G |
登录账号(全小写) |
root |
密码 |
000000 |
在VMware软件中选择打开虚拟机,导入模板机hadoop100,接下来进行克隆模板机。
2、克隆虚拟机
(1) 利用模板机hadoop100,克隆三台虚拟机:kafka-broker1、kafka-broker2、kafka-broker3。克隆时,要先关闭hadoop100。
- 在模板机上点击右键选择管理 -> 克隆
- 选择创建完整克隆
- 分别填写虚拟机名称以及存储的位置,点击完成即可。
(2) 启动三台克隆机,分别修改克隆机IP,以下以kafka-broker2举例说明
- 使用root用户登录,密码为000000
- 启动终端窗口,修改克隆虚拟机的静态IP
# 修改IP文件 vim /etc/sysconfig/network-scripts/ifcfg-ens33
- 修改为:
DEVICE=ens33 TYPE=Ethernet ONBOOT=yes BOOTPROTO=static NAME="ens33" IPADDR=192.168.8.102 PREFIX=24 GATEWAY=192.168.8.2 DNS1=192.168.8.2
- kafka-broker1:192.168.8.101
- kafka-broker2:192.168.8.102
- kafka-broker3:192.168.8.103
(3) 修改克隆机主机名,以下以kafka-broker1举例说明
- 使用root用户登录,修改主机名
# 修改主机名 vim /etc/hostname kafka-broker1
- 配置Linux克隆机主机名称映射hosts文件,打开/etc/hosts
# 修改主机名称映射 vim /etc/hosts # 添加如下内容: 192.168.8.101 kafka-broker1 192.168.8.102 kafka-broker2 192.168.8.103 kafka-broker3 192.168.8.104 kafka-broker4
- 重启三台克隆机kafka-broker1、kafka-broker2、kafka-broker3。
3、分发脚本
在分布式环境中,一般都需要在多个服务器节点安装软件形成服务集群。但是在每个服务器中单独安装软件的过程是非常麻烦的,所以我们可以采用在单一的服务器节点中安装软件,一般安装成功后,将安装好的软件分发(复制)到其他服务器节点的方式,这种方式非常方便且实用的,但是需要注意的是,软件分发完成后,需要根据软件要求修改每个服务器节点自己的配置内容。
(1) 在kafka-broker1虚拟机中创建xsync分发脚本文件,用于向多个虚拟机同步文件。
# 进入/root目录 cd /root # 创建bin目录 mkdir bin # 进入/root/bin目录 cd bin # 创建xsync文件 vim xsync
(2) 然后增加文件内容:
#!/bin/bash #1. 判断参数个数 if [ $# -lt 1 ] then echo Not Enough Arguement! exit; fi #2. 遍历集群所有机器 for host in kafka-broker1 kafka-broker2 kafka-broker3 do echo ==================== $host ==================== #3. 遍历所有目录,挨个发送 for file in $@ do #4 判断文件是否存在 if [ -e $file ] then #5. 获取父目录 pdir=$(cd -P $(dirname $file); pwd) #6. 获取当前文件的名称 fname=$(basename $file) ssh $host "mkdir -p $pdir" rsync -av $pdir/$fname $host:$pdir else echo $file does not exists! fi done done
(3) 修改xsync文件权限
# 修改权限 chmod 777 /root/bin/xsync
4、SSH无密登录配置
分发文件时,需要通过脚本切换主机进行指令操作,切换主机时,是需要输入密码的,每一次都输入就显得有点麻烦,所以这里以虚拟机kafka-broker1为例配置SSH免密登录(其他节点执行同样步骤即可),配置完成后,脚本执行时就不需要输入密码了。
(1) 生成公钥和私钥
# 生产公钥和私钥 ssh-keygen -t rsa
然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)。
(2) 将公钥拷贝到要免密登录的目标机器上,拷贝过程需要输入目标机器密码
# ssh-copy-id 目标机器 ssh-copy-id kafka-broker1 ssh-copy-id kafka-broker2 ssh-copy-id kafka-broker3
三、安装JDK
1、卸载现有JDK
# 不同节点都要执行操作 rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps
2、上传并解压Java压缩包
将jdk-8u212-linux-x64.tar.gz文件上传到虚拟机的/opt/software目录中
# 进入/opt/software目录 cd /opt/software/ # 解压缩文件到指定目录 tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/ # 进入/opt/module目录 cd /opt/module # 改名 mv jdk1.8.0_212/ java
3、配置Java环境变量
(1) 新建 /etc/profile.d/my_env.sh文件
vim /etc/profile.d/my_env.sh
(2) 添加内容
#JAVA_HOME export JAVA_HOME=/opt/module/java export PATH=$PATH:$JAVA_HOME/bin
(3) 让环境变量生效
source /etc/profile.d/my_env.sh
(4) 安装测试
java -version
4、分发软件
# 分发环境变量文件 xsync /etc/profile.d/my_env.sh # 进入/opt/module路径 cd /opt/module # 调用分发脚本将本机得Java安装包分发到其他两台机器 xsync java # 在每个节点让环境变量生效
四、安装ZooKeeper
1、上传并解压ZooKeeper压缩包
将apache-zookeeper-3.7.1-bin.tar.gz文件上传到三台虚拟机的/opt/software目录中
# 进入到/opt/software目录中 cd /opt/software/ # 解压缩文件到指定目录 tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /opt/module/ # 进入/opt/module目录 cd /opt/module # 文件目录改名 mv apache-zookeeper-3.7.1-bin/ zookeeper
2、配置服务器编号
(1) 在/opt/module/zookeeper/目录下创建zkData
# 进入/opt/module/zookeeper目录 cd /opt/module/zookeeper # 创建zkData文件目录 mkdir zkData
(2) 创建myid文件
# 进入/opt/module/zookeeper/zkData目录 cd /opt/module/zookeeper/zkData # 创建myid文件 vim myid
(3) 在文件中增加内容(后面步骤会说明)
1
3、修改配置文件
(1) 重命名/opt/module/zookeeper/conf目录下的zoo_sample.cfg文件为zoo.cfg文件
# 进入cd /opt/module/zookeeper/conf文件目录 cd /opt/module/zookeeper/conf # 修改文件名称 mv zoo_sample.cfg zoo.cfg # 修改文件内容 vim zoo.cfg
(2) 修改zoo.cfg文件
# 以下内容为修改内容 dataDir=/opt/module/zookeeper/zkData # 以下内容为新增内容 ####################### cluster ########################## # server.A=B:C:D # # A是一个数字,表示这个是第几号服务器 # B是A服务器的主机名 # C是A服务器与集群中的主服务器(Leader)交换信息的端口 # D是A服务器用于主服务器(Leader)选举的端口 ######################################################### server.1=kafka-broker1:2888:3888 server.2=kafka-broker2:2888:3888 server.3=kafka-broker3:2888:3888
4、分发软件
# 进入/opt/module路径 cd /opt/module # 调用分发脚本将本机得ZooKeeper安装包分发到其他两台机器 xsync zookeeper # 分别将不同虚拟机/opt/module/zookeeper/zkData目录下myid文件进行修改 vim /opt/module/zookeeper/zkData/myid # kafka-broker1:1 # kafka-broker2:2 # kafka-broker3:3
5、启停脚本
ZooKeeper软件的启动和停止比较简单,但是每一次如果都在不同服务器节点执行相应指令,也会有点麻烦,所以我们这里将指令封装成脚本文件,方便我们的调用。
(1) 在虚拟机kafka-broker1的/root/bin目录下创建zk.sh脚本文件
在/root/bin这个目录下存放的脚本,root用户可以在系统任何地方直接执行
# 进入/root/bin目录 cd /root/bin # 创建zk.sh脚本文件 vim zk.sh
在脚本中增加内容:
#!/bin/bash case $1 in "start"){ for i in kafka-broker1 kafka-broker2 kafka-broker3 do echo ---------- zookeeper $i 启动 ------------ ssh $i "/opt/module/zookeeper/bin/zkServer.sh start" done };; "stop"){ for i in kafka-broker1 kafka-broker2 kafka-broker3 do echo ---------- zookeeper $i 停止 ------------ ssh $i "/opt/module/zookeeper/bin/zkServer.sh stop" done };; "status"){ for i in kafka-broker1 kafka-broker2 kafka-broker3 do echo ---------- zookeeper $i 状态 ------------ ssh $i "/opt/module/zookeeper/bin/zkServer.sh status" done };; esac
(2) 增加脚本文件权限
# 给zk.sh文件授权 chmod 777 zk.sh
(3) 脚本调用方式
# 启动ZK服务 zk.sh start # 查看ZK服务状态 zk.sh status # 停止ZK服务 zk.sh stop
五、安装Kafka
1、上传并解压Kafka压缩包
将kafka_2.12-3.6.1.tgz文件上传到三台虚拟机的/opt/software目录中
# 进入/opt/software目录 cd /opt/software # 解压缩文件到指定目录 tar -zxvf kafka_2.12-3.6.1.tgz -C /opt/module/ # 进入/opt/module目录 cd /opt/module # 修改文件目录名称 mv kafka_2.12-3.6.1/ kafka
2、修改配置文件
# 进入cd /opt/module/kafka/config文件目录 cd /opt/module/kafka/config # 修改配置文件 vim server.properties
输入以下内容(主要修改上面四项配置):
#broker的全局唯一编号,每个服务节点不能重复,只能是数字。 broker.id=1 #broker对外暴露的IP和端口 (每个节点单独配置) advertised.listeners=PLAINTEXT://kafka-broker1:9092 #kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔 log.dirs=/opt/module/kafka/datas #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理) zookeeper.connect=kafka-broker1:2181,kafka-broker2:2181,kafka-broker3:2181/kafka #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘IO的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #topic在当前broker上的分区个数 num.partitions=1 #用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 # 每个topic创建时的副本数,默认时1个副本 offsets.topic.replication.factor=1 #segment文件保留的最长时间,超时将被删除 log.retention.hours=168 #每个segment文件的大小,默认最大1G log.segment.bytes=1073741824 # 检查过期数据的时间,默认5分钟检查一次是否数据过期 log.retention.check.interval.ms=300000
3、分发kafka软件
# 进入 /opt/module目录 cd /opt/module # 执行分发指令 xsync kafka # 按照上面的配置文件内容,在每一个Kafka节点进行配置,请注意配置文件中的前四项主要配置内容 vim /opt/module/kafka/config/server.properties
4、配置环境变量
(1) 修改 /etc/profile.d/my_env.sh文件
vim /etc/profile.d/my_env.sh
(2) 添加内容
#KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin
(3) 让环境变量生效
source /etc/profile.d/my_env.sh
(4) 分发环境变量,并让环境变量生效
xsync /etc/profile.d/my_env.sh # 每个节点执行刷新操作 source /etc/profile.d/my_env.sh
5、启停脚本
启动前请先启动ZooKeeper服务
(1) 在虚拟机kafka-broker1的/root/bin目录下创建kfk.sh脚本文件,对kafka服务的启动停止等指令进行封装
# 进入/root/bin目录 cd /root/bin # 创建kfk.sh脚本文件 vim kfk.sh
在脚本中增加内容:
#! /bin/bash case $1 in "start"){ for i in kafka-broker1 kafka-broker2 kafka-broker3 do echo " --------启动 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties" done };; "stop"){ for i in kafka-broker1 kafka-broker2 kafka-broker3 do echo " --------停止 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh " done };; esac
(2) 增加脚本文件权限
# 给文件授权 chmod 777 kfk.sh
(3) 脚本调用方式
# 启动kafka kfk.sh start # 停止Kafka kfk.sh stop
注意:停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止ZooKeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。
(4) 联合脚本
因为Kafka启动前,需要先启动ZooKeeper,关闭时,又需要将所有Kafka全部关闭后,才能关闭ZooKeeper,这样操作起来感觉比较麻烦,所以可以将之前的2个脚本再做一次封装。
- 在虚拟机kafka-broker1的/root/bin目录下创建xcall脚本文件
# 创建xcall文件 vim /root/bin/xcall
- 在脚本中增加内容:
#! /bin/bash for i in kafka-broker1 kafka-broker2 kafka-broker3 do echo --------- $i ---------- ssh $i "$*" done
- 增加脚本文件权限
# 增加权限 chmod 777 /root/bin/xcall
- 在虚拟机kafka-broker1的/root/bin目录下创建cluster.sh脚本文件
# 创建cluster.sh脚本文件 vim /root/bin/cluster.sh
- 在脚本中增加内容:
#!/bin/bash case $1 in "start"){ echo ================== 启动 Kafka集群 ================== #启动 Zookeeper集群 zk.sh start #启动 Kafka采集集群 kfk.sh start };; "stop"){ echo ================== 停止 Kafka集群 ================== #停止 Kafka采集集群 kfk.sh stop #循环直至 Kafka 集群进程全部停止 kafka_count=$(xcall jps | grep Kafka | wc -l) while [ $kafka_count -gt 0 ] do sleep 1 kafka_count=$(xcall | grep Kafka | wc -l) echo "当前未停止的 Kafka 进程数为 $kafka_count" done #停止 Zookeeper集群 zk.sh stop };; esac
- 增加脚本文件权限
# 增加权限 chmod 777 /root/bin/cluster.sh
- 脚本调用方式
# 集群启动 cluster.sh start # 集群关闭 cluster.sh stop
六、测试集群
1、启动Kafka集群
因为已经将ZooKeeper和Kafka的启动封装为脚本,所以可以分别调用脚本启动或调用集群脚本启动
# 启动集群 cluster.sh start
输入指令查看进程
# xcall 后面跟着linux指令操作,可以同时对多个服务器节点同时执行相同指令 xcall jps
2、查看Kafka状态
使用客户端工具访问kafka
3、关闭Kafka集群
# 关闭集群 cluster.sh stop # 查看进程 xcall jps