rabbitmq消息的确认机制ack

简介: rabbitmq消息的确认机制ack

公众号merlinsea


消息确认机制ack介绍【消费者端保证】

消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息决定是删除还是重新入队 。


消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息 重新放入队列中【可能存在重复消费的问题,任何队列都不可避免存在重复消费问题】。


只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。


消息的ACK自动确认机制【默认】是打开的,也可以将消息的自动确认机制改为手动确认。


消息如未被进行ACK的消息确认机制,这条消息被锁定。


Unacked Unacked状态的消息是指:消息被消费者获取了但还没有给队列ack响应的消息。消息每入队一次,msgTag都会自增1。

640.jpg

1、配置文件application.yml开启手工确认模式【自动确认模式开发人员啥也不需要管】

#消息队列
spring:
  rabbitmq:
    host: 39.107.221.166
    port: 5672
    virtual-host: /dev
    password: password
    username: admin
    #开启消息可靠性投递,生产者到broker的交换机
    publisher-confirm-type: correlated
    #开启消息可靠性投递,交换机到队列的可靠性投递
    publisher-returns: true
    #为true,则交换机处理消息到路由失败,则会返回给生产者
    template:
      mandatory: true
    #消息手工确认ACK
    listener:
      simple:
        acknowledge-mode: manual


2、消费者代码

@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {
    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {
        long msgTag = 0;
        try {
            msgTag = message.getMessageProperties().getDeliveryTag();
            System.out.println("msgTag="+msgTag);
            System.out.println("message="+message.toString());
            System.out.println("body="+body);
            if(msgTag<=2){
                throw new Exception("模拟异常");
            }
            //告诉broker,消息已经被确认
            //记录消息日志 TODO
            channel.basicAck(msgTag,false);
        }catch (Exception e){
            /**
             * basicNack()方法用于告诉broker消息被拒绝消费
             * 参数1: 消息的tag标识
             * 参数2:false 关闭多条拒绝
             * 参数3:true=消息重新入队 ,false=消息直接丢弃
             */
            //channel.basicNack(msgTag,false,true);
             /**
             * basicReject()用于告诉broker消息被拒绝消费
             * 参数1:消息tag标识
             * 参数2:true=消息重新入队 ,false=消息直接丢弃
             */
            System.out.println("拒绝消费消息 msgTag="+msgTag);
            channel.basicReject(msgTag,true);
        }
    }
}


相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
8月前
|
消息中间件 存储 监控
|
8月前
|
消息中间件 存储 运维
|
8月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
111 0
|
3月前
|
消息中间件 数据采集 数据库
小说爬虫-02 爬取小说详细内容和章节列表 推送至RabbitMQ 消费ACK确认 Scrapy爬取 SQLite
小说爬虫-02 爬取小说详细内容和章节列表 推送至RabbitMQ 消费ACK确认 Scrapy爬取 SQLite
29 1
|
3月前
|
消息中间件 Java Kafka
Kafka ACK机制详解!
本文深入剖析了Kafka的ACK机制,涵盖其原理、源码分析及应用场景,并探讨了acks=0、acks=1和acks=all三种级别的优缺点。文中还介绍了ISR(同步副本)的工作原理及其维护机制,帮助读者理解如何在性能与可靠性之间找到最佳平衡。适合希望深入了解Kafka消息传递机制的开发者阅读。
252 0
|
5月前
|
Kubernetes 监控 Perl
在k8S中,自动扩容机制是什么?
在k8S中,自动扩容机制是什么?
|
8月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
108 0
|
6月前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ使用问题之过期删除机制的触发条件是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之过期删除机制的触发条件是什么
|
5月前
|
存储 网络安全 API
【Azure Service Bus】 Service Bus如何确保消息发送成功,发送端是否有Ack机制 
【Azure Service Bus】 Service Bus如何确保消息发送成功,发送端是否有Ack机制 
|
5月前
|
Kubernetes Java 调度
在K8S中,Pod突然挂掉,K8S有什么机制或功能自动清除Pod?
在K8S中,Pod突然挂掉,K8S有什么机制或功能自动清除Pod?