【RocketMQ 系列三】RocketMQ集群搭建(2m-2s-sync)

本文涉及的产品
云防火墙,500元 1000GB
简介: 【RocketMQ 系列三】RocketMQ集群搭建(2m-2s-sync)

一、集群特点

  1. NameServer是一个几乎无状态的节点,可集群部署,节点之间不会互相通信。
  2. Broker部署相对比较复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master。Master与Slave可以通过指定相同的BrokerName,不同的BrokerId来区分,BrokerId为0表示Master,BrokerId大于0表示Slave。
  3. Producer与NameServer集群中的其中一个节点(随机选择)建立长链接,定期从NameServer取Topic路由信息,并向Topic服务的Master建立长链接(也就是说根据Topic的路由信息找到向哪个Broker发送消息),且定时向Master发送心跳,Producer完全无状态,可集群部署。
  4. Consumer与NameServer集群中的其中一个节点(随机选择)建立长链接,定期从NameServer取Topic路由信息,并向Topic服务的Master,Slave建立长链接,且定时向Master,Slave发送心跳,Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅关系由Broker配置决定。

二、服务器环境

1.1.部署架构

使用的模式是 2m-2s-sync,双主双从,同步复制

1.2.服务器信息

IP 角色 架构模式
192.31.186.198 nameserver,broker master1,slave2
192.31.184.89 nameserver,broker master2,slave1

1.3.配置Host (两台服务器都需要)

  1. nameserver机器信息
#nameserver
172.31.186.180 rocketmq-nameserver1
172.31.184.89  rocketmq-nameserver2
#broker 机器信息
172.31.186.180  rocketmq-master1
172.31.186.180  rocketmq-slave2
172.31.184.89  rocketmq-master2
172.31.184.89  rocketmq-slave1
  1. 配置完成之后,重启网卡。
systemctl restart network

1.4. 安装jdk(两台服务器都需要)

安装jdk-8u192-linux-x64.rpm,此处省略。

1.5. 关闭防火墙(两台服务器都需要)

首先查看防火墙的状态,如果状态是 active 则表示防火墙开启

$ systemctl status firewalld

关闭防火墙

$ systemctl stop firewalld

1.6. 创建数据存储目录(两台服务器都需要)

  1. 创建master节点的存储路径
mkdir -p /data/server/feige/rocketmq/store
mkdir -p /data/server/feige/rocketmq/store/commitlog
mkdir -p /data/server/feige/rocketmq/store/consumequeue
mkdir -p /data/server/feige/rocketmq/store/index
  1. 创建slave节点的存储路径
mkdir -p /data/server/feige/rocketmq/store-s
mkdir -p /data/server/feige/rocketmq/store-s/commitlog
mkdir -p /data/server/feige/rocketmq/store-s/consumequeue
mkdir -p /data/server/feige/rocketmq/store-s/index

三、下载与安装配置

2.1.下载rocketmq并解压(两天服务器都要)

cd /data/server/feige
wget https://archive.apache.org/dist/rocketmq/5.1.3/rocketmq-all-5.1.3-bin-release.zip
unzip rocketmq-all-5.1.3-bin-release.zip -d /data/server/feige/rocketmq
cd /data/server/feige/rocketmq/rocketmq-all-5.1.3-bin-release

2.2. 环境变量配置(两台服务器都需要)

vim /etc/profile

在profile文件的末尾加入如下命令:

ROCKETMQ_HOME=/data/server/feige/rocketmq/rocketmq-all-5.1.3-bin-release
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH

执行 source /etc/profile 使文件生效

2.3. 配置 rocketmq broker-a (在Master1上操作)

vim conf/2m-2s-sync/broker-a.properties

在此配置文件中添加,master默认的端口是10911。

#所属集群名称
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样;主从节点一样,也就是这个值区分broker组的;
brokerName=broker-a
#主从标识:0 表示Master, >0 表示Slave;同一个组中区分主从的标识,只能有一个主;
brokerId=0
#当前broker监听的IP
brokerIP1=172.31.186.180
#用于HA主从同步
brokerIP2=172.31.186.180
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
haListenPort=10912
#删除长期无用文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间(h),默认48小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#检测物理文件磁盘空间使用率
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/data/server/feige/rocketmq/store
#commitLog存储路径
storePathCommitLog=/data/server/feige/rocketmq/store/commitlog
#消费队列存储路径
storePathConsumeQueue=/data/server/feige/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/data/server/feige/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckPoint=/data/server/feige/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/data/server/feige/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#broker角色
#- ASYNC_MASTER    异步复制Master
#- SYNC_MASTER     同步双写Master
#- SLAVE    从节点
brokerRole=SYNC_MASTER
#刷盘策略
#- ASYNC_FLUSH    异步刷盘
#- SYNC_FLUSH     同步刷盘
flushDiskType=ASYNC_FLUSH

2.4. 配置 rocketmq broker-b-s (在Slave2上配置)

