浅析RabbitMQ死信队列

简介: 浅析RabbitMQ死信队列

在现代分布式系统中,消息队列扮演着至关重要的角色。它们可以实现应用程序之间的异步通信,并确保数据的可靠传输和处理。而在这个领域中,RabbitMQ作为一种强大而受欢迎的消息队列解决方案,具备了高效、可靠和灵活的特性。

然而,即使使用了RabbitMQ,我们仍然会遇到一些不可预料的情况,例如消费者无法处理某些消息、消息过期或者队列溢出等。为了解决这些问题,RabbitMQ引入了死信队列(Dead Letter Queue)的概念,为开发人员提供了一种有效的错误处理机制。

那么,究竟什么是死信队列呢?

本文结合Spring Boot使用RabbitMQ的死信队列,着重从是什么、为什么、怎么用几个方面对死信队列进行简单介绍。

1. 是什么:

  • 死信队列(Dead Letter Queue)是一种特殊的消息队列,用于存储无法被消费的消息。
  • 当消息满足某些条件无法被正常消费时,将被发送到死信队列中进行处理。
  • 死信队列提供了一种延迟处理、异常消息处理等场景的解决方案。

2. 为什么

  • 用来处理消费者无法正确处理的消息,避免消息丢失或积压
  • 实现延迟消息处理,例如订单超时未支付,可以将该消息发送到死信队列,然后再进行后续处理。
  • 用于实现消息重试机制,当消费者处理失败时,将消息重新发送到死信队列进行重试。
  • 提高了系统的可伸缩性和容错性,能够应对高并发和异常情况。

3. 怎么用

  1. 在Spring Boot中配置和使用死信队列:
  • 首先,在pom.xml文件中添加RabbitMQ的依赖项。
  • 然后,在application.properties文件中配置RabbitMQ连接信息。
  • 接下来,创建生产者和消费者代码,并通过注解将队列和交换机进行绑定。
  • 在队列的声明中添加死信队列的相关参数,如x-dead-letter-exchangex-dead-letter-routing-key等。
  • 最后,在消费者中编写处理消息的逻辑,包括对异常消息进行处理,并设置是否重新发送到死信队列。

简而言之,死信队列可以认为是一个正常队列的备用队列(或者说是兜底队列),当正常队列的消息无法消费的时候mq会重新把该消息发送到死信交换机,由死信交换机根据路由键将消息投递到备用队列,启动服务备用方案。

消息从正常队列到死信队列的三种情况:

1、消息被否定确认使用 channel.basicNackchannel.basicReject ,并且此时requeue 属性被设置为false

2、消息在队列中的时间超过了设置的TTL())时间。

3、消息数量超过了队列的容量限制()。

当一个队列中的消息满足上述三种情况任一个时,改消息就会从原队列移至死信队列,若改队列没有绑定死信队列则消息被丢弃。

4. 实战

以下是一个简单的Spring Boot集成RabbitMQ的死信队列示例代码:

  • 配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=123456
# 开启消费者手动确认
spring.rabbitmq.listener.type=direct
# 发送到队列失败时的手动处理
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.publisher-returns=true
# 发送到交换机手动确认
spring.rabbitmq.publisher-confirm-type=simple
  • 配置类
@Configuration
@Slf4j
public class RabbitCof {
    @Resource
    private MqKeys mqKeys;
    @Bean("normalQueue")
    public Queue normalQueue() {
        /**
         * 为普通队列绑定交换机
         */
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", mqKeys.DIE_EXCHANGE);
        args.put("x-dead-letter-routing-key", mqKeys.DIE_ROUTING_KEY);
        args.put("x-message-ttl", 1000); // 队列中的消息未被消费则1秒后过期
        return new Queue(mqKeys.NORMAL_QUEUE, true, false, false, args);
    }
    @Bean("normalExchange")
    public Exchange normalExchange() {
        return new DirectExchange(mqKeys.NORMAL_EXCHANGE);
    }
    @Bean("normalBind")
    public Binding normalBinding(@Qualifier("normalQueue") Queue normalQueue, @Qualifier("normalExchange") Exchange normalExchange) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(mqKeys.ROUTING_KEY).noargs();
    }
    /**
     * 死信队列
     * @return
     */
    @Bean("dieQueue")
    public Queue dlQueue() {
        return new Queue(mqKeys.DIE_QUEUE, true, false, false);
    }
    /**
     * 死信交换机
     * @return
     */
    @Bean("dieExchange")
    public Exchange dlExchange() {
        return new DirectExchange(mqKeys.DIE_EXCHANGE);
    }
    @Bean("dieBind")
    public Binding dlBinding(@Qualifier("dieQueue") Queue dlQueue, @Qualifier("dieExchange") Exchange dlExchange) {
        return BindingBuilder.bind(dlQueue).to(dlExchange).with(mqKeys.DIE_ROUTING_KEY).noargs();
    }
    @Resource
    private ConnectionFactory connectionFactory;
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        /**
         * 消费者确认收到消息后,手动ack回调处理
         * spring.rabbitmq.publisher-confirm-type=simple
         */
        rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause)->{
            if(!ack) {
                log.info("消息投递到交换机失败,correlationData={} ,ack={}, cause={}", correlationData == null ? "null" : correlationData.getId(), ack, cause);
            } else {
                log.info("消息成功投递到交换机,correlationData={} ,ack={}, cause={}", correlationData == null ? "null" : correlationData.getId(), ack, cause);
            }
        });
        /**
         * 消息投递到队列失败回调处理
         * spring.rabbitmq.listener.direct.acknowledge-mode=manual
         * spring.rabbitmq.publisher-returns=true
         */
        rabbitTemplate.setReturnsCallback((returnedMessage)->{
            Message message = returnedMessage.getMessage();
            log.error("分发到到队列失败, body->{}", message.getBody());
        });
        return rabbitTemplate;
    }
}
  • 生产者类
