Spring boot + RabbitMQ延迟队列实战

简介: Spring boot + RabbitMQ延迟队列实战

一、背景


延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。

那么,为什么需要延迟消费呢?我们来看以下的场景:


订单业务: 在电商/点餐中,都有下单后 30 分钟内没有付款,就自动取消订单。

短信通知: 下单成功后 60s 之后给用户发送短信通知。

失败重试: 业务操作失败后,间隔一定的时间进行失败重试。


传统订单处理:


采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。


当然传统的手法还可以再优化一下,即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,然后再做其他的业务操作。


但是通过定时任务来执行,实时性始终不是很好。


二、实现方案


方案一:通过死信队列实现


参考:https://www.cnblogs.com/xmf3628/p/12097101.html

16.png


方案二:通过延迟路由插件来实现rabbitmq-delayed-message-exchange


参考:https://www.cnblogs.com/wintercloud/p/10877399.html


这里演示通过插件来实现


三、插件安装


下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0


RabbitMQ的有些插件没有集成在初始的安装中,它们需要额外安装,这些文件的后缀为.ez,安装时需要将.ez文件拷贝到安装的插件目录。以下是不同系统中默认安装的插件目录路径:

15.png


//查看已安装的插件
rabbitmq-plugins list
//启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
//重启服务
service rabbitmq-server restart
//再次查看,插件是否生效
rabbitmq-plugins list


说明插件安装成功,并成功启用。



四、机制解释


安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。

14.png



五、代码实战


1、依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.44</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>


2、队列连接信息

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/

3、生产者代码


声明延迟队列和延迟队列的交换器,并建立绑定关系

@Configuration
public class TestDelayQueueConfig {
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(ExchangeEnum.DELAY_EXCHANGE.getValue(), "x-delayed-message", true, false, args);
    }
    /**
     * 延迟消息队列
     * @return
     */
    @Bean
    public Queue delayQueue() {
        return new Queue(QueueEnum.TEST_DELAY.getName(), true);
    }
    @Bean
    public Binding deplyBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(QueueEnum.TEST_DELAY.getRoutingKey()).noargs();
    }
}


@Getter
public enum ExchangeEnum {
    DELAY_EXCHANGE("test.deply.exchange");
    private String value;
    ExchangeEnum(String value) {
        this.value = value;
    }
}


@Getter
public enum QueueEnum {
    /**
     * delay
     */
    TEST_DELAY("test.delay.queue", "delay");
    /**
     * 队列名称
     */
    private String name;
    /**
     * 队列路由键
     */
    private String routingKey;
    QueueEnum(String name, String routingKey) {
        this.name = name;
        this.routingKey = routingKey;
    }
}


延迟队列生产者服务:

@Component
@Slf4j
public class DeplyProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void send(String msg, int delayTime) {
        log.info("msg= " + msg + ".delayTime" + delayTime);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDelay(delayTime);
        Message message = new Message(msg.getBytes(), messageProperties);
        rabbitTemplate.send(ExchangeEnum.DELAY_EXCHANGE.getValue(), QueueEnum.TEST_DELAY.getRoutingKey(), message);
    }
}

单元测试:

@Test
public void sendDeplyMsgTest() {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String currentTime = sdf.format(new Date());
    log.info("发送测试消息的时间:" +currentTime );
    deplyProducer.send(currentTime + "发送一个测试消息,延迟10秒", 10000);//10秒
    deplyProducer.send(currentTime + "发送一个测试消息,延迟20秒", 20000);//2秒
    deplyProducer.send(currentTime + "发送一个测试消息,延迟30秒", 30000);//1秒
}


消费者:


@Component
@RabbitListener(queues = "test.delay.queue")
@Slf4j
public class DeplyConsumer {
    @RabbitHandler
    public void onMessage(byte[] message,
                          @Headers Map<String, Object> headers,
                          Channel channel) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        //log.info("收到延时消息时间:" + sdf.format(new Date()) + " Delay sent.");
        log.info(sdf.format(new Date())+"接收到延时消息:" + new String(message));
    }
}


执行测试结果:

13.png

执行消息发送代码的时候,不会立刻把消息推送到对应的队列中。

只要到了对应的时候,才会将消息推送到队列里面。


可以在执行单元测试的时候,通过管理界面,观察队列中消息的增长:

每隔10s,增加1条。

12.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
36 6
|
3月前
|
自然语言处理 Java API
Spring Boot 接入大模型实战:通义千问赋能智能应用快速构建
【10月更文挑战第23天】在人工智能(AI)技术飞速发展的今天,大模型如通义千问(阿里云推出的生成式对话引擎)等已成为推动智能应用创新的重要力量。然而,对于许多开发者而言,如何高效、便捷地接入这些大模型并构建出功能丰富的智能应用仍是一个挑战。
343 6
|
3月前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
173 2
|
3月前
|
Java 数据库连接 Spring
【2021Spring编程实战笔记】Spring开发分享~(下)
【2021Spring编程实战笔记】Spring开发分享~(下)
37 1
|
3月前
|
XML Java 数据格式
Spring IOC容器的深度解析及实战应用
【10月更文挑战第14天】在软件工程中,随着系统规模的扩大,对象间的依赖关系变得越来越复杂,这导致了系统的高耦合度,增加了开发和维护的难度。为解决这一问题,Michael Mattson在1996年提出了IOC(Inversion of Control,控制反转)理论,旨在降低对象间的耦合度,提高系统的灵活性和可维护性。Spring框架正是基于这一理论,通过IOC容器实现了对象间的依赖注入和生命周期管理。
86 0
|
3月前
|
XML Java 数据库连接
【2020Spring编程实战笔记】Spring开发分享~(上)
【2020Spring编程实战笔记】Spring开发分享~
61 0
|
4月前
|
消息中间件 JSON Java
|
5月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
78 0
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
243 0
|
消息中间件 中间件 微服务
RabbitMQ 入门简介及安装
RabbitMQ 入门简介及安装
130 0