1. Rocket概述
消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:
- 削峰填谷:主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题
- 系统解耦:解决不同重要程度、不同能力级别系统之间依赖导致一死全死
- 提升性能:当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
- 蓄流压测:线上有些链路不好压测,可以通过堆积一定量消息再放开来压测
1.1 特点
Apache Alibaba RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:
- 支持严格的消息顺序
- 支持 Topic 与 Queue 两种模式
- 亿级消息堆积能力
- 比较友好的分布式特性
- 同时支持 Push 与 Pull 方式消费消息
- 历经多次天猫双十一海量消息考验
目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,对比其主要优势有:
- 支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
- 支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
- 支持 18 个级别的延迟消息(RabbitMQ 和 Kafka 不支持)
- 支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
- 支持 Consumer 端 Tag 过滤,减少不必要的网络传输(RabbitMQ 和 Kafka 不支持)
- 支持重复消费(RabbitMQ 不支持,Kafka 支持)
1.2 网络部署架构
- NameServer:是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步
- Broker:部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slaver,但是一个Slaver只能对应一个Master,Master与Slaver的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slaver。Master可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有的NameServer
- Producer:与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Produce完全无状态,可集群部署
- Consumer:与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slaver建立长连接,且定时向Master、Slaver发送心跳。Consumer即可从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定
1.3 存储特点
1.3.1 零拷贝原理
Consumer消费消息过程,使用了零拷贝,零拷贝包括以下两种方式,RocketMQ使用第一种方式,因小块数据传输的要求效果比sendfile方式好。
使用mmap+write方式
- 优点:即使频繁调用,使用小文件块传输,效率也很高
- 缺点:不能很好的利用DMA方式,会比sendfile多消耗CPU资源,内存安全性控制复杂,需要避免JVM Crash问题
使用sendfile方式
- 优点:可以利用DMA方式,消耗CPU资源少,大块文件传输效率高,无内存安全新问题
- 缺点:小块文件效率低于mmap方式,只能是BIO方式传输,不能使用NIO
1.3.2 数据存储结构
1.3.3 顺序消息和幂等性
顺序消息原理:producer
在发送消息的时候,把消息发到同一个队列(queue
)中,消费者注册消息监听器为MessageListenerOrderly
,这样就可以保证消费端只有一个线程去消费消息。(注意:把消息发到同一个队列(queue
),不是同一个topic
,默认情况下一个topic
包括4个queue
)
顺序消息缺陷:发送顺序消息无法利用集群Fail Over
特性消费顺序消息的并行度依赖于队列数量队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题遇到消息失败的消息,无法跳过,当前队列消费暂停。
消息幂等性:RocketMQ
使用的消息原语是At Least Once
,所以consumer
可能多次收到同一个消息,如果业务需要保证严格的不重复消息时需要自己做好幂等。
造成消息重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?
- 消费端处理消息的业务逻辑保持幂等性
- 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现
1.3.4 相关资料
2. 安装部署
2.1 下载
下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.5.0/
可以直接下载编译好的
2.2 修改配置
1.修改conf/broker.conf
,添加以下配置:
brokerIP1:配置
broker
所在服务器的ip
地址,以便Name Server
连接
2.修改runserver.sh
和runbroker.sh
(可不改),因为rocketMQ
默认的启动参数内存占用非常大,如果环境没有这么多内存就必需修改JAVA_OPT
参数:
runserver.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
runbroker.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m
2.3 运行
2.3.1 运行Name Server
nohup sh bin/mqnamesrv &
查看运行日志:
tail -f ~/logs/rocketmqlogs/broker.log
2.3.2 运行Broker
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf & • 1
通过
-c
参数指定配置文件查看运行日志:
tail -f ~/logs/rocketmqlogs/broker.log
2.4 安装可视化管理界面
下载:
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
编译:
mvn clean package -Dmaven.test.skip=true
运行:
nohup java -jar \ -Drocketmq.config.namesrvAddr=192.168.28.130:9876 \ -Drocketmq.config.isVIPChannel=false \ rocketmq-console-ng-1.0.0.jar &
访问:
2.5 停止服务
如果需要停止rocketMQ
的服务,在生产环境不建议直接用kill
,应该使用以下命令:
sh bin/mqshutdown broker sh bin/mqshutdown namesrv
3. 常见异常处理
3.1 MQClientException: No route info of this topic, TopicTest1
在客户端的 Producer 运行起来准备发送消息时抛异常为 “ No route info of this topic
” 异常产生的原因可能是:
Broker
禁止自动创建Topic
,且用户没有通过手工方式创建Topic
Broker
没有正确连接到Name Server
Producer
没有正确连接到Name Server
解决办法:
排查1:Broker 禁止自动创建 Topic,且用户没有通过手工方式创建 Topic
- 可以在
rocketmq
所在目录下执行" sh bin/mqbroker -m "
来查看broker
的配置参数 - 如下所示,
autoCreateTopicEnable=true
证明是没有问题的
排查2:Broker 没有正确连接到 Name Server
- 通过查看
broker
的日志tail -f ~/logs/rocketmqlogs/broker.log
看看有没有错误信息
排查3:Producer 没有正确连接到 Name Server
- 检查程序连接
Name Server
的地址有没有错 - 如果在云服务器上,检查安全组的配置
9876
端口有没有开发 - 看看有没有打开防火墙,有的话设置防火墙开放
9876
端口
[root@zlt rocketmq-all-4.5.0-bin-release]# firewall-cmd --zone=public --list-ports 8090/tcp 80/tcp 8080/tcp [root@zlt rocketmq-all-4.5.0-bin-release]# firewall-cmd --zone=public --add-port=9876/tcp --permanent success [root@zlt rocketmq-all-4.5.0-bin-release]# firewall-cmd --reload success [root@zlt rocketmq-all-4.5.0-bin-release]# firewall-cmd --zone=public --list-ports 9876/tcp 8090/tcp 80/tcp 8080/tcp • 1 • 2 • 3 • 4 • 5 • 6 • 7 • 8
3.2 RemotingTooMuchRequestException: sendDefaultImpl call timeout
在客户端的 Producer
运行起来准备发送消息时抛异常如下,通常因为Name Server
连接不上Broker
:
解决办法:
- 检查
rocketmq-console
的集群页签,broker
的地址是否正确:
broker
地址的配置方式请参考安装部署中提到的步骤:
- 修改
broker.conf
的配置,添加brokerIP1
参数 - 启动
broker
时加上-c
参数指定配置文件
3.3 消费/查看不了死信队列topic的消息
死信队列默认的perm
值为2没有查看权限
解决办法:
- 在控制台把队列的
perm
改为6就可以了
4. RocketMQ事务消息原理
4.1 原理图
分为两个逻辑:正常事务消息的发送及提交、事务消息的回查流程
事务消息发送及提交:
- 发送消息(half消息)
- 服务端响应消息写入结果
- 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
- 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
回查流程:
- 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
- Producer收到回查消息,检查回查消息对应的本地事务的状态
- 根据本地事务状态,重新Commit或者Rollback
4.2 时序图