RabbitMQ入门指南(七):生产者可靠性

简介: RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消息丢失的可能性、生产者可靠性中的生产者重试机制和生产者确认机制等内容。

一、消息丢失的可能性

流程图如下:


1.发送消息时丢失:

  • 生产者发送消息时连接MQ失败:当生产者尝试与RabbitMQ建立连接但失败时,发送消息的操作将无法完成。这可能是由于网络问题、RabbitMQ服务未运行或配置错误导致的。
  • 生产者发送消息到达MQ后未找到Exchange:在RabbitMQ中,生产者将消息发送到Exchange,Exchange再路由到一个或多个Queue。如果生产者发送消息到达MQ后未找到正确的Exchange,那么消息将无法被路由到正确的Queue,从而丢失。
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue:即使消息找到了Exchange,如果Exchange没有正确配置或没有找到合适的Queue来路由消息,那么消息也会丢失。这可能是由于配置错误或Queue不存在导致的。
  • 消息到达MQ后,处理消息的进程发生异常:在某些情况下,当消息到达MQ后,处理消息的进程可能会发生异常,导致消息无法被处理。这可能是由于代码错误、资源限制或其他系统问题导致的。

2.MQ导致消息丢失:

  • 消息到达MQ,保存到队列后,尚未消费就突然宕机:在某些情况下,如果RabbitMQ服务突然宕机,而消息已经保存到队列中但尚未被消费者消费,那么这些消息可能会丢失。为了防止这种情况,可以使用持久化配置来确保消息在MQ宕机后仍然保留在磁盘上。

3.消费者处理消息时消息丢失:

  • 消息接收后尚未处理突然宕机:如果消费者在接收消息后尚未处理就突然宕机,那么这些消息可能会丢失。为了防止这种情况,可以使用确认机制来确保消息在被消费者成功处理之前一直保留在队列中。
  • 消息接收后处理过程中抛出异常:在处理消息的过程中,如果消费者代码出现异常并抛出错误,那么可能会导致消息处理失败。为了防止这种情况,可以对消费者的代码进行充分的测试和异常处理,以确保其能够正确、稳定地处理消息。

二、生产者可靠性

1.生产者重试机制

由于网络波动,可能会出现客户端连接MO失败的情况,为了解决这个问题,Spring AMQP提供了消息发送时的重试机制 。

在publisher服务的application.yml文件中添加配置:

spring:
  rabbitmq:
    connection-timeout: 1s     # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true     # 开启超时重试机制
        initial-interval: 1000ms     # 失败后的初始等待时间
        multiplier: 1     # 失败后下次的等待时长倍数(下次等待时长:initial-interval * multiplier)
        max-attempts: 3     # 最大重试次数

image.gif

当网络不稳定时,重试机制可以有效提高消息发送的成功率。然而,Spring AMQP提供的重试机制是阻塞式的,这意味着在等待重试的过程中,当前线程会被阻塞。对于对业务性能有要求的应用,建议禁用重试机制。如果仍需使用,建议合理配置等待时长和重试次数,并考虑使用异步线程来执行发送消息的代码,以避免阻塞主线程。

2.生产者确认机制

RabbitMQ提供了Publisher ConfirmPublisher Return两种确认机制。开启确机制认后,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功。
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功。
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功。
  • 其他情况都会返回NACK,告知投递失败。


ACK和NACK属于Publisher Confirm机制,ACK是投递成功,NACK是投递失败。return属于Publisher Return机制。默认两种机制都是关闭状态,需要通过配置文件来开启。

开启生产者确认:

在publisher服务的application.yml文件中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated     # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true     # 开启publisher return机制

image.gif

publisher-confirm-type有三种模式(一般推荐使用correlated回调机制):

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

定义ReturnCallback:

在publisher服务定义一个配置类(每个RabbitTemplate只能配置一个ReturnCallback):

