Spring boot + RabbitMQ延迟队列实战

简介: Spring boot + RabbitMQ延迟队列实战

一、背景


延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。

那么,为什么需要延迟消费呢?我们来看以下的场景:


订单业务: 在电商/点餐中,都有下单后 30 分钟内没有付款,就自动取消订单。

短信通知: 下单成功后 60s 之后给用户发送短信通知。

失败重试: 业务操作失败后,间隔一定的时间进行失败重试。


传统订单处理:


采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。


当然传统的手法还可以再优化一下,即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,然后再做其他的业务操作。


但是通过定时任务来执行,实时性始终不是很好。


二、实现方案


方案一:通过死信队列实现


参考:https://www.cnblogs.com/xmf3628/p/12097101.html

16.png


方案二:通过延迟路由插件来实现rabbitmq-delayed-message-exchange


参考:https://www.cnblogs.com/wintercloud/p/10877399.html


这里演示通过插件来实现


三、插件安装


下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0


RabbitMQ的有些插件没有集成在初始的安装中,它们需要额外安装,这些文件的后缀为.ez,安装时需要将.ez文件拷贝到安装的插件目录。以下是不同系统中默认安装的插件目录路径:

15.png


//查看已安装的插件
rabbitmq-plugins list
//启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
//重启服务
service rabbitmq-server restart
//再次查看,插件是否生效
rabbitmq-plugins list


说明插件安装成功,并成功启用。



四、机制解释


安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。

14.png



五、代码实战


1、依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.44</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>


2、队列连接信息

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/

3、生产者代码


声明延迟队列和延迟队列的交换器,并建立绑定关系

@Configuration
public class TestDelayQueueConfig {
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(ExchangeEnum.DELAY_EXCHANGE.getValue(), "x-delayed-message", true, false, args);
    }
    /**
     * 延迟消息队列
     * @return
     */
    @Bean
    public Queue delayQueue() {
        return new Queue(QueueEnum.TEST_DELAY.getName(), true);
    }
    @Bean
    public Binding deplyBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(QueueEnum.TEST_DELAY.getRoutingKey()).noargs();
    }
}


@Getter
public enum ExchangeEnum {
    DELAY_EXCHANGE("test.deply.exchange");
    private String value;
    ExchangeEnum(String value) {
        this.value = value;
    }
}


@Getter
public enum QueueEnum {
    /**
     * delay
     */
    TEST_DELAY("test.delay.queue", "delay");
    /**
     * 队列名称
     */
    private String name;
    /**
     * 队列路由键
     */
    private String routingKey;
    QueueEnum(String name, String routingKey) {
        this.name = name;
        this.routingKey = routingKey;
    }
}


延迟队列生产者服务:

@Component
@Slf4j
public class DeplyProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void send(String msg, int delayTime) {
        log.info("msg= " + msg + ".delayTime" + delayTime);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDelay(delayTime);
        Message message = new Message(msg.getBytes(), messageProperties);
        rabbitTemplate.send(ExchangeEnum.DELAY_EXCHANGE.getValue(), QueueEnum.TEST_DELAY.getRoutingKey(), message);
    }
}

单元测试:

@Test
public void sendDeplyMsgTest() {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String currentTime = sdf.format(new Date());
    log.info("发送测试消息的时间:" +currentTime );
    deplyProducer.send(currentTime + "发送一个测试消息,延迟10秒", 10000);//10秒
    deplyProducer.send(currentTime + "发送一个测试消息,延迟20秒", 20000);//2秒
    deplyProducer.send(currentTime + "发送一个测试消息,延迟30秒", 30000);//1秒
}


消费者:


@Component
@RabbitListener(queues = "test.delay.queue")
@Slf4j
public class DeplyConsumer {
    @RabbitHandler
    public void onMessage(byte[] message,
                          @Headers Map<String, Object> headers,
                          Channel channel) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        //log.info("收到延时消息时间:" + sdf.format(new Date()) + " Delay sent.");
        log.info(sdf.format(new Date())+"接收到延时消息:" + new String(message));
    }
}


执行测试结果:

13.png

执行消息发送代码的时候,不会立刻把消息推送到对应的队列中。

只要到了对应的时候,才会将消息推送到队列里面。


可以在执行单元测试的时候,通过管理界面,观察队列中消息的增长:

每隔10s,增加1条。

