一、引言
适用读者:一切使用RocketMQ的人员。
文章目的:主要介绍RocketMQ的集中部署方式和方法、使开发人员快速搭建RocketQM服务器。
二、简介
RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
三、Linux环境搭建&安装包下载
- Java环境:RocketMQ依赖于宿主机Java环境,所以首先确保Java环境安装到位,如有不懂Java安装,问度娘。
- Rocket下载:https://github.com/alibaba/RocketMQ/releases/download/v3.2.6/alibaba-rocketmq-3.2.6.tar.gz 具体版本根据需求下载,此处以3.2.6作为案例版本。
- 复制、解压安装包:将安装包上传至服务器目标目录,例:opt/tpapp/。解压安装包即可: tar -zxvf alibaba-rocketmq-3.2.6.tar.gz opt/tpapp/rocketmq 。
四、RocketMQ部署服务的简述
我们需要部署NameSever和Broker服务器,这两个服务有什么作用和用途请看下图:
NameServer:
RocketMQ-nameSrv用于管理所有broker的信息,以便于Producer和Consumer能够获取到正确的Broker信息,进行业务处理;
可以看到NameSrv的主要管理内容如下:
1. 接收Broker的注册,注销请求。
2. Producer获取topic下的所有BrokerQueue,put消息。
3. Consumer获取topic下所有的BrokerQueue,get消息。
Broker:
负责存储消息,提供读写功能。
RocketMQ依赖于NameServer服务器和Broker服务器,所以我们需要分别部署NameServer和Broker(实际上只是用同一个软件文件,以不同的命令和配置区分别启动NameServer服务和Broker服务的)。
五、RocketMQ部署集中集群方式和配置
NameServer一般作多台集群部署(2台:namesrv1、namesrv2):
启动:namesrv1 : nohup opt/tpapp/rocketmq/bin/mqnamesrv &
namesrv2 : nohup opt/tpapp/rocketmq/bin/mqnamesrv &
停止:sh mqshutdown namesrv
Broker部署集群方式有以下几种(Master和Slave都是指Broker服务器):
1.单Master(一台服务器)
优点:除了配置简单没什么优点
缺点:不可靠,该机器重启或宕机,将导致整个服务不可用
启动:
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-noslave/broker-a.properties)
2.多Master(两台或多台服务器)
优点:配置简单,性能最高
缺点:可能会有少量消息丢失(配置相关),单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性
启动:
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-noslave/broker-a.properties)
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-noslave/broker-b.properties)
3.多Master多Slave,每个Master配一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒级()
优点:性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预
缺点:Master宕机或磁盘损坏时会有少量消息丢失
启动:
Master:
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-async/broker-a.properties)
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-async/broker-a-s.properties)
Slave:
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-async/broker-b.properties)
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-async/broker-b-s.properties)
4.多Master多Slave,每个Master配一个Slave,有多对Master-Slave,HA采用同步双写方式,主备都写成功,向应用返回成功
优点:服务可用性与数据可用性非常高
缺点:性能比一部HA略低,当前版本主宕备不能自动切换为主
Master和Slave的配置文件参考conf目录下的配置文件
Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数
一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分
Master:
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-sync/broker-a.properties)
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-sync/broker-a-s.properties)
Slave:
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-sync/broker-b.properties)
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-sync/broker-b-s.properties)
5. 停止broker
sh mqshutdown broker
六、配置(注意对应版本)
1.Broker配置
参数名 |
默认值 |
说明 |
consumerGroup |
Conusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
|
listenPort |
10911 |
Broker对外服务的监听端口 |
namesrvAddr |
Null |
Name Server地址 |
brokerIP1 |
本机IP |
本机IP地址,默认系统自动识别,但是某些多网卡机器会存在识别错误的情况,这种情况下可以人工配置。 |
brokerName |
本机主机名 |
|
brokerClusterName |
DefaultCluster |
Broker所属哪个集群 |
brokerId |
0 |
BrokerId,必须是大等于0的整数,0表示Master,>0表示Slave,一个Master可以挂多个Slave,Master和Slave通过BrokerName来配对 |
storePathCommitLog |
$HOME/store/commitlog |
commitLog存储路径 |
storePathConsumeQueue |
$HOME/store/consumequeue |
消费队列存储路径 |
storePathIndex |
$HOME/store/index |
消息索引存储队列 |
deleteWhen |
4 |
删除时间时间点,默认凌晨4点 |
fileReservedTime |
48 |
文件保留时间,默认48小时 |
maxTransferBytesOnMessageInMemory |
262144 |
单次pull消息(内存)传输的最大字节数 |
maxTransferCountOnMessageInMemory |
32 |
单次pull消息(内存)传输的最大条数 |
maxTransferBytesOnMessageInMemory |
65535 |
单次pull消息(磁盘)传输的最大字节数 |
maxTransferCountOnMessageInDisk |
8 |
单次pull消息(磁盘)传输的最大条数 |
messageIndexEnable |
TRUE |
是否开启消息索引功能 |
messageIndexSafe |
FALSE |
是否提供安全的消息索引机制,索引保证不丢 |
brokerRole |
ASYNC_MASTER |
Broker的角色 -ASYNC_MASTER异步复制Master -SYNC_MASTER同步双写Master -SLAVE |
flushDiskType |
ASYNC_FLUSH |
刷盘方式 -ASYNC_FLUSH异步刷盘 -SYNC_FLUSH同步刷盘 |
cleanFileForciblyEnable |
TRUE |
磁盘满,且无过期文件情况下TRUE表示强制删除文件,优先保证服务可用 FALSE标记服务不可用,文件不删除 |
显示详细信息
2.客户端公共配置类:clientconfig
参数名 | 默认值 |
说明 |
NamesrvAddr |
NameServer地址列表,多个nameServer地址用分号隔开 |
|
clientIP |
本机IP |
客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定 |
instanceName |
DEFAULT |
客户端实例名称,客户端创建的多个Producer,Consumer实际是共用一个内部实例(这个实例包含网络连接,线程资源等) |
clientCallbackExecutorThreads |
4 |
通信层异步回调线程数 |
pollNameServerInteval |
30000 |
轮训Name Server 间隔时间,单位毫秒 |
heartbeatBrokerInterval |
30000 |
向Broker发送心跳间隔时间,单位毫秒 |
persistConsumerOffsetInterval |
5000 |
持久化Consumer消费进度间隔时间,单位毫秒 |
显示详细信息
3.producer配置
参数名 |
默认值 |
说明 |
producerGroup |
DEFAULT_PRODUCER |
Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。 |
createTopicKey |
TBW102 |
在发送消息时,自动创建服务器不存在的topic,需要指定key |
defaultTopicQueueNums |
4 |
在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 |
sendMsgTimeout |
10000 |
发送消息超时时间,单位毫秒 |
compressMsgBodyOverHowmuch |
4096 |
消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 |
retryAnotherBrokerWhenNotStoreOK |
FALSE |
如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 |
maxMessageSize |
131072 |
客户端限制的消息大小,超过报错,同时服务端也会限制(默认128K) |
transactionCheckListener |
事物消息回查监听器,如果发送事务消息,必须设置 |
|
checkThreadPoolMinSize |
1 |
Broker回查Producer事务状态时,线程池大小 |
checkThreadPoolMaxSize |
1 |
Broker回查Producer事务状态时,线程池大小 |
checkRequestHoldMax |
2000 |
Broker回查Producer事务状态时,Producer本地缓冲请求队列大小 |
显示详细信息
4.pushConsumer配置
参数名 |
默认值 |
说明 |
consumerGroup |
DEFAULT_CONSUMER |
Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应将它们归为同一组 |
messageModel |
CLUSTERING |
消息模型,支持以下两种1.集群消费2.广播消费 |
consumeFromWhere |
CONSUME_FROM_LAST_OFFSET |
Consumer启动后,默认从什么位置开始消费 |
allocateMessageQueueStrategy |
AllocateMessageQueueAveragely |
Rebalance算法实现策略 |
Subscription |
{} |
订阅关系 |
messageListener |
消息监听器 |
|
offsetStore |
|
消费进度存储 |
consumeThreadMin |
10 |
消费线程池数量 |
consumeThreadMax |
20 |
消费线程池数量 |
consumeConcurrentlyMaxSpan |
2000 |
单队列并行消费允许的最大跨度 |
pullThresholdForQueue |
1000 |
拉消息本地队列缓存消息最大数 |
Pullinterval |
0 |
拉消息间隔,由于是长轮询,所以为0,但是如果应用了流控,也可以设置大于0的值,单位毫秒 |
consumeMessageBatchMaxSize |
1 |
批量消费,一次消费多少条消息 |
pullBatchSize |
32 |
批量拉消息,一次最多拉多少条 |
显示详细信息
5.pullConsumer
参数名 |
默认值 |
说明 |
consumerGroup |
Conusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
|
brokerSuspendMaxTimeMillis |
20000 |
长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒 |
consumerPullTimeoutMillis |
10000 |
非长轮询,拉消息超时时间,单位毫秒 |
consumerTimeoutMillisWhenSuspend |
30000 |
长轮询,Consumer拉消息请求咋broker挂起超过指定时间,客户端认为超时,单位毫秒 |
messageModel |
BROADCASTING |
消息模型,支持以下两种:1集群消费 2广播模式 |
messageQueueListener |
监听队列变化 |
|
offsetStore |
消费进度存储 |
|
registerTopics |
注册的topic集合 |
|
allocateMessageQueueStrategy |
Rebalance算法实现策略 |
显示详细信息