引言
本文代码已上传至Github,有兴趣的同学可以下载来看看:https://github.com/ylw-github/SpringBoot-RabbitMQ-Demo.git
死信队列听上去像 消息“死”了 ,其实也有点这个意思,我们也可以称他为“备胎队列”。
死信队列是当消息在一个队列因为以下原因产生的:
- 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了。
可以看下流程图:
项目整合
注意:死信交换机和配置,只在初始化的时候配置,如果之前配置过相同的交换机,需要先删除。如下:删除fanoutExchange
。
1.生产者配置
@Component public class DeadFanOutConfig { /** * 定义死信队列相关信息 */ public final static String deadQueueName = "dead_queue"; public final static String deadRoutingKey = "dead_routing_key"; public final static String deadExchangeName = "dead_exchange"; /** * 死信队列 交换机标识符 */ public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange"; /** * 死信队列交换机绑定键标识符 */ public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; // 邮件队列 private String FANOUT_EMAIL_QUEUE = "fanout_email_queue"; // 短信队列 private String FANOUT_SMS_QUEUE = "fanout_sms_queue"; // fanout 交换机 private String EXCHANGE_NAME = "fanoutExchange"; // 1.定义邮件队列 @Bean public Queue fanOutEamilQueue() { // 将普通队列绑定到死信队列交换机上 Map<String, Object> args = new HashMap<>(2); args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue = new Queue(FANOUT_EMAIL_QUEUE, true, false, false, args); return queue; } // 2.定义短信队列 @Bean public Queue fanOutSmsQueue() { return new Queue(FANOUT_SMS_QUEUE); } // 2.定义交换机 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_NAME); } // 3.队列与交换机绑定邮件队列 @Bean Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange); } // 4.队列与交换机绑定短信队列 @Bean Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange); } /** * 配置死信队列 * * @return */ @Bean public Queue deadQueue() { Queue queue = new Queue(deadQueueName, true); return queue; } @Bean public DirectExchange deadExchange() { return new DirectExchange(deadExchangeName); } @Bean public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey); } }
2.消费者配置
@Component public class FanoutEamilConsumer { /** * 死信队列演示 */ @RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("邮件消费者获取生产者消息msg:" + msg + ",消息id:" + messageId); JSONObject jsonObject = JSONObject.parseObject(msg); Integer timestamp = jsonObject.getInteger("timestamp"); try { int result = 1 / timestamp; System.out.println("result:" + result); // 通知mq服务器删除该消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); // // 丢弃该消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } }
3.死信消费者配置:
@Component public class DeadConsumer { @RabbitListener(queues = "dead_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("死信邮件消费者获取生产者消息msg:" + msg + ",消息id:" + messageId); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
4.启动生产者,浏览器访问:http://localhost:8081/sendFanout?queueName=fanout_email_queue,可以看到:
5. 启动消费者
6.浏览器访问:http://localhost:8081/sendFanout?queueName=fanout_email_queue ,可以看到消费者先自动补偿,补偿结束后,还是失败,然后拒绝消息,发送给死信队列了。
自动补偿:
失败放到死信队列:
最终由死信队列来获取消息: