zookeeper 和 kafka 的安装使用

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
日志服务 SLS,月写入数据量 50GB 1个月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
简介:

前提都安装JDK 6版本以上

java -version

java version "1.7.0_79"



测试环境集群结构(三台集群的hosts一定要互相解析 不然会很坑)


server1: 192.168.100.10

server1: 192.168.100.13

server1: 192.168.100.20


安装包:

zookeeper-3.4.9.tar.gz 

kafka_2.11-0.10.1.0.tgz 


背景:

为了获得可靠的 ZooKeeper 服务,用户应该在一个集群上部署 ZooKeeper 。只要集群上大多数的 ZooKeeper 服务启动了,那么总的 ZooKeeper 服务将是可用的。另外,最好使用奇数台机器。 如果 zookeeper 拥有 5 台机器,那么它就能处理 2 台机器的故障了。




### zookeeper的集群安装##################


1、安装包下载到/usr/local/src

2、解压至安装路径/usr/localcd 

cd /usr/local/src/

tar -xvf zookeeper-3.4.9.tar.gz   -C /usr/local/

cd /usr/local

ln -s zookeeper-3.4.9/ zookeeper



3、修改配置文件

cd  cd /usr/local/zookeeper/conf/

cp zoo_sample.cfg zoo.cfg


配置文件(每台zookeeper的配置文件都一样)

[root@master conf]# cat zoo.cfg 

# The number of milliseconds of each tick

tickTime=2000

# The number of ticks that the initial 

# synchronization phase can take

initLimit=10

# The number of ticks that can pass between 

# sending a request and getting an acknowledgement

syncLimit=5

# the directory where the snapshot is stored.

# do not use /tmp for storage, /tmp here is just 

# example sakes.

dataDir=/usr/local/zookeeper/data

# the port at which the clients will connect

clientPort=2181

# the maximum number of client connections.

# increase this if you need to handle more clients

#maxClientCnxns=60

#

# Be sure to read the maintenance section of the 

# administrator guide before turning on autopurge.

#

# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance

#

# The number of snapshots to retain in dataDir

#autopurge.snapRetainCount=3

# Purge task interval in hours

# Set to "0" to disable auto purge feature

#autopurge.purgeInterval=1


#第一个端口( port )是从( follower )机器连接到主( leader )机器的端口

#第二个端口是用来进行 leader 选举的端口

server.1=192.168.100.10:2888:3888

server.2=192.168.100.13:2888:3888

server.3=192.168.100.20:2888:3888





### 注释:

在这个文件中,我们需要指定 dataDir 的值,它指向了一个目录,这个目录在开始的时候需要为空。下面是每个参数的含义:

tickTime :基本事件单元,以毫秒为单位。它用来指示心跳,最小的 session 过期时间为两倍的 tickTime. 。

dataDir :存储内存中数据库快照的位置,如果不设置参数,更新事务日志将被存储到默认位置。

clientPort :监听客户端连接的端口

#server.A=B:C:D  其中A是一个数字,代表这是第几号服务器;B是服务器的IP地址;C表示服务器与群集中的“领导者”交换信息的端口;当领导者失效后,D表示用来执行选举时服务器相互通信的端口。





4、创建数据目录和创建myid文件

mkdir /usr/local/zookeeper/data

echo "1" > /usr/local/zookeeper/data/myid






### 配置其他节点

myid要不一样



5、启动集群

按照上述进行配置即可。

第五步:启动ZooKeeper集群

在ZooKeeper集群的每个结点上,执行启动ZooKeeper服务的脚本,如下所示:

cd /usr/local/zookeeper/

bin/zkServer.sh start  

bin/zkServer.sh start  

bin/zkServer.sh start  



查看日志:(日志生成在执行启动的目录下)

tail -f zookeeper.out



查看监听的端口:(Leader端 才监听2888 端口,follower是不监听的,只监听3888端口)

[root@agent zookeeper]# netstat -tulnp |grep 88

tcp        0      0 ::ffff:192.168.100.13:3888  :::*                        LISTEN      18526/java          

tcp        0      0 ::ffff:192.168.100.13:2888  :::*                        LISTEN      18526/java          

[root@agent zookeeper]# netstat -tulnp |grep 2181

tcp        0      0 :::2181                     :::*                        LISTEN      18526/java          

[root@agent zookeeper]# 




6 验证


./bin/zkServer.sh  status



注释: 因为启动顺序是从第一台开始的 所以看日志第一台会有日志因为第二台第三台还未启动,一会就正常了忽略



看到各个角色(leader选举出来表示集群正常) 通过上面状态查询结果可见,第二台是集群的Leader,其余的两个结点是Follower。

[root@agent zookeeper]# ./bin/zkServer.sh  status

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Mode: leader



7、测试客户端命令连接

./bin/zkCli.sh -server 192.168.100.10:2181


连接上去之后:

执行命令

[zk: 192.168.100.10:2181(CONNECTED) 4] ls /

[zookeeper]




注: 当前根路径为/zookeeper。








################# 安装kafka

下载包

[root@master src]# ll kafka_2.11-0.10.1.0.tgz 

-rw-r--r-- 1 root root 34373824 Oct 20  2016 kafka_2.11-0.10.1.0.tgz


