六、死信队列

简介: 六、死信队列

📄 死信队列的概念:

死信就是无法被消费的消息。一般来说,producer 将消息投递到 broker 或者直接到 queue 中,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:

为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信的原因:

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
三种导致死信的效果:消息被拒绝、消息TTL过期、队列达到最大长度。架构图如下所示:

1. 消息TTL过期 , 代码示例

(1) application.yaml

# 项目名
server:
  servlet:
    context-path: /demo
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated # 开启消息确认
    publisher-returns: true # 开启发送失败回退
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual  #消息手动确认
        concurrency: 1 #消费者数量
        max-concurrency: 1  #消费者最大数量
        prefetch: 1  #消费者每次从队列中取几个消息
        default-requeue-rejected: true #消息消费失败后,重新进入消费队列中
        retry:
          initial-interval: 1000 #1秒后重试
          enabled: true #启用发布重试
          max-attempts: 3 #传递消息的最大尝试次数
          max-interval: 10000 #尝试的最大时间间隔
          multiplier: 1.0 #应用于先前传递重试时间间隔的乘数

(2) RabbitConfig

@Configuration
public class RabbitConfig implements BeanPostProcessor {
    //1. 创建交换机
    @Bean
    public DirectExchange newExchange(){
        return new DirectExchange("normalExchange",true,false);
    }
    //2. 创建队列
    @Bean
    public Queue newQueue(){
        Map<String ,Object> map = new HashMap<>();
        map.put("x-message-ttl",2000); // 消息存活时间1s
        map.put("x-dead-letter-exchange","deadExchange");// 设置死信交换机 的名称
        map.put("x-dead-letter-routing-key","key2") ;//设置死信路由键名字
        return new Queue("normalQueueA",true,false,false,map);
    }
    //3. 绑定
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(newQueue()).to(newExchange()).with("key1");
    }
    //4. 创建死信交换机
    @Bean
    public DirectExchange newDeadExchange(){
        return new DirectExchange("deadExchange",true,false);
    }
    //5. 创建死信队列
    @Bean
    public Queue newDeadQueue(){
        return new Queue("deadQueueA",true,false,false);
    }
    //6. 绑定
    @Bean
    public Binding bindingDead(){
        return BindingBuilder.bind(newDeadQueue()).to(newDeadExchange()).with("key2");
    }
}

(3) 生产者:订单发送消息给下游服务 支付服务

@RestController
public class OrderProducer {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    @PostMapping("/submitOrder")
    //模拟下订单
    public String submitOrder(){
        String orderId = UUID.randomUUID().toString().replace("-","");
        Map<String ,Object> map = new HashMap<>();
        map.put("orderId",orderId);
        map.put("orderNum",4959044);
        map.put("custId",101);
        map.put("orderPrice",3500f);
        map.put("orderDate",new Date());
        rabbitTemplate.convertAndSend("normalExchange","key1",map);
        return "生产者下单成功";
    }
}

(4)下游服务消费者:支付服务

