RabbitMQ(二)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RabbitMQ(二)

五、交换机


5.1 Exchange


路由 ( routing )就是通过互联的 网络 把 信息 从源地址传输到目的地址的活动


RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。


相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。


5.1.2 交换机类型


共有以下类型:


直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)


5.1.3 无名交换机


前面我们其实都用到了交换机(默认),但它们属于交换机的无名类型,我们通过空字符串(“”)进行标识。


5.2 临时队列


每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。


创建临时队列的方式如下:


String queueName = channel.queueDeclare().getQueue();


44.png


5.3 绑定(binding)

binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系


5.3.1 实现绑定


45.png

和我们图中写的内容一致,生产者把消息发送给交换机。交换机根据Routingkey将消息发送给对应的队列,最后队列把消息传递给消费者


46.png


5.4 Fanout


Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。系统中默认有些 exchange 类型


47.png


5.4.1 Fanout 实战

我们写两个logs接受消息,看看交换机处于fanout状态下消息的处理情况


48.png


49.png


ReceiveLogs01 将接收到的消息打印在控制台

package com.caq.rabbitmq.five;import com.caq.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;/** * 负责消息的接受 */public class ReceiveLogs01 {    private static final String EXCHANGE_NAME = "logs";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();        //声明一个交换机        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");        //声明一个队列,临时队列(连接的时候存在,不连接的时候自动存在,名称随机)        String queue = channel.queueDeclare().getQueue();        /**         * 绑定交换机与队列         */        channel.queueBind(queue, EXCHANGE_NAME, "");        System.out.println("等待接受消息,把接收到的消息打印在屏幕上.....");        //接受消息        DeliverCallback deliverCallback = (consumerTag, message) -> {            System.out.println("ReceiveLogs01控制台打印接受到的消息" + new String(message.getBody()));        };        //接受消息        channel.basicConsume(queue, true, deliverCallback, consumerTag -> {});    }}

ReceiveLogs02 将接收到的消息存储在磁盘


和上面类似


EmitLog 发送消息给两个消费者接收


package com.caq.rabbitmq.five;import com.caq.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import java.util.Scanner;/** * 扇出就是发布订阅 */public class EmitLog {    //交换机的名字    private static final String EXCHANGE_NAME = "logs";    //发送消息    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();        //声明一个交换机        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");        Scanner scanner = new Scanner(System.in);        while (scanner.hasNext()) {            String message = scanner.next();            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());            System.out.println("生产者发出消息" + message);        }    }}

处理的结果


50.png


51.png


52.png


5.5 Direct exchange


回顾一下什么是 bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解: 队列只对它绑定的交换机的消息感兴趣。


绑定用参数:routingKey 来表示也可称该参数为 binding key,

创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);

绑定之后的意义由其交换类型决定。


5.5.1 Direct exchange介绍


前面我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性—它只能进行无意识的广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey 队列中去。


53.png


5.5.2 多重绑定


54.png

当然如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多


5.5.3 实战


55.png

交换机绑定关系如下:


不要桥接校园网!!!


56.png


57.png


只有绑定了info的队列才会消费消息


58.png


只有绑定了error的队列才会消费消息


59.png


60.png


5.6 Topics


5.6.1 Topic 的介绍

就很像我们之前学的正则表达式一样,可以匹配多个


尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性——比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用 topic 类型


发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词


比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。


当然这个单词列表最多不能超过 255 个字节。


在这个规则列表中,其中有两个替换符是大家需要注意的:


*(星号)可以代替一个单词

#(井号)可以替代零个或多个单词


5.6.2 Topic 匹配案例


下图绑定关系如下


61.png


Q1–>绑定的是


中间带 orange 带 3 个单词的字符串 (*.orange.*)

Q2–>绑定的是


最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)

第一个单词是 lazy 的多个单词 (lazy.#)

上图是一个队列绑定关系图,我们来看看他们之间数据接收

例子

说明
quick.orange.rabbit 被队列 Q1Q2 接收到
azy.orange.elephant 被队列 Q1Q2 接收到
quick.orange.fox 被队列 Q1 接收到
lazy.brown.fox 被队列 Q2 接收到
lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配 Q2


注意:


当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了

如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了


5.6.3 Topic 实战

62.png


生产者代码


package com.caq.rabbitmq.topiclogs;
import com.caq.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
/**
 * 生产者
 */
public class EmitLogTopic {
    //交换机的名字
    private static final String EXCHANGE_NAME = "topic_logs";
    //发送消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        Map<String, String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
        bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
        /**
         * 返回此映射中包含的映射的 Set 视图。
         * 注意:Set 视图意思是 HashMap 中所有的键值对都被看作是一个 set 集合。
         */
        for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
            String routingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            System.out.println("生产者发出消息"+message);
        }
    }
}


