RabbitMQ之消息可靠性投递解读

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RabbitMQ之消息可靠性投递解读

基本介绍

消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一些性能,性能与可靠性是无法兼得的;

如果业务实时一致性要求不是特别高的场景,可以牺牲一些可靠性来换取性能。


  • ① 代表消息从生产者发送到Exchange;
  • ② 代表消息从Exchange路由到Queue;
  • ③ 代表消息在Queue中存储;
  • ④ 代表消费者监听Queue并消费消息;

rabbitmq 整个消息投递的路径为:

   producer—>rabbitmq broker—>exchange—>queue—>consumer

   消息从 producer 到 exchange 则会返回一个 confirmCallback 。

   消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。

producer->exchange:确保消息发送到RabbitMQ服务器的交换机

消息从 producer 到 exchange 则会返回一个 confirmCallback

Confirm模式

消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障;

具体代码设置

  • 配置文件application.yml 开启确认模式:spring.rabbitmq.publisher-confirm-type=correlated
  • 写一个类实现implements RabbitTemplate.ConfirmCallback,判断成功和失败的ack结果,可以根据具体的结果,如果ack为false,对消息进行重新发送或记录日志等处理;设置rabbitTemplate的确认回调方法
  • rabbitTemplate.setConfirmCallback(messageConfirmCallBack);
  @Component
    public class MessageConfirmCallBack implements RabbitTemplate.ConfirmCallback {
        /**
         * 交换机收到消息后,会回调该方法
         *
         * @param correlationData  相关联的数据
         * @param ack  有两个取值,true和false,true表示成功:消息正确地到达交换机,反之false就是消息没有正确地到达交换机
         * @param cause 消息没有正确地到达交换机的原因是什么
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("correlationData = " + correlationData);
            System.out.println("ack = " + ack);
            System.out.println("cause = " + cause);
            if (ack) {
                //正常
            } else {
                //不正常的,可能需要记日志或重新发送
            }
        }
    }

发消息参考代码(需要对RabbitTemplate 进行初始化)

@Service
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private MessageConfirmCallBack messageConfirmCallBack;
    @PostConstruct //bean在初始化的时候,会调用一次该方法,只调用一次,起到初始化的作用
    public void init() {
        rabbitTemplate.setConfirmCallback(messageConfirmCallBack);
    }
    /**
     * 发送消息
     */
    public void sendMessage() {
        //关联数据对象
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("O159899323"); //比如设置一个订单ID,到时候在confirm回调里面,你就可以知道是哪个订单没有发送到交换机上去
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE + 123, "info", "hello", correlationData);
        System.out.println("消息发送完毕......");
    }
}

Transaction(事务)模式

   RabbitMQ支持事务(transaction),RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback()。

  • (1)txSelect用于将当前channel设置成transaction模式,通过调用tx.select方法开启事务模式。
  • (2)txCommit用于提交事务。当开启了事务模式后,只有当一个消息被所有的镜像队列保存完毕后,RabbitMQ才会调用tx.commit-ok返回给客户端。
  • (3)txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。
channel.txSelect(); // 将当前channel设置成事务模式
/**
  * ConfirmConfig.exchangeName(交换机名称)
  * ConfirmConfig.routingKey(路由键)
  * message (消息内容)
  */
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message));
channel.txCommit(); // 提交事务
channel.txRollback(); // 回滚事务

注: 事务确实能够解决producer与broker之间消息确认的问题,只有消息成功被broker接受,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作同时进行消息重发。

事务机制的缺点 :

  • 使用事务机制的话会降低RabbitMQ的性能。
  • 会导致生产者和RabbitMq之间产生同步(等待确认),这也违背了我们使用RabbitMq的初衷,所以一般很少采用。

exchange -> queue:确保消息从交换机发到队列

可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致②失败。使用return模式,可以实现消息无法路由的时候返回给生产者;当然在实际生产环境下,我们不会出现这种问题,我们都会进行严格测试才会上线(很少有这种问题);

消息从 exchange –> queue 投递失败则会返回一个 returnCallback

return模式

开启确认模式;使用rabbitTemplate.setConfirmCallback设置回调函数,当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理;

注意配置文件中,开启 退回模式;

spring.rabbitmq.publisher-returns: true

使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,则会将消息退回给producer,并执行回调函数returnedMessage;

