RabbiMQ中的消息回调与消息确认

简介: 和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。 所以,消息接收的确认机制主要存在三种模式 1.自动确认 这也是默认的消息确认情况。AcknowledgeMode.NONE,RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即 认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递 当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除 2.不确认(不介绍) 3.手动确认(多数选择的模式) 消费者收
  1. 什么是消息回调
    消息回调,其实就是消息确认(生产者推送消息成功,消费者接收消息成功)
  2. 为什么要进行消息确认
    经常会听到丢消息的字眼, 对于程序来说,发送者没法确认是否发送成功,消费者处理失败也无法反馈,
    没有消息确认机制,就会出现消息莫名其妙的没了,也不知道什么情况
  3. 生产者推送消息[确认]
    0.前提:使用直连交换机完成消息的发送和接收

    1.在rabbitmq-provider项目的application.yml文件上,添加消息确认的配置项

    #1.开启 confirm 确认机制
    spring.rabbitmq.publisher-confirms=true
    #2.开启 return 确认机制
    spring.rabbitmq.publisher-returns=true
    #3.设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
    spring.rabbitmq.template.mandatory=true
    
    注1:设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
         spring.rabbitmq.template.mandatory=true//此行配置与正面代码效果一样 
         rabbitTemplate.setMandatory(true);//此行代码也配置项3的效果一致
    

    2.创建RabbitTemplateConfig,在里面创建自定义RabbitTemplate并添加2个相关的回调函数

    RabbitTemplate.ConfirmCallback
      通过实现ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
      还需要在配置文件添加配置spring.rabbitmq.publisher-returns=true
    
    RabbitTemplate.ReturnCallback
      通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调
      还需要在配置文件添加配置spring.rabbitmq.publisher-returns=true
    
    见:资料/RabbitTemplateConfig.java   
    

    3.ConfirmCallback和ReturnCallback这2个回调函数在什么情况会触发

    先从总体的情况分析,推送消息存在四种情况
    A.消息推送到server,找不到交换机
    B.消息推送到server,找到交换机了,但是没找到队列
    C.消息推送到server,交换机和队列啥都没找到(C和A效果是一样,交换机都没找到,更不用说队列了)
    D.消息推送成功
    

    4.小结:

    流程:由生产者 -------> 交换机(Exchange)
    ConfirmCallBack:
    如果消息没有到exchange,则confirm回调,ack=false
    如果消息到达exchange,则confirm回调,ack=true
    
    流程:由交换机(Exchange)--------> 队列(Queue)
    ReturnCallBack:
    exchange到queue成功,则不回调return
    exchange到queue失败,则回调return(需设置mandatory=true,否则不执行回调函数,消息就丢了) 
    

    5.注意:spring-rabbit和原生的rabbit-client ,表现是不一样的.测试的时候,原生的client,exchange错误的话,直接就报错了,

    是不会到confirmListener和returnListener的
    
    
  4. 消费者接收消息[确认]
    和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
    所以,消息接收的确认机制主要存在三种模式
    1.自动确认

    这也是默认的消息确认情况。AcknowledgeMode.NONE,RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即
    认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递
    
    当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除
    

    2.不确认(不介绍)

    3.手动确认(多数选择的模式)

    消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功
    basic.ack用于肯定确认 
    basic.nack用于否定确认
    basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息
    
    消费者端以上的3个方法都表示[消息已经被正确投递],但是basic.ack表示消息已经被正确处理,
    但是basic.nack,basic.reject表示没有被正确处理,但是RabbitMQ中仍然需要删除这条消息 
    
    

    ## 消息接收手动确认编码开始
    4.修改FirstQueueReceiver代码,继承AbstractAdaptableMessageListener并重写onMessage(Message message, Channel channel)方法

    1.消费者消息确认相关类和接口的体系结构
      MessageListener
        ChannelAwareMessageListener//在此接口中,注入了通道Channel,Aware英文意思为“意识到、感知到、察觉到”
          AbstractAdaptableMessageListener
            public void onMessage(Message message, Channel channel) throws Exception
    
    
    ## 请将之前的handlerMessage方法注释掉,因为只需要一个处理方法
    ## //@RabbitHandler
    ## //public void handlerMessage(Map<String, Object> map) {} 
    2.重写onMessage(Message message, Channel channel)方法,进行消息接收[确认]
      关键代码:
      long deliveryTag = message.getMessageProperties().getDeliveryTag();//消息唯一标识ID
      boolean multiple = false;  
      channel.basicAck(deliveryTag, multiple); 
      channel.basicReject(deliveryTag,true);//为true会重新放回队列
      channel.basicNack(deliveryTag, multiple,true)
              
    
      注1: basicAck(long deliveryTag, boolean multiple)方法的两个参数
            deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用
              basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 
              投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
            multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 
              小于等于传入值的所有消息
    

    5.新建MessageListenerConfig,添加消息接收确认机制相关代码

    源码见:资料/MessageListenerConfig
    关键代码:
    container.setConnectionFactory(connectionFactory);//设置mq连接工厂对象
    container.setConcurrentConsumers(1);//设置并发消费者
    container.setMaxConcurrentConsumers(1);//设置最多的并发消费者
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
    
    //注意:此处不能使用Autowired根据类型自动注入队列,必须调用rabbitmqDirectConfig.firstQueue()获得,why?
    // 因为项目中可能存在多个队列,它们的类型都是Queue,自动注入会报错
    container.setQueues(rabbitmqDirectConfig.firstQueue());
    container.setMessageListener(directReceiver);
    
     
    注1:SimpleMessageListenerContainer即简单消息监听容器
         这个类非常的强大,我们可以对他进行很多的设置,用对于消费者的配置项,这个类都可以满足
         1.它可以设置事务特性、事务管理器、事务属性、事务并发、是否开启事务、回滚消息等。但是我们在实际生产中,很少使用事务,基本都是采用补偿机制
         2.它可以设置消费者数量、最小最大数量、批量消费
         3.它可以设置消息确认和自动确认模式、是否重回队列、异常捕获 Handler 函数
         4.它可以设置消费者标签生成策略、是否独占模式、消费者属性等
         5.它还可以设置具体的监听器、消息转换器等等
         另外,SimpleMessageListenerContainer 还可以进行动态设置,比如在运行中的应用可以动态的修改
         其消费者数量的大小、接收消息的模式等。
    注2:消息确认模式
         AcknowledgeMode.NONE:自动确认
         AcknowledgeMode.AUTO:根据情况确认
         AcknowledgeMode.MANUAL:手动确认  
    
    
    
  5. Rabbitmq的消息确认的两种方式
    Rabbitmq为了确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)
    在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。
