公众号merlinsea
消息确认机制ack介绍【消费者端保证】
消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息决定是删除还是重新入队 。
消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息 重新放入队列中【可能存在重复消费的问题,任何队列都不可避免存在重复消费问题】。
只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
消息的ACK自动确认机制【默认】是打开的,也可以将消息的自动确认机制改为手动确认。
消息如未被进行ACK的消息确认机制,这条消息被锁定。
Unacked Unacked状态的消息是指:消息被消费者获取了但还没有给队列ack响应的消息。消息每入队一次,msgTag都会自增1。
1、配置文件application.yml开启手工确认模式【自动确认模式开发人员啥也不需要管】
#消息队列 spring: rabbitmq: host: 39.107.221.166 port: 5672 virtual-host: /dev password: password username: admin #开启消息可靠性投递,生产者到broker的交换机 publisher-confirm-type: correlated #开启消息可靠性投递,交换机到队列的可靠性投递 publisher-returns: true #为true,则交换机处理消息到路由失败,则会返回给生产者 template: mandatory: true #消息手工确认ACK listener: simple: acknowledge-mode: manual
2、消费者代码
@Component @RabbitListener(queues = "order_queue") public class OrderMQListener { @RabbitHandler public void messageHandler(String body, Message message, Channel channel) throws IOException { long msgTag = 0; try { msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("message="+message.toString()); System.out.println("body="+body); if(msgTag<=2){ throw new Exception("模拟异常"); } //告诉broker,消息已经被确认 //记录消息日志 TODO channel.basicAck(msgTag,false); }catch (Exception e){ /** * basicNack()方法用于告诉broker消息被拒绝消费 * 参数1: 消息的tag标识 * 参数2:false 关闭多条拒绝 * 参数3:true=消息重新入队 ,false=消息直接丢弃 */ //channel.basicNack(msgTag,false,true); /** * basicReject()用于告诉broker消息被拒绝消费 * 参数1:消息tag标识 * 参数2:true=消息重新入队 ,false=消息直接丢弃 */ System.out.println("拒绝消费消息 msgTag="+msgTag); channel.basicReject(msgTag,true); } } }