RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点
- 1、能够保证严格的消息顺序
- 2、提供丰富的消息拉取模式
- 3、高效的订阅者水平扩展能力
- 4、实时的消息订阅机制
- 5、亿级消息堆积能力
搭建一个双节点的RocketM
环境背景:
- 虚拟机:vmware12
- 操作系统:centos6.5
- 内存:1G RAM
- 硬盘:20G ROM
在WMWare虚拟机下实现,实现两台IP和资源的服务主机,IP分别是 192.168.1.12,192.168.1.13,分别在这两台机器的hosts文件中添加。
vim /etc/hosts #rocketmq 192.168.1.12 rocketmq-nameserver1 192.168.1.12 rocketmq-master1 192.168.1.13 rocketmq-nameserver2 192.168.1.13 rocketmq-master2 复制代码
下载安装RocketMQ
分别将alibaba-rocketmq-x.x.x.tar.gz使用rz命令上传到两台机器,也可以直接官网在线下载。
将RocketMq解压到/usr/local目录下:
root@localhost local]#tar -zxvf alibaba-rocketmq-x.x.x.tar.gz -C /usr/local/ 复制代码
建立alibaba-rocketmq到rocketmq软连接,如下:
[root@localhost local]#ln -s alibaba-rocketmq rocketmq 复制代码
创建rocketmq存储的相关路径
[root@localhost local]# mkdir /usr/local/rocketmq/store [root@localhost local]# mkdir /usr/local/rocketmq/store/commitlog [root@localhost local]# mkdir /usr/local/rocketmq/store/consumequeue [root@localhost local]# mkdir /usr/local/rocketmq/store/index 复制代码
分别配置两台机器的broker-a.properties和broker-b.properties文件
调整相关RocketMQ的broker的配置
[root@localhost local]# vim /usr/local/rocketmq/conf/2m-noslave/broker-a.properties #所属集群名字 brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true # Broker 对外服务的监听端口 listenPort=10911 # 删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir= /usr/local/rocketmq/store #commitLog 存储路径 storePathCommitLog= /usr/local/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/usr/local/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/usr/local/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=ASYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128 复制代码
启动nameserver
[root@singlenode rocketmq]# cd /usr/local/rocketmq/bin/ [root@singlenode bin]# nohup sh mqnamesrv & 复制代码
启动broker
cd /usr/local/rocketmq/bin [root@singlenode bin]# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 & [root@singlenode bin]# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log 复制代码
部署RocketMq管理界面
首先下载rocketmq-console.war和tomcat(地址github.com/apache/rock…
修改config.properties文件
rocketmq.namesrv.addr=192.168.1.12:9876;192.168.1.13:9876 throwDone=true 复制代码
启动Tomcat,访问http://192.168.1.128:8080/rocketmq-console
设定环境变量:
export NAMESRV_ADDR=192.168.169.128:9876\;192.168.169.129:9876
运行测试:
bash tools.sh com.alibaba.rocketmq.example.quickstart.Producer bash tools.sh com.alibaba.rocketmq.example.quickstart.Consumer 复制代码
关闭防火墙
[root@singlenode bin]#service iptables status [root@singlenode bin]#service iptables stop 复制代码
如果是centos7以上使用
systemctl stop firewalld.service 复制代码
RocketMq的角色
- producer
- consumer
- Broker
- NameServer
创建topic
- b broker地址
- c Cluster名称
- n nameserver地址列表
- t topic名称
updateTopic -b 192.168.0.1:10911 -c RocketMq-Cluster -n 192.168.0.1:9876;192.168.0.2:9876 -t order-topic 复制代码
删除topic
deleteTopic -c RocketMq-Cluster -n 192.168.0.1:9876;192.168.0.2:9876 -t order-topic 复制代码
创建/修改订阅组
订阅组名称
updateSubGroup -b 192.168.0.1:10911 -c RocketMq-Cluster -g subGroupName -n 192.168.0.1:9876;192.168.0.2:9876 复制代码
删除订阅组
deleteSubGroup -b 192.168.0.1:10911 -c RocketMq-Cluster -g subGroupName -n 192.168.0.1:9876;192.168.0.2:9876 复制代码
更新broker配置
某些配置文件broker运行的时候可以动态修改,-k broker配置文件的key -v value
updateBrokerConfig -b 192.168.0.1:10911 -c RocketMq-Cluster -n 192.168.0.1:9876;192.168.0.2:9876 -k deleteWhen -v 05 复制代码
更新topic的读写权限
updateTopicPerm -b 192.168.0.1:10911 -c RocketMq-Cluster -n 192.168.0.1:9876;192.168.0.2:9876 -t order-topic 复制代码
查询topic路由信息
TopicRoute -b 192.168.0.1:10911 -c RocketMq-Cluster -n 192.168.0.1:9876;192.168.0.2:9876 -t order-topic 复制代码
查看topic路由信息
TopicList -n 192.168.0.1:9876;192.168.0.2:9876 复制代码
查看topic状态统计信息
TopicStats -t order_topic -n 192.168.0.1:9876;192.168.0.2:9876 复制代码
根据时间查询消息
printMsg -t order_topic -n 192.168.0.1:9876;192.168.0.2:9876 复制代码
根据Id查询消息
queryMsgById -i msgId -n -n 192.168.0.1:9876;192.168.0.2:9876 复制代码
查看集群信息
clusterList -n 192.168.0.1:9876;192.168.0.2:9876 复制代码
额外说明
如果没有安装中文语言包,出现乱码了,可以通过
[root@localhost local]# yum groupinstall chinese-support 复制代码
可以修改两台机器的日志配置文件并且把conf目录下所有xml文件中的${user.home}替换为/usr/local/rocketmq
[root@localhost rocketmq]# mkdir -p /usr/local/rocketmq/logs [root@localhost rocketmq]# cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml 复制代码
分别修改两台机器的rocketmq启动脚本
[root@localhost rocketmq]# vim /usr/local/rocketmq/bin/runbroker.sh JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn800g -XX:PermSize=128m -XX:MaxPermSize=320m" JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC" JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${HOME}/rmq_bk_gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps" JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib" #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"