@Component
public class MessageReturnCallBack implements RabbitTemplate.ReturnsCallback {
    /**
     * 当消息从交换机 没有正确地 到达队列,则会触发该方法
     * 如果消息从交换机 正确地 到达队列了,那么就不会触发该方法
     *
     * @param returned
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.out.println("消息return模式:" + returned);
    }
}
@Service
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private MessageReturnCallBack messageReturnCallBack;
    @PostConstruct //bean在初始化的时候,会调用一次该方法,只调用一次,起到初始化的作用
    public void init() {
        rabbitTemplate.setReturnsCallback(messageReturnCallBack);
    }
    /**
     * 发送消息
     */
    public void sendMessage() {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "info123", "hello");
        System.out.println("消息发送完毕......");
    }
}

备份交换机(alternate-exchange)

使用备份交换机(alternate-exchange),无法路由的消息会发送到这个备用交换机上。

备份 交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由 备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑 定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都 进入这个队列了。到达这个队列以后消费者可以进行处理,通知开发人员进行查看。

当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息,如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换机来实现,可以接收备用交换机的消息,然后记录日志或发送报警信息。

设置参考代码

Map<String, Object> arguments = new HashMap<>();
//指定当前正常的交换机的备用交换机是谁
arguments.put("alternate-exchange", EXCHANGE_ALTERNATE); 
//DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
return new DirectExchange(EXCHANGE, true, false, arguments);
//return ExchangeBuilder.directExchange(EXCHANGE).withArguments(args).build();

确保消息在队列正确地存储

可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题;

队列持久化

QueueBuilder.durable(QUEUE).build();

交换机持久化

ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();

消息持久化

 MessageProperties messageProperties = new MessageProperties();
//设置消息持久化,当然它默认就是持久化,所以可以不用设置,可以查看源码
 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); 

集群,镜像队列,高可用

允许消费者和生产者在RabbitMQ节点崩溃的情况下继续运行

如果RabbitMQ集群中只有一个Broker节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过publisherconfirm机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。

      引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性

确保消息从队列正确地投递到消费者

采用消息消费时的手动ack确认机制来保证;如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement);

#开启手动ack消息消费确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消费者在订阅队列时,通过上面的配置,不自动确认,采用手动确认,RabbitMQ会等待消费者显式地回复确认信号后才从队列中删除消息;

如果消息消费失败,也可以调用basicReject()或者basicNack()来拒绝当前消息而不是确认。如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的队列中,或者只打印异常日志);

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 存储 监控
|
7月前
|
消息中间件 存储 运维
|
2天前
|
消息中间件 Java 中间件
MQ四兄弟:如何保证消息可靠性
本文介绍了RabbitMQ、RocketMQ、Kafka和Pulsar四种消息中间件的可靠性机制。这些中间件通过以下几种方式确保消息的可靠传输:1. 消息持久化,确保消息在重启后不会丢失;2. 确认机制,保证消息从生产者到消费者都被成功处理;3. 重试机制,处理失败后的重试;4. 死信队列,处理无法消费的消息。每种中间件的具体实现略有不同,但核心思想相似,都是从生产者、中间件本身和消费者三个角度来保障消息的可靠性。
12 0
|
4月前
|
消息中间件 存储 运维
RabbitMQ-消息消费时的可靠性保障
将这些实践融入到消息消费的处理逻辑中,可以很大程度上保障RabbitMQ中消息消费的可靠性,确保消息系统的稳定性和数据的一致性。这些措施的实施,需要在系统的设计和开发阶段充分考虑,以及在后续的维护过程中不断的调整和完善。
64 0
|
5月前
|
消息中间件 存储 RocketMQ
消息队列 MQ使用问题之进行超过3天的延迟消息投递,采用多次投递的策略是否有风险
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 存储 运维
深入理解MQ消息队列的高可用与可靠性策略
深入理解MQ消息队列的高可用与可靠性策略
1349 3
|
7月前
|
消息中间件 Java API
【微服务系列笔记】MQ消息可靠性
消息可靠性涉及防止丢失,包括生产者发送时丢失、未到达队列以及消费者消费失败处理后丢失。 确保RabbitMQ消息可靠性的方法有:开启生产者确认机制,确保消息到达队列;启用消息持久化以防止未消费时丢失;使用消费者确认机制,如设置为auto,由Spring确认处理成功后ack。此外,可开启消费者失败重试机制,多次失败后将消息投递到异常交换机。
133 1
|
7月前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
300 0
RabbitMQ入门指南(九):消费者可靠性
|
7月前
|
消息中间件 Java 微服务
RabbitMQ入门指南(七):生产者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消息丢失的可能性、生产者可靠性中的生产者重试机制和生产者确认机制等内容。
218 0
RabbitMQ入门指南(七):生产者可靠性
|
7月前
|
消息中间件 安全 Java
SpringBoot基于RabbitMQ实现消息可靠性
SpringBoot基于RabbitMQ实现消息可靠性
117 0
下一篇
DataWorks