4 高级队列(自学)

简介: RabbitMQ惰性队列(LazyQueue)将消息直接存入磁盘,减少内存占用,支持百万级消息存储,避免因消息积压导致的性能问题。适用于消息量大、消费者处理慢的场景,如日志聚合。3.12版本后已成为默认队列类型。结合优先级队列可实现高效、灵活的消息处理。常用于异步解耦,如下单后异步清空购物车。

4.5. LazyQueue
4.5.1 介绍
在默认情况下,RabbitMQ会将接收到的信息先保存在内存中然后再保存至磁盘,以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:
消费者宕机或出现网络故障
消息发送量激增,超过了消费者处理速度
消费者处理业务发生阻塞
一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。
为了解决消息堆积问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:
接收到消息后直接存入磁盘而非内存
消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
支持数百万条的消息存储
而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。
LazyQueue的特点:
Lazy Queue 通过将大部分消息存储在磁盘上而不是内存中来减少内存占用。它使用内存映射文件技术来实现高效的磁盘访问。
即使在处理大量消息的情况下,Lazy Queue 也能保持较高的性能。通过减少内存占用,可以避免频繁的垃圾回收操作,从而提高整体性能。
LazyQueue应用场景:
当消息队列非常大并且消息数量非常多时,使用 Lazy Queue 可以显著减少内存使用量,从而提高系统性能。例如,在日志聚合系统中,需要处理大量的日志消息,使用 Lazy Queue 可以有效管理内存资源。
4.5.2 控制台配置Lazy模式
在添加队列的时候,添加x-queue-mod=lazy参数即可设置队列为Lazy模式:
4.5.3 代码配置Lazy模式
在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式,
基于注解来声明队列并设置为Lazy模式:
4.5.4 测试
测试流程:
创建一个惰性队列,一次写入100条消息
写入成功观察队列信息:
100条消息全部在磁盘,内存消息个数为0.
再向非惰性队列写入100条消息观察消息存储情况
100条消息分别在内存和磁盘存储。
4.5.5 小结
LazyQueue的特点:
Lazy Queue 惰性队列将大部分消息存储在磁盘上而不是内存中来减少内存占用。
即使在处理大量消息的情况下,Lazy Queue 也能保持较高的性能。通过减少内存占用,可以避免频繁的垃圾回收操作,从而提高整体性能。
LazyQueue应用场景:
当消息队列非常大并且消息数量非常多时,使用 Lazy Queue 可以显著减少内存使用量,从而提高系统性能。
4.6. 优先级队列
4.6.1 介绍
自从 RabbitMQ 3.5.0 版本起,引入了优先级队列的功能,允许开发者根据消息的重要程度来设定不同的优先级。这在处理紧急或重要的消息时非常有用。
通过设置 x-max-priority 参数,可以实现这一功能。然而,在消费速度远高于生产速度且消息队列中没有积压消息的情况下,优先级的作用就不那么明显了。
应用场景:
交易处理
场景: 在金融交易系统中,需要处理不同类型的交易请求,如紧急交易和常规交易。
实现: 使用优先级队列,将紧急交易请求设置为最高优先级,常规交易设置为较低优先级。
好处: 确保紧急交易能够被优先处理,提高了交易系统的响应速度。
任务调度
场景: 在一个任务调度系统中,可能存在不同优先级的任务需要被处理。
实现: 使用优先级队列,高优先级的任务(如紧急任务或关键任务)会被优先处理。
好处: 保证了重要任务能够被及时处理,提高了系统的响应能力和可靠性。
4.6.2 创建优先级队列
创建优先级队列:priority.queue
设置x-max-priority ,定义优先级的最大值
4.6.3 测试
向优先级队列发100条消息,优先级用随机数生成,最大为10,优先级越大越优先出队。
发送成功观察控制台:
运行消费程序
观察控制台,按优先级消费消息,优先级高的最先消费。
5.业务改造
5.1 需求分析
案例需求:改造余额支付功能,将支付成功后基于OpenFeign远程调用交易服务更新订单状态接口由同步调用改为基于RabbitMQ的异步通知。如图:
说明:目前没有通知服务和积分服务,因此我们只关注交易服务,步骤如下:
定义direct类型交换机,命名为pay.direct
定义消息队列,命名为:pay.success.queue
将 pay.success.queue 与pay.direct绑定,BindingKey为pay.success
支付成功时不再调用交易服务更新订单状态的接口,而是发送一条消息到pay.direct,发送消息的RoutingKey 为pay.success,消息内容是订单id
交易服务监听pay.success.queue 队列,接收到消息后更新订单状态为已支付
5.2 配置MQ
不管是生产者还是消费者,都需要配置MQ的基本信息。分为两步:
1)添加依赖:
在支付服务、交易服务添加amqp依赖。
2)配置MQ地址:
在支付服务、交易服务添加MQ地址配置【这里也可以抽取一个 shared-mq.yaml 放在nacos里面】
5.3 发送消息
5.3.1 常量类
在common模块配置常量类,包括支付交换机,支付成功key等信息
5.3.2 编写代码
修改pay-service服务下的com.hmall.pay.service.impl.PayOrderServiceImpl类中的tryPayOrderByBalance方法:
屏蔽tradeClient.markOrderPaySuccess(po.getBizOrderNo())
支付成功后发送消息
代码如下:
5.4 接收消息
在trade-service服务中定义一个消息监听类,接收到消息,调用方法更新订单状态为已支付。
其代码如下:
Java
运行代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.hmall.trade.listener;