消费者代码


package com.caq.rabbitmq.topiclogs;
import com.caq.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic01 {
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个交换机,通过枚举的形式指定交换机的类型
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //声明一个队列
        String queueName = "Q1";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
        System.out.println("等待接受消息");
        //接受消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLogs02控制台打印接受到的消息" + new String(message.getBody()));
            System.out.println("接受队列" + queueName + "绑定键" + message.getEnvelope().getRoutingKey());
        };
        //接受消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}


消费者2代码


package com.caq.rabbitmq.topiclogs;
import com.caq.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic02 {
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个交换机,通过枚举的形式指定交换机的类型
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //声明一个队列
        String queueName = "Q2";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
        System.out.println("等待接受消息");
        //接受消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLogs02控制台打印接受到的消息" + new String(message.getBody()));
            System.out.println("接受队列" + queueName + "绑定键" + message.getEnvelope().getRoutingKey());
        };
        //接受消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}


63.png


64.png


65.png


六、死信队列


6.1 死信的概念


死信,顾名思义就是无法被消费的消息字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。


应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效


6.2 死信的来源


消息 TTL 过期


TTL是Time To Live的缩写, 也就是生存时间


队列达到最大长度


队列满了,无法再添加数据到 mq 中


消息被拒绝


(basic.reject 或 basic.nack) 并且 requeue=false.


6.3 死信实战


6.3.1 代码架构图

66.png


6.3.2 消息TTL过期

生产者代码


package com.caq.rabbitmq.deadqueue;
import com.caq.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.AMQBasicProperties;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
 * 死信队列生产者
 */
public class Producer {
    //交换机的名字
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //发送消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //死信消息,设置TTL时间
        AMQP.BasicProperties properties =
                new AMQP.BasicProperties()
                        .builder().expiration("10000").build();
        for (int i = 0; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zs", properties,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发送消息:"+message);
        }
    }
}


消费者 C1 代码:


package com.caq.rabbitmq.deadqueue;
import com.caq.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.HashMap;
import java.util.Map;
public class Consumer01 {
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static final String DEAD_EXCHANGE = "dead_exchange";
    public static final String NORMAL_QUEUE = "normal_queue";
    public static final String DEAD_QUEUE = "dead_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
//        声明死信和普通交换机 类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//        声明普通队列
        Map<String, Object> arguments = new HashMap<>();
        //过期时间
//        正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//        设置死信routingkey
        arguments.put("x-dead-letter-routing-key", "lisi");
//        设置队列长度
//        arguments.put("x-max-length", 6);
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
//        声明死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        //绑定普通交换机与普通队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zs");
        //绑定死信交换机与死信队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        System.out.println("等待接受消息..........");
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody());
            System.out.println("Consumer01 接收到消息"+message);
        };
        //开启手动应答
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {
        });
    }
}


启动 C1 ,之后关闭消费者,模拟其接收不到消息。再启动 Producer


67.png


消费者 C2 代码:


以上步骤完成后,启动 C2 消费者,它消费死信队列里面的消息


package com.caq.rabbitmq.deadqueue;
import com.caq.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.HashMap;
import java.util.Map;
/**
 * 死信队列
 * 消费者2
 */
public class Consumer02 {
    public static final String NORMAL_QUEUE = "dead_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("等待接受消息..........");
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer02接受的消息是:" + new String(message.getBody()));
        };
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {
        });
    }
}

68.png


6.3.3 队列达到最大长度


消息生产者代码去掉 TTL 属性


AMQP.BasicProperties properties =

new AMQP.BasicProperties()

.builder().expiration(“10000”).build();


消息生产者代码去掉 TTL 属性


package com.caq.rabbitmq.deadqueue;
import com.caq.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.AMQBasicProperties;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
 * 死信队列生产者
 */
public class Producer {
    //交换机的名字
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //发送消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        for (int i = 0; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zs", null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发送消息:"+message);
        }
    }
}


C1 消费者修改以下代码**(启动之后关闭该消费者 模拟其接收不到消息)**


//设置正常队列的长度限制,例如发10个,4个则为死信

params.put(“x-max-length”,6);


注意此时需要把原先队列删除 因为参数改变了


