微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理

简介: 微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理

1. Rocket概述

消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:

  • 削峰填谷:主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题
  • 系统解耦:解决不同重要程度、不同能力级别系统之间依赖导致一死全死
  • 提升性能:当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
  • 蓄流压测:线上有些链路不好压测,可以通过堆积一定量消息再放开来压测

1.1 特点

Apache Alibaba RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:

  • 支持严格的消息顺序
  • 支持 Topic 与 Queue 两种模式
  • 亿级消息堆积能力
  • 比较友好的分布式特性
  • 同时支持 Push 与 Pull 方式消费消息
  • 历经多次天猫双十一海量消息考验

目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,对比其主要优势有:

  • 支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
  • 支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
  • 支持 18 个级别的延迟消息(RabbitMQ 和 Kafka 不支持)
  • 支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
  • 支持 Consumer 端 Tag 过滤,减少不必要的网络传输(RabbitMQ 和 Kafka 不支持)
  • 支持重复消费(RabbitMQ 不支持,Kafka 支持)

1.2 网络部署架构

  • NameServer:是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步
  • Broker:部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slaver,但是一个Slaver只能对应一个Master,Master与Slaver的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slaver。Master可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有的NameServer
  • Producer:与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Produce完全无状态,可集群部署
  • Consumer:与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slaver建立长连接,且定时向Master、Slaver发送心跳。Consumer即可从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定

1.3 存储特点

1.3.1 零拷贝原理

Consumer消费消息过程,使用了零拷贝,零拷贝包括以下两种方式,RocketMQ使用第一种方式,因小块数据传输的要求效果比sendfile方式好。

使用mmap+write方式

  • 优点:即使频繁调用,使用小文件块传输,效率也很高
  • 缺点:不能很好的利用DMA方式,会比sendfile多消耗CPU资源,内存安全性控制复杂,需要避免JVM Crash问题

使用sendfile方式

  • 优点:可以利用DMA方式,消耗CPU资源少,大块文件传输效率高,无内存安全新问题
  • 缺点:小块文件效率低于mmap方式,只能是BIO方式传输,不能使用NIO
1.3.2 数据存储结构

1.3.3 顺序消息和幂等性

