4.5. LazyQueue
4.5.1 介绍
在默认情况下,RabbitMQ会将接收到的信息先保存在内存中然后再保存至磁盘,以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:
消费者宕机或出现网络故障
消息发送量激增,超过了消费者处理速度
消费者处理业务发生阻塞
一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。
为了解决消息堆积问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:
接收到消息后直接存入磁盘而非内存
消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
支持数百万条的消息存储
而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。
LazyQueue的特点:
Lazy Queue 通过将大部分消息存储在磁盘上而不是内存中来减少内存占用。它使用内存映射文件技术来实现高效的磁盘访问。
即使在处理大量消息的情况下,Lazy Queue 也能保持较高的性能。通过减少内存占用,可以避免频繁的垃圾回收操作,从而提高整体性能。
LazyQueue应用场景:
当消息队列非常大并且消息数量非常多时,使用 Lazy Queue 可以显著减少内存使用量,从而提高系统性能。例如,在日志聚合系统中,需要处理大量的日志消息,使用 Lazy Queue 可以有效管理内存资源。
4.5.2 控制台配置Lazy模式
在添加队列的时候,添加x-queue-mod=lazy参数即可设置队列为Lazy模式:
4.5.3 代码配置Lazy模式
在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式,
基于注解来声明队列并设置为Lazy模式:
4.5.4 测试
测试流程:
创建一个惰性队列,一次写入100条消息
写入成功观察队列信息:
100条消息全部在磁盘,内存消息个数为0.
再向非惰性队列写入100条消息观察消息存储情况
100条消息分别在内存和磁盘存储。
4.5.5 小结
LazyQueue的特点:
Lazy Queue 惰性队列将大部分消息存储在磁盘上而不是内存中来减少内存占用。
即使在处理大量消息的情况下,Lazy Queue 也能保持较高的性能。通过减少内存占用,可以避免频繁的垃圾回收操作,从而提高整体性能。
LazyQueue应用场景:
当消息队列非常大并且消息数量非常多时,使用 Lazy Queue 可以显著减少内存使用量,从而提高系统性能。
4.6. 优先级队列
4.6.1 介绍
自从 RabbitMQ 3.5.0 版本起,引入了优先级队列的功能,允许开发者根据消息的重要程度来设定不同的优先级。这在处理紧急或重要的消息时非常有用。
通过设置 x-max-priority 参数,可以实现这一功能。然而,在消费速度远高于生产速度且消息队列中没有积压消息的情况下,优先级的作用就不那么明显了。
应用场景:
交易处理
场景: 在金融交易系统中,需要处理不同类型的交易请求,如紧急交易和常规交易。
实现: 使用优先级队列,将紧急交易请求设置为最高优先级,常规交易设置为较低优先级。
好处: 确保紧急交易能够被优先处理,提高了交易系统的响应速度。
任务调度
场景: 在一个任务调度系统中,可能存在不同优先级的任务需要被处理。
实现: 使用优先级队列,高优先级的任务(如紧急任务或关键任务)会被优先处理。
好处: 保证了重要任务能够被及时处理,提高了系统的响应能力和可靠性。
4.6.2 创建优先级队列
创建优先级队列:priority.queue
设置x-max-priority ,定义优先级的最大值
4.6.3 测试
向优先级队列发100条消息,优先级用随机数生成,最大为10,优先级越大越优先出队。
发送成功观察控制台:
运行消费程序
观察控制台,按优先级消费消息,优先级高的最先消费。
5.业务改造
5.1 需求分析
案例需求:改造余额支付功能,将支付成功后基于OpenFeign远程调用交易服务更新订单状态接口由同步调用改为基于RabbitMQ的异步通知。如图:
说明:目前没有通知服务和积分服务,因此我们只关注交易服务,步骤如下:
定义direct类型交换机,命名为pay.direct
定义消息队列,命名为:pay.success.queue
将 pay.success.queue 与pay.direct绑定,BindingKey为pay.success
支付成功时不再调用交易服务更新订单状态的接口,而是发送一条消息到pay.direct,发送消息的RoutingKey 为pay.success,消息内容是订单id
交易服务监听pay.success.queue 队列,接收到消息后更新订单状态为已支付
5.2 配置MQ
不管是生产者还是消费者,都需要配置MQ的基本信息。分为两步:
1)添加依赖:
在支付服务、交易服务添加amqp依赖。
2)配置MQ地址:
在支付服务、交易服务添加MQ地址配置【这里也可以抽取一个 shared-mq.yaml 放在nacos里面】
5.3 发送消息
5.3.1 常量类
在common模块配置常量类,包括支付交换机,支付成功key等信息
5.3.2 编写代码
修改pay-service服务下的com.hmall.pay.service.impl.PayOrderServiceImpl类中的tryPayOrderByBalance方法:
屏蔽tradeClient.markOrderPaySuccess(po.getBizOrderNo())
支付成功后发送消息
代码如下:
5.4 接收消息
在trade-service服务中定义一个消息监听类,接收到消息,调用方法更新订单状态为已支付。
其代码如下:
Java
运行代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.hmall.trade.listener;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class PayStatusListener {
private final IOrderService orderService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MqConstants.PAY_SUCCESS_QUEUE, durable = "true"),
exchange = @Exchange(name = MqConstants.PAY_EXCHANGE_NAME, type = "direct"),
key = MqConstants.PAY_SUCCESS_KEY
))
public void listenPaySuccess(Long orderId){
orderService.markOrderPaySuccess(orderId);
}
}
作业
改造下单功能
要求:
改造下单功能,将基于OpenFeign的清理购物车同步调用,改为基于RabbitMQ的异步通知:
定义topic类型交换机,命名为trade.topic
定义消息队列,命名为cart.clear.queue
将cart.clear.queue与trade.topic绑定,BindingKey为order.create
下单成功时不再调用清理购物车接口,而是发送一条消息到trade.topic,发送消息的RoutingKey 为order.create,消息内容是下单的具体商品、当前登录用户信息
购物车服务监听cart.clear.queue队列,接收到消息后清理指定用户的购物车中的指定商品
提示:
将交换机、队列等信息配置在常量类中
在hm-common 模块配置消息转换器并通过springboot自动装配