rabbitmq消息的确认机制ack

简介: rabbitmq消息的确认机制ack

公众号merlinsea


消息确认机制ack介绍【消费者端保证】

消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息决定是删除还是重新入队 。


消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息 重新放入队列中【可能存在重复消费的问题,任何队列都不可避免存在重复消费问题】。


只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。


消息的ACK自动确认机制【默认】是打开的,也可以将消息的自动确认机制改为手动确认。


消息如未被进行ACK的消息确认机制,这条消息被锁定。


Unacked Unacked状态的消息是指:消息被消费者获取了但还没有给队列ack响应的消息。消息每入队一次,msgTag都会自增1。

640.jpg

1、配置文件application.yml开启手工确认模式【自动确认模式开发人员啥也不需要管】

#消息队列
spring:
  rabbitmq:
    host: 39.107.221.166
    port: 5672
    virtual-host: /dev
    password: password
    username: admin
    #开启消息可靠性投递,生产者到broker的交换机
    publisher-confirm-type: correlated
    #开启消息可靠性投递,交换机到队列的可靠性投递
    publisher-returns: true
    #为true,则交换机处理消息到路由失败,则会返回给生产者
    template:
      mandatory: true
    #消息手工确认ACK
    listener:
      simple:
        acknowledge-mode: manual


2、消费者代码

@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {
    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {
        long msgTag = 0;
        try {
            msgTag = message.getMessageProperties().getDeliveryTag();
            System.out.println("msgTag="+msgTag);
            System.out.println("message="+message.toString());
            System.out.println("body="+body);
            if(msgTag<=2){
                throw new Exception("模拟异常");
            }
            //告诉broker,消息已经被确认
            //记录消息日志 TODO
            channel.basicAck(msgTag,false);
        }catch (Exception e){
            /**
             * basicNack()方法用于告诉broker消息被拒绝消费
             * 参数1: 消息的tag标识
             * 参数2:false 关闭多条拒绝
             * 参数3:true=消息重新入队 ,false=消息直接丢弃
             */
            //channel.basicNack(msgTag,false,true);
             /**
             * basicReject()用于告诉broker消息被拒绝消费
             * 参数1:消息tag标识
             * 参数2:true=消息重新入队 ,false=消息直接丢弃
             */
            System.out.println("拒绝消费消息 msgTag="+msgTag);
            channel.basicReject(msgTag,true);
        }
    }
}


相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
消息中间件 数据采集 数据库
小说爬虫-02 爬取小说详细内容和章节列表 推送至RabbitMQ 消费ACK确认 Scrapy爬取 SQLite
小说爬虫-02 爬取小说详细内容和章节列表 推送至RabbitMQ 消费ACK确认 Scrapy爬取 SQLite
228 2
|
消息中间件 Java Kafka
Kafka ACK机制详解!
本文深入剖析了Kafka的ACK机制,涵盖其原理、源码分析及应用场景,并探讨了acks=0、acks=1和acks=all三种级别的优缺点。文中还介绍了ISR(同步副本)的工作原理及其维护机制,帮助读者理解如何在性能与可靠性之间找到最佳平衡。适合希望深入了解Kafka消息传递机制的开发者阅读。
1866 0
|
存储 网络安全 API
【Azure Service Bus】 Service Bus如何确保消息发送成功,发送端是否有Ack机制 
【Azure Service Bus】 Service Bus如何确保消息发送成功,发送端是否有Ack机制 
147 0
|
Kubernetes Java 调度
在K8S中,Pod突然挂掉,K8S有什么机制或功能自动清除Pod?
在K8S中,Pod突然挂掉,K8S有什么机制或功能自动清除Pod?
|
Kubernetes 安全 Linux
在k8S中,PodSecurityPolicy 机制能实现哪些安全策略?
在k8S中,PodSecurityPolicy 机制能实现哪些安全策略?
|
Kubernetes 安全 调度
在k8S中, PodSecurityPolicy机制是什么?
在k8S中, PodSecurityPolicy机制是什么?
|
Kubernetes 监控 Perl
在k8S中,自动扩容机制是什么?
在k8S中,自动扩容机制是什么?
|
Kubernetes 监控 Perl
在K8S中,RC的机制是什么?
在K8S中,RC的机制是什么?
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
340 0
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
500 0
下一篇
开通oss服务