消息堆积问题究竟是什么

简介: 消息堆积问题究竟是什么

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。

解决消息堆积有三种思路:

  1. 增加更多消费者,提高消费速度
  2. 在消费者内开启线程池加快消息处理速度
  3. 扩大队列容积,提高堆积上限

1、惰性队列

上面呢,我们已经 知道解决消息队列的常见三种解决方案,其中一种方案就是想办法去提高一个队列它能存储一个消息量的上限。

但是RabbitMQ呢是内存存储的,如果说在高并发的情况下消息量非常的大,这些消息我们如果都给它丢到内存当中,显然是不合适的,所以我们就要学习一个惰性队列来解决这个问题!

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:

  1. 接收到消息后直接存入磁盘而非内存
  2. 消费者要消费消息时才会从磁盘中读取并加载到内存
  3. 支持数百万条的消息存储

1.1 基于@Bean声明lazy-queue

image.png

package com.jie.mq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class LazyConfig {
   /**
    * @description:惰性队列
    * @author: jie
    * @time: 2022/3/13 11:06
    */
    @Bean
    public Queue lazyQueue(){
        return QueueBuilder.durable("lazy.queue")
                .lazy()
                .build();
    }
    /**
     * @description:普通队列
     * @author: jie
     * @time: 2022/3/13 11:06
     */
    @Bean
    public Queue normalQueue(){
        return QueueBuilder.durable("normal.queue")
                .build();
    }
}

运行,查看浏览器。

image.png

1.2 基于@RabbitListener声明LazyQueue

/**
     * @description:声明惰性队列
     * @author: jie
     * @time: 2022/3/13 14:37
     */
    @RabbitListener(queuesToDeclare = @Queue(
            name = "lazy.queue",
            durable = "true",
            arguments = @Argument(name = "x-queue-mode",value = "lazy")
    ))
    public void listenLazyQueue(String msg){
        System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
    }

image.png1.3 发送消息

package com.jie.mq.spring;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.nio.charset.StandardCharsets;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testLazyQueue() throws InterruptedException {
        for (int i = 0; i < 1000000; i++) {
            String routingKey = "delay";
            //1、准备消息
            Message message = MessageBuilder
                    .withBody("hell,Spring".getBytes(StandardCharsets.UTF_8))
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                    .build();
            //2、发送消息
            rabbitTemplate.convertAndSend("lazy.queue", message);
        }
    }
    @Test
    public void testNormaQueue() throws InterruptedException {
        for (int i = 0; i < 1000000; i++) {
            String routingKey = "delay";
            //1、准备消息
            Message message = MessageBuilder
                    .withBody("hell,Spring".getBytes(StandardCharsets.UTF_8))
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                    .build();
            //2、发送消息
            rabbitTemplate.convertAndSend("normal.queue", message);
        }
    }

2、总结

消息堆积问题的解决方案?

  1. 队列上绑定多个消费者,提高消费速度
  2. 使用惰性队列,可以再mq中保存更多消息

惰性队列的优点有哪些?

  1. 基于磁盘存储,消息上限高
  2. 没有间歇性的page-out,性能比较稳定

惰性队列的缺点有哪些?

  1. 基于磁盘存储,消息时效性会降低
  2. 性能受限于磁盘的IO


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 存储 Java
《RabbitMQ》| 解决消息延迟和堆积问题
本文主要介绍 RabbitMQ的常见问题
867 1
|
1天前
|
消息中间件 存储 Java
MQ线上消息乱序问题处理及场景详解
【11月更文挑战第22天】在现代分布式系统中,消息队列(MQ)作为核心组件,承担着异步处理、削峰填谷和系统解耦的重任。
9 1
|
4月前
|
消息中间件 数据安全/隐私保护 RocketMQ
消息队列 MQ使用问题之遇到消费速度是固定的并且导致了堆积,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 监控 物联网
消息队列 MQ使用问题之如何获取和处理消息堆积数据
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 API RocketMQ
消息队列 MQ使用问题之消息在没有消费者的情况下丢失,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Shell 数据处理
rocket mq 查看消费进度,消息堆积,清除堆积数据命令
该内容是关于RocketMQ的消费进度管理和堆积数据处理的指导。首先,需进入RocketMQ的bin目录,然后使用`mqadmin consumerProgress`命令查看消费者或生产者的消费进度。`broker offset`和`consumer offset`的差值表示未消费消息。通过`resetOffsetByTime`命令可重置消费位点来清除堆积数据,未消费消息默认3天后会被丢弃。此外,`CONSUME_FROM WHERE`枚举类定义了消费起点选项,包括从最后、最开始或指定时间点消费。
1658 3
|
5月前
|
消息中间件 存储 监控
【消息中间件】详解mq消息积压
【消息中间件】详解mq消息积压
200 0
|
消息中间件 Arthas Java
线上kafka消息堆积,consumer掉线,怎么办?
线上kafka消息堆积,consumer掉线,怎么办?
190 0
|
6月前
|
消息中间件 存储 NoSQL
消息队列之堆积问题分析
消息队列之堆积问题分析
141 1
|
消息中间件 弹性计算 Java
RocketMQ-没有消费者的消息堆积场景分析
RocketMQ-没有消费者的消息堆积场景分析
323 1