@Component
public class PayConsumer {
        @RabbitHandler
       @RabbitListener(queues = "normalQueueA")
    public void process(Map map,Channel channel,Message message) throws IOException {
        System.out.println("支付服务接收到的消息:" + map);
        String orderId = (String)map.get("orderId");
        Integer custId = (Integer)map.get("custId");
        Integer orderNum = (Integer)map.get("orderNum");
        Float orderPrice = (Float)map.get("orderPrice");
        Date orderDate = (Date)map.get("orderDate");
        System.out.println("支付服务接收到的orderId:" + orderId);
        System.out.println("支付服务接收到的custId:" + custId);
        System.out.println("支付服务接收到的orderNum:" + orderNum);
        System.out.println("支付服务接收到的orderPrice:" + orderPrice);
        System.out.println("支付服务接收到的orderDate:" + orderDate);
        //告诉broker,消息已经被确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

(5)消息未处理后,消费死信消息的消费者

@Component
public class DeadOrderConsumer {
    // 获得死信队列中的消息
     @RabbitHandler
   @RabbitListener(queues = "deadQueueA")
    public void process(Map map){
        System.out.println("订单取消支付后,从死信队列中接收到的消息:" + map);
        String orderId = (String)map.get("orderId");
        Integer custId = (Integer)map.get("custId");
        Integer orderNum = (Integer)map.get("orderNum");
        Float orderPrice = (Float)map.get("orderPrice");
        Date orderDate = (Date)map.get("orderDate");
        System.out.println("取消支付后,从死信队列中接收到的orderId:" + orderId);
        System.out.println("取消支付后,从死信队列中接收到的custId:" + custId);
        System.out.println("取消支付后,从死信队列中接收到的orderNum:" + orderNum);
        System.out.println("取消支付后,从死信队列中接收到的orderPrice:" + orderPrice);
        System.out.println("取消支付后,从死信队列中接收到的orderDate:" + orderDate);
    }
}

若将PayConsumer中的注解注释了,或监听其他不存在的queue, 即 支付服务 没有消费 订单消息,则订单发送发送过来的消息 过了ttl 时间,则进入死信队列。运行效果如下:

2. 队列达到最大长度

application.yaml 不变

RabbitConfig 去掉ttl 属性, 加上 队列达到最大长度 为6

//2. 创建队列
    @Bean
    public Queue newQueue(){
        Map<String ,Object> map = new HashMap<>();
        map.put("x-max-length",6); // 队列达到最大长度 为6
        map.put("x-dead-letter-exchange","deadExchange");// 设置死信交换机 的名称
        map.put("x-dead-letter-routing-key","key2") ;//设置死信路由键名字
        return new Queue("normalQueueA",true,false,false,map);
    }

OrderProducer 循环发送10个消息

@RestController
public class OrderProducer {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    @PostMapping("/submitOrder")
    //模拟下订单
    public String submitOrder(){
        Map<String ,Object> map = new HashMap<>();      
        map.put("orderNum",4959044);
        map.put("custId",101);
        map.put("orderPrice",3500f);
        map.put("orderDate",new Date());
        for(int i =0;i<=9;i++) {
            String orderId = UUID.randomUUID().toString().replace("-", "");
            map.put("orderId",orderId);
            rabbitTemplate.convertAndSend("normalExchange", "key1", map);
        }
        return "生产者下单成功";
    }
}

下游消费者 支付服务PayConsumer, DeadOrderConsumer 不变

进行测试,首先启动 Consumer01 创建更改后的队列,然后关闭它,模拟其接受不到消息。然后启动消费者发送10条消息,可以看到发送到6条消息首先在 normal-queue 队列中,4条消息在 dead-queue 死信队列中。

目录
相关文章
|
3月前
|
存储 监控 安全
死信队列的死信队列
死信队列的死信队列
|
消息中间件
RabbitMQ的死信队列和延时队列
RabbitMQ的死信队列和延时队列
|
2月前
|
消息中间件 存储
RabbitMQ-死信交换机和死信队列
死信队列和死信交换机是RabbitMQ提供的一个非常实用的功能,通过合理使用这一机制,可以大大增强系统的健壮性和可靠性。它们不仅能有效解决消息处理失败的情况,还能为系统的错误追踪、消息延迟处理等提供支持。在设计系统的消息体系时,合理规划和使用死信队列和死信交换机,将会为系统的稳定运行提供一个有力的
63 0
|
5月前
|
消息中间件 存储 Java
消息队列-死信队列
消息队列-死信队列
57 0
|
缓存
指令缓存队列
指令缓存队列
69 0
|
6月前
队列的实现
队列的实现
|
11月前
|
C++
c++ 队列
队列的数据结构
38 0
|
消息中间件
死信队列和延迟队列的介绍
死信队列和延迟队列的介绍
|
消息中间件
RabbitMQ 的死信队列、延迟队列
RabbitMQ 的死信队列、延迟队列
91 0
|
消息中间件 监控
什么情况下消息会成为死信 ?
在 RabbitMQ 中,消息成为死信的情况通常包括以下几种
330 0