RabbitMQ没有延时队列?我就教你一招,玩转延时队列

简介: 延时队列:顾名思义,是一个用于做消息延时消费的队列。但是它也是一个普通队列,所以它具备普通队列的特性,相比之下,延时的特性就是它最大的特点。所谓的延时就是将我们需要的消息,延迟多久之后被消费。普通队列是即时消费的,延时队列是根据延时时间,多久之后才能消费的。

什么是延时队列

延时队列:顾名思义,是一个用于做消息延时消费的队列。但是它也是一个普通队列,所以它具备普通队列的特性,相比之下,延时的特性就是它最大的特点。所谓的延时就是将我们需要的消息,延迟多久之后被消费。普通队列是即时消费的,延时队列是根据延时时间,多久之后才能消费的。

网络异常,图片无法展示
|

延时队列使用场景

  • 订单在十分钟之内未支付则自动取消。
  • 会员续费的定时推送
  • 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
  • 优惠券过期提醒

核心的应用内容基本都是基于需要设定过期时间的

RabbitMQ如何实现延时队列

  • 方式1、通过RabbitMQ的高级特性TTL和配合死信队列
  • 方式2、安装rabbitmq_delayed_message_exchange插件

RabbitMQ中的高级特性TTL

TTL是什么呢?TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒,为什么延时队列要介绍它?TTL就是一种消息过期策略。给我们的消息做过期处理,当消息在队列中存活了指定时候之后,该队列就会将这个消息直接丢弃。在RabbitMQ中并没有直接实现好的延时队列,我们可以使用TTL这种高级特性,然后配合死信队列,即可实现延时队列的功能。

那么,如何设置这个TTL值呢?有两种方式,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下: 方式一:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

使用这种方式,消息被设定TTL,一旦消息过期,就会被队列丢弃

方式二:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

使用这种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。

另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

RabbitMQ到底怎么实现延时队列

  • 步骤一:创建一个正常的队列,指定消息过期时间,并且指定消息过期后需要投递的死信交换器和死信交换队列。
  • 步骤二:创建死信队列和死信交换器

RabbitMQ实现延时队列实例

package com.example.demo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
 * @author echo
 * @date 2021-01-14 14:35
 */
public class TopicDealProductTest {
    /**
     * 延时队列交换机
     */
    private static final String DIRECT_EXCHANGE_DELAY = "dir_exchange_delay";
    /**
     * 死信队列交换机
     */
    private static final String DIRECT_EXCHANGE_DEAD = "dir_exchange_dead";
    /**
     * 延时队列
     */
    private static final String DIRECT_QUEUE_DELAY = "dir.queue.delay";
    /**
     * 死信队列
     */
    private static final String DIRECT_QUEUE_DEAD = "dir.queue.dead";
    /**
     * 延时队列ROUTING_KEY
     */
    private static final String DIRECT_DELAY_ROUTING_KEY = "delay.queue.routingKey";
    /**
     * 延时队列ROUTING_KEY
     */
    private static final String DIRECT_DEAD_ROUTING_KEY = "dead.queue.routingKey";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = createConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        sendMsg(channel);
        Thread.sleep(10000);
        closeConnection(connection, channel);
    }
    private static void sendMsg(Channel channel) throws IOException {
        // 创建延时队列和延时交换器
        channel.exchangeDeclare(DIRECT_EXCHANGE_DELAY, BuiltinExchangeType.DIRECT);
        Map<String, Object> map = new HashMap<>(16);
        // 在延时交换器上指定死信交换器
        map.put("x-dead-letter-exchange", DIRECT_EXCHANGE_DEAD);
        // 在延时交换器上指定死信队列的routing-key
        map.put("x-dead-letter-routing-key", DIRECT_DEAD_ROUTING_KEY);
        // 设定延时队列的延长时长 10s
        map.put("x-message-ttl", 10000);
        // 创建延时队列
        channel.queueDeclare(DIRECT_QUEUE_DELAY, true, false, false, map);
        // 在延时交换器上绑定延时队列
        channel.queueBind(DIRECT_QUEUE_DELAY, DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY);
        // 创建死信队列和死信交换器
        channel.exchangeDeclare(DIRECT_EXCHANGE_DEAD, BuiltinExchangeType.TOPIC, true, false, null);
        // 创建死信队列
        channel.queueDeclare(DIRECT_QUEUE_DEAD, true, false, false, null);
        // 在死信交换器上绑定死信队列
        channel.queueBind(DIRECT_QUEUE_DEAD, DIRECT_EXCHANGE_DEAD, DIRECT_DEAD_ROUTING_KEY);
        channel.basicPublish(DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY, null, "hello world".getBytes());
    }
    private static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException {
        // 关闭资源
        channel.close();
        connection.close();
    }
    private static Connection createConnection() throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ的链接参数
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("echo");
        factory.setPassword("123456");
        // 和RabbitMQ建立一个链接
        return factory.newConnection();
    }
}

