03.RabbitMQ延迟队列

简介: 03.RabbitMQ延迟队列

03.RabbitMQ延迟队列



1.使用场景

“订单下单成功后,15分钟未支付自动取消”

1.1.传统处理超时订单

采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再优化一下,即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,然后再做其他的业务操作


1.2.rabbitMQ延时队列方案

一台普通的rabbitmq服务器单队列容纳千万级别的消息还是没什么压力的,而且rabbitmq集群扩展支持的也是非常好的, 并且队列中的消息是可以进行持久化,即使我们重启或者宕机也能保证数据不丢失


2.TTL和DLX

rabbitMQ中是没有延时队列的,也没有属性可以设置,只能通过死信交换机(DLX)和设置过期时间(TTL)结合起来实现延迟队列


2.1.TTL

TTL是Time To Live的缩写, 也就是生存时间。

RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。

如果两种方式一起使用消息的TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。

默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。

设置消息的过期时间用 x-message-ttl 参数实现,单位毫秒。
设置队列的过期时间用 x-expires 参数,单位毫秒,注意,不能设置为0。

消息:生产者 -> 交换机 消息在生产者制造消息的时候就开始计算了TTL  TTL=5
队列:生产者 -> 交换机 -> 路由键 -> 队列 当消息送达到队列的时候才开始计算TTL  TTL=10

2.2.DLX和死信队列

DLX即Dead-Letter-Exchange(死信交换机),它其实就是一个正常的交换机,能够与任何队列绑定。

死信队列是指队列(正常)上的消息(过期)变成死信后,能够发送到另外一个交换机(DLX),然后被路由到一个队列上,这个队列,就是死信队列

成为死信一般有以下几种情况:

消息被拒绝(basic.reject or basic.nack)且带requeue=false参数

消息的TTL-存活时间已经过期

队列长度限制被超越(队列满)


注1:如果队列上存在死信, RabbitMq会将死信消息投递到设置的DLX上去

注2:通过在队列里设置x-dead-letter-exchange参数来声明DLX,如果当前DLX是direct类型还要声明 x-dead-letter-routing-key参数来指定路由键,如果没有指定,则使用原队列的路由键


3.延迟队列

通过DLX和TTL模拟出延迟队列的功能,即,消息发送以后,不让消费者拿到,而是等待过期时间,变成死信后,发送给死信交换机再路由到死信队列进行消费


注1:延迟队列(即死信队列)产生流程

4. 开发步骤

4.1.config中添加DlxConfig

package com.zjzaki.rabbitmqprovider.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.logging.Handler;

/**
 * @Author zjzaki
 * @Package com.zjzaki.rabbitmqprovider.config
 * @Date 2023-09-06 20:53:31
 */
@Configuration
public class DlxConfig {

    public static String NORMAL_EXCHANGE = "normal_exchange";
    public static String NORMAL_QUEUE = "normal_queue";
    public static String NORMAL_ROUTING_KEY = "normal_routing_key";
    public static String DLX_EXCHANGE = "dlx_exchange";
    public static String DLX_QUEUE = "dlx_queue";
    public static String DLX_ROUTING_KEY = "normal_routing_key";

    @Bean
    public Queue normalQueue() {
        // ttl时间,当前队列绑定哪个交换机,绑定的交换机对应的routing_key
        HashMap<String, Object> arguments = new HashMap<>(16);
        //message在该队列queue的存活时间最大为10秒
        arguments.put("x-message-ttl", 10000);
        //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        arguments.put("x-dead-letter-exchange", DLX_EXCHANGE);
        //x-dead-letter-routing-key参数是给这个DLX指定路由键
        arguments.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
        // return new Queue(NORMAL_QUEUE);
        return new Queue(NORMAL_QUEUE, true, false, false, arguments);
    }

    @Bean
    public DirectExchange normalEXCHANGE() {
        return new DirectExchange(NORMAL_EXCHANGE);
    }

    @Bean
    public Binding normalBinding() {
        return BindingBuilder.bind(normalQueue()).to(normalEXCHANGE()).with(NORMAL_ROUTING_KEY);
    }

    @Bean
    public Queue dlxQueue() {
        return new Queue(DLX_QUEUE);
    }

    @Bean
    public DirectExchange dlxEXCHANGE() {
        return new DirectExchange(DLX_EXCHANGE);
    }

    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue()).to(dlxEXCHANGE()).with(DLX_ROUTING_KEY);
    }
}

controller中添加代码

@RequestMapping("/dlx")
public String dlxMsg(String rk) {
    Map<Object, Object> map = new HashMap<>();
    map.put("msg", "此消息为[验证死信队列]路由过来的");
    map.put("time", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    rabbitTemplate.convertAndSend(DlxConfig.NORMAL_EXCHANGE, rk, map);
    return "dlx";
}

4.2.访问 http://localhost:8081/dlx?rk=normal_routing_key

4.3.rabbitmq-consumer添加DlxReceiver

package com.zjzaki.rabbitmqconsumer.config;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;

@Component
@RabbitListener(queues = {"dlx_queue"})
public class DlxReceiver {
    // @RabbitListener(queues = {"direct-queue"})
    @RabbitHandler
    public void handler(Map msg) {
        System.out.println("当前的时间为: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        
        System.out.println(msg);
    }
}

4.4.启动,访问 http://localhost:8081/dlx?rk=normal_routing_key

可以注意到间隔10秒后收到消息

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
33 1
|
6天前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
15 0
说说RabbitMQ延迟队列实现原理?
|
19天前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
20 1
|
19天前
|
消息中间件 SQL RocketMQ
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
21 1
|
22天前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ产品使用合集之是否支持任意时间延迟的消息
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 存储 监控
RabbitMQ 死信队列
RabbitMQ的死信队列(DLQ)是存储无法正常消费消息的特殊队列,常见于消息被拒绝、过期或队列满时。DLQ用于异常处理、任务调度和监控,通过绑定到普通队列自动路由死信消息。通过监听死信队列,可以对异常消息进行补偿和进一步处理,提升系统稳定性和可维护性。
26 1
|
27天前
|
消息中间件
RabbitMQ配置单活模式队列
RabbitMQ配置单活模式队列
28 0
|
1月前
|
消息中间件 存储 负载均衡
消息队列 MQ产品使用合集之如何排查是哪个队列导致的异常TPS增加
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
2月前
|
消息中间件 Java
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
65 1
|
22天前
|
消息中间件 测试技术 RocketMQ
消息队列 MQ产品使用合集之在异步发送消息函数sendMessage()中出现了错误,错误代码为-3,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。