RabbiMQ中的消息回调与消息确认

简介: 和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。 所以,消息接收的确认机制主要存在三种模式 1.自动确认 这也是默认的消息确认情况。AcknowledgeMode.NONE,RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即 认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递 当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除 2.不确认(不介绍) 3.手动确认(多数选择的模式) 消费者收
  1. 什么是消息回调
    消息回调,其实就是消息确认(生产者推送消息成功,消费者接收消息成功)
  2. 为什么要进行消息确认
    经常会听到丢消息的字眼, 对于程序来说,发送者没法确认是否发送成功,消费者处理失败也无法反馈,
    没有消息确认机制,就会出现消息莫名其妙的没了,也不知道什么情况
  3. 生产者推送消息[确认]
    0.前提:使用直连交换机完成消息的发送和接收

    1.在rabbitmq-provider项目的application.yml文件上,添加消息确认的配置项

    #1.开启 confirm 确认机制
    spring.rabbitmq.publisher-confirms=true
    #2.开启 return 确认机制
    spring.rabbitmq.publisher-returns=true
    #3.设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
    spring.rabbitmq.template.mandatory=true
    
    注1:设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
         spring.rabbitmq.template.mandatory=true//此行配置与正面代码效果一样 
         rabbitTemplate.setMandatory(true);//此行代码也配置项3的效果一致
    

    2.创建RabbitTemplateConfig,在里面创建自定义RabbitTemplate并添加2个相关的回调函数

    RabbitTemplate.ConfirmCallback
      通过实现ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
      还需要在配置文件添加配置spring.rabbitmq.publisher-returns=true
    
    RabbitTemplate.ReturnCallback
      通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调
      还需要在配置文件添加配置spring.rabbitmq.publisher-returns=true
    
    见:资料/RabbitTemplateConfig.java   
    

    3.ConfirmCallback和ReturnCallback这2个回调函数在什么情况会触发

    先从总体的情况分析,推送消息存在四种情况
    A.消息推送到server,找不到交换机
    B.消息推送到server,找到交换机了,但是没找到队列
    C.消息推送到server,交换机和队列啥都没找到(C和A效果是一样,交换机都没找到,更不用说队列了)
    D.消息推送成功
    

    4.小结:

    流程:由生产者 -------> 交换机(Exchange)
    ConfirmCallBack:
    如果消息没有到exchange,则confirm回调,ack=false
    如果消息到达exchange,则confirm回调,ack=true
    
    流程:由交换机(Exchange)--------> 队列(Queue)
    ReturnCallBack:
    exchange到queue成功,则不回调return
    exchange到queue失败,则回调return(需设置mandatory=true,否则不执行回调函数,消息就丢了) 
    

    5.注意:spring-rabbit和原生的rabbit-client ,表现是不一样的.测试的时候,原生的client,exchange错误的话,直接就报错了,

    是不会到confirmListener和returnListener的
    
    
  4. 消费者接收消息[确认]
    和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
    所以,消息接收的确认机制主要存在三种模式
    1.自动确认

    这也是默认的消息确认情况。AcknowledgeMode.NONE,RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即
    认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递
    
    当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除
    

    2.不确认(不介绍)

    3.手动确认(多数选择的模式)

    消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功
    basic.ack用于肯定确认 
    basic.nack用于否定确认
    basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息
    
    消费者端以上的3个方法都表示[消息已经被正确投递],但是basic.ack表示消息已经被正确处理,
    但是basic.nack,basic.reject表示没有被正确处理,但是RabbitMQ中仍然需要删除这条消息 
    
    

    ## 消息接收手动确认编码开始
    4.修改FirstQueueReceiver代码,继承AbstractAdaptableMessageListener并重写onMessage(Message message, Channel channel)方法

    1.消费者消息确认相关类和接口的体系结构
      MessageListener
        ChannelAwareMessageListener//在此接口中,注入了通道Channel,Aware英文意思为“意识到、感知到、察觉到”
          AbstractAdaptableMessageListener
            public void onMessage(Message message, Channel channel) throws Exception
    
    
    ## 请将之前的handlerMessage方法注释掉,因为只需要一个处理方法
    ## //@RabbitHandler
    ## //public void handlerMessage(Map<String, Object> map) {} 
    2.重写onMessage(Message message, Channel channel)方法,进行消息接收[确认]
      关键代码:
      long deliveryTag = message.getMessageProperties().getDeliveryTag();//消息唯一标识ID
      boolean multiple = false;  
      channel.basicAck(deliveryTag, multiple); 
      channel.basicReject(deliveryTag,true);//为true会重新放回队列
      channel.basicNack(deliveryTag, multiple,true)
              
    
      注1: basicAck(long deliveryTag, boolean multiple)方法的两个参数
            deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用
              basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 
              投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
            multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 
              小于等于传入值的所有消息
    

    5.新建MessageListenerConfig,添加消息接收确认机制相关代码

    源码见:资料/MessageListenerConfig
    关键代码:
    container.setConnectionFactory(connectionFactory);//设置mq连接工厂对象
    container.setConcurrentConsumers(1);//设置并发消费者
    container.setMaxConcurrentConsumers(1);//设置最多的并发消费者
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
    
    //注意:此处不能使用Autowired根据类型自动注入队列,必须调用rabbitmqDirectConfig.firstQueue()获得,why?
    // 因为项目中可能存在多个队列,它们的类型都是Queue,自动注入会报错
    container.setQueues(rabbitmqDirectConfig.firstQueue());
    container.setMessageListener(directReceiver);
    
     
    注1:SimpleMessageListenerContainer即简单消息监听容器
         这个类非常的强大,我们可以对他进行很多的设置,用对于消费者的配置项,这个类都可以满足
         1.它可以设置事务特性、事务管理器、事务属性、事务并发、是否开启事务、回滚消息等。但是我们在实际生产中,很少使用事务,基本都是采用补偿机制
         2.它可以设置消费者数量、最小最大数量、批量消费
         3.它可以设置消息确认和自动确认模式、是否重回队列、异常捕获 Handler 函数
         4.它可以设置消费者标签生成策略、是否独占模式、消费者属性等
         5.它还可以设置具体的监听器、消息转换器等等
         另外,SimpleMessageListenerContainer 还可以进行动态设置,比如在运行中的应用可以动态的修改
         其消费者数量的大小、接收消息的模式等。
    注2:消息确认模式
         AcknowledgeMode.NONE:自动确认
         AcknowledgeMode.AUTO:根据情况确认
         AcknowledgeMode.MANUAL:手动确认  
    
    
    
  5. Rabbitmq的消息确认的两种方式
    Rabbitmq为了确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)
    在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