@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.info("消息的return callback:");
                log.info("exchange:{}, key:{}, msg:{}, code:{}, text:{}",
                        returned.getExchange(), returned.getRoutingKey(), returned.getMessage(), returned.getReplyCode(), returned.getReplyText());
            }
        });
    }
}

image.gif

定义ConfirmCallback:

由于每条消息的处理逻辑可能不同,因此需要在每次发送消息时定义ConfirmCallback(当调用RabbitTemplate的convertAndSend方法时,需要多传递一个参数)。

这个参数是一个CorrelationData对象,包含两个核心内容:

  • id:这是消息的唯一标识,MQ会使用它来判断不同的消息的回执,避免混淆。
  • SettableListenableFuture:这是一个Future对象,MQ将通过它返回回执结果。可以提前为CorrelationData中的Future添加回调函数来处理消息回执。

在publisher服务中的测试类添加一个测试方法,实现消息发送,并且添加ConfirmCallback:

@Test
    void testConfirmCallback() throws InterruptedException {
        // 1.创建CorrelationData
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
        // 2.添加ConfirmCallback
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("消息回调失败", ex);
            }
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                log.debug("收到confirm callback回执");
                if (result.isAck()) {
                    // 消息发送成功
                    log.debug("消息发送成功,收到ACK");
                } else {
                    // 消息发送失败
                    log.error("消息发送失败,收到NACK,原因:{}", result.getReason());
                }
            }
        });
        rabbitTemplate.convertAndSend("test.direct", "red", "test", cd);
    }

image.gif

生产者确认会带来额外的网络和系统资源开销,因此应尽量避免使用。如果确实需要使用,则无需开启Publisher-Return机制,因为一般路由失败是业务问题。对于NACK消息,可以有限次数重试,如果仍然失败,则记录异常消息。


总结

RabbitMQ是一个开源的消息队列软件,旨在提供可靠的消息传递和消息队列功能。本文主要介绍了消息丢失的可能性、生产者可靠性中的生产者重试机制和生产者确认机制等内容,希望对大家有所帮助。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
17小时前
|
消息中间件 存储 监控
|
17小时前
|
消息中间件 安全 Java
【RabbitMQ高级篇】消息可靠性问题
【RabbitMQ高级篇】消息可靠性问题
49 0
|
17小时前
|
消息中间件 存储 运维
|
17小时前
|
消息中间件 存储 运维
深入理解MQ消息队列的高可用与可靠性策略
深入理解MQ消息队列的高可用与可靠性策略
21 3
|
17小时前
|
消息中间件 安全 Java
SpringBoot基于RabbitMQ实现消息可靠性
SpringBoot基于RabbitMQ实现消息可靠性
37 0
|
17小时前
|
消息中间件 Java API
【微服务系列笔记】MQ消息可靠性
消息可靠性涉及防止丢失,包括生产者发送时丢失、未到达队列以及消费者消费失败处理后丢失。 确保RabbitMQ消息可靠性的方法有:开启生产者确认机制,确保消息到达队列;启用消息持久化以防止未消费时丢失;使用消费者确认机制,如设置为auto,由Spring确认处理成功后ack。此外,可开启消费者失败重试机制,多次失败后将消息投递到异常交换机。
42 1
|
17小时前
|
消息中间件 Docker 微服务
RabbitMQ入门指南(十一):延迟消息-延迟消息插件
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了DelayExchange插件、延迟消息插件实现延迟消息等内容。
55 0
|
17小时前
|
消息中间件 微服务
RabbitMQ入门指南(十):延迟消息-死信交换机
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了死信交换机、死信交换机实现延迟消息等内容。
47 0
|
17小时前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
43 0
RabbitMQ入门指南(九):消费者可靠性
|
17小时前
|
消息中间件 存储 Java
RabbitMQ入门指南(八):MQ可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了MQ数据持久化、LazyQueue模式、管理控制台配置Lazy模式、代码配置Lazy模式、更新已有队列为lazy模式等内容。
65 0