微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理

简介: 微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理

1. Rocket概述

消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:

  • 削峰填谷:主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题
  • 系统解耦:解决不同重要程度、不同能力级别系统之间依赖导致一死全死
  • 提升性能:当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
  • 蓄流压测:线上有些链路不好压测,可以通过堆积一定量消息再放开来压测

1.1 特点

Apache Alibaba RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:

  • 支持严格的消息顺序
  • 支持 Topic 与 Queue 两种模式
  • 亿级消息堆积能力
  • 比较友好的分布式特性
  • 同时支持 Push 与 Pull 方式消费消息
  • 历经多次天猫双十一海量消息考验

目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,对比其主要优势有:

  • 支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
  • 支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
  • 支持 18 个级别的延迟消息(RabbitMQ 和 Kafka 不支持)
  • 支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
  • 支持 Consumer 端 Tag 过滤,减少不必要的网络传输(RabbitMQ 和 Kafka 不支持)
  • 支持重复消费(RabbitMQ 不支持,Kafka 支持)

1.2 网络部署架构

  • NameServer:是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步
  • Broker:部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slaver,但是一个Slaver只能对应一个Master,Master与Slaver的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slaver。Master可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有的NameServer
  • Producer:与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Produce完全无状态,可集群部署
  • Consumer:与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slaver建立长连接,且定时向Master、Slaver发送心跳。Consumer即可从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定

1.3 存储特点

1.3.1 零拷贝原理

Consumer消费消息过程,使用了零拷贝,零拷贝包括以下两种方式,RocketMQ使用第一种方式,因小块数据传输的要求效果比sendfile方式好。

使用mmap+write方式

  • 优点:即使频繁调用,使用小文件块传输,效率也很高
  • 缺点:不能很好的利用DMA方式,会比sendfile多消耗CPU资源,内存安全性控制复杂,需要避免JVM Crash问题

使用sendfile方式

  • 优点:可以利用DMA方式,消耗CPU资源少,大块文件传输效率高,无内存安全新问题
  • 缺点:小块文件效率低于mmap方式,只能是BIO方式传输,不能使用NIO
1.3.2 数据存储结构

1.3.3 顺序消息和幂等性

