RabbitMQ手动ACK与死信队列

简介: RabbitMQ手动ACK与死信队列

为了保证消息从队列可靠的达到消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。

默认情况下RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。所以在实际项目中会使用手动Ack。

1、手动应答

  1. Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。
  2. Channel.basicNack (用于否定确认)
  3. Channel.basicReject (用于否定确认):与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。

消费者端的配置,相关属性值改为自己的:

server.port=8082
#rabbitmq服务器ip
spring.rabbitmq.host=localhost
#rabbitmq的端口
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=lonewalker
#密码
spring.rabbitmq.password=XX
#配置虚拟机
spring.rabbitmq.virtual-host=demo
#设置消费端手动 ack   none不确认  auto自动确认  manual手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

修改消费代码:请勿复制使用,会卡死

packagecom.example.consumer.service;
importcom.alibaba.fastjson.JSONObject;
importcom.example.consumer.entity.User;
importcom.rabbitmq.client.Channel;
importlombok.extern.slf4j.Slf4j;
importorg.springframework.amqp.core.Message;
importorg.springframework.amqp.rabbit.annotation.RabbitListener;
importorg.springframework.stereotype.Service;
importjava.io.IOException;
/*** @description:* @author: LoneWalker* @create: 2022-04-04**/@Service@Slf4jpublicclassConsumerService {
@RabbitListener(queues="publisher.addUser")
publicvoidaddUser(StringuserStr,Channelchannel,Messagemessage){
longdeliveryTag=message.getMessageProperties().getDeliveryTag();
try {
log.info("我一直在重试");
inta=1/0;
Useruser=JSONObject.parseObject(userStr,User.class);
log.info(user.toString());
//手动ack  第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息channel.basicAck(deliveryTag,false);
        } catch (Exceptione) {
//手动nack 告诉rabbitmq该消息消费失败  第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为truetry {
channel.basicNack(deliveryTag,false,true);
            } catch (IOExceptionex) {
ex.printStackTrace();
            }
        }
    }
}


先启动发布者发送消息,查看控制台:有一条消息待消费·

启动消费端,因为代码中有除0,所以会报错,这里就会出现一条unacked消息:

因为设置的是将消息重新请求,所以它会陷入死循环

防止出现这种情况,可以将basicNack最后一个参数改为false,让消息进去死信队列

2、什么是死信队列

说简单点就是备胎队列,而死信的来源有以下几种:

  1. 消息被否定确认,使用 channel.basicNackchannel.basicReject ,并且此时requeue 属性被设置为false
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

3、配置死信队列

一般会为每个重要的业务队列配置一个死信队列。可以分为以下步骤:

  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由key
  3. 为死信交换机配置死信队列

从控制台将之前的交换机都删除,然后修改代码。

首先看一下发布者的配置代码:

packagecom.example.publisher.config;
importlombok.extern.slf4j.Slf4j;
importorg.springframework.amqp.core.*;
importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;
importorg.springframework.amqp.rabbit.connection.CorrelationData;
importorg.springframework.amqp.rabbit.core.RabbitTemplate;
importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
importorg.springframework.amqp.support.converter.MessageConverter;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importjava.util.HashMap;
importjava.util.Map;
/*** @author LoneWalker* @date 2023/4/8* @description*/@Slf4j@ConfigurationpublicclassRabbitMqConfigimplementsRabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@BeanpublicRabbitTemplaterabbitTemplate(CachingConnectionFactoryconnectionFactory) {
RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
//设置给rabbitTemplaterabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.setMandatory(true);
returnrabbitTemplate;
    }
@BeanpublicMessageConverterjackson2JsonMessageConverter() {
returnnewJackson2JsonMessageConverter();
    }
/************ 正常配置 ******************//*** 正常交换机,开启持久化*/@BeanDirectExchangenormalExchange() {
returnnewDirectExchange("normalExchange", true, false);
    }
@BeanpublicQueuenormalQueue() {
// durable: 是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive: 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。// autoDelete: 是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。Map<String, Object>args=deadQueueArgs();
// 队列设置最大长度args.put("x-max-length", 5);
returnnewQueue("normalQueue", true, false, false, args);
    }
@BeanpublicQueuettlQueue() {
Map<String, Object>args=deadQueueArgs();
// 队列设置消息过期时间 60 秒args.put("x-message-ttl", 60*1000);
returnnewQueue("ttlQueue", true, false, false, args);
    }
@BeanBindingnormalRouteBinding() {
returnBindingBuilder.bind(normalQueue())
                .to(normalExchange())
                .with("normalRouting");
    }
@BeanBindingttlRouteBinding() {
returnBindingBuilder.bind(ttlQueue())
                .to(normalExchange())
                .with("ttlRouting");
    }
/**************** 死信配置 *****************//*** 死信交换机*/@BeanDirectExchangedeadExchange() {
returnnewDirectExchange("deadExchange", true, false);
    }
