上一篇blog详细了解了kafka的基本概念、生产消费者模型、基本架构,我对kafka有了一个整体的认知,其整体框架其实可以理解为如下架构【一个Partion分区的leader和foller不会存储到一个broker上】:
消息生产出来后依据topic发送到Kafka集群中去,由集群来管理消息,消费集群主动拉取到消息后消费【消费者的消费速度可以由自己来决定,可能会存在长连接轮询的浪费问题】,整体的消息和集群管理由Zookeeper来进行。了解了这些后我们来搭建一个kafka集群。由于我只有一台电脑,所以准备使用虚拟机搭建的方式来进行环境搭建,那么我先部署一个单节点虚拟机并在上面运行kafka,然后再克隆出两台虚拟机来,这样就组成了一个分布式虚拟机集群。
单节点虚拟机安装
虚拟机搭建我使用如下的组合进行:环境准备的时候我决定使用VMware+Centos7+SecureCRT+AppNode的方式来搭建和管理虚拟机,详细的搭建过程不在这里讨论,可以参照我另一篇文章【分布式集群搭建 一】虚拟机配置(VMware+Centos7+SecureCRT+AppNode),按照这篇文章搭建好单节点虚拟机后,我们可以在上边安装Kafka:
下载安装Kafka
在官方网站找一个镜像站点kafka官方站点下载Kafka,我们这里选择清华的镜像站点下载一个Kafka:
注意这里有个坑,直接用wget的方式拉取到centos再解压会报错,所以我们先下载到windows,然后上传到centos上,通过我们的AppNode:
直接解压或者使用命令:tar -xvf kafka_2.13-2.6.0.tgz
,解压完成后可以看到如上目录
修改Zookeeper配置文件
进入如下路径/kafka/kafka_2.13-2.6.0/config/修改zookeeper.properties
配置文件
dataDir=/tmp/zookeeper dataLogDir=/tmp/zookeeper/log clientPort=2181 maxClientCnxns=0 admin.enableServer=false tickTime=2000 initLimit=10 syncLimit=5 #设置broker Id的服务地址,这里的0、1、2和brokerid保持一致 server.0=192.168.5.101:2888:3888 server.1=192.168.5.102:2888:3888 server.2=192.168.5.103:2888:3888
其中,2888端口为zookeeper的通讯端口,3888端口为选举端口。然后进入dataDir的目录/tmp/zookeeper
下创建文件myid,并写入server.id具体值(建议和kafka的broker.id保持一致)
并在myid中填写和brokerid以及本机ip对应server一致的序号:
修改Kafka配置文件
进入如下路径/kafka/kafka_2.13-2.6.0/config/
,修改 server.properties的配置:
broker.id=0 zookeeper.connect=192.168.5.101:2181,192.168.5.102:2181,192.168.5.103:2181 log.dirs=/kafka/kafka_2.13-2.6.0/data
克隆虚拟机集群
克隆集群并管理可以参照我的另一篇blog-【分布式集群搭建 二】克隆虚拟机并配置集群,使用了三台机器来进行分布式集群配置,配置完成后可以看到集群运转正常:
当然克隆完成后我们需要分别修改配置zookeeper的配置文件和
创建每台机器的myid文件
进入dataDir的目录/tmp/zookeeper
下创建文件myid,并写入server.id具体值,分别在102和103的myid文件中写入1和2,保持和每台机器的server.id以及brokerid一致。
修改每台机器的Kafka配置
分别进入每台机器的如下路径/kafka/kafka_2.13-2.6.0/config/
,修改 server.properties的配置
broker.id=0 zookeeper.connect=192.168.5.101:2181,192.168.5.102:2181,192.168.5.103:2181 log.dirs=/kafka/kafka_2.13-2.6.0/data
broker.id=1 zookeeper.connect=192.168.5.101:2181,192.168.5.102:2181,192.168.5.103:2181 log.dirs=/kafka/kafka_2.13-2.6.0/data
broker.id=2 zookeeper.connect=192.168.5.101:2181,192.168.5.102:2181,192.168.5.103:2181 log.dirs=/kafka/kafka_2.13-2.6.0/data
运行kafka命令
修改完成后我们来操作一下Kafka来实现一下,这个时候用SecureCRT来同时开启三个会话:分别在三台机器上以守护进程的方式分别操作:
启动zookeeper和Kafka
启动zookeeper bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 启动kafka集群 bin/kafka-server-start.sh -daemon config/server.properties
创建topic、查看已创建topic、查看topic的详情
创建topic bin/kafka-topics.sh --zookeeper 192.168.5.101:2181 --create --replication-factor 3 --partitions 1 --topic tml-second 查看topic列表 bin/kafka-topics.sh --zookeeper 192.168.5.101:2181 --list 查看topic详情 bin/kafka-topics.sh --zookeeper 192.168.5.101:2181 --describe --topic tml-second
需要注意副本数不能超过集群机器数,同一个机器上出现两个同一分区的副本,分区的效果就没有了,如下:
查看详情时第一个行显示所有partitions的一个总结,以下每一行给出一个partition中的信息,如果我们只有一个partition,则只显示一行。
- leader 是在给出的所有partitons中负责读写的节点,每个节点都有可能成为leader,这里leader为2,也就是102机器
- replicas 显示给定partiton所有副本所存储节点的节点列表,不管该节点是否是leader或者是否存活,这里就是我们的三台机器、0、1、2。分别对应101、102、103.
- isr 副本都已同步的的节点集合,这个集合中的所有节点都是存活状态,并且跟leader同步,这里也是我们的三台机器、0、1、2。分别对应101、102、103.说明我们三台机器都在集群里没有脱机
我们来通过一个复杂案例来确定其分布机制:
发送消息、消费消息
发送消息:bin/kafka-console-producer.sh --broker-list 192.168.5.101:9092 --topic tml-second 消费消息:bin/kafka-console-consumer.sh --bootstrap-server 192.168.5.102:9092 --from-beginning --topic tml-second 消费同一个组的消息:bin/kafka-console-consumer.sh --bootstrap-server 192.168.5.102:9092 --topic tml-second --consumer.config config/consumer.properties
删除topic
bin/kafka-topics.sh --zookeeper 192.168.5.101:2181 --delete --topic tml-kafka
关闭kafka服务
bin/kafka-server-stop.sh stop
这篇blog历程比较艰难,由于linux不是很数量,所以一开始集群配置好后老是启动不了,折腾了两天才发现zookeeper和kafka没有以守护进程的方式开启,CTRL+C杀了进程,导致后续kafka老是连接不上zookeeper,不过终于搭建好一个分布式集群了,柳暗花明又一村!