顺序消息原理producer在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息。(注意:把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue

顺序消息缺陷:发送顺序消息无法利用集群Fail Over特性消费顺序消息的并行度依赖于队列数量队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题遇到消息失败的消息,无法跳过,当前队列消费暂停。

消息幂等性RocketMQ使用的消息原语是At Least Once,所以consumer可能多次收到同一个消息,如果业务需要保证严格的不重复消息时需要自己做好幂等。

造成消息重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

  • 消费端处理消息的业务逻辑保持幂等性
  • 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现
1.3.4 相关资料

2. 安装部署

2.1 下载

下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.5.0/

可以直接下载编译好的

2.2 修改配置

1.修改conf/broker.conf,添加以下配置:

brokerIP1:配置broker所在服务器的ip地址,以便Name Server连接

2.修改runserver.shrunbroker.sh(可不改),因为rocketMQ默认的启动参数内存占用非常大,如果环境没有这么多内存就必需修改JAVA_OPT参数:

runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"


runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m

2.3 运行

2.3.1 运行Name Server
nohup sh bin/mqnamesrv &

查看运行日志:tail -f ~/logs/rocketmqlogs/broker.log

2.3.2 运行Broker
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
• 1

通过-c参数指定配置文件

查看运行日志:tail -f ~/logs/rocketmqlogs/broker.log

2.4 安装可视化管理界面

下载:

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

编译:

mvn clean package -Dmaven.test.skip=true

运行:

nohup java -jar \
    -Drocketmq.config.namesrvAddr=192.168.28.130:9876 \
    -Drocketmq.config.isVIPChannel=false \
    rocketmq-console-ng-1.0.0.jar &

访问:

http://192.168.28.130:8080/

2.5 停止服务

如果需要停止rocketMQ的服务,在生产环境不建议直接用kill,应该使用以下命令:

sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

3. 常见异常处理

3.1 MQClientException: No route info of this topic, TopicTest1

在客户端的 Producer 运行起来准备发送消息时抛异常为 “ No route info of this topic” 异常产生的原因可能是:

  • Broker 禁止自动创建 Topic,且用户没有通过手工方式创建 Topic
  • Broker没有正确连接到Name Server
  • Producer没有正确连接到Name Server

解决办法:

排查1:Broker 禁止自动创建 Topic,且用户没有通过手工方式创建 Topic

  • 可以在rocketmq所在目录下执行" sh bin/mqbroker -m " 来查看 broker的配置参数
  • 如下所示,autoCreateTopicEnable=true 证明是没有问题的

排查2:Broker 没有正确连接到 Name Server

  • 通过查看broker的日志tail -f ~/logs/rocketmqlogs/broker.log看看有没有错误信息

排查3:Producer 没有正确连接到 Name Server

  • 检查程序连接Name Server的地址有没有错
  • 如果在云服务器上,检查安全组的配置9876端口有没有开发
  • 看看有没有打开防火墙,有的话设置防火墙开放9876端口
[root@zlt rocketmq-all-4.5.0-bin-release]# firewall-cmd --zone=public --list-ports
8090/tcp 80/tcp 8080/tcp
[root@zlt rocketmq-all-4.5.0-bin-release]# firewall-cmd --zone=public --add-port=9876/tcp --permanent
success
[root@zlt rocketmq-all-4.5.0-bin-release]# firewall-cmd --reload
success
[root@zlt rocketmq-all-4.5.0-bin-release]# firewall-cmd --zone=public --list-ports
9876/tcp 8090/tcp 80/tcp 8080/tcp
• 1
• 2
• 3
• 4
• 5
• 6
• 7
• 8

3.2 RemotingTooMuchRequestException: sendDefaultImpl call timeout

在客户端的 Producer 运行起来准备发送消息时抛异常如下,通常因为Name Server连接不上Broker

解决办法:

  • 检查rocketmq-console的集群页签,broker的地址是否正确:

broker地址的配置方式请参考安装部署中提到的步骤:

  • 修改broker.conf的配置,添加brokerIP1参数
  • 启动broker时加上-c参数指定配置文件

3.3 消费/查看不了死信队列topic的消息

死信队列默认的perm值为2没有查看权限

解决办法:

  • 在控制台把队列的perm改为6就可以了

4. RocketMQ事务消息原理

4.1 原理图

分为两个逻辑:正常事务消息的发送及提交、事务消息的回查流程

事务消息发送及提交

  • 发送消息(half消息)
  • 服务端响应消息写入结果
  • 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
  • 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

回查流程

  • 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
  • Producer收到回查消息,检查回查消息对应的本地事务的状态
  • 根据本地事务状态,重新Commit或者Rollback

4.2 时序图


相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
Java 关系型数据库 数据库
微服务——SpringBoot使用归纳——Spring Boot事务配置管理——常见问题总结
本文总结了Spring Boot中使用事务的常见问题,虽然通过`@Transactional`注解可以轻松实现事务管理,但在实际项目中仍有许多潜在坑点。文章详细分析了三个典型问题:1) 异常未被捕获导致事务未回滚,需明确指定`rollbackFor`属性;2) 异常被try-catch“吃掉”,应避免在事务方法中直接处理异常;3) 事务范围与锁范围不一致引发并发问题,建议调整锁策略以覆盖事务范围。这些问题看似简单,但一旦发生,排查难度较大,因此开发时需格外留意。最后,文章提供了课程源代码下载地址,供读者实践参考。
83 0
|
4月前
|
Java 关系型数据库 数据库
微服务——SpringBoot使用归纳——Spring Boot事务配置管理——Spring Boot 事务配置
本文介绍了 Spring Boot 中的事务配置与使用方法。首先需要导入 MySQL 依赖,Spring Boot 会自动注入 `DataSourceTransactionManager`,无需额外配置即可通过 `@Transactional` 注解实现事务管理。接着通过创建一个用户插入功能的示例,展示了如何在 Service 层手动抛出异常以测试事务回滚机制。测试结果表明,数据库中未新增记录,证明事务已成功回滚。此过程简单高效,适合日常开发需求。
196 0
|
4月前
|
Java 数据库 微服务
微服务——SpringBoot使用归纳——Spring Boot事务配置管理——事务相关
本文介绍Spring Boot事务配置管理,阐述事务在企业应用开发中的重要性。事务确保数据操作可靠,任一异常均可回滚至初始状态,如转账、购票等场景需全流程执行成功才算完成。同时,事务管理在Spring Boot的service层广泛应用,但根据实际需求也可能存在无需事务的情况,例如独立数据插入操作。
55 0
|
4月前
|
JSON Java 数据格式
微服务——SpringBoot使用归纳——Spring Boot中的全局异常处理——处理系统异常
本文介绍了在Spring Boot项目中如何通过创建`GlobalExceptionHandler`类来全局处理系统异常。通过使用`@ControllerAdvice`注解,可以拦截项目中的各种异常,并结合`@ExceptionHandler`注解针对特定异常(如参数缺失、空指针等)进行定制化处理。文中详细展示了处理参数缺失异常和空指针异常的示例代码,并说明了通过拦截`Exception`父类实现统一异常处理的方法。虽然拦截`Exception`可一劳永逸,但为便于问题排查,建议优先处理常见异常,最后再兜底处理未知异常,确保返回给调用方的信息友好且明确。
259 0
微服务——SpringBoot使用归纳——Spring Boot中的全局异常处理——处理系统异常
|
4月前
|
JSON Java 数据格式
微服务——SpringBoot使用归纳——Spring Boot中的全局异常处理——拦截自定义异常
本文介绍了在实际项目中如何拦截自定义异常。首先,通过定义异常信息枚举类 `BusinessMsgEnum`,统一管理业务异常的代码和消息。接着,创建自定义业务异常类 `BusinessErrorException`,并在其构造方法中传入枚举类以实现异常信息的封装。最后,利用 `GlobalExceptionHandler` 拦截并处理自定义异常,返回标准的 JSON 响应格式。文章还提供了示例代码和测试方法,展示了全局异常处理在 Spring Boot 项目中的应用价值。
91 0
|
4月前
|
JSON Java 数据格式
微服务——SpringBoot使用归纳——Spring Boot中的全局异常处理——定义返回的统一 json 结构
本课主要讲解Spring Boot中的全局异常处理方法。在项目开发中,各层操作难免会遇到各种异常,若逐一处理将导致代码耦合度高、维护困难。因此,需将异常处理从业务逻辑中分离,实现统一管理与友好反馈。本文通过定义一个简化的JsonResult类(含状态码code和消息msg),结合全局异常拦截器,展示如何封装并返回标准化的JSON响应,从而提升代码质量和用户体验。
80 0
|
7月前
|
消息中间件 存储 监控
说说MQ在你项目中的应用(一)
本文总结了消息队列(MQ)在项目中的应用,主要围绕异步处理、系统解耦和流量削峰三大功能展开。通过分析短信通知和业务日志两个典型场景,介绍了MQ的实现方式及其优势。短信通知中,MQ用于异步发送短信并处理状态更新;业务日志中,Kafka作为高吞吐量的消息系统,负责收集和传输系统及用户行为日志,确保数据的可靠性和高效处理。MQ不仅提高了系统的灵活性和响应速度,还提供了重试机制和状态追踪等功能,保障了业务的稳定运行。
199 7
|
9月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
7月前
|
消息中间件 存储 中间件
说说MQ在你项目中的应用(二)商品支付
本文总结了消息队列(MQ)在支付订单业务中的应用,重点分析了RabbitMQ的优势。通过异步处理、系统解耦和流量削峰等功能,RabbitMQ确保了支付流程的高效与稳定。具体场景包括用户下单、支付请求、商品生产和物流配送等环节。相比Kafka,RabbitMQ在低吞吐量、高实时性需求下表现更优,提供了更低延迟和更高的可靠性。
204 0
|
9月前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
420 2