1、解压至安装目录

tar -xvf kafka_2.11-0.10.1.0.tgz -C /usr/local/

cd /usr/local/

ln -s kafka_2.11-0.10.1.0/ kafka





2、修改配置文件

cd /usr/local/kafka/config

vim server.properties 


需要修改的项目:

# The id of the broker. This must be set to a unique integer for each broker

broker.id=1





# A comma seperated list of directories under which to store log files

log.dirs=/usr/local/kafka/logs


# Zookeeper connection string (see zookeeper docs for details).

# This is a comma separated host:port pairs, each corresponding to a zk

zookeeper.connect=192.168.100.10:2181,192.168.100.13:2181,192.168.100.20:2181



注意:每台kafka的broer.id 是不一样的

3、创建日志目录

1
mkdir  /usr/local/kafka/logs





4、### 配置其他节点



5、启动

从后台启动Kafka集群(3台都需要启动)

1
2
cd  /usr/local/kafka/bin/
. /kafka-server-start .sh -daemon  .. /config/server .properties


[root@master bin]# jps

7449 Jps

7427 Kafka             #kafka进程

31341 QuorumPeerMain   #zk进程



如果(3个kafka进程都在表示正常,配置文件错误是服务起不了)


查看启动日志:

tail -f  /usr/local/kafka/logs/server.log 



6、测试kafka集群


1-进入kafka根目录,创建topic   test

1
. /bin/kafka-topics .sh --create --zookeeper 192.168.100.10:2181,192.168.100.13:2181,192.168.100.20:2181 --replication-factor 1 --partitions 1 --topic  test

结果:

Created topic "test".



2-列出已创建的topic列表

1
. /bin/kafka-topics .sh --list --zookeeper 192.168.100.10:2181,192.168.100.13:2181,192.168.100.20:2181

结果:

test


或者使用zookeeper命令去查看

1
. /bin/zkCli .sh -server 192.168.100.10:2181

[zk: 192.168.100.10:2181(CONNECTED) 3] ls /brokers/topics

[test]




3-模拟客户端去发送消息


./bin/kafka-console-producer.sh --broker-list 192.168.100.10:9092,192.168.100.13:9092,192.168.100.20:9092 --topic test


4-模拟客户端去接受消息(虽然看到的是消费 但是因为消费么有删除每次 执行之前的模拟信息还会看到)

./bin/kafka-console-consumer.sh --zookeeper 192.168.100.10:2181,192.168.100.13:2181,192.168.100.20:2181 --from-beginning --topic test





5、停止 在启动(重启)


集群依次kafka都需要停止

1
. /bin/kafka-server-stop .sh


集群依次kafka都需要启动

1
2
cd  /usr/local/kafka/bin/
. /kafka-server-start .sh -daemon  .. /config/server .properties




6、 删除测试的test 的那个topic

1
. /bin/kafka-topics .sh --delete --zookeeper 192.168.100.10:2181,192.168.100.13:2181,192.168.100.20:2181 --topic  test


结果                             

Topic test is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.


因为:

如果kafaka启动时加载的配置文件中server.properties没有配置(默认没有配置 默认值为false)delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion



彻底删除进入zk

1
. /bin/zkCli .sh -server 192.168.100.10:2181

rmr /brokers/topics/test

本文转自残剑博客51CTO博客,原文链接http://blog.51cto.com/cuidehua/1922639博如需转载请自行联系原作者


cuizhiliang

相关文章
|
3月前
|
消息中间件 运维 算法
Kafka 为什么要抛弃 Zookeeper?
本文探讨了Kafka为何逐步淘汰ZooKeeper。长久以来,ZooKeeper作为Kafka的核心组件,负责集群管理和协调任务。然而,随着Kafka的发展,ZooKeeper带来的复杂性增加、性能瓶颈及一致性问题日益凸显。为解决这些问题,Kafka引入了KRaft,这是一种基于Raft算法的内置元数据管理方案,不仅简化了部署流程,还提升了系统的一致性和扩展性。本文详细分析了这一转变背后的原因及其带来的优势,并展望了Kafka未来的发展方向。
265 1
|
3月前
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
121 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
3月前
|
消息中间件 Java Kafka
ELFK对接zookeeper&kafka
ELFK对接zookeeper&kafka
|
5月前
|
消息中间件 存储 Kafka
ZooKeeper助力Kafka:掌握这四大作用,让你的消息队列系统稳如老狗!
【8月更文挑战第24天】Kafka是一款高性能的分布式消息队列系统,其稳定运行很大程度上依赖于ZooKeeper提供的分布式协调服务。ZooKeeper在Kafka中承担了四大关键职责:集群管理(Broker的注册与选举)、主题与分区管理、领导者选举机制以及消费者组管理。通过具体的代码示例展示了这些功能的具体实现方式。
144 2
|
6月前
|
消息中间件 存储 Kafka
kafka 在 zookeeper 中保存的数据内容
kafka 在 zookeeper 中保存的数据内容
65 3
|
6月前
|
消息中间件 NoSQL Kafka
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(2)
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(2)
|
6月前
|
消息中间件 应用服务中间件 Kafka
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(1)
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(1)
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
147 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
68 1
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
400 9

热门文章

最新文章