RabbitMQ中的消息可靠性问题(上)

简介: RabbitMQ中的消息可靠性问题(上)

image.png

其中的每一步都可能导致消息丢失,常见的丢失原因包括:

  • 发送时丢失:
  • 生产者发送的消息未送达exchange
  • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

针对这些问题,RabbitMQ分别给出了解决方案:

  • 生产者确认机制
  • mq持久化
  • 消费者确认机制
  • 失败重试机制

本篇博客就来带大家解决消息的可靠性。

1、导入Demo工程

导入Demo课程。

image.png

把配置文件application.yml配置修改完毕。

然后我们在consumer 的config包的通用配置类创建一个队列。

image.png

运行consumer的启动类。

image.png

2、生产者消息确认

生产者确认机制:

RabbitMQ提供了publisherconfirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求: 1. publisher-confirm,发送者确认

  1. 消息成功投递到交换机,返回Ack(acknowledge 告知已收到)。
  2. 消息未投递到交换机,返回Nack(未收到)。

2. publisher-return,发送者回执 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

image.png

注:

确认机制发送消息时,需要给每个消息设置一个全局唯一id以区分不同消息,避免ACK冲突。

2.1 修改配置

首先,修改publisher服务中的application.yml文件,添加下面的内容:

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

image.png

配置说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
  • simple:同步等待confirm结果,直到超时(可能会导致消息堵塞,不推荐)。
  • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback。(推荐)
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback。
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息。

2.2 定义Return回调

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置

修改publisher服务,添加一个:

image.png

package com.jie.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @description:生产者通用配置
 * @author: jie
 * @time: 2022/2/25 15:34
 */
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    /**
     * @description:每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置
     * @author: jie 
     * @time: 2022/2/25 15:35
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 投递失败,记录日志
            log.info("消息发送到队列失败,响应码{},失败原因{},交换机{},路由键{},消息{}",
                    replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有业务需要,可以重发消息
        });
    }
}

2.3 发送消息

我们发送消息通过一个单元测试来发。

image.png

这里面有一个最简单的消息发送代码。

image.png

我们要去为amq.topic这个交换机绑定一下simple.queue这个队列。这里我用的是手动的方式,大家可以选择使用代码的方式。

打开浏览器

image.png

点击进去

image.png

绑定完成,回到代码区,我现在发送消息,符合要求,那一定能发送成功,所以我们要修改一下代码。


@Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        String routingKey = "simple.test";
        //1、准备消息
        String message = "hello, spring amqp!";
        //2、准备CorrelationData
        //2.1 消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //2.2 准备ConfirmCallback
        //成功回调
        correlationData.getFuture().addCallback(confirm -> {
            //判断结果
            if(confirm.isAck()){
                //ACK 消息成功
                log.error("消息成功投递到交换机!消息ID:{}",correlationData.getId());
            }else {
                //NACK 消息失败
                log.error("消息投递到交换机失败!消息ID:{}",correlationData.getId());
                //重发消息
            }
            //失败回调
        }, throwable -> {
            //记录日志
            log.error("消息发送失败!",throwable);
        });
        //3、发送消息
        rabbitTemplate.convertAndSend("amq.topic", routingKey, message,correlationData);
    }

我们可以先运行代码查看控制消息。

image.png

这个是成功的情况,接下来演示一下失败的情况。比如消息根本没有到达交换机,可能是交换机名称填错了。image.png

还有一种就是交换机到达了,没有到达队列。比如队列的名称填错了。

image.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8月前
|
消息中间件 存储 监控
|
8月前
|
消息中间件 安全 Java
【RabbitMQ高级篇】消息可靠性问题
【RabbitMQ高级篇】消息可靠性问题
181 0
|
8月前
|
消息中间件 存储 数据库
RabbitMQ之MQ的可靠性
RabbitMQ之MQ的可靠性
105 0
|
8月前
|
消息中间件 存储 运维
|
15天前
|
消息中间件 Java 中间件
MQ四兄弟:如何保证消息可靠性
本文介绍了RabbitMQ、RocketMQ、Kafka和Pulsar四种消息中间件的可靠性机制。这些中间件通过以下几种方式确保消息的可靠传输:1. 消息持久化,确保消息在重启后不会丢失;2. 确认机制,保证消息从生产者到消费者都被成功处理;3. 重试机制,处理失败后的重试;4. 死信队列,处理无法消费的消息。每种中间件的具体实现略有不同,但核心思想相似,都是从生产者、中间件本身和消费者三个角度来保障消息的可靠性。
17 0
|
8月前
|
消息中间件 SQL Java
RabbitMQ之消费者可靠性
RabbitMQ之消费者可靠性
|
5月前
|
消息中间件 存储 运维
RabbitMQ-消息消费时的可靠性保障
将这些实践融入到消息消费的处理逻辑中,可以很大程度上保障RabbitMQ中消息消费的可靠性,确保消息系统的稳定性和数据的一致性。这些措施的实施,需要在系统的设计和开发阶段充分考虑,以及在后续的维护过程中不断的调整和完善。
66 0
|
8月前
|
消息中间件 存储 运维
深入理解MQ消息队列的高可用与可靠性策略
深入理解MQ消息队列的高可用与可靠性策略
1353 3
|
消息中间件 存储 Kafka
如何保证MQ中消息的可靠性传输?
如何保证MQ中消息的可靠性传输?
123 1
|
8月前
|
消息中间件 Java API
【微服务系列笔记】MQ消息可靠性
消息可靠性涉及防止丢失,包括生产者发送时丢失、未到达队列以及消费者消费失败处理后丢失。 确保RabbitMQ消息可靠性的方法有:开启生产者确认机制,确保消息到达队列;启用消息持久化以防止未消费时丢失;使用消费者确认机制,如设置为auto,由Spring确认处理成功后ack。此外,可开启消费者失败重试机制,多次失败后将消息投递到异常交换机。
138 1