一、任务描述
本实验任务主要完成基于ubuntu环境掌握Kafka consumer 与 producer测试的工作。通过完成本实验任务,要求学生熟练掌握Kafka consumer 与 producer的使用,为后续实验的开展奠定Kafka平台基础,也为从事大数据平台运维工程师、大数据技术支持工程师等岗位工作奠定夯实的技能基础。
二、任务目标
1、掌握Kafka consumer 与 producer测试
三、任务环境
Ubuntu(三台节点:mater:192.168.0.3,slave1:192.168.0.2,slave2:192.168.0.4)、Zookeeper3.4.5
四、任务分析
Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
学会使用Kafka consumer 与 producer测试。
五、 任务实施
步骤1、完全分布环境搭建
三台节点先做ssh免密码配置,具体可参考Hadoop完全分布式搭建。
分别在三台节点下进入/simple/zookeeper/conf使用命令【vi zoo.cfg】进行zoo.cfg配置文件修改,设置集群配置参数,添加集群配置。如图1所示。
图1 配置zoo.cfg文件
分别在三台节点的zookeeper/目录下新建zk_data文件夹,并在此zk_data/目录下新建myid文件。如图2所示。
图2 创建myid文件
在主节点zookeeper文件夹zk_data/下,对myid文件进行编译【echo 1 >> myid】。如图3所示。
图3 配置主节点myid文件
相同的步骤在第二个节点,为myid文件赋值2。如图4所示。
图4 配置slave1节点myid文件
相同的步骤在第三个节点,为myid文件赋值3。如图5所示。
图5 配置myid文件3
在主节点的linux系统中终端首先切换到simple目录,执行命令:【cd /simple】。然后执行解压命令:【tar -zxvf /simple/soft/kafka_2.10-0.8.1.1.tgz -C /simple】。如图6所示。
图6 解压
将解压好的kafka软件包重命名为Kafka。如图7所示。
图7 重命名
修改配置文件server.properties。在Kafka的config目录下执行命令【vim server.properties】,按i键之后进入编辑状态,配置如下
log.dirs=/simple/kafka/kafka-logs zookeeper.connect=192.168.0.3:2181,192.168.0.2:2181,192.168.0.4:2181
。如图8-9所示 。
图8 修改server.properties
图9 修改server.properties
将Kafka分发到第二个和第三个节点的/simple目录下。如图10-11所示。
图10 分发
图11 分发
在第二个节点上修改kafka的config目录下的vim server.properties文件。如图12所示。
图12 修改server.properties
在第三个节点上修改kafka的config目录下的server.properties文件。如图13所示。
图13 修改server.properties
步骤2、启动Kafka
分别进入三个节点到zookeeper bin文件下,通过命令【./zkServer.sh start】来启动服务。如图14-16所示。
图14 启动Zookeeper集群
图15 启动Zookeeper集群
图16 启动Zookeeper集群
在三台节点上利用【./zkServer.sh status】查看Zookeeper节点状态。如图17-19所示。
图17 master查看Zookeeper节点状态
图18 slave1查看Zookeeper节点状态
图19 slave2查看Zookeeper节点状态
分别进入三个节点到kafka bin文件下,通过命令【./kafka-server-start.sh ../config/server.properties】来启动服务。如图20-22所示。
图20 启动Kafka集群
图21 启动Kafka集群
图22 启动Kafka集群
查看是否启动。在各个节点重启终端,执行【jps】命令,可以看到新启动进程。如图23-25所示。
图23 查看进程
图24 查看进程
图25 查看进程
步骤3、Kafka consumer 与 producer测试
在主节点创建topic。执行【./kafka-topics.sh --create --zookeeper 192.168.0.3:2181,192.168.0.2:2181,192.168.0.4:2181 --replication-factor 3 --partitions 1 --topic test】命令。如图26所示。
图26 创建topic
在主节点查看指定topic的详细信息。执行【./kafka-topics.sh --describe --zookeeper 192.168.0.3:2181,192.168.0.2:2181,192.168.0.4:2181 --topic test】命令。如图27所示。
图27 查看指定topic
在主节点master创建生产者并向test主题发送消息。执行【./kafka-console-producer.sh --broker-list 192.168.0.3:9092,192.168.0.2:9092,192.168.0.4:9092 --topic test】命令。如图28所示。
图28 创建生产者
在主节点新开启一个终端创建消费者者并消费消息。执行【./kafka-console-consumer.sh --zookeeper 192.168.0.3:2181,192.168.0.2:2181,192.168.0.4:2181 --topic test --from-beginning】命令。如图29所示。
图29 创建消费者
杀掉第二个节点上的broker。如图30所示。
图30 杀掉broker
在主节点再次查看指定topic的详细信息,发现topic还正常的存在。执行【./kafka-topics.sh --describe --zookeeper 192.168.0.3:2181,192.168.0.2:2181,192.168.0.4:2181 --topic test】命令。如图31所示。
图31 查看指定topic
在主节点再次创建消费者并消费消息,还能查询到消息说明一切都是正常的。执行【./kafka-console-consumer.sh --zookeeper 192.168.0.3:2181,192.168.0.2:2181,192.168.0.4:2181 --topic test --from-beginning】命令。如图32所示。
图32 再次创建消费者
♥ 知识链接
Producer
Producers直接发送消息到broker上的leader partition,不需要经过任何中介一系列的路由转发。为了实现这个特性,kafka集群中的每个broker都可以响应producer的请求,并返回topic的一些元信息,这些元信息包括哪些机器是存活的,topic的leader partition都在哪,现阶段哪些leader partition是可以直接被访问的。
♥ 温馨提示
Producer客户端自己控制着消息被推送到哪些partition。实现的方式可以是随机分配、实现一类随机负载均衡算法,或者指定一些分区算法。