Kafka consumer 与 producer测试

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: Kafka consumer 与 producer测试

一、任务描述

本实验任务主要完成基于ubuntu环境掌握Kafka consumer 与 producer测试的工作。通过完成本实验任务,要求学生熟练掌握Kafka consumer 与 producer的使用,为后续实验的开展奠定Kafka平台基础,也为从事大数据平台运维工程师、大数据技术支持工程师等岗位工作奠定夯实的技能基础。


二、任务目标

1、掌握Kafka consumer 与 producer测试


三、任务环境

Ubuntu(三台节点:mater:192.168.0.3,slave1:192.168.0.2,slave2:192.168.0.4)、Zookeeper3.4.5

四、任务分析

Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。


学会使用Kafka consumer 与 producer测试。

五、 任务实施

步骤1、完全分布环境搭建

三台节点先做ssh免密码配置,具体可参考Hadoop完全分布式搭建。

 分别在三台节点下进入/simple/zookeeper/conf使用命令【vi zoo.cfg】进行zoo.cfg配置文件修改,设置集群配置参数,添加集群配置。如图1所示。

8c9aac12ce27435b84b029b6ccf7a155.jpg



图1 配置zoo.cfg文件

 分别在三台节点的zookeeper/目录下新建zk_data文件夹,并在此zk_data/目录下新建myid文件。如图2所示。


8b31408a7aa14fc5a0df9ab186fbf2a3.png


图2 创建myid文件

 在主节点zookeeper文件夹zk_data/下,对myid文件进行编译【echo 1 >> myid】。如图3所示。

8a7a360400b144b49b057b290aed009d.png



图3 配置主节点myid文件

 相同的步骤在第二个节点,为myid文件赋值2。如图4所示。

9842442e62f84a60a611cf415ddeac05.png



图4 配置slave1节点myid文件

 相同的步骤在第三个节点,为myid文件赋值3。如图5所示。


f35bc0b32957421d84b5ca1515940416.png


图5 配置myid文件3


在主节点的linux系统中终端首先切换到simple目录,执行命令:【cd /simple】。然后执行解压命令:【tar -zxvf /simple/soft/kafka_2.10-0.8.1.1.tgz -C /simple】。如图6所示。


c8f73905e9ca4373ac3140371af728ca.jpg


图6 解压

 将解压好的kafka软件包重命名为Kafka。如图7所示。


07cb2be4cf954a8d949ea403f5c79b88.jpg


图7 重命名

 修改配置文件server.properties。在Kafka的config目录下执行命令【vim server.properties】,按i键之后进入编辑状态,配置如下

log.dirs=/simple/kafka/kafka-logs zookeeper.connect=192.168.0.3:2181,192.168.0.2:2181,192.168.0.4:2181

。如图8-9所示 。

cea245273ceb431c93adcd1c286fd267.png



图8 修改server.properties


2fc2d748d271408a99189e25775e4a15.jpg


图9 修改server.properties

 将Kafka分发到第二个和第三个节点的/simple目录下。如图10-11所示。


e8c9deb3c9554b93ad95d02d9fccd690.jpg


图10 分发

4620539132b14fb7962a2408f728ac73.jpg



图11 分发

 在第二个节点上修改kafka的config目录下的vim server.properties文件。如图12所示。


cb71de01b6be48d0ac61b0e585dcf4e9.png


图12 修改server.properties

 在第三个节点上修改kafka的config目录下的server.properties文件。如图13所示。


7e4bb510f17a40f58555d295a4c48f9d.png


图13 修改server.properties


步骤2、启动Kafka

分别进入三个节点到zookeeper bin文件下,通过命令【./zkServer.sh start】来启动服务。如图14-16所示。



4724192e3f6e4a77a8b621cfadb715cc.png

图14 启动Zookeeper集群


bd4e2a1a89394b67960430dbb1efea9c.png


图15 启动Zookeeper集群

b3623e557c414db7b82432e7dc4ecdee.png



图16 启动Zookeeper集群

 在三台节点上利用【./zkServer.sh status】查看Zookeeper节点状态。如图17-19所示。



6d31a767982e456388cfe52ea13b2993.png

图17 master查看Zookeeper节点状态


293ba36a99af45bdbef82195e80e699c.png


图18 slave1查看Zookeeper节点状态


fe0427e09f1e459bb82fbc9e0484b8bf.png


图19 slave2查看Zookeeper节点状态

 分别进入三个节点到kafka bin文件下,通过命令【./kafka-server-start.sh ../config/server.properties】来启动服务。如图20-22所示。

f9c599d6f4174ffda9a1f7f195c5ac1e.png



图20 启动Kafka集群


d7848ae91f5144cca35e004ffb19561c.png


图21 启动Kafka集群


9b4ae6ee89d24f70bb3a290f4c102ab7.png


图22 启动Kafka集群

 查看是否启动。在各个节点重启终端,执行【jps】命令,可以看到新启动进程。如图23-25所示。


