RabbitMQ消息确认

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: RabbitMQ消息确认

消息确认机制

mq 中,消费者和生产者并不直接进行通信,生产者只负责把消息发送到队列,消费者只负责从队列获取消息(不管是push还是pull)。

消费者从队列中获取到消息之后,这条消息就不存在队列中了,但是如果此时消费者所在的信道因为网络中断没有消费到,那这条消息就被永远的丢失了,所以,我们希望等待消费者成功消费掉这个消息之后再删除这条消息。

而在发送消息的时候也是这样的,生产者发消息给交换机,也不能保证消息准确发送过去了,消息就像石沉大海一样,所以这样需要一个消息确认。

消息确认流程

在流程图中,可以看到消息确认是分为生产者确认和消费者确认的。

这两个机制都是受到 TCP 协议的启发,它们对数据安全非常重要。

在RabbitMQ 中有两种事务机制来确保消息的安全送达,分别是事务机制和确认机制。事务机制需要每个消息或每组消息发布提交的通道设置为事务性的,非常耗费性能,降低了消息吞吐量。因此,实际中通常采用确认机制即可。

我们可以利用这两个 callback 接口来控制消息的一致性和处理一部分的异常情况。

  1. 在配置文件中需要添加
spring:
  rabbitmq
    publisher-confirm-type:
  1. 它有三个值:
  • NONE:禁用发布确认模式,是默认值
  • CORRELATED:发布消息成功到交换器后触发回调方法
  • SIMPLE:值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
  1. 配置类
@Configuration
public class ConfirmConfig {
    // 交换机
    public static final String confirm_exchange_name = "confirm_exchange";
    // 队列
    public static final String confirm_queue_name="confirm_queue";
    // routingkey
    public static final String confirm_routing_key = "key1";
    // 声明交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(confirm_exchange_name);
    }
    // 声明队列
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(confirm_queue_name).build();
    }
    // 绑定队列到交换机
    @Bean
    public Binding queueBingExchange(){
        return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(confirm_routing_key);
    }
}
  1. 回调接口
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 向rabbitTemplate 注入回调失败的类
     * 后置处理器:其他注解都执行结束才执行。
     */
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
    /**
     * 交换机确认回调方法
     *  发消息 交换机接收到了  回调
     * @param correlationData :保存回调消息的 ID 及相关信息,交换机收到消息 ack=true代表成功;ack=false 代表失败
     * @param ack :true 代表交换机收到了
     * @param cause : 原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            log.info("交换机已经收到 ID 为:{} 的消息",correlationData.getId());
        }else{
            log.info("交换机未收到 ID为 {} 的消息,原因是 {}",correlationData.getId(),cause);
        }
    }
    /**
     * 当消息传送到队列过程中不可抵达的时候 将消息返回给生产者
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息 {} ,被交换机 {} 退回原因 {}",message,exchange,replyText);
    }
}
  1. 交换机确认–生产消费测试使用交换机回调,就配置 publisher-confirm-type: 为CORRELATED
@GetMapping("/sendMsg/{message}")
    public void sendConfirmMsg(@PathVariable String message){
        rabbitTemplate.convertAndSend(ConfirmConfig.confirm_exchange_name+'1', ConfirmConfig.confirm_routing_key,message,new CorrelationData("1"));
        log.info("发送消息内容:{}",message);
    }
  1. 当生产者获取不到消息的时候进入回调函数执行 false 的代码。实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause。
  • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
  • ack:消息投递到broker 的状态,true表示成功。
  • cause:表示投递失败的原因。
  1. 队列确认–回退接口交换机接收到消息后可以判断当前的路径发送没有问题,但是不能保证消息能够发送到路由队列的。而发送者是不知道这个消息有没有送达队列的,因此,我们需要在队列中进行消息确认。这就是回退消息。实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。添加注解:
publisher-returns: true
  1. 执行代码:
rabbitTemplate.convertAndSend(ConfirmConfig.confirm_exchange_name, ConfirmConfig.confirm_routing_key,message+"1",new CorrelationData("1"));
rabbitTemplate.convertAndSend(ConfirmConfig.confirm_exchange_name, ConfirmConfig.confirm_routing_key+"2",message+"2",new CorrelationData("2"));
  1. 回退执行
消息 (Body:'hello22' MessageProperties [headers={spring_returned_message_correlation=2}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) ,被交换机 confirm_exchange 退回原因 NO_ROUTE
  1. 生产者的消息确认机制有两种:
  • 一个是从生产者发送到交换机的确认回调;
  • 一个是从交换机发送到队列的确认回调;
    生产者发送到交换机,需要配置一个参数 publisher-confirm-type ,它默认是 none,没有开启,可以把它改为 correlated ,即消息成功发送后会触发一个回调;然后我们根据 ack 的一个状态进行判断,如果为 true ,则代表发送成功。
    还有一个交换机到队列的回调,将 publisher-returns 改为 true 即可,触发 returnedMessage
  1. 消费者确认rabbitmq 消费消息有两种模式,一个是推送 push ,一个是自己拉取pull。
  • 推模式:消息中间件主动将消息推送给消费者
  • 拉模式:消费者主动从消息中间件拉取消息。
  1. 实际使用中,拉取消息是会降低系统吞吐量的,以及消费者很难实时获取消息,因此,一般使用的是push 模式。
    在 mq 推消息给消费者不是等消费者消费完一个再推一个,而是根据prefetch_count 参数来决定可以推多个消息到消费者的缓存里面。
    在消费者确认中,为了保证数据不会丢失,RabbitMQ 支持消息确定ACK。ACK 机制是消费者从 RabbitMQ 收到消息并处理完成后,返回给RabbitMQ,RabbitMQ 收到反馈后才将此消息从队列中删除。
    自动确认
    自动确认是指消费者在消费消息的时候,当消费者收到消息后,消息就会被 RabbitMQ 从队列中删除掉。这种模式认为 “发送即成功”。这是不安全的,因为消费者可能在业务中并没有成功消费完就中断了。
    下面我们通过 debug 来测试下这个逻辑。可以看到在debug 状态下,消费者还未消费,该队列中就没有任何数据了。

    手动确认 autoAck:false
    手动确认又分为肯定确认和否定确认。
    肯定确认 BasicAck
// false 表示只确认 b.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
channel.basicAck(b.getEnvelope().getDeliveryTag(),false);

  1. 当消费者消费完数据后,队列中一共还剩18条,有一条消息待确认。

    否定确认: BasicNack、BasicReject
    否定确认的场景不多,但有时候某个消费者因为某种原因无法立即处理某条消息时,就需要否定确认了.
    否定确认时,需要指定是丢弃掉这条消息,还是让这条消息重新排队,过一会再来,又或者是让这条消息重新排队,并尽快让另一个消费者接收并处理它.
丢弃:requeue: false:channel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: false);
重新排队( requeue: true): channel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: true);
  1. 一般来说,如果出现异常,就使用channel.BasicNack 把消费失败的消息重新放入到队列中去。
    springboot 版本确认
    Springboot 的确认模式有三种,配置如下:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • NONE : 不确认 :
    1、默认所有消息消费成功,会不断的向消费者推送消息
    2、因为 rabbitmq 认为所有消息都被消费成功。所以队列中存在丢失消息风险。
  • AUTO:自动确认
    1、根据消息处理逻辑是否抛出异常自动发送 ack(正常)和nack(异常)给服务端,如果消费者本身逻辑没有处理好这条数据就存在丢失消息的风险。
    2、使用自动确认模式时,需要考虑的另一件事情就是消费者过载。
  • MANUAL:手动确认
    1、手动确认在业务失败后进行一些操作,消费者调用 ack、nack、reject 几种方法进行确认,如果消息未被 ACK 则发送到下一个消费者或重回队列。
    2、ack 用于肯定确认;nack 用于 否定确认 ;reject 用于否定确认(一次只能拒绝单条消息)
@Component
@Slf4j
public class MsgConfirmController {
    @RabbitListener(queues = ConfirmConfig.confirm_queue_name)
    public void consumerConfirm(Message message, Channel channel) throws IOException {
        if(message.getBody().equals("2")){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 
            //basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
            log.info("接收的消息为:{}",message);
        }else{
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
            log.info("未消费数据");
        }
    }
}
  1. 发现它可以将消息返回给队列,然后又消费这个数据,不断消费,造成了死循环,消息无限投递。

    这时候可以改成 false,然后配置下死信队列,将该消息发送到死信队列中
  • channel.basicAck
import com.rabbitmq.client.Channel; //导入方法依赖的package包/类
/**
 * 普通消息监听
 * 
 * @param message 消息实体
 * @param channel channel 就是当前的会话通道
 * @throws Exception 备注: 手动ack就是在当前channel里面调用basicAsk的方法,并传入当前消息的tagId就可以了。
 */
