springcloud:springboot整合RabbitMQ|RabbitMQ保证消息可靠性(三)

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 上一章我们讲解了rabbitmq的四种交换机类型、七种通讯方式。本章我们将整合springboot来向大家完整演示rabbitmq的使用,并说明如何保证消息的可靠性。

0. 引言

上一章我们讲解了rabbitmq的四种交换机类型、七种通讯方式。本章我们将整合springboot来向大家完整演示rabbitmq的使用,并说明如何保证消息的可靠性。

1. RabbitMQ的安装

这里为了快速部署,我们通过docker来安装,如果需要其他安装方式的可以去rabbitmq官网或者github下载对应系统安装包来安装

1、下载镜像

docker pull rabbitmq

本文书写时,其版本为rabbitmq3.8.9

2、安装镜像

docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

3、开启远程管理,否则通过15672无法登陆管理页面
进入到docker容器中执行:

# 查看容器id
docker ps -a
# 进入容器
docker exec -it 容器id /bin/bssh
# 容器内执行
rabbitmq-plugins enable rabbitmq_management

4、访问ip:15672。如果是在虚拟机中安装的,记得开通15672,5672端口
在这里插入图片描述

2. RabbitMQ的使用

2.2 springboot整合RabbitMQ

1、创建springboot项目

2、在项目中引入amqp依赖,这里的版本与springboot保持一致

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.3.7.RELEASE</version>
</dependency>

2、配置文件中添加rabbitmq的配置

spring: 
  # rabbitmq配置
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /

3、声明及创建交换机、队列,如果不知道交换机、队列、routingKey概念的,可以先查看上一篇rabbitmq博客

这里我们创建一个队列和直接交换机来示例
(1)创建队列

    @Bean
    public Queue routingQueueA(){
        return new Queue(RabbitConstant.ROUTING_QUEUE_A);
    }

(2)创建交换机

    @Bean
    public DirectExchange testExchange(){
        return new DirectExchange("test.exchange");
    }

(3)绑定队列和交换机,并且设置routingKey

    @Bean
    public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){
        return BindingBuilder.bind(testQueue).to(testExchange).with("test.routing.key");
    }

完整代码:

注意以上代码是写在一个配置类中的,目的是为了在项目启动时能够加载该类,并且创建对应的Bean,即队列、交换机和绑定关系

@Configuration
public class RabbitMqConfig {

    @Bean
    public Queue testQueue(){
        return new Queue("test.queue");
        // 另一种创建队列的方法
        // return QueueBuilder.durable("test.queue").build();
    }

    @Bean
    public DirectExchange testExchange(){
        return new DirectExchange("test.exchange");
        // 另一种创建交换机的方法
        // return ExchangeBuilder.directExchange("test.exchange").build();
    }

    @Bean
    public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){
        return BindingBuilder.bind(testQueue).to(testExchange).with("test.routing.key");
    }
 
}

这里我们只声明了一个直接交换机,单个队列,rabbitmq的其他消息模型和交换机类型大家可以到上一篇中查看,这里不再累叙

4、生产者发送消息
(1)创建消息对象,这是我们要发送到消息队列中的自定义的消息对象

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MyMessage implements Serializable {

    private Long id;

    private String title;

    private String body;

    private Date createDate;

}

(1)创建发送方法

public class QueueController {

    private final RabbitTemplate rabbitTemplate;

    @GetMapping("sendTestQueue")
    public String sendTestQueue(){
        MyMessage message = new MyMessage(1L,"物流提醒","到达装货区域,注意上传凭证",new Date());
        rabbitTemplate.convertAndSend("test.exchange","test.routing.key", message);
        return "发送成功";
    }

}

5、创建消费者

@Component
public class QueueListener {

   @RabbitListener(queues = "test.queue")
    public void handler(MyMessage messageInfo, Message message, Channel channel) {
        System.out.println("接收的消息:"+messageInfo);
    }
}

6、测试调用我们的消息发送方法
在这里插入图片描述
7、可以看到结果中显示了我们刚刚发送的消息
在这里插入图片描述

2.3 RabbitMQ保证消息可靠性

我们在上一章没有讲解的Publisher Confirms模式就是用来保证消息可靠性的。下面我们来看看实现消息可靠性的具体代码,也就是实现Publisher Confirms模式。

2.3.1 哪些环节会导致消息丢失

首先我们要明白消息可靠性也就是保证消息不丢失,那么就要先理解消息会在哪些环节丢失,我们通过一张图来表述可能会导致消息丢失的环节
(1)生产者到交换机的过程中,如果生产者将消息发送出去了,但是还没送达之前,rabbitmq宕机了,或者因为网络原因消息在传输过程中丢失了,但生产者又不知道交换机没有收到,就会导致消息的丢失
(2)因为rabbitmq是基于内存运行的,当rabbitmq宕机或者重启,内存被初始化,就会导致消息丢失。
(3)交换机到队列的过程中,消息还没到达队列时,rabbitmq宕机了,就会导致消息丢失
(4)同2
(5)队列发送消息到消费者的过程中,当队列把消息发送出去了,在发送途中,因为网络波动或者消息者宕机导致消费者没有收到消息,但是队列并不知道消费者没有收到消息,就会导致消息丢失
(6)消费者接收到消息之后,还没有来得及处理消息,消费者就宕机了,也会导致消息丢失。
在这里插入图片描述