package com.caq.rabbitmq.deadqueue;
import com.caq.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.HashMap;
import java.util.Map;
public class Consumer01 {
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static final String DEAD_EXCHANGE = "dead_exchange";
    public static final String NORMAL_QUEUE = "normal_queue";
    public static final String DEAD_QUEUE = "dead_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //        声明死信和普通交换机 类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //        声明普通队列
        Map<String, Object> arguments = new HashMap<>();
        //        正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //        设置死信routingkey
        arguments.put("x-dead-letter-routing-key", "lisi");
        //        设置队列长度
        arguments.put("x-max-length", 6);
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
        /******************************************************************/
        //        声明死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        //绑定普通交换机与普通队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zs");
        //绑定死信交换机与死信队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        System.out.println("等待接受消息..........");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8");
                                                                      System.out.println("Consumer01 接收到消息"+message);
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {
        });
    }
}


C2 消费者代码不变(启动 C2 消费者)


69.png


70.png


6.3.4 消息被拒

生产者,C2消费者代码同上生产者一致


C1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息)


package com.caq.rabbitmq.deadqueue;
import com.caq.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.HashMap;
import java.util.Map;
/**
 * 死信队列
 * <p>
 * 消费者1
 */
public class Consumer01 {
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static final String DEAD_EXCHANGE = "dead_exchange";
    public static final String NORMAL_QUEUE = "normal_queue";
    public static final String DEAD_QUEUE = "dead_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
//        声明死信和普通交换机 类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//        声明普通队列
        Map<String, Object> arguments = new HashMap<>();
        //过期时间
//        正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//        设置死信routingkey
        arguments.put("x-dead-letter-routing-key", "lisi");
//        设置队列长度
//        arguments.put("x-max-length", 6);
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
        /******************************************************************/
//        声明死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        //绑定普通交换机与普通队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zs");
        //绑定死信交换机与死信队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        System.out.println("等待接受消息..........");
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody());
            if (msg.equals("info5")) {
                System.out.println("消费者1接受的消息是" + msg + ":此消息是被c1拒绝的");
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            } else {
                System.out.println("Consumer01接受的消息是:" + msg);
            }
        };
        //开启手动应答
        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {
        });
    }
}


71.png

72.png


七、延迟队列


7.1 延迟队列概念


延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。


7.2 延迟队列使用场景


订单在十分钟之内未支付则自动取消


新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。


用户注册成功后,如果三天内没有登陆则进行短信提醒。


用户发起退款,如果三天内没有得到处理则通知相关运营人员。


预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议


这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,


如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?


如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,


如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。


7.3 队列设置TTL


TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。


换句话说,**如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。**如果同时配置了队列的TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。


7.3.1 队列设置TTL


在创建队列的时候设置队列的“x-message-ttl”属性


73.png


7.3.2 消息设置TTL

是针对每条消息设置TTL


74.png


如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。


刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面, 成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。


7.4 整合 SpringBoot


7.4.1 添加依赖

<dependencies>
   <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--RabbitMQ 依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--swagger-->
    <dependency>
        <groupId>io.springfox</groupId>
        <artifactId>springfox-swagger2</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>io.springfox</groupId>
        <artifactId>springfox-swagger-ui</artifactId>
        <version>3.0.0</version>
    </dependency>
    <!--RabbitMQ 测试依赖-->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>


7.4.2 修改配置文件

spring:
  rabbitmq:
    host: 192.168.42.96
    port: 5672
    username: admin
    password: 123


7.4.3 添加Swagger 配置类

package com.caq.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
    @Bean
    public Docket webApiConfig() {
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }
    private ApiInfo webApiInfo() {
        return new ApiInfoBuilder()
                .title("rabbitmq 接口文档")
                .description("本文档描述了 rabbitmq 微服务接口定义")
                .version("1.0")
                .contact(new Contact("enjoy6288", "http://mildcaq@163.com", "2350938432@qq.com"))
                .build();
    }
}


7.5 队列 TTL


7.5.1 代码架构图

它们的绑定关系如下:


75.png


7.5.2 配置类代码

package com.caq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
 * TTL队列    配置文件类代码
 * 通过配置类的形式完成交换机,信道的声明
 * 之后只用做生产者和消费者的代码
 */
