RabbitMQ的延时队列

简介: RabbitMQ的延时队列

应用场景

目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如:

淘宝七天自动确认收货。在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能。

12306 购票支付确认页面。我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 30 分钟内订单不确认的话将会自动取消订单。其实在下订单那一刻开始购票业务系统就会发送一个延时消息给订单系统,延时30分钟,告诉订单系统订单未完成,如果我们在30分钟内完成了订单,则可以通过逻辑代码判断来忽略掉收到的消息。

在上面两种场景中,如果我们使用下面两种传统解决方案无疑大大降低了系统的整体性能和吞吐量:

使用 redis 给订单设置过期时间,最后通过判断 redis 中是否还有该订单来决定订单是否已经完成。这种解决方案相较于消息的延迟推送性能较低,因为我们知道 redis 都是存储于内存中,我们遇到恶意下单或者刷单的将会给内存带来巨大压力。

使用传统的数据库轮询来判断数据库表中订单的状态,这无疑增加了IO次数,性能极低。

使用 jvm 原生的 DelayQueue ,也是大量占用内存,而且没有持久化策略,系统宕机或者重启都会丢失订单信息。

消息延迟推送的实现

RabbitMQ 3.6.x 之前我们一般采用死信队列+TTL过期时间来实现延迟队列。

在 RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件,可以下载放置到 RabbitMQ根目录下的 plugins 下。

 

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
public class MQConfig {
 
    public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
    public static final String LAZY_QUEUE = "MQ.LazyQueue";
    public static final String LAZY_KEY = "lazy.#";
 
    @Bean
    public TopicExchange lazyExchange(){
        //Map<String, Object> pros = new HashMap<>();
        //设置交换机支持延迟消息推送
        //pros.put("x-delayed-message", "topic");
        TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
        exchange.setDelayed(true);
        return exchange;
    }
 
    @Bean
    public Queue lazyQueue(){
        return new Queue(LAZY_QUEUE, true);
    }
 
    @Bean
    public Binding lazyBinding(){
        return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
    }
}

我们在 Exchange 的声明中可以设置exchange.setDelayed(true)来开启延迟队列,也可以设置为以下内容传入交换机声明的方法中,因为第一种方式的底层就是通过这种方式来实现的。

//Map<String, Object> pros = new HashMap<>();
//设置交换机支持延迟消息推送
//pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);

发送消息时我们需要指定延迟推送的时间,我们这里在发送消息的方法中传入参数 new MessagePostProcessor() 是为了获得 Message对象,因为需要借助 Message对象的api 来设置延迟时间。

import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.util.Date;
 
@Component
public class MQSender {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    //confirmCallback returnCallback 代码省略,请参照上一篇
  
    public void sendLazy(Object message){
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局唯一
        CorrelationData correlationData = new CorrelationData("12345678909"+new Date());
 
        //发送消息时指定 header 延迟时间
        rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
                new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //设置消息持久化
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                //message.getMessageProperties().setHeader("x-delay", "6000");
                message.getMessageProperties().setDelay(6000);
                return message;
            }
        }, correlationData);
    }
}

我们可以观察 setDelay(Integer i)底层代码,也是在 header 中设置 x-delay。

等同于我们手动设置 header:

message.getMessageProperties().setHeader("x-delay", "6000");
/**
 * Set the x-delay header.
 * @param delay the delay.
 * @since 1.6
 */
public void setDelay(Integer delay) {
 if (delay == null || delay < 0) {
  this.headers.remove(X_DELAY);
 }
 else {
  this.headers.put(X_DELAY, delay);
 }
}

消费端进行消费

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;
 
import java.io.IOException;
import java.util.Map;
 
@Component
public class MQReceiver {
 
    @RabbitListener(queues = "MQ.LazyQueue")
    @RabbitHandler
    public void onLazyMessage(Message msg, Channel channel) throws IOException{
        long deliveryTag = msg.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag, true);
        System.out.println("lazy receive " + new String(msg.getBody()));
 
    }
测试结果
 
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
 
@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {
 
    @Autowired
    private MQSender mqSender;
 
    @Test
    public void sendLazy() throws  Exception {
        String msg = "hello spring boot";
 
        mqSender.sendLazy(msg + ":");
    }
}

果然在 6 秒后收到了消息

Spring Boot 完整教程看下这个:

https://github.com/javastacks/spring-boot-best-practice

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
安全 Shell Linux
【Shell 命令集合 系统管理 】Linux 切换当前用户身份为另一个用户 su命令 使用指南
【Shell 命令集合 系统管理 】Linux 切换当前用户身份为另一个用户 su命令 使用指南
1223 1
|
11月前
|
敏捷开发 数据可视化 前端开发
敏捷开发任务分配工具选型指南:功能、价格与适用场景
Scrum团队任务分配常面临冲刺目标模糊、资源失衡、进度断层与协作不畅四大痛点。解决关键在于建立适配Scrum流程的任务分配机制,遵循需求拆解、资源匹配、进度可视、协作衔接与迭代优化五大原则。合理选用板栗看板、Jira等工具,并通过流程适配、团队培训与数据复盘,提升分配效率,实现敏捷开发闭环,助力团队摆脱混乱、提升效能。
敏捷开发任务分配工具选型指南:功能、价格与适用场景
|
运维 数据挖掘 数据处理
Pandas时间数据处理:从基础到进阶的实战指南
Pandas时间数据处理涵盖了从基础到高级的全面功能。其核心由Timestamp、DatetimeIndex、Period和Timedelta四个类构建,支持精准的时间点与区间操作。内容包括时间数据生成(字符串解析与序列生成)、时间索引与切片、高级运算(偏移重采样与窗口计算)、时区处理、周期性数据分析及实战案例(如智能电表数据)。此外,还涉及性能优化技巧和未来展望,帮助用户高效处理时间序列数据并应用于预测分析等场景。
560 1
|
CDN
阿里云CDN怎么收费?看这一篇就够了,CDN不同计费模式收费价格全解析
阿里云CDN收费包含基础费用与增值费用。基础费用提供三种计费模式:按流量、带宽峰值及月结95带宽峰值计费,默认按流量计费,价格因地域和用量阶梯而异。增值费用涵盖静态HTTPS、QUIC请求、WAF防护及实时日志等服务,按需使用并单独计费。此外,可通过购买资源包预付费降低整体成本。更多详情参见阿里云官方文档。
3013 8
|
算法 API 数据安全/隐私保护
LabVIEW编程LabVIEW开发 控制雷赛运动控制器SMC604A例程与相关资料
LabVIEW编程LabVIEW开发 控制雷赛运动控制器SMC604A例程与相关资料
455 0
|
存储 JavaScript 算法
JS垃圾回收机制有哪些?
本文介绍了JavaScript中的垃圾回收(GC)机制,包括其概念、产生原因及重要性。文章详细讲解了几种常见的垃圾回收算法,如引用计数、标记清除、标记整理和分代回收,并分析了它们的优缺点。最后总结了垃圾回收对JS开发的重要作用,强调了其在自动内存管理和性能优化中的关键地位。
798 2
JS垃圾回收机制有哪些?
|
存储 SQL 分布式计算
Hadoop生态系统概述:构建大数据处理与分析的基石
【8月更文挑战第25天】Hadoop生态系统为大数据处理和分析提供了强大的基础设施和工具集。通过不断扩展和优化其组件和功能,Hadoop将继续在大数据时代发挥重要作用。
1043 3
|
数据采集 存储 自然语言处理
基于网络爬虫的微博热点分析,包括文本分析和主题分析
本文介绍了一个基于Python网络爬虫的微博热点分析系统,使用requests和pandas等库收集和处理数据,结合jieba分词、LDA主题分析和snownlp情感分析技术,对微博文本进行深入分析,并利用matplotlib进行数据可视化,以洞察微博用户的关注点和情感倾向。
1226 0
基于网络爬虫的微博热点分析,包括文本分析和主题分析
|
存储 弹性计算 编解码
阿里云8核32G云服务器多少钱?2024年阿里云8核32G云服务器配置价格及性能评测
2024年阿里云8核32G云服务器的价格为7543.01元一年。该价格基于特定的配置和促销活动,可能因时间、活动政策、地域等因素而有所变动。关于阿里云8核32G云服务器的性能测评,该服务器配备了32GB的内存和8核的CPU,具有出色的计算能力和处理速度,可以轻松应对大型应用、高并发场景和复杂计算任务。同时,服务器提供了多种带宽选择,从1M到5M不等,可以满足不同用户的网络需求。40GB ESSD云盘提供了高速且稳定的存储性能,有助于提升数据读写效率。
1737 0
|
SQL Java 数据处理
【Hive】Hive的函数:UDF、UDAF、UDTF的区别?
【4月更文挑战第17天】【Hive】Hive的函数:UDF、UDAF、UDTF的区别?

热门文章

最新文章