Spring boot + RabbitMQ延迟队列实战

简介: Spring boot + RabbitMQ延迟队列实战

一、背景


延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。

那么,为什么需要延迟消费呢?我们来看以下的场景:


订单业务: 在电商/点餐中,都有下单后 30 分钟内没有付款,就自动取消订单。

短信通知: 下单成功后 60s 之后给用户发送短信通知。

失败重试: 业务操作失败后,间隔一定的时间进行失败重试。


传统订单处理:


采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。


当然传统的手法还可以再优化一下,即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,然后再做其他的业务操作。


但是通过定时任务来执行,实时性始终不是很好。


二、实现方案


方案一:通过死信队列实现


参考:https://www.cnblogs.com/xmf3628/p/12097101.html

16.png


方案二:通过延迟路由插件来实现rabbitmq-delayed-message-exchange


参考:https://www.cnblogs.com/wintercloud/p/10877399.html


这里演示通过插件来实现


三、插件安装


下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0


RabbitMQ的有些插件没有集成在初始的安装中,它们需要额外安装,这些文件的后缀为.ez,安装时需要将.ez文件拷贝到安装的插件目录。以下是不同系统中默认安装的插件目录路径:

15.png


//查看已安装的插件
rabbitmq-plugins list
//启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
//重启服务
service rabbitmq-server restart
//再次查看,插件是否生效
rabbitmq-plugins list


说明插件安装成功,并成功启用。



四、机制解释


安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。

14.png



五、代码实战


1、依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.44</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>


2、队列连接信息

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/

3、生产者代码


声明延迟队列和延迟队列的交换器,并建立绑定关系

@Configuration
public class TestDelayQueueConfig {
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(ExchangeEnum.DELAY_EXCHANGE.getValue(), "x-delayed-message", true, false, args);
    }
    /**
     * 延迟消息队列
     * @return
     */
    @Bean
    public Queue delayQueue() {
        return new Queue(QueueEnum.TEST_DELAY.getName(), true);
    }
    @Bean
    public Binding deplyBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(QueueEnum.TEST_DELAY.getRoutingKey()).noargs();
    }
}


@Getter
public enum ExchangeEnum {
    DELAY_EXCHANGE("test.deply.exchange");
    private String value;
    ExchangeEnum(String value) {
        this.value = value;
    }
}


@Getter
public enum QueueEnum {
    /**
     * delay
     */
    TEST_DELAY("test.delay.queue", "delay");
    /**
     * 队列名称
     */
    private String name;
    /**
     * 队列路由键
     */
    private String routingKey;
    QueueEnum(String name, String routingKey) {
        this.name = name;
        this.routingKey = routingKey;
    }
}


延迟队列生产者服务:

@Component
@Slf4j
public class DeplyProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void send(String msg, int delayTime) {
        log.info("msg= " + msg + ".delayTime" + delayTime);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDelay(delayTime);
        Message message = new Message(msg.getBytes(), messageProperties);
        rabbitTemplate.send(ExchangeEnum.DELAY_EXCHANGE.getValue(), QueueEnum.TEST_DELAY.getRoutingKey(), message);
    }
}

单元测试:

@Test
public void sendDeplyMsgTest() {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String currentTime = sdf.format(new Date());
    log.info("发送测试消息的时间:" +currentTime );
    deplyProducer.send(currentTime + "发送一个测试消息,延迟10秒", 10000);//10秒
    deplyProducer.send(currentTime + "发送一个测试消息,延迟20秒", 20000);//2秒
    deplyProducer.send(currentTime + "发送一个测试消息,延迟30秒", 30000);//1秒
}


消费者:


@Component
@RabbitListener(queues = "test.delay.queue")
@Slf4j
public class DeplyConsumer {
    @RabbitHandler
    public void onMessage(byte[] message,
                          @Headers Map<String, Object> headers,
                          Channel channel) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        //log.info("收到延时消息时间:" + sdf.format(new Date()) + " Delay sent.");
        log.info(sdf.format(new Date())+"接收到延时消息:" + new String(message));
    }
}


执行测试结果:

13.png

执行消息发送代码的时候,不会立刻把消息推送到对应的队列中。

只要到了对应的时候,才会将消息推送到队列里面。


可以在执行单元测试的时候,通过管理界面,观察队列中消息的增长:

每隔10s,增加1条。

12.png

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
负载均衡 监控 Java
Spring Cloud Gateway 全解析:路由配置、断言规则与过滤器实战指南
本文详细介绍了 Spring Cloud Gateway 的核心功能与实践配置。首先讲解了网关模块的创建流程,包括依赖引入(gateway、nacos 服务发现、负载均衡)、端口与服务发现配置,以及路由规则的设置(需注意路径前缀重复与优先级 order)。接着深入解析路由断言,涵盖 After、Before、Path 等 12 种内置断言的参数、作用及配置示例,并说明了自定义断言的实现方法。随后重点阐述过滤器机制,区分路由过滤器(如 AddRequestHeader、RewritePath、RequestRateLimiter 等)与全局过滤器的作用范围与配置方式,提
Spring Cloud Gateway 全解析:路由配置、断言规则与过滤器实战指南
|
1月前
|
监控 Cloud Native Java
Spring Boot 3.x 微服务架构实战指南
🌟蒋星熠Jaxonic,技术宇宙中的星际旅人。深耕Spring Boot 3.x与微服务架构,探索云原生、性能优化与高可用系统设计。以代码为笔,在二进制星河中谱写极客诗篇。关注我,共赴技术星辰大海!(238字)
Spring Boot 3.x 微服务架构实战指南
|
28天前
|
XML Java 测试技术
《深入理解Spring》:IoC容器核心原理与实战
Spring IoC通过控制反转与依赖注入实现对象间的解耦,由容器统一管理Bean的生命周期与依赖关系。支持XML、注解和Java配置三种方式,结合作用域、条件化配置与循环依赖处理等机制,提升应用的可维护性与可测试性,是现代Java开发的核心基石。
|
2月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
198 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
2月前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
170 2
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
3月前
|
人工智能 监控 安全
如何快速上手【Spring AOP】?核心应用实战(上篇)
哈喽大家好吖~欢迎来到Spring AOP系列教程的上篇 - 应用篇。在本篇,我们将专注于Spring AOP的实际应用,通过具体的代码示例和场景分析,帮助大家掌握AOP的使用方法和技巧。而在后续的下篇中,我们将深入探讨Spring AOP的实现原理和底层机制。 AOP(Aspect-Oriented Programming,面向切面编程)是Spring框架中的核心特性之一,它能够帮助我们解决横切关注点(如日志记录、性能统计、安全控制、事务管理等)的问题,提高代码的模块化程度和复用性。
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
413 0
|
消息中间件 NoSQL 关系型数据库
【Docker安装软件,一篇就够了】Docker安装,Docker安装Mysql8.0、Redis、RabbitMQ及常用命令(持续更新)
【Docker安装软件,一篇就够了】Docker安装,Docker安装Mysql8.0、Redis、RabbitMQ及常用命令(持续更新)
1108 0
|
消息中间件 中间件 微服务
RabbitMQ 入门简介及安装
RabbitMQ 入门简介及安装
295 0