e0529009146f4070805681ca80ac5ef1.png


图23 查看进程

bb53d024aa0a4f3a9376e67c69d48745.png



图24 查看进程

c948488781494be39854a4ac38ddf357.png



图25 查看进程

步骤3、Kafka consumer 与 producer测试

在主节点创建topic。执行【./kafka-topics.sh --create --zookeeper 192.168.0.3:2181,192.168.0.2:2181,192.168.0.4:2181 --replication-factor 3 --partitions 1 --topic test】命令。如图26所示。


63767ebbb4cc48bbbfaf1ffbdcf0c9d1.jpg


图26 创建topic

 在主节点查看指定topic的详细信息。执行【./kafka-topics.sh --describe --zookeeper 192.168.0.3:2181,192.168.0.2:2181,192.168.0.4:2181 --topic test】命令。如图27所示。


3e46e86385fb4c30b30f2e60b42eb145.jpg


图27 查看指定topic

 在主节点master创建生产者并向test主题发送消息。执行【./kafka-console-producer.sh --broker-list 192.168.0.3:9092,192.168.0.2:9092,192.168.0.4:9092 --topic test】命令。如图28所示。


a53f48c6872b4828a108c93ebc02c057.jpg


图28 创建生产者

 在主节点新开启一个终端创建消费者者并消费消息。执行【./kafka-console-consumer.sh --zookeeper 192.168.0.3:2181,192.168.0.2:2181,192.168.0.4:2181 --topic test --from-beginning】命令。如图29所示。


67daf2021c104b3c8fcc3cc256d26de1.jpg


图29 创建消费者

 杀掉第二个节点上的broker。如图30所示。



c170ff990bb2448da293bd24118badc0.jpg

图30 杀掉broker

 在主节点再次查看指定topic的详细信息,发现topic还正常的存在。执行【./kafka-topics.sh --describe --zookeeper 192.168.0.3:2181,192.168.0.2:2181,192.168.0.4:2181 --topic test】命令。如图31所示。


f8d838cabd794b8885f2707849c0f3d2.jpg


图31 查看指定topic

 在主节点再次创建消费者并消费消息,还能查询到消息说明一切都是正常的。执行【./kafka-console-consumer.sh --zookeeper 192.168.0.3:2181,192.168.0.2:2181,192.168.0.4:2181 --topic test --from-beginning】命令。如图32所示。


a162243611e5410bae131d768f1013d3.jpg


图32 再次创建消费者


♥ 知识链接

Producer


Producers直接发送消息到broker上的leader partition,不需要经过任何中介一系列的路由转发。为了实现这个特性,kafka集群中的每个broker都可以响应producer的请求,并返回topic的一些元信息,这些元信息包括哪些机器是存活的,topic的leader partition都在哪,现阶段哪些leader partition是可以直接被访问的。


♥ 温馨提示

Producer客户端自己控制着消息被推送到哪些partition。实现的方式可以是随机分配、实现一类随机负载均衡算法,或者指定一些分区算法。

88c7d79a2c094be2a8bac068129a6bef.png

相关文章
|
1月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
64 4
|
25天前
|
消息中间件 中间件 Kafka
解锁Kafka等消息队列中间件的测试之道
在这个数字化时代,分布式系统和消息队列中间件(如Kafka、RabbitMQ)已成为日常工作的核心组件。本次公开课由前字节跳动资深专家KK老师主讲,深入解析消息队列的基本原理、架构及测试要点,涵盖功能、性能、可靠性、安全性和兼容性测试,并探讨其主要应用场景,如应用解耦、异步处理和限流削峰。课程最后设有互动答疑环节,助你全面掌握消息队列的测试方法。
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
55 0
|
2月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
251 4
|
3月前
|
消息中间件 监控 算法
Kafka Producer 的性能优化技巧
【8月更文第29天】Apache Kafka 是一个分布式流处理平台,它以其高吞吐量、低延迟和可扩展性而闻名。对于 Kafka Producer 来说,正确的配置和编程实践可以显著提高其性能。本文将探讨一些关键的优化策略,并提供相应的代码示例。
136 1
|
3月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
82 4
|
3月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
3月前
|
开发者 图形学 前端开发
绝招放送:彻底解锁Unity UI系统奥秘,五大步骤教你如何缔造令人惊叹的沉浸式游戏体验,从Canvas到动画,一步一个脚印走向大师级UI设计
【8月更文挑战第31天】随着游戏开发技术的进步,UI成为提升游戏体验的关键。本文探讨如何利用Unity的UI系统创建美观且功能丰富的界面,包括Canvas、UI元素及Event System的使用,并通过具体示例代码展示按钮点击事件及淡入淡出动画的实现过程,助力开发者打造沉浸式的游戏体验。
95 0
|
3月前
|
消息中间件 Java Kafka
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)