RabbitMQ——死信队列的三大来源应用举例

简介: RabbitMQ——死信队列的三大来源应用举例

1.什么是死信队列


先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景: 为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

三大来源:

·       消息 TTL 过期

·       队列达到最大长度(队列满了,无法再添加数据到 mq )

·       消息被拒绝(basic.reject basic.nack)并且 requeue=false


2.三大来源之消息TTL过期


我们就参考上面的架构图来写代码。首先是生产者,其中还有一个工具类代码。

package com.szh.rabbitmq.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 *
 */
public class RabbitMqUtils {
    public static Channel getChannel() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.40.130");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return channel;
    }
}
package com.szh.rabbitmq.dead;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
 * 死信队列之生产者
 */
public class Producer {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //设置接收消息的过期时间,超过这个时间则转到死信队列
        AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder().expiration("10000").build();
        for (int i = 1; i <= 10; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE,"zql",properties,message.getBytes(StandardCharsets.UTF_8));
        }
    }
}

下面是两个消费者。

package com.szh.rabbitmq.dead;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
 * 死信队列之消费者01
 */
public class Consumer01 {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列的名称
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //绑定交换机、声明交换机的类型
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
        //绑定普通队列, 正常队列绑定死信队列信息
        Map<String, Object> arguments = new HashMap<>();
        //正常队列设置死信交换机, 参数 key 是固定值
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //正常队列设置死信 routing-key, 参数 key 是固定值
        arguments.put("x-dead-letter-routing-key","szh");
        //正常队列最大长度限制
        //arguments.put("x-max-length",6);
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        //绑定死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        //普通交换机与普通队列进行绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zql");
        //死信交换机与死信队列进行绑定
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"szh");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag,message) -> {
//            String msg = new String(message.getBody());
//            if ("info5".equals(msg)) {
//                System.out.println(msg + "此消息已被Consumer01拒绝....");
//                //执行拒绝策略,被拒绝的消息将转到死信队列中
//                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
//            } else {
//                System.out.println("Consumer01接收的消息是:" + msg);
//                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
//            }
            String msg = new String(message.getBody());
            System.out.println("Consumer01接收的消息是:" + msg);
        };
        //修改autoAck为false,表示不自动应答
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> {});
    }
}
package com.szh.rabbitmq.dead;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
/**
 *
 */
public class Consumer02 {
    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer02接收的消息是:" + new String(message.getBody()));
        };
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});
    }
}

下面我们测试一下,因为有关队列的声明都写在了第一个消费者中,所以先启动第一个消费者,然后模拟消息TTL过期(直接将消费者1down掉)。


然后再启动生产者,此时会向MQ中发送10条消息,这些消息此时会存在normal_queue中。由于消费者1已经down掉,它自然接收不到消息,那么等消息过期之后(在生产者代码中设定的是10s),这些消息会被转到死信队列dead_queue中,此时再启动消费者2,它就可以从死信队列中接收到这10条消息。

3.三大来源之队列达到最大长度


工具类和上面的案例是一样的,其余的生产者和消费者稍有变动。

package com.szh.rabbitmq.dead;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
 * 死信队列之生产者
 */
public class Producer {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //设置接收消息的过期时间,超过这个时间则转到死信队列
//        AMQP.BasicProperties properties = new AMQP.BasicProperties()
//                .builder().expiration("10000").build();
        for (int i = 1; i <= 10; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE,"zql",null,message.getBytes(StandardCharsets.UTF_8));
        }
    }
}
package com.szh.rabbitmq.dead;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
 * 死信队列之消费者01
 */
public class Consumer01 {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列的名称
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //绑定交换机、声明交换机的类型
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
        //绑定普通队列, 正常队列绑定死信队列信息
        Map<String, Object> arguments = new HashMap<>();
        //正常队列设置死信交换机, 参数 key 是固定值
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //正常队列设置死信 routing-key, 参数 key 是固定值
        arguments.put("x-dead-letter-routing-key","szh");
        //正常队列最大长度限制
        arguments.put("x-max-length",6);
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        //绑定死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        //普通交换机与普通队列进行绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zql");
        //死信交换机与死信队列进行绑定
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"szh");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag,message) -> {
//            String msg = new String(message.getBody());
//            if ("info5".equals(msg)) {
//                System.out.println(msg + "此消息已被Consumer01拒绝....");
//                //执行拒绝策略,被拒绝的消息将转到死信队列中
//                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
//            } else {
//                System.out.println("Consumer01接收的消息是:" + msg);
//                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
//            }
            String msg = new String(message.getBody());
            System.out.println("Consumer01接收的消息是:" + msg);
        };
        //修改autoAck为false,表示不自动应答
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> {});
    }
}
package com.szh.rabbitmq.dead;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
/**
 *
 */