这里为了避免与master的端口冲突,将slave的默认端口改成11011,另外还需要将从节点broker的存储路径改下,不能与master节点共用。

#所属集群名称
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#主从标识:0 表示Master, >0 表示Slave
brokerId=1
#当前broker监听的IP
brokerIP1=172.31.186.180
#用于HA主从同步
brokerIP2=172.31.186.180
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
haListenPort=11012
#删除长期无用文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间(h),默认48小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/data/server/feige/rocketmq/store-s
#commitLog存储路径
storePathCommitLog=/data/server/feige/rocketmq/store-s/commitlog
#消费队列存储路径
storePathConsumeQueue=/data/server/feige/rocketmq/store-s/consumequeue
#消息索引存储路径
storePathIndex=/data/server/feige/rocketmq/store-s/index
#checkpoint 文件存储路径
storeCheckPoint=/data/server/feige/rocketmq/store-s/checkpoint
#abort 文件存储路径
abortFile=/data/server/feige/rocketmq/store-s/abort
#限制的消息大小
maxMessageSize=65536
#broker角色
#- ASYNC_MASTER    异步复制Master
#- SYNC_MASTER     同步双写Master
#- SLAVE    从节点
brokerRole=SLAVE
#刷盘策略
#- ASYNC_FLUSH    异步刷盘
#- SYNC_FLUSH     同步刷盘
flushDiskType=ASYNC_FLUSH

2.5. 配置 rocketmq broker-b.properties(在Master2上操作)

执行

vim conf/2m-2s-sync/broker-b.properties

在此配置文件中添加:master默认的端口是10911。

#所属集群名称
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#主从标识:0 表示Master, >0 表示Slave
brokerId=0
#当前broker监听的IP
brokerIP1=172.31.184.89
#用于HA主从同步
brokerIP2=172.31.184.89
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
haListenPort=10912
#删除长期无用文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间(h),默认48小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/data/server/feige/rocketmq/store
#commitLog存储路径
storePathCommitLog=/data/server/feige/rocketmq/store/commitlog
#消费队列存储路径
storePathConsumeQueue=/data/server/feige/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/data/server/feige/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckPoint=/data/server/feige/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/data/server/feige/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#broker角色
#- ASYNC_MASTER    异步复制Master
#- SYNC_MASTER     同步双写Master
#- SLAVE    从节点
brokerRole=SYNC_MASTER
#刷盘策略
#- ASYNC_FLUSH    异步刷盘
#- SYNC_FLUSH     同步刷盘
flushDiskType=ASYNC_FLUSH

2.6. 配置rocketmq broker-a-s.properties(在Slave1上操作)

vim conf/2m-2s-sync/broker-a-s.properties

在此配置文件中添加:将slave的默认端口改成11011。这里从节点的存储路径要与主节点区分,主和从都必须有

#所属集群名称
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#主从标识:0 表示Master, >0 表示Slave
brokerId=1
#当前broker监听的IP
brokerIP1=172.31.184.89
#用于HA主从同步
brokerIP2=172.31.184.89
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
haListenPort=11012
#删除长期无用文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间(h),默认48小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/data/server/feige/rocketmq/store-s
#commitLog存储路径
storePathCommitLog=/data/server/feige/rocketmq/store-s/commitlog
#消费队列存储路径
storePathConsumeQueue=/data/server/feige/rocketmq/store-s/consumequeue
#消息索引存储路径
storePathIndex=/data/server/feige/rocketmq/store-s/index
#checkpoint 文件存储路径
storeCheckPoint=/data/server/feige/rocketmq/store-s/checkpoint
#abort 文件存储路径
abortFile=/data/server/feige/rocketmq/store-s/abort
#限制的消息大小
maxMessageSize=65536
#broker角色
#- ASYNC_MASTER    异步复制Master
#- SYNC_MASTER     同步双写Master
#- SLAVE    从节点
brokerRole=SLAVE
#刷盘策略
#- ASYNC_FLUSH    异步刷盘
#- SYNC_FLUSH     同步刷盘
flushDiskType=ASYNC_FLUSH

2.7. 测试环境JVM配置(两台机器都要执行)

RocketMQ默认占用的内存是8G,比较大,这里需要修改下占用的堆内存,不然在一个机器是上同时启动两个 broker可能会启动失败。

vim bin/runbroker.sh
# 根据机器实际内存调整,这里演示的机器内存比较小,配置的值比较小
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn128m"
 vim bin/runserver.sh
# 根据机器实际内存调整,这里演示的机器内存比较小,配置的值比较小
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
 vim bin/tools.sh
# 根据机器实际内存调整,这里演示的机器内存比较小,配置的值比较小
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m"

2.8. 生产环境JVM配置

vim bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
 vim bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
 vim bin/tools.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m"

四、启动集群

