搭建kafka运行环境

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,182元/月
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

1.前言

由于项目涉及到kafka,自己以前没有接触过这方面的,学习了下,将搭建kafka运行环境同大家分享。


2.搭建步骤



第一步,到Apache Kafka官网下载最新的压缩包,比如我下载的就是:

1
kafka_2.9.2-0.8.1.1.tgz


第二步,解压并启动Zookeeper

1
2
3
tar  -xzvf kafka_2.9.2-0.8.1.1.tgz
cd  kafka_2.9.2-0.8.1.1
bin /zookeeper-server-start .sh config /zookeeper .properties &


说明:

  • 由于kafka用到了zookeeper,所以应该首先启动它。

  • 应该确保已经安装JDK,并设定好JAVA_HOME,CLASSPATH,PATH这些环境变量。否则会提示:

java command not found


wKioL1QeTWiC80BcAAZgcsalBjE065.jpg



查看端口信息:


wKioL1QeTbHiD2WTAAFtqrDfcrI689.jpg



查看zookeeper.properties配置信息:

1
2
# the port at which the clients will connect
clientPort=2181


第三步:启动kafka

1
bin /kafka-server-start .sh config /server .properties

我在启动中遇到了下面的2个问题:


Unrecognized VM option '+UseCompressedOops'

原因及解决办法:

kafka用了很多优化运行的jvm参数,而我安装的jdk所带的jvm不一定支持这些参数,比如: 

-XX:+UseCompressedOops 

所以需要编辑kafka-run-class.sh,将这个选项注释掉:

1
2
3
4
5
  KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC 
-XX:+UseConcMarkSweepGC 
-XX:+CMSClassUnloadingEnabled 
-XX:+CMSScavengeBeforeRemark 
-XX:+DisableExplicitGC -Djava.awt.headless= true "

 

Error occurred during initialization of VM Could not reserve enough space for object heap

原因及解决办法:

查看kafka-server-start.sh配置文件,发现有heap设置信息:


1
export  KAFKA_HEAP_OPTS= "-Xmx1G -Xms1G"


我们可以java -X来看看这些参数的意义:


wKiom1QeUNnCeNxMAAMWoSYtIfU419.jpg


-Xms表示JAVA堆内存的初始化大小,而-Xmx表示最大值。

我在创建虚拟机时,指定的内存大小为256M,很显然不够,因此我尝试改为:

1
#export KAFKA_HEAP_OPTS="-Xmx100m -Xms200m"

但是依然报错,看来这个参数不能调到这么小,最后我将虚拟机内存调到1G,启动成功。



wKiom1QeUpSgUUWJAAbAC-ciE8c891.jpg



注意输出信息中,端口信息:9092



进程验证:


wKioL1QeYVuCVuhBAAAwdD9ld8Y413.jpg


端口验证:


wKiom1QeYkexrHeOAABv0xPvBnw149.jpg


说明:

Kafka的进程ID为9300,占用端口为9092

QuorumPeerMain为对应的zookeeper实例,进程ID为6379,在2181端口监听




3.一些基本概念



  • kafka是什么?


记住几个关键点,分布式、高吞吐量 的 订阅、发布 消息系统


  • kafka有什么?


producer   消息的生成者,即发布消息

consumer   消息的消费者,即订阅消息

broker     Kafka以集群的方式运行,可以由一个或多个服务组成,服务即broker

zookeeper  协调转发 


  • kafka的工作图


wKioL1QeZSji0xPbAABVRKnvLJE735.jpg

producers通过网络将消息发送到Kafka集群,集群向消费者提供消息

kafka对消息进行归纳,即topic,也就是说producer发布topic,consumer订阅topic


下面,我们来一个初步的测试,来加深这些概念的理解。




4.模拟客户端发送、接受消息初步测试



Step 1 : 创建一个topic


wKioL1QeaQ2C4SzRAAB26yLebNA178.jpg


命令运行后提示:

Created topic "my_first_topic".


注意,创建topic时需要指明zookeeper的socket(IP+PORT)在哪里,以及topic名称。

(至于partitions,replication-factor这些分区,副本的概念以后再说,暂放)


此时:

zookeeper进程提示:

wKioL1QeabnjK073AAOMyEsPH0U599.jpg


kafka进程提示:

wKiom1QeaduSXtPtAAFXcwcLPU4105.jpg




Step 2 : 查看topic list


wKioL1QebB7gc9JcAABrP9DMrKM472.jpg



Step 3 : 发送、接受消息


下面,我启动2个XSHELL客户端,一个用于生产者发送消息,一个用于消费者接受消息。


XSHELL-A


wKioL1Qebd7ybw32AAD0PjKC2yk691.jpg


XSHELL-B


wKiom1Qebe2QOLySAADxhrQ7Tbk791.jpg


只要我们在XSHELL-A中输入消息回车,那么马上XSHELL-B中就会有消息显示。


注意:

producer,指定的Socket(localhost+9092),说明生产者的消息要发往kafka,也即是broker

consumer, 指定的Socket(localhost+2181),说明消费者的消息来自zookeeper(协调转发)



上面的只是一个单个的broker,下面我们来实验一个多broker的集群。


5.搭建一个多个broker的集群



刚才只是启动了单个broker,现在启动有3个broker组成的集群,这些broker节点

也都是在本机上的


Step 1 : 为每一个broker提供配置文件


我们先看看config/server.properties配置信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@localhost config] # grep -v '#' server.properties  | sort
broker. id =0
port=9092
num.network.threads=2 
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log. dirs = /tmp/kafka-logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner. enable = false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=1000000

说明:

broker.id为集群中唯一的标注一个节点,因为在同一个机器上,所以必须指定不同的

