rabbitmq系列(四)死信队列

简介: 当消息在一个队列中变成一个死信之后,它将被重新publish到另一个交换机上,这个交换机我们就叫做死信交换机,死信交换机将死信投递到一个队列上就是死信队列。

一、什么是死信队列

当消息在一个队列中变成一个死信之后,它将被重新publish到另一个交换机上,这个交换机我们就叫做死信交换机,死信交换机将死信投递到一个队列上就是死信队列。具体原理如下图:

死信交换机.png

消息变成死信的三种情况:

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

二、手动签收应答模式

应答模式分为两种,手动签收和自动签收,自动应答就是消费者消费了一条消息就自动告诉队列删除消息。这样的弊端就是不管消费逻辑有没有成功,都会将消息删除,这样就会造成消息丢失。而使用手动签收后,就是在消费逻辑处理成功后,手动告诉队列消费成功,然后队列再去删除这条消息。

  1. 再消费者配置文件中开启手动签收模式
spring.rabbitmq.listener.simple.acknowledge-mode = manual
  1. 在消费逻辑处理成功后手动签收,修改消费者代码
@RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(Message message,@Headers Map<String,Object> headers, Channel channel) throws Exception {

        Jedis jedis = new Jedis("localhost", 6379);

        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(),"UTF-8");
        System.out.println("接收导的消息为:"+msg+"==消息id为:"+messageId);

        String messageIdRedis = jedis.get("messageId");

        if(messageId == messageIdRedis){
            return;
        }
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String email = jsonObject.getString("email");
        String content = jsonObject.getString("timestamp");

        String httpUrl = "http://127.0.0.1:8080/email?email"+email+"&content="+content;
        // 如果发生异常则返回null
        String body = HttpUtils.httpGet(httpUrl, "utf-8");
        //
        if(body == null){
            throw new Exception();
        }
        jedis.set("messageId",messageId);
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        // 手动签收
        channel.basicAck(deliveryTag,false);
    }

三、产生死信的三种情况演示

  • 消息被拒绝

我们继续修改一下消费者代码,尝试让消费者消费的时候发生异常。然后在catch块中拒绝消息。

// 拒绝消息,给死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

我们运行程序后发现,当消息消费异常后在队列”zb-byte1“中的消息被消费了,同时发现在死信队列”dead-byte-zb“中有一条未被消费的消息。消息到死信队列后,然后我们在创建一个消费者去消费消息就可以了。当然死信队列也需要去手动签收消息。

  • 消息TTL过期

这种模式我们也叫做延迟消费,有一种特别经典的案例就是用户在一个商品抢购系统中,用户抢到商品后需要在30分钟时间内支付,不然订单无效。这时候我们就可以通过消息TTL过期来实现,设置队列消息过期时间为30分钟,30分钟后publish到死信队列,我们在死信队列中消费订单状态是否支付成功来判断该订单是否有效。

非常简单,我们只需要在配置死信交换机的时候设置有效时间就可以了

  @Bean
    public Queue queue(){

        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME);
        map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY);
        map.put("x-message-ttl",7200); // 队列过期时间
        Queue queue = new Queue(QUEUE_NAME,true,false,false,map);
        return queue;
    }
  • 队列达到最大长度

设置队列长度即可:

 @Bean
    public Queue queue(){

        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME);
        map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY);
        map.put("x-max-length",3);
//        map.put("x-message-ttl",7200); // 队列过期时间
        Queue queue = new Queue(QUEUE_NAME,true,false,false,map);
        return queue;
    }

设置好之后,我们先不要启动消费者,然后调用生成者往队列中发送消息,当消息长度大于3时,我们发现消息进入了死信队列。

注意:前文中也提到过,队列不能被修改,也就是说已经创建好的队列设置了过期时常为7200s,然后我们注释掉,增加队列长度是3的代码,这样运行会报错,必须在rabbitmq中将该队列删除,然后重新生成队列才可以。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
11月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
601 6
|
12月前
|
消息中间件 JSON Java
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
663 2
|
12月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
12月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
238 0
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
161 4
|
12月前
|
消息中间件 存储
RabbitMQ-死信交换机和死信队列
死信队列和死信交换机是RabbitMQ提供的一个非常实用的功能,通过合理使用这一机制,可以大大增强系统的健壮性和可靠性。它们不仅能有效解决消息处理失败的情况,还能为系统的错误追踪、消息延迟处理等提供支持。在设计系统的消息体系时,合理规划和使用死信队列和死信交换机,将会为系统的稳定运行提供一个有力的
190 0
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
281 0
说说RabbitMQ延迟队列实现原理?
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
524 1
|
消息中间件
RabbitMQ 死信消息队列 重复消费 basicAck basicNack
RabbitMQ 死信消息队列 重复消费 basicAck basicNack