@Component
public class Producer {
    @Resource
    private  MqKeys mqKeys;
    @Resource
    private RabbitTemplate rabbitTemplate;
    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend(mqKeys.NORMAL_EXCHANGE, mqKeys.ROUTING_KEY, message);
    }
}
  • 消费者类
@Component
@RabbitListener(queues = "normal.queue")
@Slf4j
public class Consumer {
    @RabbitHandler
    public void handleMessage(String data, Message message, Channel channel) {
        boolean success = false;
        int retryCount = 3;
        System.out.println(message.toString());
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        while (!success && retryCount-- > 0){
            try {
                // 处理消息
                log.info("收到消息: {}, deliveryTag = {}", data, deliveryTag);
                // 正常处理完毕,手动确认,此处不确认让他进入死信队列
//                success = true;
//                channel.basicAck(deliveryTag, false);
                Thread.sleep(3 * 1000L);
            }catch (Exception e){
                log.error("程序异常:{}", e.getMessage());
            }
        }
        // 达到最大重试次数后仍然消费失败
        if(!success){
            try {
                log.info("move to die queue");
                // 手动拒绝,移至死信队列
                /**
                 *
                 deliveryTag – the tag from the received AMQP.Basic.GetOk or AMQP.Basic.Deliver
                 multiple – true to reject all messages up to and including the supplied delivery tag; false to reject just the supplied delivery tag.
                 requeue – true if the rejected message(s) should be requeued rather than discarded/dead-lettered
                 */
                channel.basicNack(deliveryTag, false, false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

以上代码演示了如何在Spring Boot中配置一个普通队列和一个死信队列,然后通过生产者发送消息到普通队列,在消费者中处理消息,并模拟了当发生异常时将消息重新发送到死信队列。

参考连接

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
23天前
|
存储 算法 安全
控制局域网上网软件之 Python 字典树算法解析
控制局域网上网软件在现代网络管理中至关重要,用于控制设备的上网行为和访问权限。本文聚焦于字典树(Trie Tree)算法的应用,详细阐述其原理、优势及实现。通过字典树,软件能高效进行关键词匹配和过滤,提升系统性能。文中还提供了Python代码示例,展示了字典树在网址过滤和关键词屏蔽中的具体应用,为局域网的安全和管理提供有力支持。
50 17
|
4月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
296 6
|
4月前
|
负载均衡 监控 算法
每个程序员都应该知道的 6 种负载均衡算法
每个程序员都应该知道的 6 种负载均衡算法
500 2
|
5月前
|
算法 程序员 Python
程序员必看!Python复杂度分析全攻略,让你的算法设计既快又省内存!
在编程领域,Python以简洁的语法和强大的库支持成为众多程序员的首选语言。然而,性能优化仍是挑战。本文将带你深入了解Python算法的复杂度分析,从时间与空间复杂度入手,分享四大最佳实践:选择合适算法、优化实现、利用Python特性减少空间消耗及定期评估调整,助你写出高效且节省内存的代码,轻松应对各种编程挑战。
112 1
|
5月前
|
消息中间件 JSON Java
|
5月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
6月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
210 2
|
6月前
|
算法 搜索推荐 程序员
程序员常用算法详细讲解
每一种算法都有其适用场景,了解并熟悉这些常用算法的策略和实现,对于解决实际编程问题具有重要的意义。需要注意的是,理论知识的重要性虽然不言而喻,但真正的理解和掌握,还需要在实践中不断地尝试和错误,以达到深入理解的目的。
62 1
|
5月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
129 0
|
5月前
|
消息中间件 存储
RabbitMQ-死信交换机和死信队列
死信队列和死信交换机是RabbitMQ提供的一个非常实用的功能,通过合理使用这一机制,可以大大增强系统的健壮性和可靠性。它们不仅能有效解决消息处理失败的情况,还能为系统的错误追踪、消息延迟处理等提供支持。在设计系统的消息体系时,合理规划和使用死信队列和死信交换机,将会为系统的稳定运行提供一个有力的
94 0