端口和日志文件,避免数据被覆盖。


在上面单个broker的实验中,为什么kafka的端口为9092,这里可以看得很清楚。


注意日志目录:

1
2
3
4
5
6
7
8
9
10
11
12
13
[root@localhost kafka-logs] # pwd
/tmp/kafka-logs
[root@localhost kafka-logs] # ls -l
total 32
drwxr-xr-x 2 root root 4096 Sep 20 22:58 my_first_topic-0
-rw-r--r-- 1 root root   32 Sep 20 23:54 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root   32 Sep 20 23:54 replication-offset-checkpoint
drwxr-xr-x 2 root root 4096 Sep 20 20:22  test -0
[root@localhost kafka-logs] # tree my_first_topic-0
my_first_topic-0
|-- 00000000000000000000.index
`-- 00000000000000000000.log
0 directories, 2 files


【topic,分区,offset等等这些概念,暂放】



kafka cluster怎么同zookeeper交互的,配置信息中也有体现。


那么下面,我们仿照上面的配置文件,提供2个broker的配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@localhost config] # touch server-1.properties
[root@localhost config] # touch server-2.properties
[root@localhost config] # vi server-1.properties
[root@localhost config]
[root@localhost config]
[root@localhost config] # vi server-2.properties
[root@localhost config]
[root@localhost config]
[root@localhost config] # cat server-1.properties
broker. id =1
port=9093
log. dir = /tmp/kafka-logs-1
zookeeper.connect=localhost:2181
[root@localhost config] # cat server-2.properties
broker. id =2
port=9094
log. dir = /tmp/kafka-logs-2
zookeeper.connect=localhost:2181


Step 2 : 启动所有的broker


由于在上面的实验中,已经启动了zookeeper和一个broker(id=0),那么现在只需要启动

broker(id=1)和broker(id=2)。


命令如下:


1
2
bin /kafka-server-start .sh config /server-2 .properties &
bin /kafka-server-start .sh config /server-1 .properties &


会发现,zookeeper进程会有提示信息输出。


进程,端口观察:


wKioL1QeemKx4EvIAAIeZbDOnto371.jpg


一个zookeeper在2181端口上监听,3个kafka cluster(broker)分别在端口

9092,9093,9094监听。




Step 3 : 创建topic


1
2
3
4
5
6
bin /kafka-topics .sh --create --topic topic_1 --partitions 1 --replication-factor 3  \
--zookeeper localhost:2181
bin /kafka-topics .sh --create --topic topic_2 --partitions 1 --replication-factor 3  \
--zookeeper localhost:2181
bin /kafka-topics .sh --create --topic topic_3 --partitions 1 --replication-factor 3  \
--zookeeper localhost:2181

查看topic创建情况:


wKioL1QefTzy-ZYFAAF-L9pFdok724.jpg


上面的有些东西,也许还不太清楚,暂放,继续试验。需要注意的是topic_1的Leader=1



Step 4 : 模拟客户端发送,接受消息


XSHELL-A

1
bin /kafka-console-consumer .sh --topic topic_1 --zookeeper localhost:2181 --from-beginning


XSHELL-B

1
2
bin /kafka-console-producer .sh --topic topic_1 --broker-list localhost:9092,localhost:9093,
localhost:9094

需要注意,此时producer将topic发布到了3个broker中,现在分布式的概念就有点了。


在XSHELL-B中发消息,XSHELL-A中就会有消息显示出来。



Step 5 : kill some broker


测试点一:

kill broker(id=0)

首先,我们根据前面的配置,得到broker(id=0)应该在9092监听,这样就能确定它的PID了。


wKioL1QehPChlpqSAAE44i09mes219.jpg




得到broker(id=0)的PID为9300,那么接下来,我们KILL这个broker:


wKioL1QehX7T7zqWAAEPAY6whpA513.jpg


再次观察,topic在kafka cluster中的情况:


wKiom1Qehd3B9USOAAH5zv7m2dI080.jpg


需要与broker(id=0)没有被kill前,做下对比。很明显,主要变化在于Isr,以后在分析。


测试下,发送消息,接受消息,是否收到影响。


生产者:


wKiom1QehpTh5gWVAAEg3vmnALQ852.jpg


消费者:


wKiom1QehqiQye_YAAEX9hq6Byo444.jpg


结论,并没有收到影响。



测试点二:


kill broker(id=1)


同上可以得到broker(id=1)的PID为21165,同样的kill它,并测试发送,接受消息

是否收到影响。


发送端:


wKioL1QeiHfxWYVyAAAsZoJsVyI996.jpg



接受端:


wKiom1QeiGOwltVhAAAq0ik1a_E670.jpg


可见,kafka的分布式机制,容错能力还是挺好的~



本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1556650,如需转载请自行联系原作者


相关文章
|
消息中间件 Kafka Windows
windows下搭建Kafka运行环境
windows下搭建Kafka运行环境 http://www.bieryun.com/ 一、安装JDK jdk的安装就不再强调了 二、安装Zookeeper 由于Kafka的运行依赖于Zookeeper,所以在运行Kafka之前需要安装并运行Zookeeper 1.
2071 0
|
11月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
472 1
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
327 1
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
1172 9
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
257 3
|
消息中间件 存储 Kafka
kafka 在 zookeeper 中保存的数据内容
kafka 在 zookeeper 中保存的数据内容
217 3
|
消息中间件 负载均衡 Kafka
微服务数据问题之Kafka实现高可用如何解决
微服务数据问题之Kafka实现高可用如何解决
172 1
|
消息中间件 存储 负载均衡
微服务数据问题之Kafka作为元数据节点如何解决
微服务数据问题之Kafka作为元数据节点如何解决
157 1