下面我们来针对这六个环节来谈谈如何保证消息不丢失

2.3.2 保证消息一定发送到交换机

利用消息队列的confirm机制可以保证消息发送到交换机的可靠性

  • 思路:

所谓confirm机制就是:交换机收到消息后会发送一个ack回执给生产者,接收成功ack=true,接收失败ack=false。那么我们就可以通过设置一个回调函数来监听这个ack,如果接收失败就叫消息重发或者存到数据库中后续补发,如果没有收到ack就说明消息在传输中丢失了,那么也进行补发

在开始讲解代码实现之前,要向大家普通,rabbtimq中confirm机制提供了三种类型:

SIMPLE:,发送的消息到达生产者后会触发waitForConfirms回调方法,为同步方式
CORRELATED:发送的消息到达生产者后会触发回调方法,为异步方式,相比较于SIMPLE效率更高
NONE:禁用发布确认类型,是默认值
  • 代码实现:

(1)首先将confirm机制设置为correlated
可以通过两种方式设置:一是在配置文件中设置,二是通过setPublisherConfirmType方法设置

spring:  
  rabbitmq: 
    publisher-confirm-type: correlated

(2)实现confirmCallback方法。这里的重试机制可以设置为重新发送,或者将消息存放到数据库后续再发送

   @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("消息发送交换机{}:correlationData({}),ack({}),cause({})",ack ? "成功":"失败", correlationData, ack, cause)
            if(!ack){
                // 发送失败进行重试机制

            }
        });
        return rabbitTemplate;
    }

confirmCallback方法中的三个参数分别为

correlationData:相关数据
ack:消息是否到达交换机,true是,false否
cause:发送失败原因

2.3.3 实现消息持久化

我们上述已经说明,消息存在exchange和queue时可能也会导致消息丢失,那么我们如何保证消息不丢失呢?

这里想象一下redis也是基于内存的,它怎么防止数据丢失呢?

那就是做持久化,所谓持久化,就是把数据保存到磁盘,rabbitmq中怎么实现呢,rabbitmq接收到消息后先存储到内存,然后再存储到磁盘,只有当磁盘保存完毕后,才发送回执给生产者,这样即使rabbitmq宕机了也不会导致消息丢失

  • Exchange,Queue持久化

存储消息的地方有Exchange和Queue,那么我们就要在这两个地方实现持久化

我们查看之前创建交换机和队列的源码,其实会发现,里面有一个durable属性,就是用来声明是否持久化的,我们创建时如果不声明就默认为true了。

但是需要注意的是,这里的持久化只是用来控制交换机和队列是否持久化的。当durable=false时,只要rabbitmq重启,当没有消费者监听该交换机或者队列时,该交换机或队列就会被删除。常常用在临时队列中。durable=true时,交换机和队列就会被保存至磁盘,重启后会从磁盘读取到内存。
在这里插入图片描述
在这里插入图片描述

  • 消息持久化

消息的载体是交换机和队列我们要先实现他们的持久化,然后再实现消息本身的持久化

原生的做法,是通过设置BasicProperties的deliveryMode为2来声明其消息实现持久化,如下所示:

AMQP.BasicProperties props = new AMQP.BasicProperties()
    .builder()
    .deliveryMode(2)
    .build();

但我们现在的演示都是整合了springboot的,我们来看看其发送消息的方法的源码
(1)我们的消息发送是通过RabbitTemplate.convertAndSend方法实现的。该方法中调用了this.convertMessageIfNecessary(object)方法将消息进行了转换
在这里插入图片描述
(2)我们打开convertMessageIfNecessary方法,方法中新建了一个MessageProperties对象。咱们上述原生的声明不就是通过一个Properties对象来实现的吗,于是我们点击进该对象查看源码
在这里插入图片描述
(3)会发现该对象的无参构造方法中,声明了一个deliveryMode属性,其值为常量
DEFAULT_DELIVERY_MODE
在这里插入图片描述
(4)继续追踪该常量的值,会发现其定义就是一个枚举类MessageDeliveryMode.PERSISTENT,其命名为持久化,通过其名称我们已经能够联想到什么了。
在这里插入图片描述
(5)为求真相,打开该枚举类,于是乎我们终于找到了我们想要的东西,其值就是2,与原生的设置异曲同工,这说明,amqp中默认就将消息设置为持久化的了。
在这里插入图片描述
所以呢,也不需要我们配置了,可能有同学会疑惑,都不用配置的,你讲他干嘛,这不浪费时间吗?

学习,讲究知其然,知其所以然。如果你抱有不求甚解的态度去学习,那么你能学到的永远是皮毛,经不起考究。

2.3.4 保证消息一定路由到队列

我们上述所说的confirm机制,只能保证消息发送到Exchange,并不能保证Exchange一定能将消息路由到Queue

