RibbitMQ学习笔记延迟队列(二)

简介: RibbitMQ学习笔记延迟队列

7.5.3. 消息生产者代码

@Slf4j 
@RequestMapping("ttl")
 @RestController
public class SendMsgController { 
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X",  "XA",  "消息来自 ttl 为 10S 的队列: "+message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: "+message);
}
}

7.5.4. 消息消费者代码

@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
 String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
}

发起一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

7.6. 延时队列优化

7.6.1. 代码架构图

在这里新增了一个队列 QC,绑定关系如下,该队列不设置TTL 时间

7.6.2. 配置文件类代码

@Component
public class MsgTtlQueueConfig {
public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String QUEUE_C = "QC";
//声明队列 C 死信交换机
@Bean("queueC") public Queue queueB(){
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//没有声明 TTL 属性
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
//声明队列 B 绑定 X 交换机
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}

7.6.3. 消息生产者代码

@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) { 
rabbitTemplate.convertAndSend("X", "XC", message, correlationData -
>
{ correlationData.getMessageProperties().setExpiration(ttlTime); return correlationData;
});
log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message);
}

发起请求代码

package com.atguigu.springbootmq.consumer;
import com.atguigu.springbootmq.config.TtlQueueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
 * 队列 ttl的消费者
 */
@Slf4j
@Component  //装饰器语法而已,
public class DeadLetterQueueConsumer {
    //接受消息
    @RabbitListener(queues="QD")
  //  @RabbitListener(queues = {TtlQueueConfig.DEAD_LETTER_QUEUE})
    public void receiveD(Message message, Channel channel)throws Exception{
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列信息{}",new Date().toString(),msg);
    }
}
package com.atguigu.springbootmq.Controller;
import com.atguigu.springbootmq.config.TtlQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
 * 发送延迟信息
 * 生产者
 */
@Slf4j //日志
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //开始发送消息
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条信息给两个TTl队列:{}",new Date().toString(),message);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: "+message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: "+message);
//        rabbitTemplate.convertAndSend(TtlQueueConfig.X_EXCHANGE,"XA","消息来自10s的"+message);
//        rabbitTemplate.convertAndSend(TtlQueueConfig.X_EXCHANGE,"XB","消息来自40s的"+message);
    }
//开始发消息  和ttl
    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
        log.info("当前时间:{},发送一条时长:{}毫秒信息给TTl队列C:{}",new Date().toString(),ttlTime,message);
        rabbitTemplate.convertAndSend(TtlQueueConfig.X_EXCHANGE,"XC",message,msg->{
            //发送消息 延迟时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }
}

http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

7.7. Rabbitmq 插件实现延迟队列

上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的 TTL,并使其在设置的TTL 时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题。

7.7.1. 安装延时队列插件

在官网上下载 https://www.rabbitmq.com/community-plugins.html,下载

rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。

进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ

/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

把插件放到指定的目录

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
• 1

安装成功了

重启了

systemctl restart rabbitmq-server

https://www.bilibili.com/video/BV1cb4y1o7zz?p=66

7.7.2. 代码架构图

在这里新增了一个队列delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

7.7.3. 配置文件类代码

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
 public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; 
 public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
//自定义交换机 我们在这里定义的是一个延迟交换机
@Bean
public CustomExchange delayedExchange()
{ Map<String, Object> args = new HashMap<>();
//自定义交换机的类型 args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange
delayedExchange) { return
BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}

7.7.4. 消息生产者代码

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) { rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
correlationData ->{ correlationData.getMessageProperties().setDelay(delayTime); return correlationData;
});
log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟                                            {} 毫 秒 的 信 息 给 队 列                                                                                             delayed.queue:{}",         new
Date(),delayTime, message);
}

7.7.5. 消息消费者代码

public static final String DELAYED_QUEUE_NAME = "delayed.queue"; 
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){ 
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);

发起请求:

http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000

第二个消息被先消费掉了,符合预期

7.8. 总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz

或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
8月前
|
消息中间件 存储 NoSQL
rocketmq实现延迟队列思路探讨
本文介绍了两种实现RocketMQ延迟消息的方法。非任意时间延迟可通过在服务器端配置`messageDelayLevel`实现,但需重启服务。任意时间延迟则分为两种策略:一是结合原生逻辑和时间轮,利用RocketMQ的默认延迟等级组合支持任意延迟,但可能丢失1分钟内的数据;二是使用存储介质(如Redis)加时间轮,消息存储和定时发送结合,能处理数据不一致和丢失问题,但涉及更多组件。推荐项目[civism-rocket](https://github.com/civism/civism-rocket)作为参考。
255 1
|
8月前
|
消息中间件 存储 Java
RabbitMQ之延迟队列(手把手教你学习延迟队列)
【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。
1932 16
|
8月前
|
消息中间件
第十五章 RabbitMQ 延迟队列
第十五章 RabbitMQ 延迟队列
46 0
|
8月前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
129 1
|
消息中间件 RocketMQ 索引
RocketMQ消费者如何实现重平衡
RocketMQ消费者如何实现重平衡
545 0
|
消息中间件 Java Docker
RabbitMQ 如何实现延迟队列?
RabbitMQ 如何实现延迟队列?
437 1
|
消息中间件 Java 数据库
RibbitMQ学习笔记延迟队列(一)
RibbitMQ学习笔记延迟队列
79 0
|
消息中间件
RibbitMQ学习笔记之死信队列
RibbitMQ学习笔记之死信队列
85 0
|
消息中间件 Java Kafka
15、RabbitMQ没有延时队列?学会这一招玩转延时队列
15、RabbitMQ没有延时队列?学会这一招玩转延时队列
252 0
15、RabbitMQ没有延时队列?学会这一招玩转延时队列