6月前
|
消息中间件 SQL RocketMQ
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
111 1
|
消息中间件 存储 安全
mq 消费者监听经常断会出现丢消息的问题吗
在消息队列(MQ)系统中,消费者监听经常断开可能会导致消息丢失的问题,具体取决于消息队列系统的设计和配置,以及你的应用程序的处理方式。以下是一些可能导致消息丢失问题的情况: 1. **消费者断开连接:** 如果消费者监听过程中发生意外断开,例如网络故障、消费者应用程序崩溃等,那么在断开连接的瞬间,可能存在未被消费的消息。 2. **消息确认机制:** 消息队列通常提供消息确认机制,确保消息在被成功处理后才被从队列中移除。如果你的消费者应用程序在处理消息时没有发送确认,或者确认机制配置不正确,可能导致消息在被处理前被从队列中移除,从而丢失。 3. **持久化设置:** 消息队列通常提供持久
191 1
|
JSON 开发工具 Android开发
通知消息和透传消息
通知消息和透传消息
893 0
通知消息和透传消息
同步消息和异步消息
同步消息和异步消息
155 0
|
编解码 Java 测试技术
消息类型-普通消息|学习笔记
快速学习消息类型-普通消息
195 0
消息类型-普通消息|学习笔记
|
消息中间件 物联网 Linux
Msgrcv 接收消息|学习笔记
快速学习 Msgrcv 接收消息
|
消息中间件 RocketMQ 开发者
消息发送1-消息校验|学习笔记
快速学习消息发送1-消息校验
消息发送1-消息校验|学习笔记
|
开发工具 开发者
消息服务-事件通知使用 | 学习笔记
快速学习消息服务-事件通知使用
消息服务-事件通知使用 | 学习笔记
|
消息中间件 Java 数据库
消息的和发送和接收|学习笔记
快速学习消息的和发送和接收
168 0
|
消息中间件 Kafka 开发者
带回调函数的生产者 | 学习笔记
快速学习带回调函数的生产者
135 0
带回调函数的生产者 | 学习笔记