import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class PayStatusListener {

private final IOrderService orderService;

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = MqConstants.PAY_SUCCESS_QUEUE, durable = "true"),
    exchange = @Exchange(name = MqConstants.PAY_EXCHANGE_NAME, type = "direct"),
    key = MqConstants.PAY_SUCCESS_KEY
))
public void listenPaySuccess(Long orderId){
    orderService.markOrderPaySuccess(orderId);
}

}
作业
改造下单功能
要求:
改造下单功能,将基于OpenFeign的清理购物车同步调用,改为基于RabbitMQ的异步通知:
定义topic类型交换机,命名为trade.topic
定义消息队列,命名为cart.clear.queue
将cart.clear.queue与trade.topic绑定,BindingKey为order.create
下单成功时不再调用清理购物车接口,而是发送一条消息到trade.topic,发送消息的RoutingKey 为order.create,消息内容是下单的具体商品、当前登录用户信息
购物车服务监听cart.clear.queue队列,接收到消息后清理指定用户的购物车中的指定商品
提示:
将交换机、队列等信息配置在常量类中
在hm-common 模块配置消息转换器并通过springboot自动装配

相关文章
|
JavaScript Java 项目管理
基于Java的大学生创新创业项目管理系统设计与实现(亮点:完整严谨的创新创业申请流程、适用于任何要求严格的审批类毕业设计)
基于Java的大学生创新创业项目管理系统设计与实现(亮点:完整严谨的创新创业申请流程、适用于任何要求严格的审批类毕业设计)
528 0
|
Nacos 微服务 监控
Nacos:微服务架构中的“服务管家”与“配置中心”
Nacos是阿里巴巴开源的微服务“服务管家”与“配置中心”,集服务注册发现、动态配置管理、健康检查、DNS发现等功能于一体,支持多语言、多协议接入,助力构建高可用、易运维的云原生应用体系。
1261 155
|
7月前
|
消息中间件 存储 人工智能
官宣上线!RocketMQ for AI:企业级 AI 应用异步通信首选方案
RocketMQ 专门为 AI 场景推出了全新Lite Topic 模型,目前已在阿里云云消息队列 RocketMQ 版 5.x 系列实例上正式发布,并会逐步贡献到 Apache RocketMQ 开源社区,欢迎大家使用。
589 65
|
12月前
|
NoSQL 安全 Java
2.2k star 单点登录框架揭秘!主流SSO太重?SpringBoot轻量级Smart‑SSO轻松接入·分布式·强踢人
Smart-SSO 是一个基于 SpringBoot 的轻量级单点登录框架,采用 OAuth2 授权码与 RBAC 权限设计,解决跨域认证、单点退出、令牌过期等痛点。支持自动续签、强制踢人、按钮级权限控制及分布式部署,适合中小团队快速构建高可用认证中台。项目已获 2.2k Star,代码开源,接入简单,是企业级应用的理想选择。[详情见 GitHub](https://github.com/a466350665/smart-sso)。
656 23
|
6月前
|
消息中间件 存储 Java
消息中间件RabbitMQ(高级)
本文深入探讨RabbitMQ在生产环境中的高级应用,涵盖消息可靠性、延迟消息、消息堆积及集群高可用等核心问题。通过生产者确认、持久化、消费者确认机制确保消息不丢失;利用TTL与死信交换机实现延迟队列;借助惰性队列提升堆积能力;最后通过普通集群、镜像集群及仲裁队列实现高可用架构。
 消息中间件RabbitMQ(高级)
|
7月前
|
分布式计算 Hadoop 大数据
到底该选谁?Hadoop、Spark、Flink、云大数据的“江湖全景图”
到底该选谁?Hadoop、Spark、Flink、云大数据的“江湖全景图”
527 6
|
6月前
|
消息中间件 存储 Java
消息中间件RabbitMQ(高级)
本节深入RabbitMQ高级特性,涵盖消息可靠性保障、持久化、消费者确认与重试机制,结合TTL与死信交换机实现延迟队列,通过惰性队列解决消息堆积,并详解普通集群、镜像集群及仲裁队列的搭建与应用,全面提升RabbitMQ在生产环境中的高可用与稳定性。
321 0
|
存储 算法 安全
分布式系统架构1:共识算法Paxos
本文介绍了分布式系统中实现数据一致性的重要算法——Paxos及其改进版Multi Paxos。Paxos算法由Leslie Lamport提出,旨在解决分布式环境下的共识问题,通过提案节点、决策节点和记录节点的协作,确保数据在多台机器间的一致性和可用性。Multi Paxos通过引入主节点选举机制,优化了基本Paxos的效率,减少了网络通信次数,提高了系统的性能和可靠性。文中还简要讨论了数据复制的安全性和一致性保障措施。
1004 1
|
NoSQL Java Linux
linux 安装 neo4j简介
Neo4j是高性能NoSQL图形数据库,利用图结构存储数据。推荐使用JDK 11配合Neo4j 3.x版本。下载3.5.9版,通过`curl`命令在Linux上获取tar.gz文件,然后解压。配置`neo4j.conf`,调整内存设置,开启远程访问。执行`./bin/neo4j start`启动,通过`http://服务器IP:7474`访问,默认凭据是username: neo4j, password: neo4j,登录后应更改密码。
2426 1
|
消息中间件 存储 监控
说说如何解决RocketMq消息积压?为什么Kafka性能比RocketMq高?它们区别是什么?
【10月更文挑战第8天】在分布式系统中,消息队列扮演着至关重要的角色,它不仅能够解耦系统组件,还能提供异步处理、流量削峰和消息持久化等功能。在众多的消息队列产品中,RocketMQ和Kafka无疑是其中的佼佼者。本文将围绕如何解决RocketMQ消息积压、为什么Kafka性能比RocketMQ高以及它们之间的区别进行深入探讨。
777 1