为了保证消息从队列可靠的达到消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。
默认情况下RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。所以在实际项目中会使用手动Ack。
1、手动应答
- Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。
- Channel.basicNack (用于否定确认)
- Channel.basicReject (用于否定确认):与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。
消费者端的配置,相关属性值改为自己的:
server.port=8082 #rabbitmq服务器ip spring.rabbitmq.host=localhost #rabbitmq的端口 spring.rabbitmq.port=5672 #用户名 spring.rabbitmq.username=lonewalker #密码 spring.rabbitmq.password=XX #配置虚拟机 spring.rabbitmq.virtual-host=demo #设置消费端手动 ack none不确认 auto自动确认 manual手动确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual
修改消费代码:请勿复制使用,会卡死
packagecom.example.consumer.service; importcom.alibaba.fastjson.JSONObject; importcom.example.consumer.entity.User; importcom.rabbitmq.client.Channel; importlombok.extern.slf4j.Slf4j; importorg.springframework.amqp.core.Message; importorg.springframework.amqp.rabbit.annotation.RabbitListener; importorg.springframework.stereotype.Service; importjava.io.IOException; /*** @description:* @author: LoneWalker* @create: 2022-04-04**/publicclassConsumerService { queues="publisher.addUser") (publicvoidaddUser(StringuserStr,Channelchannel,Messagemessage){ longdeliveryTag=message.getMessageProperties().getDeliveryTag(); try { log.info("我一直在重试"); inta=1/0; Useruser=JSONObject.parseObject(userStr,User.class); log.info(user.toString()); //手动ack 第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息channel.basicAck(deliveryTag,false); } catch (Exceptione) { //手动nack 告诉rabbitmq该消息消费失败 第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为truetry { channel.basicNack(deliveryTag,false,true); } catch (IOExceptionex) { ex.printStackTrace(); } } } }
先启动发布者发送消息,查看控制台:有一条消息待消费·
启动消费端,因为代码中有除0,所以会报错,这里就会出现一条unacked消息:
因为设置的是将消息重新请求,所以它会陷入死循环
防止出现这种情况,可以将basicNack最后一个参数改为false,让消息进去死信队列
2、什么是死信队列
说简单点就是备胎队列,而死信的来源有以下几种:
- 消息被否定确认,使用
channel.basicNack
或channel.basicReject
,并且此时requeue
属性被设置为false
。 - 消息在队列的存活时间超过设置的TTL时间。
- 消息队列的消息数量已经超过最大队列长度。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
3、配置死信队列
一般会为每个重要的业务队列配置一个死信队列。可以分为以下步骤:
- 配置业务队列,绑定到业务交换机上
- 为业务队列配置死信交换机和路由key
- 为死信交换机配置死信队列
从控制台将之前的交换机都删除,然后修改代码。
首先看一下发布者的配置代码:
packagecom.example.publisher.config; importlombok.extern.slf4j.Slf4j; importorg.springframework.amqp.core.*; importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory; importorg.springframework.amqp.rabbit.connection.CorrelationData; importorg.springframework.amqp.rabbit.core.RabbitTemplate; importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter; importorg.springframework.amqp.support.converter.MessageConverter; importorg.springframework.context.annotation.Bean; importorg.springframework.context.annotation.Configuration; importjava.util.HashMap; importjava.util.Map; /*** @author LoneWalker* @date 2023/4/8* @description*/publicclassRabbitMqConfigimplementsRabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { publicRabbitTemplaterabbitTemplate(CachingConnectionFactoryconnectionFactory) { RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); //设置给rabbitTemplaterabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); rabbitTemplate.setMandatory(true); returnrabbitTemplate; } publicMessageConverterjackson2JsonMessageConverter() { returnnewJackson2JsonMessageConverter(); } /************ 正常配置 ******************//*** 正常交换机,开启持久化*/DirectExchangenormalExchange() { returnnewDirectExchange("normalExchange", true, false); } publicQueuenormalQueue() { // durable: 是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive: 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。// autoDelete: 是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。Map<String, Object>args=deadQueueArgs(); // 队列设置最大长度args.put("x-max-length", 5); returnnewQueue("normalQueue", true, false, false, args); } publicQueuettlQueue() { Map<String, Object>args=deadQueueArgs(); // 队列设置消息过期时间 60 秒args.put("x-message-ttl", 60*1000); returnnewQueue("ttlQueue", true, false, false, args); } BindingnormalRouteBinding() { returnBindingBuilder.bind(normalQueue()) .to(normalExchange()) .with("normalRouting"); } BindingttlRouteBinding() { returnBindingBuilder.bind(ttlQueue()) .to(normalExchange()) .with("ttlRouting"); } /**************** 死信配置 *****************//*** 死信交换机*/DirectExchangedeadExchange() { returnnewDirectExchange("deadExchange", true, false); } /*** 死信队列*/publicQueuedeadQueue() { returnnewQueue("deadQueue", true, false, false); } BindingdeadRouteBinding() { returnBindingBuilder.bind(deadQueue()) .to(deadExchange()) .with("deadRouting"); } /*** 转发到 死信队列,配置参数*/privateMap<String, Object>deadQueueArgs() { Map<String, Object>map=newHashMap<>(); // 绑定该队列到死信交换机map.put("x-dead-letter-exchange", "deadExchange"); map.put("x-dead-letter-routing-key", "deadRouting"); returnmap; } /*** 消息成功到达交换机会触发* @param correlationData* @param ack* @param cause*/publicvoidconfirm(CorrelationDatacorrelationData, booleanack, Stringcause) { if (ack) { log.info("交换机收到消息成功:"+correlationData.getId()); }else { log.error("交换机收到消息失败:"+correlationData.getId() +"原因:"+cause); } } /*** 消息未成功到达队列会触发* @param returnedMessage*/publicvoidreturnedMessage(ReturnedMessagereturnedMessage) { log.error("{}--消息未成功到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId()); } }
properties
server.port=8081 #rabbitmq服务ip spring.rabbitmq.host=localhost #rabbitmq端口号 spring.rabbitmq.port=5672 #用户名 spring.rabbitmq.username=用户名改为自己的 #密码 spring.rabbitmq.password=密码改为自己的 #虚拟机 spring.rabbitmq.virtual-host=demo spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true
发送消息:
publicclassPublisherServiceImplimplementsPublisherService{ privatefinalRabbitTemplaterabbitTemplate; publicvoidaddUser(Useruser) { CorrelationDatacorrelationData=newCorrelationData(); correlationData.setId(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("normalExchange","normalRouting",user,correlationData); } }
4、模拟场景
4.1消息处理异常
文章开篇说到的消息手动ack,一旦出现异常会陷入死循环,那么不把消息放回原队列,而是放入死信队列,然后抛异常由人工处理:
packagecom.example.consumer.service; importcom.alibaba.fastjson.JSONObject; importcom.example.consumer.entity.User; importcom.rabbitmq.client.Channel; importlombok.extern.slf4j.Slf4j; importorg.springframework.amqp.core.Message; importorg.springframework.amqp.rabbit.annotation.RabbitListener; importorg.springframework.stereotype.Service; importjava.io.IOException; /*** @description:* @author: LoneWalker* @create: 2022-04-04**/publicclassConsumerService { queues="normalQueue") (publicvoidaddUser(StringuserStr,Channelchannel,Messagemessage){ longdeliveryTag=message.getMessageProperties().getDeliveryTag(); try { inta=1/0; Useruser=JSONObject.parseObject(userStr,User.class); log.info(user.toString()); //手动ack 第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息channel.basicAck(deliveryTag,false); } catch (Exceptione) { //手动nack 告诉rabbitmq该消息消费失败 第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为truetry { channel.basicNack(deliveryTag,false,false); } catch (IOExceptionex) { thrownewRuntimeException("消息处理失败"); } } } }
注意basicNack的第三个参数,设置为false后就不会重新请求。
4.2队列达到最大长度
配置上面的代码已经有过了:
测试的话我们发6条消息,加上4.1测试产生的死信,预期死信队列中应该会有两条:
4.3消息TTL过期
过期时间TTL表示可以对消息设置预期的时间,超过这个时间就删除或者放入死信队列。修改routingKey为ttlRouting。上述代码中配置过期时间为60s
死信队列中的消息处理和正常的队列没什么区别,就不赘述了。