Spring boot + RabbitMQ延迟队列实战

简介: Spring boot + RabbitMQ延迟队列实战

一、背景


延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。

那么,为什么需要延迟消费呢?我们来看以下的场景:


订单业务: 在电商/点餐中,都有下单后 30 分钟内没有付款,就自动取消订单。

短信通知: 下单成功后 60s 之后给用户发送短信通知。

失败重试: 业务操作失败后,间隔一定的时间进行失败重试。


传统订单处理:


采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。


当然传统的手法还可以再优化一下,即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,然后再做其他的业务操作。


但是通过定时任务来执行,实时性始终不是很好。


二、实现方案


方案一:通过死信队列实现


参考:https://www.cnblogs.com/xmf3628/p/12097101.html

16.png


方案二:通过延迟路由插件来实现rabbitmq-delayed-message-exchange


参考:https://www.cnblogs.com/wintercloud/p/10877399.html


这里演示通过插件来实现


三、插件安装


下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0


RabbitMQ的有些插件没有集成在初始的安装中,它们需要额外安装,这些文件的后缀为.ez,安装时需要将.ez文件拷贝到安装的插件目录。以下是不同系统中默认安装的插件目录路径:

15.png


//查看已安装的插件
rabbitmq-plugins list
//启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
//重启服务
service rabbitmq-server restart
//再次查看,插件是否生效
rabbitmq-plugins list


说明插件安装成功,并成功启用。



四、机制解释


安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。

14.png



五、代码实战


1、依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.44</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>


2、队列连接信息

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/

3、生产者代码


声明延迟队列和延迟队列的交换器,并建立绑定关系

@Configuration
public class TestDelayQueueConfig {
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(ExchangeEnum.DELAY_EXCHANGE.getValue(), "x-delayed-message", true, false, args);
    }
    /**
     * 延迟消息队列
     * @return
     */
    @Bean
    public Queue delayQueue() {
        return new Queue(QueueEnum.TEST_DELAY.getName(), true);
    }
    @Bean
    public Binding deplyBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(QueueEnum.TEST_DELAY.getRoutingKey()).noargs();
    }
}


@Getter
public enum ExchangeEnum {
    DELAY_EXCHANGE("test.deply.exchange");
    private String value;
    ExchangeEnum(String value) {
        this.value = value;
    }
}


@Getter
public enum QueueEnum {
    /**
     * delay
     */
    TEST_DELAY("test.delay.queue", "delay");
    /**
     * 队列名称
     */
    private String name;
    /**
     * 队列路由键
     */
    private String routingKey;
    QueueEnum(String name, String routingKey) {
        this.name = name;
        this.routingKey = routingKey;
    }
}


延迟队列生产者服务:

@Component
@Slf4j
public class DeplyProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void send(String msg, int delayTime) {
        log.info("msg= " + msg + ".delayTime" + delayTime);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDelay(delayTime);
        Message message = new Message(msg.getBytes(), messageProperties);
        rabbitTemplate.send(ExchangeEnum.DELAY_EXCHANGE.getValue(), QueueEnum.TEST_DELAY.getRoutingKey(), message);
    }
}

单元测试:

@Test
public void sendDeplyMsgTest() {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String currentTime = sdf.format(new Date());
    log.info("发送测试消息的时间:" +currentTime );
    deplyProducer.send(currentTime + "发送一个测试消息,延迟10秒", 10000);//10秒
    deplyProducer.send(currentTime + "发送一个测试消息,延迟20秒", 20000);//2秒
    deplyProducer.send(currentTime + "发送一个测试消息,延迟30秒", 30000);//1秒
}


消费者:


@Component
@RabbitListener(queues = "test.delay.queue")
@Slf4j
public class DeplyConsumer {
    @RabbitHandler
    public void onMessage(byte[] message,
                          @Headers Map<String, Object> headers,
                          Channel channel) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        //log.info("收到延时消息时间:" + sdf.format(new Date()) + " Delay sent.");
        log.info(sdf.format(new Date())+"接收到延时消息:" + new String(message));
    }
}


执行测试结果:

13.png

执行消息发送代码的时候,不会立刻把消息推送到对应的队列中。

只要到了对应的时候,才会将消息推送到队列里面。


可以在执行单元测试的时候,通过管理界面,观察队列中消息的增长:

每隔10s,增加1条。

12.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
10天前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
58 6
|
1月前
|
消息中间件 JSON Java
|
1月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
1月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
58 0
|
2月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
64 2
|
2月前
|
消息中间件 Kafka Apache
kafka vs rocketmq: 不要只顾着吞吐量而忘了延迟这个指标
这篇文章讨论了Apache RocketMQ和Kafka的对比,强调RocketMQ在低延迟、消息重试与追踪、海量Topic、多租户等方面进行了优化,特别是在小包非批量和大量分区场景下的吞吐量超越Kafka,适合电商和金融领域等高并发、高可靠和高可用场景。
69 0
|
3月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4天前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
6天前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
30 4
|
10天前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
45 4