@Override
public void onMessage(Message message, Channel channel) throws Exception {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    logger.debug("deliveryTag= " + deliveryTag);
    try {
        logger.info("------消费者处理消息------");
        logger.info("receive message" + message.getMessageProperties().getAppId());
        logger.info("receive channel" + channel.getChannelNumber() + "----");
        // 获取消息
        if (null != message.getBody()) {
            EventMessage eventMessage = (EventMessage) ObjectAndByteCovertUtil.ByteToObject(message.getBody());
            if (null != eventMessage) {
                System.out.println(Thread.currentThread().getName() + ":" +  TimeUtils.getSysTime("yyyy-MM-dd HH:mm:ss") + ":[下游应用- 消费普通消息]:" + message.getMessageProperties());
                // TODO 业务处理
            }
        }
        // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除 )
        channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
        logger.warn("message consume failed: " + e.getMessage());
        // ack返回false,requeue-true并重新回到队列
        channel.basicNack(deliveryTag, false, true);
    }
}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 网络协议
RabbitMQ消息的应答
RabbitMQ消息的应答
56 0
|
消息中间件 存储 Java
【RabbitMQ四】——RabbitMQ发布订阅模式(Publish/Subscribe)
【RabbitMQ四】——RabbitMQ发布订阅模式(Publish/Subscribe)
344 1
|
消息中间件
【RabbitMQ七】——RabbitMQ发布确认模式(Publisher Confirms)
【RabbitMQ七】——RabbitMQ发布确认模式(Publisher Confirms)
271 0
|
6月前
|
消息中间件 Java
RabbitMQ中的消息确认机制是什么?为什么需要消息确认?
RabbitMQ中的消息确认机制是什么?为什么需要消息确认?
95 0
|
消息中间件 应用服务中间件 nginx
【RabbitMQ六】——RabbitMQ主题模式(Topic)
【RabbitMQ六】——RabbitMQ主题模式(Topic)
406 1
|
消息中间件
RabbitMq消息确认机制
RabbitMq消息确认机制
107 0
|
消息中间件 存储
RabbitMQ学习(七):交换器
RabbitMQ学习(七):交换器
RabbitMQ学习(七):交换器
|
消息中间件 负载均衡 算法
RabbitMQ中真的只有四种交换器吗?
RabbitMQ中真的只有四种交换器吗?
281 0
RabbitMQ中真的只有四种交换器吗?
|
消息中间件 网络协议
RabbitMQ消息应答
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。
RabbitMQ消息应答
|
消息中间件 存储 网络协议
RabbitMQ消息模型详解(1)
RabbitMQ消息模型详解(1)
165 0
RabbitMQ消息模型详解(1)