RabbitMQ之死信队列

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【1月更文挑战第10天】先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

文章目录

一、死信的概念

二、死信的来源

三、实战

1、消息 TTL 过期

2、队列达到最大长度

3、消息被拒

总结


一、死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

二、死信的来源

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

三、实战

1、消息 TTL 过期

生产者代码:

publicclassProducer {
privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";
publicstaticvoidmain(String[] argv) throwsException {
try (Channelchannel=RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//设置消息的 TTL 时间AMQP.BasicPropertiesproperties=newAMQP.BasicProperties().builder().expiration("10000").build();
//该信息是用作演示队列个数限制for (inti=1; i<11 ; i++) {
Stringmessage="info"+i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
System.out.println("生产者发送消息:"+message);
            }
        }
    }
}

消费者 C1 代码(启动之后关闭该消费者 模拟其接收不到消息)

publicclassConsumer01 {
//普通交换机名称privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";
//死信交换机名称privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";
publicstaticvoidmain(String[] argv) throwsException {
Channelchannel=RabbitUtils.getChannel();
//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列StringdeadQueue="dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定死信交换机与 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常队列绑定死信队列信息Map<String, Object>params=newHashMap<>();
//正常队列设置死信交换机 参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");
StringnormalQueue="normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息.....");
DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {
Stringmessage=newString(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息"+message);
        };
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag-> {
        });
    }
}

消费者 C2 代码(以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息)

publicclassConsumer02 {
privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";
publicstaticvoidmain(String[] argv) throwsException {
Channelchannel=RabbitUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
StringdeadQueue="dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信队列消息.....");
DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {
Stringmessage=newString(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收死信队列的消息"+message);
        };
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag-> {
        });
    }
}

2、队列达到最大长度

消息生产者代码去掉 TTL 属性

publicclassProducer {
privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";
publicstaticvoidmain(String[] argv) throwsException {
try (Channelchannel=RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//该信息是用作演示队列个数限制for (inti=1; i<11 ; i++) {
Stringmessage="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());
System.out.println("生产者发送消息:"+message);
            }
        }
    }
}

C1 消费者修改以下代码(启动之后关闭该消费者 模拟其接收不到消息)

publicclassConsumer01 {
//普通交换机名称privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";
//死信交换机名称privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";
publicstaticvoidmain(String[] argv) throwsException {
Channelchannel=RabbitUtils.getChannel();
//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列StringdeadQueue="dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定死信交换机与 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常队列绑定死信队列信息Map<String, Object>params=newHashMap<>();
//正常队列设置死信交换机 参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");
//设置正常队列长度限制params.put("x-max-length",6);
StringnormalQueue="normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息.....");
DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {
Stringmessage=newString(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息"+message);
        };
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag-> {
        });
    }
}

注意此时需要把原先队列删除 因为参数改变了

C2 消费者代码

publicclassConsumer02 {
privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";
publicstaticvoidmain(String[] argv) throwsException {
Channelchannel=RabbitUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
StringdeadQueue="dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信队列消息.....");
DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {
Stringmessage=newString(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收死信队列的消息"+message);
        };
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag-> {
        });
    }
}

3、消息被拒

消息生产者代码

publicclassProducer {
privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";
publicstaticvoidmain(String[] argv) throwsException {
try (Channelchannel=RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//该信息是用作演示队列个数限制for (inti=1; i<11 ; i++) {
Stringmessage="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());
System.out.println("生产者发送消息:"+message);
            }
        }
    }
}

C1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息)

publicclassConsumer01 {
//普通交换机名称privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";
//死信交换机名称privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";
publicstaticvoidmain(String[] argv) throwsException {
Channelchannel=RabbitUtils.getChannel();
//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列StringdeadQueue="dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定死信交换机与 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常队列绑定死信队列信息Map<String, Object>params=newHashMap<>();
//正常队列设置死信交换机 参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");
StringnormalQueue="normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息.....");
DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {
Stringmessage=newString(delivery.getBody(), "UTF-8");
if(message.equals("info5")){
System.out.println("Consumer01 接收到消息"+message+"并拒绝签收该消息");
//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            }else {
System.out.println("Consumer01 接收到消息"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
booleanautoAck=false;
channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag-> {
        });
    }
}

C2 消费者代码

publicclassConsumer02 {
privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";
publicstaticvoidmain(String[] argv) throwsException {
Channelchannel=RabbitUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
StringdeadQueue="dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信队列消息.....");
DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {
Stringmessage=newString(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收死信队列的消息"+message);
        };
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag-> {
        });
    }
}

启动消费者 1 然后再启动消费者 2


总结

以上就是RabbitMQ之死信队列的相关知识点,希望对你有所帮助。

积跬步以至千里,积怠惰以至深渊。时代在这跟着你一起努力哦!

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
90 0
|
2月前
|
消息中间件 存储 NoSQL
RabbitMQ的幂等性、优先级队列和惰性队列
**摘要:** 本文讨论了RabbitMQ中的幂等性、优先级队列和惰性队列。幂等性确保了重复请求不会导致副作用,关键在于消费端的幂等性保障,如使用唯一ID和Redis的原子性操作。优先级队列适用于处理不同重要性消息,如大客户订单优先处理,通过设置`x-max-priority`属性实现。惰性队列自3.6.0版起提供,用于延迟将消息加载到内存,适合大量消息存储和消费者延迟消费的场景。
27 4
|
26天前
|
消息中间件 Java API
RabbitMQ入门指南(五):Java声明队列、交换机以及绑定
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了Java声明队列、交换机以及绑定队列和交换机等内容。
30 0
|
1月前
|
消息中间件 Java Maven
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称
|
1月前
|
消息中间件 存储 NoSQL
rocketmq实现延迟队列思路探讨
本文介绍了两种实现RocketMQ延迟消息的方法。非任意时间延迟可通过在服务器端配置`messageDelayLevel`实现,但需重启服务。任意时间延迟则分为两种策略:一是结合原生逻辑和时间轮,利用RocketMQ的默认延迟等级组合支持任意延迟,但可能丢失1分钟内的数据;二是使用存储介质(如Redis)加时间轮,消息存储和定时发送结合,能处理数据不一致和丢失问题,但涉及更多组件。推荐项目[civism-rocket](https://github.com/civism/civism-rocket)作为参考。
71 1
|
2月前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
45 1
|
3月前
|
消息中间件 监控 数据挖掘
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
51 0
|
3月前
|
消息中间件 Docker 容器
docker构建rabbitmq并配置延迟队列插件
docker构建rabbitmq并配置延迟队列插件
44 0
|
3月前
|
消息中间件
rabbitmq动态创建队列
rabbitmq动态创建队列
39 0
|
2月前
|
消息中间件 网络协议 JavaScript
MQTT常见问题之微消息队列mqtt支持ipv6失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:

相关产品

  • 云消息队列 MQ