【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起实践RocketMQ的服务搭建及配置操作

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起实践RocketMQ的服务搭建及配置操作

RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点


  • 1、能够保证严格的消息顺序
  • 2、提供丰富的消息拉取模式
  • 3、高效的订阅者水平扩展能力
  • 4、实时的消息订阅机制
  • 5、亿级消息堆积能力




搭建一个双节点的RocketM


环境背景:


  • 虚拟机:vmware12
  • 操作系统:centos6.5
  • 内存:1G RAM
  • 硬盘:20G ROM


在WMWare虚拟机下实现,实现两台IP和资源的服务主机,IP分别是 192.168.1.12,192.168.1.13,分别在这两台机器的hosts文件中添加。

vim /etc/hosts
#rocketmq
192.168.1.12 rocketmq-nameserver1
192.168.1.12 rocketmq-master1
192.168.1.13 rocketmq-nameserver2
192.168.1.13 rocketmq-master2
复制代码



下载安装RocketMQ


分别将alibaba-rocketmq-x.x.x.tar.gz使用rz命令上传到两台机器,也可以直接官网在线下载。

将RocketMq解压到/usr/local目录下:

root@localhost local]#tar -zxvf alibaba-rocketmq-x.x.x.tar.gz -C /usr/local/
复制代码


建立alibaba-rocketmq到rocketmq软连接,如下:

[root@localhost local]#ln -s alibaba-rocketmq rocketmq
复制代码

image.png



创建rocketmq存储的相关路径

[root@localhost local]# mkdir /usr/local/rocketmq/store
[root@localhost local]# mkdir /usr/local/rocketmq/store/commitlog
[root@localhost local]# mkdir /usr/local/rocketmq/store/consumequeue
[root@localhost local]# mkdir /usr/local/rocketmq/store/index
复制代码

分别配置两台机器的broker-a.properties和broker-b.properties文件

image.png



调整相关RocketMQ的broker的配置

[root@localhost local]# vim /usr/local/rocketmq/conf/2m-noslave/broker-a.properties 
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir= /usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog= /usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
复制代码



启动nameserver

[root@singlenode rocketmq]# cd /usr/local/rocketmq/bin/
[root@singlenode bin]# nohup sh mqnamesrv &
复制代码



启动broker

cd /usr/local/rocketmq/bin
[root@singlenode bin]# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
[root@singlenode bin]# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
复制代码



部署RocketMq管理界面