public class Consumer02 {
    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer02接收的消息是:" + new String(message.getBody()));
        };
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});
    }
}

下面我们测试一下,还是先启动消费者1确保MQ中已经有了相应的交换机和队列,然后将消费者1先停掉,去启动生产者,先向MQ中发送10条消息,看看结果。


此时由于消费者1被停掉了,它就无法接收消息,而它所承受的队列最大长度为6,所以这6个会堆积在normal_queue队列中,剩下的10-6=4 条消息会转到死信队列中。


当我们启动消费者12之后,可以看到它们能够接收到相应队列中的消息。

4.三大来源之消息被拒绝


package com.szh.rabbitmq.dead;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
 * 死信队列之生产者
 */
public class Producer {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //设置接收消息的过期时间,超过这个时间则转到死信队列
//        AMQP.BasicProperties properties = new AMQP.BasicProperties()
//                .builder().expiration("10000").build();
        for (int i = 1; i <= 10; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE,"zql",null,message.getBytes(StandardCharsets.UTF_8));
        }
    }
}
package com.szh.rabbitmq.dead;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
 * 死信队列之消费者01
 */
public class Consumer01 {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列的名称
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //绑定交换机、声明交换机的类型
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
        //绑定普通队列, 正常队列绑定死信队列信息
        Map<String, Object> arguments = new HashMap<>();
        //正常队列设置死信交换机, 参数 key 是固定值
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //正常队列设置死信 routing-key, 参数 key 是固定值
        arguments.put("x-dead-letter-routing-key","szh");
        //正常队列最大长度限制
        //arguments.put("x-max-length",6);
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        //绑定死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        //普通交换机与普通队列进行绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zql");
        //死信交换机与死信队列进行绑定
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"szh");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            String msg = new String(message.getBody());
            if ("info5".equals(msg)) {
                System.out.println(msg + "此消息已被Consumer01拒绝....");
                //执行拒绝策略,被拒绝的消息将转到死信队列中
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            } else {
                System.out.println("Consumer01接收的消息是:" + msg);
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }
//            String msg = new String(message.getBody());
//            System.out.println("Consumer01接收的消息是:" + msg);
        };
        //修改autoAck为false,表示不自动应答
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> {});
    }
}
package com.szh.rabbitmq.dead;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.szh.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
/**
 *
 */
public class Consumer02 {
    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer02接收的消息是:" + new String(message.getBody()));
        };
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});
    }
}

下面我们测试一下,还是先启动消费者1确保MQ中已经有了相应的交换机和队列,然后将消费者1先停掉,去启动生产者,先向MQ中发送10条消息,看看结果。


生产者消息发送完毕之后,因为消费者1down掉了,所以这10条消息被堆积到了normal_queue队列中。


此时我们再启动消费者1,可以看到它正常的去MQ中消费,但是其中的info5被拒绝了,而这个拒绝的消息就会转到死信队列中。


在死信队列中就看到了info5这条消息,此时再启动消费者2,它就可以顺利的去死信队列中消费了。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
26天前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
1月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
35 0
说说RabbitMQ延迟队列实现原理?
|
1月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
43 1
|
1月前
|
消息中间件
RabbitMQ 死信消息队列 重复消费 basicAck basicNack
RabbitMQ 死信消息队列 重复消费 basicAck basicNack
|
2月前
|
消息中间件 Arthas 监控
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
|
2月前
|
数据采集 监控 物联网
MQTT协议在智能制造中的应用案例与效益分析
【6月更文挑战第8天】MQTT协议在智能制造中的应用案例与效益分析
80 1
|
2月前
|
消息中间件 存储 监控
RabbitMQ 死信队列
RabbitMQ的死信队列(DLQ)是存储无法正常消费消息的特殊队列,常见于消息被拒绝、过期或队列满时。DLQ用于异常处理、任务调度和监控,通过绑定到普通队列自动路由死信消息。通过监听死信队列,可以对异常消息进行补偿和进一步处理,提升系统稳定性和可维护性。
33 1
|
2月前
|
消息中间件 Java Spring
Spring Boot与RabbitMQ的集成应用
Spring Boot与RabbitMQ的集成应用
|
2月前
|
消息中间件
RabbitMQ配置单活模式队列
RabbitMQ配置单活模式队列
39 0