📄 死信队列的概念:
死信就是无法被消费的消息。一般来说,producer 将消息投递到 broker 或者直接到 queue 中,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:
为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的原因:
- 消息 TTL 过期
- 队列达到最大长度(队列满了无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
三种导致死信的效果:消息被拒绝、消息TTL过期、队列达到最大长度。架构图如下所示:
1. 消息TTL过期 , 代码示例
(1) application.yaml
# 项目名 server: servlet: context-path: /demo spring: rabbitmq: host: localhost port: 5672 username: guest password: guest publisher-confirm-type: correlated # 开启消息确认 publisher-returns: true # 开启发送失败回退 template: mandatory: true listener: simple: acknowledge-mode: manual #消息手动确认 concurrency: 1 #消费者数量 max-concurrency: 1 #消费者最大数量 prefetch: 1 #消费者每次从队列中取几个消息 default-requeue-rejected: true #消息消费失败后,重新进入消费队列中 retry: initial-interval: 1000 #1秒后重试 enabled: true #启用发布重试 max-attempts: 3 #传递消息的最大尝试次数 max-interval: 10000 #尝试的最大时间间隔 multiplier: 1.0 #应用于先前传递重试时间间隔的乘数
(2) RabbitConfig
@Configuration public class RabbitConfig implements BeanPostProcessor { //1. 创建交换机 @Bean public DirectExchange newExchange(){ return new DirectExchange("normalExchange",true,false); } //2. 创建队列 @Bean public Queue newQueue(){ Map<String ,Object> map = new HashMap<>(); map.put("x-message-ttl",2000); // 消息存活时间1s map.put("x-dead-letter-exchange","deadExchange");// 设置死信交换机 的名称 map.put("x-dead-letter-routing-key","key2") ;//设置死信路由键名字 return new Queue("normalQueueA",true,false,false,map); } //3. 绑定 @Bean public Binding binding(){ return BindingBuilder.bind(newQueue()).to(newExchange()).with("key1"); } //4. 创建死信交换机 @Bean public DirectExchange newDeadExchange(){ return new DirectExchange("deadExchange",true,false); } //5. 创建死信队列 @Bean public Queue newDeadQueue(){ return new Queue("deadQueueA",true,false,false); } //6. 绑定 @Bean public Binding bindingDead(){ return BindingBuilder.bind(newDeadQueue()).to(newDeadExchange()).with("key2"); } }
(3) 生产者:订单发送消息给下游服务 支付服务
@RestController public class OrderProducer { @Autowired private AmqpTemplate rabbitTemplate; @PostMapping("/submitOrder") //模拟下订单 public String submitOrder(){ String orderId = UUID.randomUUID().toString().replace("-",""); Map<String ,Object> map = new HashMap<>(); map.put("orderId",orderId); map.put("orderNum",4959044); map.put("custId",101); map.put("orderPrice",3500f); map.put("orderDate",new Date()); rabbitTemplate.convertAndSend("normalExchange","key1",map); return "生产者下单成功"; } }
(4)下游服务消费者:支付服务
@Component public class PayConsumer { @RabbitHandler @RabbitListener(queues = "normalQueueA") public void process(Map map,Channel channel,Message message) throws IOException { System.out.println("支付服务接收到的消息:" + map); String orderId = (String)map.get("orderId"); Integer custId = (Integer)map.get("custId"); Integer orderNum = (Integer)map.get("orderNum"); Float orderPrice = (Float)map.get("orderPrice"); Date orderDate = (Date)map.get("orderDate"); System.out.println("支付服务接收到的orderId:" + orderId); System.out.println("支付服务接收到的custId:" + custId); System.out.println("支付服务接收到的orderNum:" + orderNum); System.out.println("支付服务接收到的orderPrice:" + orderPrice); System.out.println("支付服务接收到的orderDate:" + orderDate); //告诉broker,消息已经被确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }
(5)消息未处理后,消费死信消息的消费者
@Component public class DeadOrderConsumer { // 获得死信队列中的消息 @RabbitHandler @RabbitListener(queues = "deadQueueA") public void process(Map map){ System.out.println("订单取消支付后,从死信队列中接收到的消息:" + map); String orderId = (String)map.get("orderId"); Integer custId = (Integer)map.get("custId"); Integer orderNum = (Integer)map.get("orderNum"); Float orderPrice = (Float)map.get("orderPrice"); Date orderDate = (Date)map.get("orderDate"); System.out.println("取消支付后,从死信队列中接收到的orderId:" + orderId); System.out.println("取消支付后,从死信队列中接收到的custId:" + custId); System.out.println("取消支付后,从死信队列中接收到的orderNum:" + orderNum); System.out.println("取消支付后,从死信队列中接收到的orderPrice:" + orderPrice); System.out.println("取消支付后,从死信队列中接收到的orderDate:" + orderDate); } }
若将PayConsumer中的注解注释了,或监听其他不存在的queue, 即 支付服务 没有消费 订单消息,则订单发送发送过来的消息 过了ttl 时间,则进入死信队列。运行效果如下:
2. 队列达到最大长度
application.yaml 不变
RabbitConfig 去掉ttl 属性, 加上 队列达到最大长度 为6
//2. 创建队列 @Bean public Queue newQueue(){ Map<String ,Object> map = new HashMap<>(); map.put("x-max-length",6); // 队列达到最大长度 为6 map.put("x-dead-letter-exchange","deadExchange");// 设置死信交换机 的名称 map.put("x-dead-letter-routing-key","key2") ;//设置死信路由键名字 return new Queue("normalQueueA",true,false,false,map); }
OrderProducer 循环发送10个消息
@RestController public class OrderProducer { @Autowired private AmqpTemplate rabbitTemplate; @PostMapping("/submitOrder") //模拟下订单 public String submitOrder(){ Map<String ,Object> map = new HashMap<>(); map.put("orderNum",4959044); map.put("custId",101); map.put("orderPrice",3500f); map.put("orderDate",new Date()); for(int i =0;i<=9;i++) { String orderId = UUID.randomUUID().toString().replace("-", ""); map.put("orderId",orderId); rabbitTemplate.convertAndSend("normalExchange", "key1", map); } return "生产者下单成功"; } }
下游消费者 支付服务PayConsumer, DeadOrderConsumer 不变
进行测试,首先启动 Consumer01 创建更改后的队列,然后关闭它,模拟其接受不到消息。然后启动消费者发送10条消息,可以看到发送到6条消息首先在 normal-queue 队列中,4条消息在 dead-queue 死信队列中。