RabbiMQ中的消息回调与消息确认

简介: 和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。 所以,消息接收的确认机制主要存在三种模式 1.自动确认 这也是默认的消息确认情况。AcknowledgeMode.NONE,RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即 认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递 当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除 2.不确认(不介绍) 3.手动确认(多数选择的模式) 消费者收
  1. 什么是消息回调
    消息回调,其实就是消息确认(生产者推送消息成功,消费者接收消息成功)
  2. 为什么要进行消息确认
    经常会听到丢消息的字眼, 对于程序来说,发送者没法确认是否发送成功,消费者处理失败也无法反馈,
    没有消息确认机制,就会出现消息莫名其妙的没了,也不知道什么情况
  3. 生产者推送消息[确认]
    0.前提:使用直连交换机完成消息的发送和接收

    1.在rabbitmq-provider项目的application.yml文件上,添加消息确认的配置项

    #1.开启 confirm 确认机制
    spring.rabbitmq.publisher-confirms=true
    #2.开启 return 确认机制
    spring.rabbitmq.publisher-returns=true
    #3.设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
    spring.rabbitmq.template.mandatory=true
    
    注1:设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
         spring.rabbitmq.template.mandatory=true//此行配置与正面代码效果一样 
         rabbitTemplate.setMandatory(true);//此行代码也配置项3的效果一致
    

    2.创建RabbitTemplateConfig,在里面创建自定义RabbitTemplate并添加2个相关的回调函数

    RabbitTemplate.ConfirmCallback
      通过实现ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
      还需要在配置文件添加配置spring.rabbitmq.publisher-returns=true
    
    RabbitTemplate.ReturnCallback
      通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调
      还需要在配置文件添加配置spring.rabbitmq.publisher-returns=true
    
    见:资料/RabbitTemplateConfig.java   
    

    3.ConfirmCallback和ReturnCallback这2个回调函数在什么情况会触发

    先从总体的情况分析,推送消息存在四种情况
    A.消息推送到server,找不到交换机
    B.消息推送到server,找到交换机了,但是没找到队列
    C.消息推送到server,交换机和队列啥都没找到(C和A效果是一样,交换机都没找到,更不用说队列了)
    D.消息推送成功
    

    4.小结:

    流程:由生产者 -------> 交换机(Exchange)
    ConfirmCallBack:
    如果消息没有到exchange,则confirm回调,ack=false
    如果消息到达exchange,则confirm回调,ack=true
    
    流程:由交换机(Exchange)--------> 队列(Queue)
    ReturnCallBack:
    exchange到queue成功,则不回调return
    exchange到queue失败,则回调return(需设置mandatory=true,否则不执行回调函数,消息就丢了) 
    

    5.注意:spring-rabbit和原生的rabbit-client ,表现是不一样的.测试的时候,原生的client,exchange错误的话,直接就报错了,

    是不会到confirmListener和returnListener的
    
    
  4. 消费者接收消息[确认]
    和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
    所以,消息接收的确认机制主要存在三种模式
    1.自动确认

    这也是默认的消息确认情况。AcknowledgeMode.NONE,RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即
    认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递
    
    当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除
    

    2.不确认(不介绍)

    3.手动确认(多数选择的模式)

    消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功
    basic.ack用于肯定确认 
    basic.nack用于否定确认
    basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息
    
    消费者端以上的3个方法都表示[消息已经被正确投递],但是basic.ack表示消息已经被正确处理,
    但是basic.nack,basic.reject表示没有被正确处理,但是RabbitMQ中仍然需要删除这条消息 
    
    

    ## 消息接收手动确认编码开始
    4.修改FirstQueueReceiver代码,继承AbstractAdaptableMessageListener并重写onMessage(Message message, Channel channel)方法

    1.消费者消息确认相关类和接口的体系结构
      MessageListener
        ChannelAwareMessageListener//在此接口中,注入了通道Channel,Aware英文意思为“意识到、感知到、察觉到”
          AbstractAdaptableMessageListener
            public void onMessage(Message message, Channel channel) throws Exception
    
    
    ## 请将之前的handlerMessage方法注释掉,因为只需要一个处理方法
    ## //@RabbitHandler
    ## //public void handlerMessage(Map<String, Object> map) {} 
    2.重写onMessage(Message message, Channel channel)方法,进行消息接收[确认]
      关键代码:
      long deliveryTag = message.getMessageProperties().getDeliveryTag();//消息唯一标识ID
      boolean multiple = false;  
      channel.basicAck(deliveryTag, multiple); 
      channel.basicReject(deliveryTag,true);//为true会重新放回队列
      channel.basicNack(deliveryTag, multiple,true)
              
    
      注1: basicAck(long deliveryTag, boolean multiple)方法的两个参数
            deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用
              basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 
              投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
            multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 
              小于等于传入值的所有消息
    

    5.新建MessageListenerConfig,添加消息接收确认机制相关代码

    源码见:资料/MessageListenerConfig
    关键代码:
    container.setConnectionFactory(connectionFactory);//设置mq连接工厂对象
    container.setConcurrentConsumers(1);//设置并发消费者
    container.setMaxConcurrentConsumers(1);//设置最多的并发消费者
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
    
    //注意:此处不能使用Autowired根据类型自动注入队列,必须调用rabbitmqDirectConfig.firstQueue()获得,why?
    // 因为项目中可能存在多个队列,它们的类型都是Queue,自动注入会报错
    container.setQueues(rabbitmqDirectConfig.firstQueue());
    container.setMessageListener(directReceiver);
    
     
    注1:SimpleMessageListenerContainer即简单消息监听容器
         这个类非常的强大,我们可以对他进行很多的设置,用对于消费者的配置项,这个类都可以满足
         1.它可以设置事务特性、事务管理器、事务属性、事务并发、是否开启事务、回滚消息等。但是我们在实际生产中,很少使用事务,基本都是采用补偿机制
         2.它可以设置消费者数量、最小最大数量、批量消费
         3.它可以设置消息确认和自动确认模式、是否重回队列、异常捕获 Handler 函数
         4.它可以设置消费者标签生成策略、是否独占模式、消费者属性等
         5.它还可以设置具体的监听器、消息转换器等等
         另外,SimpleMessageListenerContainer 还可以进行动态设置,比如在运行中的应用可以动态的修改
         其消费者数量的大小、接收消息的模式等。
    注2:消息确认模式
         AcknowledgeMode.NONE:自动确认
         AcknowledgeMode.AUTO:根据情况确认
         AcknowledgeMode.MANUAL:手动确认  
    
    
    
  5. Rabbitmq的消息确认的两种方式
    Rabbitmq为了确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)
    在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。