到这里,其实我们不难发现,我们无非是利用了TTL这个特性,让消息在过期的时候丢弃到指定队列,死信队列其实也是一个普通队列。

执行之后,我们来看看结果,在Exchange里面,我们创建了两个交换器和两个队列,但是两个队列和交换器还是有区别的,我们来看图片

网络异常,图片无法展示
|

我们可以看到两个队列的Features标志是不一样的

  • TTL: 消息在队列中的过期时间
  • DLX: 该队列绑定了死信交换器
  • DLK: 该队列绑定的死信队列的ROUTING_KEY

在我们执行完成之后,我们可以看到,消息先被投递到了delay,该队列里面的消息,到达过期时间之后就被投递到了dead队列中去了。

那么我们上面介绍了TTL和设置AMQP.BasicProperties,这两种有一定的区别,前一个是设置队列消息过期时间,后一个是设定每条消息的过期时间。那他们的区别在哪里?

设置每条消息和设置TTL的区别

其实这两种方式的区别就在于怎么判断该消息是否要被丢弃。TTL设定的队列,只要消息到达过期时间,立马就会将消息丢弃。如果是后者,可能我们队列里面有很多的消息,然后每条消息的过期时间又不一致,这个时候,如果队列出口处堵了很多没有设定过期时间的消息又不被消费的时候,队列后面的消息及时设定了过期时间也不会被丢弃,只有在设定了过期时间的消息到了队列该消费的位置,才会判定

怎么使用AMQP.BasicProperties?

package com.example.demo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
 * @author echo
 * @date 2021-01-14 14:35
 */
public class TopicDealProductTest {
    /**
     * 延时队列交换机
     */
    private static final String DIRECT_EXCHANGE_DELAY = "dir_exchange_delay";
    /**
     * 死信队列交换机
     */
    private static final String DIRECT_EXCHANGE_DEAD = "dir_exchange_dead";
    /**
     * 延时队列
     */
    private static final String DIRECT_QUEUE_DELAY = "dir.queue.delay";
    /**
     * 死信队列
     */
    private static final String DIRECT_QUEUE_DEAD = "dir.queue.dead";
    /**
     * 延时队列ROUTING_KEY
     */
    private static final String DIRECT_DELAY_ROUTING_KEY = "delay.queue.routingKey";
    /**
     * 延时队列ROUTING_KEY
     */
    private static final String DIRECT_DEAD_ROUTING_KEY = "dead.queue.routingKey";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = createConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        sendMsg(channel);
        Thread.sleep(10000);
        closeConnection(connection, channel);
    }
    private static void sendMsg(Channel channel) throws IOException {
        // 创建延时队列和延时交换器
        channel.exchangeDeclare(DIRECT_EXCHANGE_DELAY, BuiltinExchangeType.DIRECT);
        Map<String, Object> map = new HashMap<>(16);
        // 在延时交换器上指定死信交换器
        map.put("x-dead-letter-exchange", DIRECT_EXCHANGE_DEAD);
        map.put("x-dead-letter-routing-key", DIRECT_DEAD_ROUTING_KEY);
        // 设定延时队列的延长时长 10s
//        map.put("x-message-ttl", 10000);
        // 创建延时队列
        channel.queueDeclare(DIRECT_QUEUE_DELAY, true, false, false, map);
        // 在延时交换器上绑定延时队列
        channel.queueBind(DIRECT_QUEUE_DELAY, DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY);
        // 创建死信队列和死信交换器
        channel.exchangeDeclare(DIRECT_EXCHANGE_DEAD, BuiltinExchangeType.TOPIC, true, false, null);
        // 创建死信队列
        channel.queueDeclare(DIRECT_QUEUE_DEAD, true, false, false, null);
        // 在死信交换器上绑定死信队列
        channel.queueBind(DIRECT_QUEUE_DEAD, DIRECT_EXCHANGE_DEAD, DIRECT_DEAD_ROUTING_KEY);
        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        builder.expiration("10000");
        AMQP.BasicProperties properties = builder.build();
        channel.basicPublish(DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY, false, properties,  "hello world".getBytes());
    }
    private static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException {
        // 关闭资源
        channel.close();
        connection.close();
    }
    private static Connection createConnection() throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ的链接参数
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("echo");
        factory.setPassword("123456");
        // 和RabbitMQ建立一个链接
        return factory.newConnection();
    }
}