顺序消息原理producer在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息。(注意:把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue

顺序消息缺陷:发送顺序消息无法利用集群Fail Over特性消费顺序消息的并行度依赖于队列数量队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题遇到消息失败的消息,无法跳过,当前队列消费暂停。

消息幂等性RocketMQ使用的消息原语是At Least Once,所以consumer可能多次收到同一个消息,如果业务需要保证严格的不重复消息时需要自己做好幂等。

造成消息重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

  • 消费端处理消息的业务逻辑保持幂等性
  • 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现
1.3.4 相关资料

2. 安装部署

2.1 下载

下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.5.0/

可以直接下载编译好的

2.2 修改配置

1.修改conf/broker.conf,添加以下配置:

brokerIP1:配置broker所在服务器的ip地址,以便Name Server连接

2.修改runserver.shrunbroker.sh(可不改),因为rocketMQ默认的启动参数内存占用非常大,如果环境没有这么多内存就必需修改JAVA_OPT参数:

runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"


runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m

2.3 运行

2.3.1 运行Name Server
nohup sh bin/mqnamesrv &

查看运行日志:tail -f ~/logs/rocketmqlogs/broker.log

2.3.2 运行Broker
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
• 1

通过-c参数指定配置文件

查看运行日志:tail -f ~/logs/rocketmqlogs/broker.log

2.4 安装可视化管理界面

下载:

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

编译:

mvn clean package -Dmaven.test.skip=true

运行:

nohup java -jar \
    -Drocketmq.config.namesrvAddr=192.168.28.130:9876 \
    -Drocketmq.config.isVIPChannel=false \
    rocketmq-console-ng-1.0.0.jar &

访问:

http://192.168.28.130:8080/

2.5 停止服务

如果需要停止rocketMQ的服务,在生产环境不建议直接用kill,应该使用以下命令:

sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

3. 常见异常处理

3.1 MQClientException: No route info of this topic, TopicTest1

在客户端的 Producer 运行起来准备发送消息时抛异常为 “ No route info of this topic” 异常产生的原因可能是:

  • Broker 禁止自动创建 Topic,且用户没有通过手工方式创建 Topic
  • Broker没有正确连接到Name Server
  • Producer没有正确连接到Name Server

解决办法:

排查1:Broker 禁止自动创建 Topic,且用户没有通过手工方式创建 Topic

  • 可以在rocketmq所在目录下执行" sh bin/mqbroker -m " 来查看 broker的配置参数
  • 如下所示,autoCreateTopicEnable=true 证明是没有问题的

排查2:Broker 没有正确连接到 Name Server

  • 通过查看broker的日志tail -f ~/logs/rocketmqlogs/broker.log看看有没有错误信息

排查3:Producer 没有正确连接到 Name Server

  • 检查程序连接Name Server的地址有没有错
  • 如果在云服务器上,检查安全组的配置9876端口有没有开发
  • 看看有没有打开防火墙,有的话设置防火墙开放9876端口
[root@zlt rocketmq-all-4.5.0-bin-release]# firewall-cmd --zone=public --list-ports
8090/tcp 80/tcp 8080/tcp
[root@zlt rocketmq-all-4.5.0-bin-release]# firewall-cmd --zone=public --add-port=9876/tcp --permanent
success
[root@zlt rocketmq-all-4.5.0-bin-release]# firewall-cmd --reload
success
[root@zlt rocketmq-all-4.5.0-bin-release]# firewall-cmd --zone=public --list-ports
9876/tcp 8090/tcp 80/tcp 8080/tcp
• 1
• 2
• 3
• 4
• 5
• 6
• 7
• 8

3.2 RemotingTooMuchRequestException: sendDefaultImpl call timeout

在客户端的 Producer 运行起来准备发送消息时抛异常如下,通常因为Name Server连接不上Broker

解决办法:

  • 检查rocketmq-console的集群页签,broker的地址是否正确:

broker地址的配置方式请参考安装部署中提到的步骤:

  • 修改broker.conf的配置,添加brokerIP1参数
  • 启动broker时加上-c参数指定配置文件

3.3 消费/查看不了死信队列topic的消息

死信队列默认的perm值为2没有查看权限

解决办法:

  • 在控制台把队列的perm改为6就可以了

4. RocketMQ事务消息原理

4.1 原理图

分为两个逻辑:正常事务消息的发送及提交、事务消息的回查流程

事务消息发送及提交

  • 发送消息(half消息)
  • 服务端响应消息写入结果
  • 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
  • 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

回查流程

  • 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
  • Producer收到回查消息,检查回查消息对应的本地事务的状态
  • 根据本地事务状态,重新Commit或者Rollback

4.2 时序图


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 存储 Java
RabbitMQ 在微服务架构中的高级应用
【8月更文第28天】在微服务架构中,服务之间需要通过轻量级的通信机制进行交互。其中一种流行的解决方案是使用消息队列,如 RabbitMQ,来实现异步通信和解耦。本文将探讨如何利用 RabbitMQ 作为服务间通信的核心组件,并构建高效的事件驱动架构。
184 2
|
4月前
|
消息中间件 监控 开发工具
微服务(三)-实现自动刷新配置(不重启项目情况下)
微服务(三)-实现自动刷新配置(不重启项目情况下)
|
3月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
3月前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
167 2
|
4月前
|
前端开发 Java UED
"揭秘!如何以戏剧性姿态,利用SpringCloud铸就无懈可击的异常处理铁壁,让你的微服务架构稳如泰山,震撼业界!"
【9月更文挑战第8天】随着微服务架构的普及,Spring Cloud作为一套完整的微服务解决方案被广泛应用。在微服务架构中,服务间调用频繁且复杂,异常处理成为保障系统稳定性和用户体验的关键。传统的异常处理方式导致代码冗余,降低系统可维护性和一致性。因此,基于Spring Cloud封装统一的异常处理机制至关重要。这样不仅可以减少代码冗余、提升一致性,还增强了系统的可维护性,并通过统一的错误响应格式优化了用户体验。具体实现包括定义全局异常处理器、自定义业务异常以及在服务中抛出这些异常。这种方式体现了微服务架构中的“服务治理”和“契约先行”原则,有助于构建健壮、可扩展的系统。
83 2
|
5月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
5月前
|
前端开发 Java UED
"揭秘!如何以戏剧性姿态,利用SpringCloud铸就无懈可击的异常处理铁壁,让你的微服务架构稳如泰山,震撼业界!"
【8月更文挑战第8天】随着Spring Cloud在微服务架构中的广泛应用,统一异常处理成为确保系统稳定性和提升用户体验的关键。传统方式在各服务中单独处理异常导致代码冗余且不一致。因此,采用Spring Cloud封装统一异常处理机制变得尤为重要:它减少了冗余代码,提升了异常处理的一致性和系统的可维护性,并通过统一错误响应格式优化了用户体验。实现这一机制可通过定义全局异常处理器、自定义业务异常并在服务中适当抛出这些异常来完成。这种方式遵循了微服务设计中的“服务治理”和“契约先行”原则,为构建健壮的微服务系统打下了基础。
77 1
|
5月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
75 0
|
6月前
|
Cloud Native PHP Docker
PHP 中的异常处理:提升代码的健壮性云原生时代的微服务架构实践
【7月更文挑战第31天】在PHP开发中,异常处理是确保应用程序稳定性和可靠性的关键。本文将引导您了解如何在PHP中实现有效的异常处理机制,通过实际代码示例展示如何捕获和处理异常,以及如何使用自定义异常类来增强错误管理的灵活性。我们将探索不同的异常处理策略,并讨论它们对提升代码质量的影响。 【7月更文挑战第31天】在数字化浪潮的推动下,云原生技术正成为企业转型的关键。本文将深入探讨如何在云平台上利用微服务架构实现敏捷开发和高效运维,通过具体的代码示例,揭示微服务与容器化部署的协同优势,同时指出在实施过程中可能遇到的挑战及应对策略。
39 1
|
6月前
|
负载均衡 Java 开发者
如何在Spring Boot项目中实现微服务架构?
如何在Spring Boot项目中实现微服务架构?