相关实践学习
容器服务Serverless版ACK Serverless 快速入门:在线魔方应用部署和监控
通过本实验,您将了解到容器服务Serverless版ACK Serverless 的基本产品能力,即可以实现快速部署一个在线魔方应用,并借助阿里云容器服务成熟的产品生态,实现在线应用的企业级监控,提升应用稳定性。
云原生实践公开课
课程大纲 开篇:如何学习并实践云原生技术 基础篇: 5 步上手 Kubernetes 进阶篇:生产环境下的 K8s 实践 相关的阿里云产品:容器服务&nbsp;ACK 容器服务&nbsp;Kubernetes&nbsp;版(简称&nbsp;ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情:&nbsp;https://www.aliyun.com/product/kubernetes
相关文章
|
6月前
|
消息中间件 RocketMQ
rocketmq 超过4M消息体怎么发送
rocketmq 超过4M消息体怎么发送
147 1
|
11月前
|
JSON 开发工具 Android开发
通知消息和透传消息
通知消息和透传消息
626 0
通知消息和透传消息
|
消息中间件 Java 数据库
消息的和发送和接收|学习笔记
快速学习消息的和发送和接收
107 0
|
消息中间件 RocketMQ 开发者
消息发送1-消息校验|学习笔记
快速学习消息发送1-消息校验
99 0
消息发送1-消息校验|学习笔记
|
消息中间件 物联网 Linux
Msgrcv 接收消息|学习笔记
快速学习 Msgrcv 接收消息
267 0
Msgrcv 接收消息|学习笔记
|
消息中间件 RocketMQ 开发者
消息发送4发送消息|学习笔记
快速学习消息发送4发送消息
74 0
消息发送4发送消息|学习笔记
|
开发工具 开发者
消息服务-事件通知使用 | 学习笔记
快速学习消息服务-事件通知使用
103 0
消息服务-事件通知使用 | 学习笔记
|
消息中间件 RocketMQ 开发者
发送同步消息|学习笔记
快速学习发送同步消息
75 0
发送同步消息|学习笔记
【EventBus】事件通信框架 ( 发送事件 | 根据事件类型获取订阅者 | 调用订阅方法 )
【EventBus】事件通信框架 ( 发送事件 | 根据事件类型获取订阅者 | 调用订阅方法 )
110 0
|
消息中间件 负载均衡 Nacos
消息发送和接收演示
接下来我们使用Java代码来演示消息的发送和接收

热门文章

最新文章