我们运行完成成之后,可以看到和我们之前那一种方式的效果是一样的

网络异常,图片无法展示
|

两种设定过期时间的方式其实区别就在于一个统一设定了过期时间,一个指定每条过期时间。但是并不影响我们延时队列的实现,那我们怎么选择呢?

怎么选择TTL设定方式?

根据两种方式的特性来选定使用场景才是最合理的。我们如果用来做延时队列的,想将延时队列的特性应用到实际场景的,并且对实时性要求比较高的,建议选择第一种方式。

总结

延时队列的实现并不难,关键是我们要知道他的一个原理,了解RabbitMQ他的TTL和死信对了。掌握了它的这些特性之后,我们就可以很好的应用延时队列。延时队列在工作中对我们的帮助也非常大,不过RabbiTMQ没有原生延时队列,我们用这种方式实现了它并不意味着我们一定要选择它。其实还有很多的方式,比如Java中的DelayQueu、kafka的时间轮等。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件
RabbitMQ的死信队列和延时队列
RabbitMQ的死信队列和延时队列
|
消息中间件
RabbitMQ延时队列插件rabbitmq_delayed_message_exchange-3.8.0.ez,有需要的朋友可自取
RabbitMQ延时队列插件rabbitmq_delayed_message_exchange-3.8.0.ez,有需要的朋友可自取
165 0
|
消息中间件 运维 Java
rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景
rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景
|
消息中间件 Java Kafka
15、RabbitMQ没有延时队列?学会这一招玩转延时队列
15、RabbitMQ没有延时队列?学会这一招玩转延时队列
245 0
15、RabbitMQ没有延时队列?学会这一招玩转延时队列
|
消息中间件 PHP 开发者
《阿里云栖开发者沙龙PHP技术专场-RabbitMQ 的延时队列和镜像队列原理与实战-钱文品》电子版地址
阿里云栖开发者沙龙PHP技术专场-RabbitMQ 的延时队列和镜像队列原理与实战-钱文品
172 0
《阿里云栖开发者沙龙PHP技术专场-RabbitMQ 的延时队列和镜像队列原理与实战-钱文品》电子版地址
|
消息中间件 Java 数据库
RabbitMQ延时队列应用场景
RabbitMQ延时队列应用场景
|
消息中间件 JSON 数据库
rabbitMQ延时队列与TTL和DLX、延迟队列的相关介绍
TTL是Time To Live的缩写, 也就是生存时间。 RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。 如果两种方式一起使用消息的TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。 默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。
|
消息中间件 JSON 数据库
rabbitMQ延时队列与TTL以及DLX和死信队列简单介绍
TTL是Time To Live的缩写, 也就是生存时间。 RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。 如果两种方式一起使用消息的TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。 默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。
|
消息中间件 Ubuntu Java
Docker下RabbitMQ延时队列实战两部曲之二:细说开发
在SpringBoot框架下进行RabbitMQ开发,并且在Docker环境部署和运行
133 0
Docker下RabbitMQ延时队列实战两部曲之二:细说开发
|
消息中间件 Ubuntu Java
Docker下RabbitMQ延时队列实战两部曲之一:极速体验
向RabbitMQ发出消息后,有时我们希望消费方不要立即消费,这时候延时队列就派上用场了,本篇就来快速体验其效果
184 0
Docker下RabbitMQ延时队列实战两部曲之一:极速体验