八、SpringBoot+RabbitMQ削峰

简介: 八、SpringBoot+RabbitMQ削峰

当大量的客户访问请求打到后端,去访问数据库等,瞬间会爆炸的。
经过前端或者其他的方案进行限流外。
还是有大量的请求,这个时候需要削峰了

1. 削峰示例1

application.yaml

rabbitmq:
    listener:
          simple:
            acknowledge-mode: manual  #消息手动确认
            #削峰限流
            prefetch: 1  #消费者每次从队列中取几个消息
            concurrency: 1 #消费者数量
            max-concurrency: 1  #启动消费者最大数量

生产者:

@RestController
public class DirectProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostMapping("direct")
    public String sendMessage(){
        String message = "消息";
        for(int i =0;i<10;i++) {
            rabbitTemplate.convertAndSend("myExchange", "myKey", message+":"+i);
            System.out.println("生产者发送消息:" + message+":"+i);
        }
        return "send message successfully";
    }
}

消费者:

先设置小一点,然后循环往队列里面放消息,消费的时候延迟2

@Component
public class DirectConsumer {
    // 监听并接收队列中的消息
    @RabbitHandler
    @RabbitListener(queues = "myQueue")
    public void getMessage(String msg, Channel channel, Message message) throws InterruptedException {
        Thread.sleep(2000L);
        System.out.println("消费者接收到了消息:" + msg);
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

效果:此时消息会全部放到列队,但是会一条一条消费。简单的实现了削峰处理

2. 削峰示例2

调整消费者的数量:

rabbitmq:
    listener:
          simple:
            acknowledge-mode: manual  #消息手动确认
            #削峰限流
            prefetch: 1  #消费者每次从队列中取几个消息
            concurrency: 3 #消费者数量
            max-concurrency: 10  #启动消费者最大数量

生产者发出50条message到MQ中

@PostMapping("direct")
    public String sendMessage(){
        String message = "消息";
        for(int i =0;i<50;i++) {
            rabbitTemplate.convertAndSend("myExchange", "myKey", message+":"+i);
            System.out.println("生产者发送消息:" + message+":"+i);
        }
        return "send message successfully";
    }

此时就会有3个消费者同时去消费队列中的消息。所以这个消费者数量需要根据实际的情况去设置所能承受的一个值,也就是峰值

3. 重试策略

如果说消费者在消费的过程中失败了,那么会一直消费,一直到成功为止。

但是也可以添加重试策略,比如失败三次就不在消费了。

rabbitmq:
    listener:
          simple:
            acknowledge-mode: manual  #消息手动确认
            #削峰限流
            prefetch: 1  #消费者每次从队列中取几个消息
            concurrency: 3 #消费者数量
            max-concurrency: 10  #启动消费者最大数量
            default-requeue-rejected: true #消息消费失败后,重新进入消费队列中
            #重试策略
            retry:
              initial-interval: 1000 #1秒后重试
              enabled: true #启用发布重试
              max-attempts: 3 #传递消息的最大尝试次数
              max-interval: 10000 #尝试的最大时间间隔
              multiplier: 1.0 #应用于先前传递重试时间间隔的乘数

消费者处理有异常:

@RabbitHandler
    @RabbitListener(queues = "myQueue")
    public void getMessage(String msg, Channel channel, Message message) throws InterruptedException {
        Thread.sleep(2000L);
         System.out.println("消费者接收到了消息:" + msg);
        int a = 10/0;  // 有异常
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

运行效果:

每次接收有异常,会将消息重新放到队列中

重试3次后,还有异常,则不再重试,直接报异常

4. 如何保证rabbitmq消息不丢失

丢失数据场景:

  1. 生产者没有生产成功,即生产者丢失
  2. rabbitmq丢失了
  3. 消费端丢失,即消费端没消费成功。

开启confirm回调,启动手动确定消息消费。

rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated  # 生产者发送消息到交换机 消息确认
    publisher-returns: true  # 交换机绑定到队列 消息确认
    listener:
      simple:
        acknowledge-mode: manual  #消息手动确认
        #削峰限流
        prefetch: 1  #消费者每次从队列中取几个消息
        concurrency: 3 #消费者数量
        max-concurrency: 10  #启动消费者最大数量
        default-requeue-rejected: true #消息消费失败后,重新进入消费队列中
        #重试策略
        retry:
          initial-interval: 1000 #1秒后重试
          enabled: true #启用发布重试
          max-attempts: 3 #传递消息的最大尝试次数
          max-interval: 10000 #尝试的最大时间间隔
          multiplier: 1.0 #应用于先前传递重试时间间隔的乘数
    template:
      mandatory: true
        #在消息没有被路由到合适队列情况下会将消息返还给消息发布者
        #当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
        # 那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,
        # 出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中, 否则就将消息return给发送者;
        #: true # 启用强制信息
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
2092 1
|
5月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
366 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
5月前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
309 2
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
9月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
302 32
|
8月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
1610 0
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
341 6
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
消息中间件 Java Maven

热门文章

最新文章