/*** 死信队列*/@BeanpublicQueuedeadQueue() {
returnnewQueue("deadQueue", true, false, false);
    }
@BeanBindingdeadRouteBinding() {
returnBindingBuilder.bind(deadQueue())
                .to(deadExchange())
                .with("deadRouting");
    }
/*** 转发到 死信队列,配置参数*/privateMap<String, Object>deadQueueArgs() {
Map<String, Object>map=newHashMap<>();
// 绑定该队列到死信交换机map.put("x-dead-letter-exchange", "deadExchange");
map.put("x-dead-letter-routing-key", "deadRouting");
returnmap;
    }
/*** 消息成功到达交换机会触发* @param correlationData* @param ack* @param cause*/@Overridepublicvoidconfirm(CorrelationDatacorrelationData, booleanack, Stringcause) {
if (ack) {
log.info("交换机收到消息成功:"+correlationData.getId());
        }else {
log.error("交换机收到消息失败:"+correlationData.getId() +"原因:"+cause);
        }
    }
/*** 消息未成功到达队列会触发* @param returnedMessage*/@OverridepublicvoidreturnedMessage(ReturnedMessagereturnedMessage) {
log.error("{}--消息未成功到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());
    }
}


properties

server.port=8081
#rabbitmq服务ip
spring.rabbitmq.host=localhost
#rabbitmq端口号
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=用户名改为自己的
#密码
spring.rabbitmq.password=密码改为自己的
#虚拟机
spring.rabbitmq.virtual-host=demo
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

发送消息:

@RequiredArgsConstructor@ServicepublicclassPublisherServiceImplimplementsPublisherService{
privatefinalRabbitTemplaterabbitTemplate;
@OverridepublicvoidaddUser(Useruser) {
CorrelationDatacorrelationData=newCorrelationData();
correlationData.setId(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("normalExchange","normalRouting",user,correlationData);
    }
}

4、模拟场景

4.1消息处理异常

文章开篇说到的消息手动ack,一旦出现异常会陷入死循环,那么不把消息放回原队列,而是放入死信队列,然后抛异常由人工处理:

packagecom.example.consumer.service;
importcom.alibaba.fastjson.JSONObject;
importcom.example.consumer.entity.User;
importcom.rabbitmq.client.Channel;
importlombok.extern.slf4j.Slf4j;
importorg.springframework.amqp.core.Message;
importorg.springframework.amqp.rabbit.annotation.RabbitListener;
importorg.springframework.stereotype.Service;
importjava.io.IOException;
/*** @description:* @author: LoneWalker* @create: 2022-04-04**/@Service@Slf4jpublicclassConsumerService {
@RabbitListener(queues="normalQueue")
publicvoidaddUser(StringuserStr,Channelchannel,Messagemessage){
longdeliveryTag=message.getMessageProperties().getDeliveryTag();
try {
inta=1/0;
Useruser=JSONObject.parseObject(userStr,User.class);
log.info(user.toString());
//手动ack  第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息channel.basicAck(deliveryTag,false);
        } catch (Exceptione) {
//手动nack 告诉rabbitmq该消息消费失败  第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为truetry {
channel.basicNack(deliveryTag,false,false);
            } catch (IOExceptionex) {
thrownewRuntimeException("消息处理失败");
            }
        }
    }
}

注意basicNack的第三个参数,设置为false后就不会重新请求。

4.2队列达到最大长度

配置上面的代码已经有过了:

测试的话我们发6条消息,加上4.1测试产生的死信,预期死信队列中应该会有两条:

4.3消息TTL过期

过期时间TTL表示可以对消息设置预期的时间,超过这个时间就删除或者放入死信队列。修改routingKey为ttlRouting。上述代码中配置过期时间为60s

死信队列中的消息处理和正常的队列没什么区别,就不赘述了。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
2月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
202 6
|
2月前
|
消息中间件 数据采集 数据库
小说爬虫-02 爬取小说详细内容和章节列表 推送至RabbitMQ 消费ACK确认 Scrapy爬取 SQLite
小说爬虫-02 爬取小说详细内容和章节列表 推送至RabbitMQ 消费ACK确认 Scrapy爬取 SQLite
25 1
|
3月前
|
消息中间件 JSON Java
|
3月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
3月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
100 0
|
4月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
122 2
|
3月前
|
消息中间件 存储
RabbitMQ-死信交换机和死信队列
死信队列和死信交换机是RabbitMQ提供的一个非常实用的功能,通过合理使用这一机制,可以大大增强系统的健壮性和可靠性。它们不仅能有效解决消息处理失败的情况,还能为系统的错误追踪、消息延迟处理等提供支持。在设计系统的消息体系时,合理规划和使用死信队列和死信交换机,将会为系统的稳定运行提供一个有力的
72 0
|
5月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
5月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
82 0
说说RabbitMQ延迟队列实现原理?
|
5月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
155 1