队列 TTL
7.5.1 代码架构图
它们的绑定关系如下:
7.5.2 配置类代码
package com.caq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * TTL队列 配置文件类代码 * 通过配置类的形式完成交换机,信道的声明 * 之后只用做生产者和消费者的代码 */ @Configuration public class TtlQueueConfig { //普通交换机名称 public static final String X_EXCHANGE = "X"; //死信交换机名称 public static final String Y_DEAD_LETTER_MESSAGE = "Y"; //普通队列名称 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; //死信队列名称 public static final String DEAD_LETTER_QUEUE = "QD"; //声明xExchange 别名 @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_MESSAGE); } //声明队列 @Bean("queueA") public Queue queueA() { Map<String, Object> arguments = new HashMap<>(); // 设置死信交换机 arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_MESSAGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","YD"); // 设置TTL 单位是ms arguments.put("x-message-ttl",10000); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } @Bean("queueB") public Queue queueB() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_MESSAGE); arguments.put("x-dead-letter-routing-key","YD"); arguments.put("x-message-ttl",40000); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } //绑定QA @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //绑定QB @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueB).to(xExchange).with("XB"); } }
7.5.3 消息生产者代码
package com.caq.controller; import com.caq.config.DelayQueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; /** * 发送延迟消息 * http://localhost:8080/ttl/sendMsg/哈哈哈哈哈哈 * * @RestController只返回内容,不进行页面跳转 * @RequestMapping请求路径 */ @Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { /** * {}是占位符,结果执行后会被后面的所替换 * * @param message */ //通过rabbitTemplate来发送消息 @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message) { log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date().toString(), message); rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + message); rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + message); } }
7.5.4 消息消费者代码
package com.caq.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; @Slf4j @Component public class DeadLetterQueueConsumer { //接收消息 @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) throws Exception{ String msg = new String(message.getBody()); log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg); } }
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。
7.6 延时队列TTL优化
我们能不能写一个队列,能适应所有情况呢?
在这里新增了一个队列 QC,绑定关系如下,该队列不设置TTL 时间
7.6.1 配置类代码
在原有代码加入队列QC并设置routingkey和绑定x交换机
package com.caq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * TTL队列 配置文件类代码 * 通过配置类的形式完成交换机,信道的声明 * 之后只用做生产者和消费者的代码 */ @Configuration public class TtlQueueConfig { //普通交换机名称 public static final String X_EXCHANGE = "X"; //死信交换机名称 public static final String Y_DEAD_LETTER_MESSAGE = "Y"; //普通队列名称 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; //死信队列名称 public static final String DEAD_LETTER_QUEUE = "QD"; public static final String QUEUE_C = "QC"; //声明xExchange 别名 @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_MESSAGE); } //声明队列 @Bean("queueA") public Queue queueA() { Map<String, Object> arguments = new HashMap<>(); // 设置死信交换机 arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_MESSAGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","YD"); // 设置TTL 单位是ms arguments.put("x-message-ttl",10000); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } @Bean("queueB") public Queue queueB() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_MESSAGE); arguments.put("x-dead-letter-routing-key","YD"); arguments.put("x-message-ttl",40000); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } // 声明QC @Bean("queueC") public Queue queueC(){ Map<String, Object> arguments = new HashMap<>(); // 设置死信交换机 arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_MESSAGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","YD"); return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build(); } //死信队列 @Bean("queueD") public Queue queueD(){ return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } //绑定QA @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //绑定QB @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueB).to(xExchange).with("XB"); } //绑定QC @Bean public Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueC).to(xExchange).with("XC"); } }
7.6.2 生产者代码
@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendExpirationOnMsg/{message}/{ttlTime}") public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) { log.info("当前时间:{},发送一条时长{}毫秒TTL信息给TTL队列QC:{}", new Date().toString(), ttlTime, message); rabbitTemplate.convertAndSend("X", "XC", message, msg -> { //发送消息的时候 延迟时长 msg.getMessageProperties().setExpiration(ttlTime); return msg; }); }
发起请求
http://localhost:8080/ttl/sendExpirationMsg/20s的消息/20000
http://localhost:8080/ttl/sendExpirationMsg/2s的消息/2000
看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“
因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
这也就是为什么第二个延时2秒,却后执行。