我们就需要Return机制来保证消息能够路由到队列

  • 思路:
    Return机制,就是当消息进入从交换机转发到消息队列,但消息队列未收到时调用回调函数,可以在回调函数中通过实现我们的重试机制来实现消息不丢失。

    这里需要注意的是,return机制提供了两种模式,通过Mandatory属性来设置
    (1)Mandatory=true,消息通过交换机无法匹配到队列时会返回给生产者,并触发returnCallback
    (2)Mandatory=false,消息通过交换机无法匹配到队列时会直接丢弃消息,默认配置

  • 代码:

(1)开启return机制
方式一:setPublisherReturns

connectionFactory.setPublisherReturns(true);

方式二:setMandatory,Mandatory为true时会自动开启return机制

rabbitTemplate.setMandatory(true);

(3)声明returnCallback方法:重试机制中可以重新发送消息,或者存储数据库后续重发

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  //      connectionFactory.setPublisherReturns(true);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            // 路由失败,进行重试机制
            
        });
        return rabbitTemplate;
    }

returnCallback方法参数

message: 消息
replyCode:回应码
replyText:回应信息
exchange:交换机
routingKey:路由键

2.3.5 保证消息一定被消费者消费

想要保证消息一定被消费者消费,我们可以通过手动ACK的形式,我们上述讲到了消息接收后会发送一个ACK回执,通过该回执来确定消息是否达到

  • 思路:

同理,针对发送给消费者,我们也可以通过手动ACK的形式,所谓手动ACK,就是消费者自己确定什么时候发送这个ACK回执过去,于是可以等到消息消费完毕后再发送这个回执回去,这样就能确保消息已经被消费,消息队列收到ACK后才将消息删除,如果这中间被中断,那么就不会有ACK回执,那消息队列中的消息就不会被删除

  • 代码:

(1)开启手动ack

spring: 
  rabbitmq: 
    # 手动提交消息
    listener:
      simple:
        acknowledge-mode: manual
      direct:
        acknowledge-mode: manual

acknowledge-mode提供了三种模式:

NONE:自动模式,默认配置,只要有消费者接受到消息,无论消费成功都认为消费成功
MANUAL:手动模式,消费者自己控制什么时候返回ACK
AUTO:自动模式,但会根据报错来决定是否删除队列中的消息,具体规则如下

如果成功消费,没有抛出异常,则自动确认,删除队列中的消息
如果抛出AmqpRejectAndDontRequeueException异常,拒绝确认,不删除队列中的消息
如果抛出ImmediateAcknowledgeAmqpException异常,自动确认,删除队列汇总消息
如果抛出其他异常,则拒绝确认,不会删除队列中的消息

(2)消费者发送ack

     @RabbitListener(queues = "test.queue")
    public void handler(MyMessage messageInfo, Message message, Channel channel) {
        try{
            System.out.println("接收的消息:"+messageInfo.toString());
            // 返回ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (IOException e){
            try {
                channel.basicRecover();
            } catch (IOException ex) {
                ex.printStackTrace();
                log.error("消息处理失败:{}",e.getMessage());
            }
        }
    }

配置文件中完整代码

@Configuration
@Slf4j
public class RabbitMqConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("消息发送交换机{}:correlationData({}),ack({}),cause({})",ack ? "成功":"失败", correlationData, ack, cause);
            if(!ack){
                // 发送失败进行重试机制

            }
        });  
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            // 路由失败,进行重试机制

        });
        return rabbitTemplate;
    }

    @Bean
    public Queue testQueue(){
        return new Queue("test.queue");
    }

    @Bean
    public DirectExchange testExchange(){
        return new DirectExchange("test.exchange");
    }

    @Bean
    public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){
        return BindingBuilder.bind(testQueue).to(testExchange).with("test.routing.key");
    }

}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 Java 网络架构
|
2月前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
429 37
|
1月前
|
负载均衡 网络协议 Java
浅谈Springboot与Springcloud的区别
浅谈Springboot与Springcloud的区别
39 1
|
3月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
2月前
|
消息中间件 存储 Java
SpringCloud基础4——RabbitMQ和SpringAMQP
消息队列MQ、RabbitMQ、SpringAMQP高级消息队列协议、发布/订阅模型、fanout、direct、topic模式
SpringCloud基础4——RabbitMQ和SpringAMQP
|
3月前
|
Java 微服务 Spring
SpringBoot+Vue+Spring Cloud Alibaba 实现大型电商系统【分布式微服务实现】
文章介绍了如何利用Spring Cloud Alibaba快速构建大型电商系统的分布式微服务,包括服务限流降级等主要功能的实现,并通过注解和配置简化了Spring Cloud应用的接入和搭建过程。
SpringBoot+Vue+Spring Cloud Alibaba 实现大型电商系统【分布式微服务实现】
|
3月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
835 2
|
3月前
|
消息中间件 Java Maven
|
3月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
67 0
|
3月前
|
Java
SpringBoot和SpringCloud对应版本
SpringBoot和SpringCloud对应版本
85 0
下一篇
无影云桌面