3.1.启动nameserver((两台机器上都执行)

nohup sh bin/mqnamesrv > /data/server/feige/rocketmq/logs/mqnamesrv.log 2>&1 &

3.2.启动broker

在192.31.186.180上执行如下命令

#1.启动Master1,在Master1上操作
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a.properties > /data/server/feige/rocketmq/logs/broker-a.log 2>&1 &
#2.启动Slave2,在Slave 2上操作
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b-s.properties > /data/server/feige/rocketmq/logs/broker-b-s.log 2>&1 &
#6、查看 NameServer(9876)和 Broker(10909、10911、10912)启动情况,在2台主机上操作
netstat -nltup|egrep "9876|10911|10912|11011|11012"

在192.31.184.89上执行如下命令:

#1.启动Master2,在Master2上操作
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b.properties > /data/server/feige/rocketmq/logs/broker-b.log 2>&1 &
#2.启动Slave1,在Slave1上操作
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a-s.properties > /data/server/feige/rocketmq/logs/broker-a-s.log 2>&1 &
#6、查看 NameServer(9876)和 Broker(10909、10911、10912)启动情况,在2台主机上操作
netstat -nltup|egrep "9876|10911|10912|11011|11012"
或者
sh bin/mqadmin clusterList -n "172.31.184.89:9876;172.31.186.180:9876"
或者
jps

3.3.测试集群生产消费情况

#在2台主机上操作
vim /etc/profile
#在文件/etc/profile中追加
export NAMESRV_ADDR="rocketmq-nameserver1:9876;rocketmq-nameserver2:9876"
#在控制台在其中1台机器上操作,启动一个生产者,在1个消费者机器就可以看到消费信息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
#在控制台其中2台机器上操作,启动2个消费者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

这里的NAMESRV_ADDR变量名不能做任何修改,不然就可能报连不上Nameserver的错误。

正常消费成功的话,则消费者出现如下日志:

3.5. 关闭命令

#关闭broker命令:
sh bin/mqshutdown broker
#关闭namesrv命令:
sh bin/mqshutdown namesrv

五、问题总结

java.lang.RuntimeException: Lock failed,MQ already started
  at org.apache.rocketmq.store.DefaultMessageStore.start(DefaultMessageStore.java:214)
  at org.apache.rocketmq.broker.BrokerController.start(BrokerController.java:827)
  at org.apache.rocketmq.broker.BrokerStartup.start(BrokerStartup.java:64)
  at org.apache.rocketmq.broker.BrokerStartup.main(BrokerStartup.java:58)
  1. 检查master节点和slave节点的端口是否冲突
  2. 检查master节点和slave节点的broker的存储路径是否冲突

六、安装管理面板

官网提供了两种部署方式,docker部署和源码部署,由于前面我们都是使用的直接部署,所以这里也采用官网提供的源码部署的方式。 (PS:docker 部署也尝试了,一直不行)。

  1. 下载源代码,源码地址:https://github.com/apache/rocketmq-dashboard/tags
  2. 在本地解压源码,然后,对源码进行编译打包,打包命令是:
$ mvn clean package -Dmaven.test.skip=true
  1. 然后,将生成的 rocketmq-dashboard-1.0.0.jar包上传到服务器,jar的相对路径是 target\rocketmq-dashboard-1.0.0.jar
  2. 运行 rocketmq-dashboard-1.0.0.jar
nohup java -jar rocketmq-dashboard-1.0.0.jar --server.port=18080 --rocketmq.config.namesrvAddr='172.31.184.89:9876;172.31.186.180:9876'>/data/server/feige/rocketmq/logs/mq-console.log 2>&1 &

这里需要指定两个参数:

  1. --server.port=18080 设置端口是18080,因为默认的端口是8080,很可能会冲突
  2. --rocketmq.config.namesrvAddr='192.31.184.89:9876;192.31.186.180:9876' 用于指定nameserver集群的地址,不指定的话会有问题。

启动成功之后访问控制台:http://192.31.186.180:18080/

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 负载均衡 监控
【面试问题】RabbitMQ 的集群
【1月更文挑战第27天】【面试问题】RabbitMQ 的集群
|
9月前
|
消息中间件 存储 Kubernetes
k8s1.20版本部署RabbitMQ集群(持久化)——2023.05
k8s1.20版本部署RabbitMQ集群(持久化)——2023.05
437 1
|
2天前
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
10 2
|
2天前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
8 1
|
8天前
|
消息中间件 存储 缓存
RocketMQ4.2 最佳实践之集群搭建
RocketMQ4.2 最佳实践之集群搭建
|
23天前
|
消息中间件 存储 Java
RocketMQ下载安装、集群搭建保姆级教程
RocketMQ下载安装、集群搭建保姆级教程
37 0
|
25天前
|
消息中间件 网络安全 网络虚拟化
消息队列 MQ操作报错合集之如何实现公网访问内网RocketMQ集群
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
|
2月前
|
消息中间件 Java API
MQ产品使用合集之RocketMQ dledger集群模式的dledgerpeers端口是集群之间通讯吗
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件
rabbitMQ集群
rabbitMQ集群
23 0
|
9月前
|
消息中间件 存储 监控
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
161 0