@Configuration
public class TtlQueueConfig {
    //普通交换机名称
    public static final String X_EXCHANGE = "X";
    //死信交换机名称
    public static final String Y_DEAD_LETTER_MESSAGE = "Y";
    //普通队列名称
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信队列名称
    public static final String DEAD_LETTER_QUEUE = "QD";
    //声明xExchange  别名
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_MESSAGE);
    }
    //声明队列
    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> arguments = new HashMap<>();
//        设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_MESSAGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");
//        设置TTL 单位是ms
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_MESSAGE);
        arguments.put("x-dead-letter-routing-key","YD");
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }
    //绑定QA
    @Bean
    public Binding  queueABindingX(@Qualifier("queueA") Queue queueA,
                                   @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    //绑定QB
    @Bean
    public Binding  queueBBindingX(@Qualifier("queueB") Queue queueB,
                                   @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
}


7.5.3 消息生产者代码

package com.caq.controller;
import com.caq.config.DelayQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
 * 发送延迟消息
 * http://localhost:8080/ttl/sendMsg/哈哈哈哈哈哈
 *
 * @RestController只返回内容,不进行页面跳转
 * @RequestMapping请求路径
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    /**
     * {}是占位符,结果执行后会被后面的所替换
     *
     * @param message
     */
    //通过rabbitTemplate来发送消息
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date().toString(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + message);
    }
}


7.5.4 消息消费者代码

package com.caq.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
    //接收消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
    }
}

76.png

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。


7.6 延时队列TTL优化

我们能不能写一个队列,能适应所有情况呢?


在这里新增了一个队列 QC,绑定关系如下,该队列不设置TTL 时间


77.png


7.6.1 配置类代码

在原有代码加入队列QC并设置routingkey和绑定x交换机


package com.caq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
 * TTL队列    配置文件类代码
 * 通过配置类的形式完成交换机,信道的声明
 * 之后只用做生产者和消费者的代码
 */
@Configuration
public class TtlQueueConfig {
    //普通交换机名称
    public static final String X_EXCHANGE = "X";
    //死信交换机名称
    public static final String Y_DEAD_LETTER_MESSAGE = "Y";
    //普通队列名称
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信队列名称
    public static final String DEAD_LETTER_QUEUE = "QD";
    public static final String QUEUE_C = "QC";
    //声明xExchange  别名
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_MESSAGE);
    }
    //声明队列
    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> arguments = new HashMap<>();
//        设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_MESSAGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");
//        设置TTL 单位是ms
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_MESSAGE);
        arguments.put("x-dead-letter-routing-key","YD");
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }
//    声明QC
    @Bean("queueC")
    public Queue queueC(){
        Map<String, Object> arguments = new HashMap<>();
//        设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_MESSAGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }
    //死信队列
    @Bean("queueD")
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }
    //绑定QA
    @Bean
    public Binding  queueABindingX(@Qualifier("queueA") Queue queueA,
                                   @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    //绑定QB
    @Bean
    public Binding  queueBBindingX(@Qualifier("queueB") Queue queueB,
                                   @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    //绑定QC
    @Bean
    public Binding  queueCBindingX(@Qualifier("queueC") Queue queueC,
                                   @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
}


7.6.2 生产者代码

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/sendExpirationOnMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,
                        @PathVariable String ttlTime) {
        log.info("当前时间:{},发送一条时长{}毫秒TTL信息给TTL队列QC:{}", new Date().toString(), ttlTime, message);
        rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
            //发送消息的时候   延迟时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }


发起请求


http://localhost:8080/ttl/sendExpirationMsg/20s的消息/20000


http://localhost:8080/ttl/sendExpirationMsg/2s的消息/2000


78.png


看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“


因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。


这也就是为什么第二个延时2秒,却后执行。


7.7 RabbitMQ插件实现延迟队列

7.7.1 安装插件

如果不能实现在消息粒度上的 TTL,并使其在设置的TTL 时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题。


我们用插件来实现别人写好的功能~


安装后,重启rabbitmq-server即可


79.png


安装成功后,交换机会出现新的类型


80.png


7.7.2 代码架构图

一个队列delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下


81.png


7.7.3 配置文件类代码


在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。


package com.caq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayQueueConfig {
    //    交换机
    public static final String DELAYED_QUEUE_NAME = "delayed_queue";
    //    队列
    public static final String DELAYED_EXCHANGE_NAME = "delayed_exchange";
    //    routingKey
    public static final String DELAYED_ROUTING_KEY = "delayed_routingkey";
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }
    // public CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
    @Bean("delayedExchange")
    public CustomExchange delayedExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message",
                true, false, arguments);
    }
    //绑定
    @Bean
    public Binding delayedQueueBindingDelayedExchange(
            @Qualifier("delayedQueue") Queue delayedQueue,
            @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}


