RabbitMQ之死信队列

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 【1月更文挑战第10天】先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

文章目录

一、死信的概念

二、死信的来源

三、实战

1、消息 TTL 过期

2、队列达到最大长度

3、消息被拒

总结


一、死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

二、死信的来源

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

三、实战

1、消息 TTL 过期

生产者代码:

publicclassProducer {
privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";
publicstaticvoidmain(String[] argv) throwsException {
try (Channelchannel=RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//设置消息的 TTL 时间AMQP.BasicPropertiesproperties=newAMQP.BasicProperties().builder().expiration("10000").build();
//该信息是用作演示队列个数限制for (inti=1; i<11 ; i++) {
Stringmessage="info"+i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
System.out.println("生产者发送消息:"+message);
            }
        }
    }
}

消费者 C1 代码(启动之后关闭该消费者 模拟其接收不到消息)

publicclassConsumer01 {
//普通交换机名称privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";
//死信交换机名称privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";
publicstaticvoidmain(String[] argv) throwsException {
Channelchannel=RabbitUtils.getChannel();
//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列StringdeadQueue="dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定死信交换机与 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常队列绑定死信队列信息Map<String, Object>params=newHashMap<>();
//正常队列设置死信交换机 参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");
StringnormalQueue="normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息.....");
DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {
Stringmessage=newString(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息"+message);
        };
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag-> {
        });
    }
}

消费者 C2 代码(以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息)

publicclassConsumer02 {
privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";
publicstaticvoidmain(String[] argv) throwsException {
Channelchannel=RabbitUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
StringdeadQueue="dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信队列消息.....");
DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {
Stringmessage=newString(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收死信队列的消息"+message);
        };
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag-> {
        });
    }
}

2、队列达到最大长度

消息生产者代码去掉 TTL 属性

publicclassProducer {
privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";
publicstaticvoidmain(String[] argv) throwsException {
try (Channelchannel=RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//该信息是用作演示队列个数限制for (inti=1; i<11 ; i++) {
Stringmessage="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());
System.out.println("生产者发送消息:"+message);
            }
        }
    }
}

C1 消费者修改以下代码(启动之后关闭该消费者 模拟其接收不到消息)

publicclassConsumer01 {
//普通交换机名称privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";
//死信交换机名称privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";
publicstaticvoidmain(String[] argv) throwsException {
Channelchannel=RabbitUtils.getChannel();
//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列StringdeadQueue="dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定死信交换机与 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常队列绑定死信队列信息Map<String, Object>params=newHashMap<>();
//正常队列设置死信交换机 参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");
//设置正常队列长度限制params.put("x-max-length",6);
StringnormalQueue="normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息.....");
DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {
Stringmessage=newString(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息"+message);
        };
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag-> {
        });
    }
}

注意此时需要把原先队列删除 因为参数改变了

C2 消费者代码

publicclassConsumer02 {
privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";
publicstaticvoidmain(String[] argv) throwsException {
Channelchannel=RabbitUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
StringdeadQueue="dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信队列消息.....");
DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {
Stringmessage=newString(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收死信队列的消息"+message);
        };
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag-> {
        });
    }
}

3、消息被拒

消息生产者代码

publicclassProducer {
privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";
publicstaticvoidmain(String[] argv) throwsException {
try (Channelchannel=RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//该信息是用作演示队列个数限制for (inti=1; i<11 ; i++) {
Stringmessage="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());
System.out.println("生产者发送消息:"+message);
            }
        }
    }
}

C1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息)

publicclassConsumer01 {
//普通交换机名称privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";
//死信交换机名称privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";
publicstaticvoidmain(String[] argv) throwsException {
Channelchannel=RabbitUtils.getChannel();
//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列StringdeadQueue="dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定死信交换机与 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常队列绑定死信队列信息Map<String, Object>params=newHashMap<>();
//正常队列设置死信交换机 参数 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值params.put("x-dead-letter-routing-key", "lisi");
StringnormalQueue="normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息.....");
DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {
Stringmessage=newString(delivery.getBody(), "UTF-8");
if(message.equals("info5")){
System.out.println("Consumer01 接收到消息"+message+"并拒绝签收该消息");
//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            }else {
System.out.println("Consumer01 接收到消息"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
booleanautoAck=false;
channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag-> {
        });
    }
}

C2 消费者代码

publicclassConsumer02 {
privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";
publicstaticvoidmain(String[] argv) throwsException {
Channelchannel=RabbitUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
StringdeadQueue="dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信队列消息.....");
DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {
Stringmessage=newString(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收死信队列的消息"+message);
        };
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag-> {
        });
    }
}

启动消费者 1 然后再启动消费者 2


总结

以上就是RabbitMQ之死信队列的相关知识点,希望对你有所帮助。

积跬步以至千里,积怠惰以至深渊。时代在这跟着你一起努力哦!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
207 6
|
3月前
|
消息中间件 JSON Java
|
3月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
3月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
100 0
|
4月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
125 2
|
3月前
|
消息中间件 存储
RabbitMQ-死信交换机和死信队列
死信队列和死信交换机是RabbitMQ提供的一个非常实用的功能,通过合理使用这一机制,可以大大增强系统的健壮性和可靠性。它们不仅能有效解决消息处理失败的情况,还能为系统的错误追踪、消息延迟处理等提供支持。在设计系统的消息体系时,合理规划和使用死信队列和死信交换机,将会为系统的稳定运行提供一个有力的
72 0
|
5月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
5月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
83 0
说说RabbitMQ延迟队列实现原理?
|
5月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
159 1
|
5月前
|
消息中间件
RabbitMQ 死信消息队列 重复消费 basicAck basicNack
RabbitMQ 死信消息队列 重复消费 basicAck basicNack

相关产品

  • 云消息队列 MQ