【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起实践RocketMQ的服务搭建及配置操作

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
性能测试 PTS,5000VUM额度
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起实践RocketMQ的服务搭建及配置操作

RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点


  • 1、能够保证严格的消息顺序
  • 2、提供丰富的消息拉取模式
  • 3、高效的订阅者水平扩展能力
  • 4、实时的消息订阅机制
  • 5、亿级消息堆积能力




搭建一个双节点的RocketM


环境背景:


  • 虚拟机:vmware12
  • 操作系统:centos6.5
  • 内存:1G RAM
  • 硬盘:20G ROM


在WMWare虚拟机下实现,实现两台IP和资源的服务主机,IP分别是 192.168.1.12,192.168.1.13,分别在这两台机器的hosts文件中添加。

vim /etc/hosts
#rocketmq
192.168.1.12 rocketmq-nameserver1
192.168.1.12 rocketmq-master1
192.168.1.13 rocketmq-nameserver2
192.168.1.13 rocketmq-master2
复制代码



下载安装RocketMQ


分别将alibaba-rocketmq-x.x.x.tar.gz使用rz命令上传到两台机器,也可以直接官网在线下载。

将RocketMq解压到/usr/local目录下:

root@localhost local]#tar -zxvf alibaba-rocketmq-x.x.x.tar.gz -C /usr/local/
复制代码


建立alibaba-rocketmq到rocketmq软连接,如下:

[root@localhost local]#ln -s alibaba-rocketmq rocketmq
复制代码

image.png



创建rocketmq存储的相关路径

[root@localhost local]# mkdir /usr/local/rocketmq/store
[root@localhost local]# mkdir /usr/local/rocketmq/store/commitlog
[root@localhost local]# mkdir /usr/local/rocketmq/store/consumequeue
[root@localhost local]# mkdir /usr/local/rocketmq/store/index
复制代码

分别配置两台机器的broker-a.properties和broker-b.properties文件

image.png



调整相关RocketMQ的broker的配置

[root@localhost local]# vim /usr/local/rocketmq/conf/2m-noslave/broker-a.properties 
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir= /usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog= /usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
复制代码



启动nameserver

[root@singlenode rocketmq]# cd /usr/local/rocketmq/bin/
[root@singlenode bin]# nohup sh mqnamesrv &
复制代码



启动broker

cd /usr/local/rocketmq/bin
[root@singlenode bin]# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
[root@singlenode bin]# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
复制代码



部署RocketMq管理界面


首先下载rocketmq-console.war和tomcat(地址github.com/apache/rock…

修改config.properties文件

rocketmq.namesrv.addr=192.168.1.12:9876;192.168.1.13:9876
throwDone=true
复制代码

启动Tomcat,访问http://192.168.1.128:8080/rocketmq-console

image.png


设定环境变量:


export NAMESRV_ADDR=192.168.169.128:9876\;192.168.169.129:9876

运行测试:


bash tools.sh com.alibaba.rocketmq.example.quickstart.Producer
bash tools.sh com.alibaba.rocketmq.example.quickstart.Consumer
复制代码


关闭防火墙

[root@singlenode bin]#service iptables status
[root@singlenode bin]#service iptables stop
复制代码


如果是centos7以上使用

systemctl stop firewalld.service
复制代码



RocketMq的角色


  • producer
  • consumer
  • Broker
  • NameServer


创建topic


  • b broker地址
  • c Cluster名称
  • n nameserver地址列表
  • t topic名称
updateTopic -b 192.168.0.1:10911 -c RocketMq-Cluster -n 192.168.0.1:9876;192.168.0.2:9876 -t order-topic
复制代码


删除topic
deleteTopic -c RocketMq-Cluster -n 192.168.0.1:9876;192.168.0.2:9876 -t order-topic
复制代码


创建/修改订阅组


订阅组名称

updateSubGroup -b 192.168.0.1:10911 -c RocketMq-Cluster -g subGroupName -n 192.168.0.1:9876;192.168.0.2:9876
复制代码



删除订阅组
deleteSubGroup -b 192.168.0.1:10911 -c RocketMq-Cluster -g subGroupName -n 192.168.0.1:9876;192.168.0.2:9876
复制代码



更新broker配置

某些配置文件broker运行的时候可以动态修改,-k broker配置文件的key -v value

updateBrokerConfig -b 192.168.0.1:10911 -c RocketMq-Cluster -n  192.168.0.1:9876;192.168.0.2:9876 -k deleteWhen -v 05
复制代码



更新topic的读写权限

updateTopicPerm -b 192.168.0.1:10911 -c RocketMq-Cluster -n  192.168.0.1:9876;192.168.0.2:9876 -t order-topic
复制代码



查询topic路由信息

TopicRoute -b 192.168.0.1:10911 -c RocketMq-Cluster -n  192.168.0.1:9876;192.168.0.2:9876 -t order-topic
复制代码



查看topic路由信息

TopicList -n  192.168.0.1:9876;192.168.0.2:9876
复制代码



查看topic状态统计信息

TopicStats -t order_topic -n  192.168.0.1:9876;192.168.0.2:9876
复制代码



根据时间查询消息

printMsg -t order_topic -n 192.168.0.1:9876;192.168.0.2:9876
复制代码



根据Id查询消息

queryMsgById -i msgId -n -n 192.168.0.1:9876;192.168.0.2:9876
复制代码



查看集群信息

clusterList -n 192.168.0.1:9876;192.168.0.2:9876
复制代码



额外说明


如果没有安装中文语言包,出现乱码了,可以通过

[root@localhost local]# yum groupinstall chinese-support
复制代码


可以修改两台机器的日志配置文件并且把conf目录下所有xml文件中的${user.home}替换为/usr/local/rocketmq

[root@localhost rocketmq]# mkdir -p /usr/local/rocketmq/logs
[root@localhost rocketmq]# cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml 
复制代码


分别修改两台机器的rocketmq启动脚本

[root@localhost rocketmq]# vim /usr/local/rocketmq/bin/runbroker.sh 
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn800g -XX:PermSize=128m -XX:MaxPermSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${HOME}/rmq_bk_gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"




相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
26天前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
2月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
2月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
19天前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
EMQ
|
4月前
|
物联网 Linux C语言
在 Windows 平台搭建 MQTT 服务
NanoMQ 有着强大的跨平台和可兼容能力,不仅可以用于以 Linux 为基础的各类平台,也为 Windows 平台提供了 MQTT 服务的新选择。
EMQ
115 8
在 Windows 平台搭建 MQTT 服务
|
3月前
|
存储 中间件 PHP
Python编程入门:从零到一的代码实践深入理解 PHP 中的中间件模式
【8月更文挑战第28天】本文旨在通过浅显易懂的方式,向初学者介绍Python编程的基础知识,并结合具体代码示例,带领读者一步步实现从零基础到能够独立编写简单程序的转变。文章将围绕Python语言的核心概念进行讲解,并通过实例展示如何应用这些概念解决实际问题。无论你是编程新手还是希望扩展技能的专业人士,这篇文章都将为你打开编程世界的大门。 【8月更文挑战第28天】在PHP的世界中,设计模式是构建可维护和可扩展软件的重要工具。本文将通过浅显易懂的语言和生动的比喻,带领读者深入理解中间件模式如何在PHP应用中发挥魔力,实现请求处理的高效管理。我们将一步步揭开中间件的神秘面纱,从它的定义、工作原理到
|
4月前
|
消息中间件 Java 物联网
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
|
4月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
|
4月前
|
JSON 中间件 数据处理
实践出真知:通过项目学习Python Web框架的路由与中间件设计
【7月更文挑战第19天】探索Python Web开发,掌握Flask或Django的关键在于理解路由和中间件。路由连接URL与功能,如Flask中@app.route()定义请求响应路径。中间件在请求处理前后执行,提供扩展功能,如日志、认证。通过实践项目,不仅学习理论,还能提升构建高效Web应用的能力。示例代码展示路由定义及模拟中间件行为,强调动手实践的重要性。
57 1

相关产品

  • 云消息队列 MQ