RabbitMQ 死信队列

简介: RabbitMQ 死信队列

https://www.rabbitmq.com/dlx.html


DLX 即 Dead-Letter-Exchange 也叫做死信交换机。

死信队列是指队列上的消息变成死信后,能够后发送到另外一个交换机,这个交换机 就是 DLX 。


一般有几种情况会变成死信:


  • 消息被拒绝( Basic.reject 或者 basic.nack)并且设置 requeue 参数为 false
  • 消息 过期 设置了 message TTL
  • 队列达到最大的长度

死信交换机是正常的交换机,能够在任何队列上被指定。其实死信交换机和一般的交换机没啥区别,只是添加了死信交换机的属性。如果队列上存在死信, RabbitMq 会将死信消息投递到设置的 DLX 上去 ,然后被路由到一个队列上,这个队列,就是死信队列。

 

流程如下:

640.png


生产者:

import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHost("xxxx");
        connectionFactory.setUsername("xxx");
        connectionFactory.setPassword("xxx");
        connectionFactory.setPort(5672);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "normal_exchange";
        String routingkey = "dlx.dlx";
        String msg = "test dlx message";
        String queueName = "normal_queueName";
        Map<String,Object>map =new HashMap<>();
        //注意:x-dead-letter-exchange 这个key是固定这样写的,value是你自定义的。
        map.put("x-dead-letter-exchange","exchange.dlx");
        //注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。
        channel.queueDeclare(queueName, true, false, false, map);
        channel.exchangeDeclare(exchangeName,"direct",true,false,null);
        channel.queueBind(queueName,exchangeName,"dlx.dlx");
        for (int i = 0; i < 3; i++) {
            // deliveryMode=2 持久化,expiration 消息有效时间
            AMQP.BasicProperties properties=new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .contentEncoding("utf-8")
                    .expiration("7000")
                    .build();
            channel.basicPublish(exchangeName, routingkey, true, properties, msg.getBytes());
        }
    }
}



消费者:

import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Consumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHost("xxxx");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("xxxx");
        connectionFactory.setPassword("xxxx");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //死信交换机声明
        channel.exchangeDeclare("exchange.dlx","topic",true,false,null);
        //死信队列声明
        channel.queueDeclare("queue.dlx",true,false,false,null);
        //routingkey指定为#就行,表示只要路由到死信队列的都接收
        channel.queueBind("queue.dlx","exchange.dlx","#");
        channel.basicConsume("queue.dlx", false, "myConsumer Tag", new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                  String routingKey = envelope.getRoutingKey();
                  String convernType = properties.getContentType();
                  long deliveryTag = envelope.getDeliveryTag();
                  System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body));
                  channel.basicAck(deliveryTag, false);
            }
        });
    }
}


上面的代码可以看到:

消息通过正常交换机 normal_exchange 到达了正常队列 normal_queue。

map.put("x-dead-letter-exchange","exchange.dlx");
 //注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。
 channel.queueDeclare(queueName, true, false, false, map);


正常的队列 normal_queue 声明了下面参数下设置了一个 x-dead-letter-exchange  当消息过期时,将消息发送到死信交换机 exchange.dlx

死信交换机下面绑定了一个队列 queue.dlx


channel.exchangeDeclare("exchange.dlx","topic",true,false,null);
 //死信队列声明
channel.queueDeclare("queue.dlx",true,false,false,null);
//routingkey指定为#就行,表示只要路由到死信队列的都接收
channel.queueBind("queue.dlx","exchange.dlx","#");

   最后将消息发送到了死信队列上,消费者,消费死信队列 queue.dlx 上的消息即可


channel.basicConsume("queue.dlx", false, "myConsumer Tag", new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                  String routingKey = envelope.getRoutingKey();
                  String convernType = properties.getContentType();
                  long deliveryTag = envelope.getDeliveryTag();
                  System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body));
                  channel.basicAck(deliveryTag, false);
            }
        });


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
127 6
|
2月前
|
消息中间件 JSON Java
|
2月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
2月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
85 0
|
3月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
90 2
|
2月前
|
消息中间件 存储
RabbitMQ-死信交换机和死信队列
死信队列和死信交换机是RabbitMQ提供的一个非常实用的功能,通过合理使用这一机制,可以大大增强系统的健壮性和可靠性。它们不仅能有效解决消息处理失败的情况,还能为系统的错误追踪、消息延迟处理等提供支持。在设计系统的消息体系时,合理规划和使用死信队列和死信交换机,将会为系统的稳定运行提供一个有力的
57 0
|
4月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
4月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
69 0
说说RabbitMQ延迟队列实现原理?
|
4月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
130 1
|
4月前
|
消息中间件
RabbitMQ 死信消息队列 重复消费 basicAck basicNack
RabbitMQ 死信消息队列 重复消费 basicAck basicNack