7.7.4 消息生产者代码

package com.caq.controller;
import com.caq.config.DelayQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
 * 发送延迟消息
 * http://localhost:8080/ttl/sendMsg/哈哈哈哈哈哈
 *
 * @RestController只返回内容,不进行页面跳转
 * @RequestMapping请求路径
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    /**
     * {}是占位符,结果执行后会被后面的所替换
     *
     * @param message
     */
    //通过rabbitTemplate来发送消息
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //开始发消息,基于插件的消息及延迟的时间
    @GetMapping("/sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message,
                        @PathVariable Integer delayTime) {
        log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",
                 new Date().toString(), delayTime, message);
        rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE_NAME,
                                      DelayQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
                                          msg.getMessageProperties().setDelay(delayTime);
                                          return msg;
                                      });
    }
}


7.7.5 消息消费者代码

消费者代码和之前的一样


package com.caq.consumer;
import com.caq.config.DelayQueueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class DelayQueueConsumer {
    //监听消息收消息
    @RabbitListener(queues = DelayQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延迟队列的消息:{}", new Date().toString(), msg);
    }
}


发起请求测试:


http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000


http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000


82.png


7.8 总结


延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用RabbitMQ 的特性,如:**消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。**另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。


当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
Arthas 弹性计算 安全
优雅上下线之如何安全的关闭Tomcat持久连接
优雅上下线之如何安全的关闭Tomcat持久连接
518 3
|
存储 运维 NoSQL
Redis7.0 核心特性简介
Redis自 2009 年诞生以来,已经走过了 13 年。在这漫长的 13 年中,Redis 从小小的开源项目逐步演变成为当今最受欢迎的内存数据库之一,被用于多种场景,帮助解决很多问题
4158 0
Redis7.0 核心特性简介
|
存储 Java 定位技术
gis利器之Gdal(二)shp数据读取
本文首先简单介绍了空间数据shp数据的基本知识,其常见的文件组成形式。使用qgis软件对数据进行常规预览,最后重点介绍了使用gdal对矢量信息进行读取,​包括空间信息和属性信息
1595 0
gis利器之Gdal(二)shp数据读取
|
3月前
|
C# 图形学 开发者
【Unity3D实例-功能-镜头】俯视角
本文介绍了Unity中常用的俯视角镜头实现方法,涵盖模型添加、角色Tag设置、摄像机脚本编写及测试运行,帮助开发者快速掌握俯视角在策略与模拟类游戏中的应用技巧。
254 0
|
JavaScript
js 解析 byte数组 成字符串
js 解析 byte数组 成字符串
304 5
|
12月前
|
SQL 存储 Oracle
南大通用GBase 8s数据库游标变量解析:提升数据库操作效率
南大通用GBase 8s 数据库游标变量解析:提升数据库操作效率
|
11月前
|
开发框架 物联网 API
HarmonyOS开发:串行通信开发详解
在电子设备和智能系统的设计中,数据通信是连接各个组件和设备的核心,串行通信作为一种基础且广泛应用的数据传输方式,因其简单、高效和成本效益高而被广泛采用。HarmonyOS作为一个全场景智能终端操作系统,不仅支持多种设备和场景,还提供了强大的开发框架和API,使得开发者能够轻松实现串行通信功能。随着技术的不断进步,串行通信技术也在不断发展。在HarmonyOS中,串行通信的开发不仅涉及到基本的数据发送和接收,还包括设备配置、错误处理和性能优化等多个方面。那么本文就来深入探讨在HarmonyOS中如何开发串行通信应用,包括串行通信的基础知识、HarmonyOS提供的API、开发步骤和实际代码示例
199 2
|
消息中间件 存储 Java
【微服务】RabbitMQ七种消息收发方式🌱
MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。
762 0
【微服务】RabbitMQ七种消息收发方式🌱
|
消息中间件 Java API
RabbitMQ入门指南(三):Java入门示例
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了AMQP、Spring AMQP和使用SpringAMQP实现对RabbitMQ的消息收发等内容。
249 0
RabbitMQ入门指南(三):Java入门示例
|
监控 Linux
性能分析之 Linux 系统中 ps&top 中 CPU 百分比不一致?
【8月更文挑战第18天】性能分析之 Linux 系统中 ps&top 中 CPU 百分比不一致?
494 5
下一篇
开通oss服务