https://www.rabbitmq.com/dlx.html
DLX 即 Dead-Letter-Exchange 也叫做死信交换机。
死信队列是指队列上的消息变成死信后,能够后发送到另外一个交换机,这个交换机 就是 DLX 。
一般有几种情况会变成死信:
- 消息被拒绝( Basic.reject 或者 basic.nack)并且设置 requeue 参数为 false
- 消息 过期 设置了 message TTL
- 队列达到最大的长度
死信交换机是正常的交换机,能够在任何队列上被指定。其实死信交换机和一般的交换机没啥区别,只是添加了死信交换机的属性。如果队列上存在死信, RabbitMq 会将死信消息投递到设置的 DLX 上去 ,然后被路由到一个队列上,这个队列,就是死信队列。
流程如下:
生产者:
import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setHost("xxxx"); connectionFactory.setUsername("xxx"); connectionFactory.setPassword("xxx"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "normal_exchange"; String routingkey = "dlx.dlx"; String msg = "test dlx message"; String queueName = "normal_queueName"; Map<String,Object>map =new HashMap<>(); //注意:x-dead-letter-exchange 这个key是固定这样写的,value是你自定义的。 map.put("x-dead-letter-exchange","exchange.dlx"); //注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。 channel.queueDeclare(queueName, true, false, false, map); channel.exchangeDeclare(exchangeName,"direct",true,false,null); channel.queueBind(queueName,exchangeName,"dlx.dlx"); for (int i = 0; i < 3; i++) { // deliveryMode=2 持久化,expiration 消息有效时间 AMQP.BasicProperties properties=new AMQP.BasicProperties().builder() .deliveryMode(2) .contentEncoding("utf-8") .expiration("7000") .build(); channel.basicPublish(exchangeName, routingkey, true, properties, msg.getBytes()); } } }
消费者:
import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setHost("xxxx"); connectionFactory.setPort(5672); connectionFactory.setUsername("xxxx"); connectionFactory.setPassword("xxxx"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //死信交换机声明 channel.exchangeDeclare("exchange.dlx","topic",true,false,null); //死信队列声明 channel.queueDeclare("queue.dlx",true,false,false,null); //routingkey指定为#就行,表示只要路由到死信队列的都接收 channel.queueBind("queue.dlx","exchange.dlx","#"); channel.basicConsume("queue.dlx", false, "myConsumer Tag", new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String convernType = properties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body)); channel.basicAck(deliveryTag, false); } }); } }
上面的代码可以看到:
消息通过正常交换机 normal_exchange 到达了正常队列 normal_queue。
map.put("x-dead-letter-exchange","exchange.dlx"); //注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。 channel.queueDeclare(queueName, true, false, false, map);
正常的队列 normal_queue 声明了下面参数下设置了一个 x-dead-letter-exchange 当消息过期时,将消息发送到死信交换机 exchange.dlx
死信交换机下面绑定了一个队列 queue.dlx
channel.exchangeDeclare("exchange.dlx","topic",true,false,null); //死信队列声明 channel.queueDeclare("queue.dlx",true,false,false,null); //routingkey指定为#就行,表示只要路由到死信队列的都接收 channel.queueBind("queue.dlx","exchange.dlx","#");
最后将消息发送到了死信队列上,消费者,消费死信队列 queue.dlx 上的消息即可
channel.basicConsume("queue.dlx", false, "myConsumer Tag", new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String convernType = properties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body)); channel.basicAck(deliveryTag, false); } });