相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
消息中间件 监控
RabbitMQ的Web管理页面
RabbitMQ的Web管理页面
1014 0
|
5月前
|
前端开发 Java 数据库连接
java bo 对象详解_全面解析 java 中 PO,VO,DAO,BO,POJO 及 DTO 等几种对象类型
Java开发中常见的六大对象模型(PO、VO、DAO、BO、POJO、DTO)各有侧重,共同构建企业级应用架构。PO对应数据库表结构,VO专为前端展示设计,DAO封装数据访问逻辑,BO处理业务逻辑,POJO是简单的Java对象,DTO用于层间数据传输。它们在三层架构中协作:表现层使用VO,业务层通过BO调用DAO处理PO,DTO作为数据传输媒介。通过在线商城的用户管理模块示例,展示了各对象的具体应用。最佳实践包括保持分层清晰、使用工具类转换对象,并避免过度设计带来的类膨胀。理解这些对象模型的区别与联系。
391 1
|
前端开发 网络协议 Dubbo
超详细Netty入门,看这篇就够了!
本文主要讲述Netty框架的一些特性以及重要组件,希望看完之后能对Netty框架有一个比较直观的感受,希望能帮助读者快速入门Netty,减少一些弯路。
92795 32
超详细Netty入门,看这篇就够了!
|
IDE Java 程序员
IDEA创建maven项目过慢,一直卡在resolving dependencies...的解决办法
作为一个从事 Java 开发的程序员,每天离不开ide的帮助。一开始学习java的时候基本都是使用eclipse进行开发, 后来接触了idea,发现是真的香,比eclipse好用太多了,能够大大提升开发的效率。
5367 0
IDEA创建maven项目过慢,一直卡在resolving dependencies...的解决办法
|
10月前
|
Java 中间件 调度
SpringBoot整合XXL-JOB【03】- 执行器的使用
本文介绍了如何将调度中心与项目结合,通过配置“执行器”实现定时任务控制。首先新建SpringBoot项目并引入依赖,接着配置xxl-job相关参数,如调度中心地址、执行器名称等。然后通过Java代码将执行器注册为Spring Bean,并声明测试方法使用`@XxlJob`注解。最后,在调度中心配置并启动定时任务,验证任务是否按预期执行。通过这些步骤,读者可以掌握Xxl-Job的基本使用,专注于业务逻辑的编写而无需关心定时器本身的实现。
2286 10
SpringBoot整合XXL-JOB【03】-  执行器的使用
|
消息中间件 存储 Java
RabbitMQ之延迟队列(手把手教你学习延迟队列)
【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。
5328 102
|
负载均衡 监控 Dubbo
Dubbo 原理和机制详解(非常全面)
本文详细解析了 Dubbo 的核心功能、组件、架构设计及调用流程,涵盖远程方法调用、智能容错、负载均衡、服务注册与发现等内容。欢迎留言交流。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
Dubbo 原理和机制详解(非常全面)
|
消息中间件 存储 缓存
消息中间件系列教程(06) -RabbitMQ -五种队列形式
消息中间件系列教程(06) -RabbitMQ -五种队列形式
1875 1
|
Java 关系型数据库 API
介绍一款Java开发的企业接口管理系统和开放平台
YesApi接口管理平台Java版,基于Spring Boot、Vue.js等技术,提供API接口的快速研发、管理、开放及收费等功能,支持多数据库、Docker部署,适用于企业级PaaS和SaaS平台的二次开发与搭建。