首先下载rocketmq-console.war和tomcat(地址github.com/apache/rock…

修改config.properties文件

rocketmq.namesrv.addr=192.168.1.12:9876;192.168.1.13:9876
throwDone=true
复制代码

启动Tomcat,访问http://192.168.1.128:8080/rocketmq-console

image.png


设定环境变量:


export NAMESRV_ADDR=192.168.169.128:9876\;192.168.169.129:9876

运行测试:


bash tools.sh com.alibaba.rocketmq.example.quickstart.Producer
bash tools.sh com.alibaba.rocketmq.example.quickstart.Consumer
复制代码


关闭防火墙

[root@singlenode bin]#service iptables status
[root@singlenode bin]#service iptables stop
复制代码


如果是centos7以上使用

systemctl stop firewalld.service
复制代码



RocketMq的角色


  • producer
  • consumer
  • Broker
  • NameServer


创建topic


  • b broker地址
  • c Cluster名称
  • n nameserver地址列表
  • t topic名称
updateTopic -b 192.168.0.1:10911 -c RocketMq-Cluster -n 192.168.0.1:9876;192.168.0.2:9876 -t order-topic
复制代码


删除topic
deleteTopic -c RocketMq-Cluster -n 192.168.0.1:9876;192.168.0.2:9876 -t order-topic
复制代码


创建/修改订阅组


订阅组名称

updateSubGroup -b 192.168.0.1:10911 -c RocketMq-Cluster -g subGroupName -n 192.168.0.1:9876;192.168.0.2:9876
复制代码



删除订阅组
deleteSubGroup -b 192.168.0.1:10911 -c RocketMq-Cluster -g subGroupName -n 192.168.0.1:9876;192.168.0.2:9876
复制代码



更新broker配置

某些配置文件broker运行的时候可以动态修改,-k broker配置文件的key -v value

updateBrokerConfig -b 192.168.0.1:10911 -c RocketMq-Cluster -n  192.168.0.1:9876;192.168.0.2:9876 -k deleteWhen -v 05
复制代码



更新topic的读写权限

updateTopicPerm -b 192.168.0.1:10911 -c RocketMq-Cluster -n  192.168.0.1:9876;192.168.0.2:9876 -t order-topic
复制代码



查询topic路由信息

TopicRoute -b 192.168.0.1:10911 -c RocketMq-Cluster -n  192.168.0.1:9876;192.168.0.2:9876 -t order-topic
复制代码



查看topic路由信息

TopicList -n  192.168.0.1:9876;192.168.0.2:9876
复制代码



查看topic状态统计信息

TopicStats -t order_topic -n  192.168.0.1:9876;192.168.0.2:9876
复制代码



根据时间查询消息

printMsg -t order_topic -n 192.168.0.1:9876;192.168.0.2:9876
复制代码



根据Id查询消息

queryMsgById -i msgId -n -n 192.168.0.1:9876;192.168.0.2:9876
复制代码



查看集群信息

clusterList -n 192.168.0.1:9876;192.168.0.2:9876
复制代码



额外说明


如果没有安装中文语言包,出现乱码了,可以通过

[root@localhost local]# yum groupinstall chinese-support
复制代码


可以修改两台机器的日志配置文件并且把conf目录下所有xml文件中的${user.home}替换为/usr/local/rocketmq

[root@localhost rocketmq]# mkdir -p /usr/local/rocketmq/logs
[root@localhost rocketmq]# cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml 
复制代码


分别修改两台机器的rocketmq启动脚本

[root@localhost rocketmq]# vim /usr/local/rocketmq/bin/runbroker.sh 
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn800g -XX:PermSize=128m -XX:MaxPermSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${HOME}/rmq_bk_gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"




相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 Java Maven
微服务技术系列教程(34) - SpringCloud-使用RabbitMQ实现消息驱动
微服务技术系列教程(34) - SpringCloud-使用RabbitMQ实现消息驱动
169 0
|
7月前
|
中间件
83 # 静态服务中间件 koa-static 的使用以及实现
83 # 静态服务中间件 koa-static 的使用以及实现
25 0
|
4月前
|
消息中间件 存储 Java
Alibaba开发十年,写出这本“MQ技术手册”,看完我愣住了
消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。虽然说,目前状况是Kafka更为火热,但更为广泛的应该还属老牌的RabbtiMQ和Alibaba自主研发的RocketMQ。
|
2月前
|
消息中间件 存储 Cloud Native
深度剖析 RocketMQ 5.0,IoT 消息:物联网需要什么样的消息技术?
本文来学习一个典型的物联网技术架构,以及在这个技术架构里面,消息队列所发挥的作用。在物联网的场景里面,对消息技术的要求和面向服务端应用的消息技术有什么区别?学习 RocketMQ 5.0 的子产品 MQTT,是如何解决这些物联网技术难题的。
90854 4
|
2月前
|
消息中间件 存储 NoSQL
【Redis项目实战】使用Springcloud整合Redis分布式锁+RabbitMQ技术实现高并发预约管理处理系统
【Redis项目实战】使用Springcloud整合Redis分布式锁+RabbitMQ技术实现高并发预约管理处理系统
|
2月前
|
存储 消息中间件 负载均衡
RocketMQ 5.0 分级存储背后的技术优化与挑战
RocketMQ 5.0 分级存储背后的技术优化与挑战
|
2月前
|
存储 消息中间件 Apache
谈谈 RocketMQ 5.0 分级存储背后一些有挑战的技术优化
谈谈 RocketMQ 5.0 分级存储背后一些有挑战的技术优化
|
3月前
|
消息中间件 存储 物联网
|
3月前
|
存储 消息中间件 对象存储
谈谈 RocketMQ 5.0 分级存储背后一些有挑战的技术优化
谈谈 RocketMQ 5.0 分级存储背后一些有挑战的技术优化
132225 231
|
8月前
|
消息中间件 程序员 Apache
阿里RocketMQ创始人首次分享出这份RocketMQ技术内幕神级架构手册
RocketMQ的发展史? RocketMQ的开源正是源于对这种开源文化的认同,开放是为了更好的协同创新,并将这一技术推向新的高度。在经历了阿里巴巴集团内部多年“双11”交易核心链路工业级场景在验证,2016年11月,团队将RocketMQ捐献给全球享有盛誉的Apache软件基金会正式质为孵化项目。 至此,RocketMQ开启了迈向全球顶级开源软件的新征程。

相关产品

  • 云消息队列 MQ