12.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
13天前
|
编解码 NoSQL Java
使用Spring Boot + Redis 队列实现视频文件上传及FFmpeg转码的技术分享
【8月更文挑战第30天】在当前的互联网应用中,视频内容的处理与分发已成为不可或缺的一部分。对于视频平台而言,高效、稳定地处理用户上传的视频文件,并对其进行转码以适应不同设备的播放需求,是提升用户体验的关键。本文将围绕使用Spring Boot结合Redis队列技术来实现视频文件上传及FFmpeg转码的过程,分享一系列技术干货。
50 3
|
24天前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
14天前
|
Java API UED
【实战秘籍】Spring Boot开发者的福音:掌握网络防抖动,告别无效请求,提升用户体验!
【8月更文挑战第29天】网络防抖动技术能有效处理频繁触发的事件或请求,避免资源浪费,提升系统响应速度与用户体验。本文介绍如何在Spring Boot中实现防抖动,并提供代码示例。通过使用ScheduledExecutorService,可轻松实现延迟执行功能,确保仅在用户停止输入后才触发操作,大幅减少服务器负载。此外,还可利用`@Async`注解简化异步处理逻辑。防抖动是优化应用性能的关键策略,有助于打造高效稳定的软件系统。
29 2
|
14天前
|
JSON Java API
解码Spring Boot与JSON的完美融合:提升你的Web开发效率,实战技巧大公开!
【8月更文挑战第29天】Spring Boot作为Java开发的轻量级框架,通过`jackson`库提供了强大的JSON处理功能,简化了Web服务和数据交互的实现。本文通过代码示例介绍如何在Spring Boot中进行JSON序列化和反序列化操作,并展示了处理复杂JSON数据及创建RESTful API的方法,帮助开发者提高效率和应用性能。
45 0
|
14天前
|
SQL Java 数据库连接
Spring Boot联手MyBatis,打造开发利器:从入门到精通,实战教程带你飞越编程高峰!
【8月更文挑战第29天】Spring Boot与MyBatis分别是Java快速开发和持久层框架的优秀代表。本文通过整合Spring Boot与MyBatis,展示了如何在项目中添加相关依赖、配置数据源及MyBatis,并通过实战示例介绍了实体类、Mapper接口及Controller的创建过程。通过本文,你将学会如何利用这两款工具提高开发效率,实现数据的增删查改等复杂操作,为实际项目开发提供有力支持。
53 0
|
14天前
|
缓存 NoSQL Java
惊!Spring Boot遇上Redis,竟开启了一场缓存实战的革命!
【8月更文挑战第29天】在互联网时代,数据的高速读写至关重要。Spring Boot凭借简洁高效的特点广受开发者喜爱,而Redis作为高性能内存数据库,在缓存和消息队列领域表现出色。本文通过电商平台商品推荐系统的实战案例,详细介绍如何在Spring Boot项目中整合Redis,提升系统响应速度和用户体验。
41 0
|
14天前
|
Java 开发者 Spring
Spring Boot实战宝典:揭秘定时任务的幕后英雄,让业务处理如流水般顺畅,轻松驾驭时间管理艺术!
【8月更文挑战第29天】在现代应用开发中,定时任务如数据备份、报告生成等至关重要。Spring Boot作为流行的Java框架,凭借其强大的集成能力和简洁的配置方式,为开发者提供了高效的定时任务解决方案。本文详细介绍了如何在Spring Boot项目中启用定时任务支持、编写定时任务方法,并通过实战案例展示了其在业务场景中的应用,同时提供了注意事项以确保任务的正确执行。
28 0
|
23天前
|
SQL 前端开发 NoSQL
SpringBoot+Vue 实现图片验证码功能需求
这篇文章介绍了如何在SpringBoot+Vue项目中实现图片验证码功能,包括后端生成与校验验证码的方法以及前端展示验证码的实现步骤。
SpringBoot+Vue 实现图片验证码功能需求
|
22天前
|
JavaScript
SpringBoot+Vue+ElementUI 实现视频播放 轮播图效果
这篇文章介绍了如何在SpringBoot+Vue+ElementUI项目中使用vue-awesome-swiper插件实现视频播放轮播图效果,包括安装插件、引入项目和使用案例的步骤。
SpringBoot+Vue+ElementUI 实现视频播放 轮播图效果
|
22天前
|
JavaScript 前端开发 Java
SpringBoot + Vue 前端后分离项目精进版本
这篇文章详细介绍了一个基于SpringBoot + Vue的前后端分离项目的搭建过程,包括前端Vue项目的初始化、依赖安装、页面创建和路由配置,以及后端SpringBoot项目的依赖添加、配置文件修改、代码实现和跨域问题的解决,最后展示了项目运行效果。
SpringBoot + Vue 前端后分离项目精进版本