【RocketMQ系列十四】RocketMQ中消息堆积如何处理

简介: 【RocketMQ系列十四】RocketMQ中消息堆积如何处理

1. 消息堆积

消息堆积顾名思义就是消息队列中堆积了大量未被处理的消息,主要发生在高并发的场景下,生产者发送消息的速率远大于消费者组消息的速度。在物联网的AIOT场景中比较常见。

在RocketMQ的Console上可以查看某个Topic上消息堆积的情况。

这里有个延迟就表示目前堆积的消息数。

2. 消息堆积出现的原因

消息堆积的本质原因还是消费者消费消息的速度赶不上生产者发送消息的速度。可能的情况有:

  1. 第一种情况: 新上线的消费者的消费逻辑存在Bug,导致消息不能被正常消费。这种场景主要存在于代码逻辑不严谨导致某些消息消费失败,或者消费超时,从而导致消息被大量堆积。
  2. 第二种情况:消费者实例宕机或者由于网络的原因不能连上Broker集群。这种情况主要是消费者实例可能是单节点或者机房网络不好的情况。
  3. 第三种情况:生产者短时间内大量发送消息到Broker端,消费者的消费能力不足。消费者消费消息往往是一些比较耗时的IO操作,比如操作数据库,调用其他服务。这导致消费者的消费速率远低于生产者发送速率。这种情况也是消息堆积的常见场景。

3. 如何解决消息堆积

  1. 解决第一种情况:对需要上线的消费者进行严格的测试,确保每种消息的场景都能覆盖到。另外,在上线的时候采用灰度发布,先灰度小范围的用户进行使用,确认没有问题了,在全量放开所有用户使用。
  2. 解决第二种情况:在上线消费者实例时需要,采用多实例,异地多活的方式,确保极端的情况下都能有消费者能够正常消费消息。
  3. 解决第三种情况:这种情况的解决本质上是如何提高消费者的消费速率。主要可以从如下方面解决:
  1. 同一个消费者组下,增加消费者实例。比如Topic中有8个队列,那么可以将消费者数量最多增加到8个。那么有同学会问为啥只增加到8个,我增加到9个,乃至10个行不行?答案是你可以增加10个消费者,但是多余的2个消费者是分不到Queue的。这是因为 在RocketMQ中某个topic下的某个队列只能被同一消费者组中的某个消费者消费。 如果消费者数量少于Queue的数量,那么有可能会出现消费不均的情况。
  2. 提高单个消费者的消费并行线程。RocketMQ  支持批量消费消息,可以通过修改DefaultMQPushConsumer  消费者类的consumeThreadMin(最少消费线程数),以及consumeThreadMax(最大消费线程数)来提高单个消费者的消费能力。
  3. 批量消费消息:
    某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时  1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量。建议使用5.x  SDK的SimpleConsumer,每次接口调用设置批次大小,一次性拉取消费多条消息。
  1. 下面就让我们来看个例子:
    生产者:使用的是DefaultMQProducer;
//4.创建消息
    StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    for (int i = 0; i < 20000; i++) {
      // 创建消息,指定topic,以及消息体
      Message message = new Message("heap_topic", ("消息堆积测试" + i).getBytes());
      //5.发送消息
      SendResult send = defaultMQProducer.send(message);
      System.out.println(send);
    }
    stopWatch.stop();
    System.out.println("生产者发送2万条消息用时="+stopWatch.getTotalTimeSeconds()+"秒");
  1. 消费者:使用的是DefaultMQPushConsumer;
// 4.创建一个回调函数
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
      System.out.println("本批次收到的消息数="+msgs.size());
      // 5.处理消息
      for (MessageExt msg : msgs) {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
        System.out.println("当前时间="+System.currentTimeMillis()+" 收到的消息内容:" + new String(msg.getBody()));
      }
      // 返回消费成功的对象
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
  1. 生产者329秒内发送了2万条消息,平均60条,

    而消费者消费一条消息需要一秒,所以生产者发送完消息之后,两个消费者还在消费。

这里消费者使用的是DefaultMQPushConsumer消费者 每批次Broker端会向消费者推送32条消息,通过pullBatchSize字段设置,而消费者,每次消费1条消息,通过consumeMessageBatchMaxSize字段设置。

当然,官方推荐使用SimpleConsumer进行批量消费消息。

//每批次拉取16条消息
    int maxMessageNum = 16;
    // Set message invisible duration after it is received.
    Duration invisibleDuration = Duration.ofSeconds(15);
    // Receive message, multi-threading is more recommended.
    do {
      final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
      log.info("Received {} message(s)", messages.size());
      for (MessageView message : messages) {
        final MessageId messageId = message.getMessageId();
        try {
          consumer.ack(message);
          log.info("Message is acknowledged successfully, messageId={}", messageId);
        } catch (Throwable t) {
          log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
        }
      }
    } while (true);

官方的代码示例

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
消息中间件 存储 运维
Rabbitmq消息大量堆积怎么办?
该文讨论了一个系统架构问题,主要涉及RabbitMQ在处理订单消息时遇到的性能瓶颈。首先,系统使用RabbitMQ是为了解耦和提高性能,前端创建订单后通过RabbitMQ发送消息给订单履约系统消费并执行后续操作。当订单流量激增时,消息堆积导致服务器压力增加。 排查解决方案: 1. 增加消费者以提高消费速度,但发现即使增加消费者,消息堆积问题仍未解决。 2. 分析消费者逻辑,发现调用库存系统接口可能导致处理速度慢。库存系统压力大,接口响应慢,加剧问题。 3. 实施清空堆积消息的策略,新建消费者快速消费消息并存储在表中,减轻服务器压力。待库存服务恢复后,再将消息推回RabbitMQ处理。
362 1
|
消息中间件 Dubbo Java
如何应对RocketMQ消息堆积
很多同学都在使用 RocketMQ 时,经常会遇到消息堆积的问题。这篇文章,我们聊聊消息堆积的概念,以及如何应对消息堆积。
1584 1
|
消息中间件 存储 Java
《RabbitMQ》| 解决消息延迟和堆积问题
本文主要介绍 RabbitMQ的常见问题
895 1
|
2月前
|
消息中间件 存储 监控
MQ线上大规模消息堆积问题处理及使用场景详解
【11月更文挑战第21天】在如今的高并发互联网应用中,消息队列(Message Queue,简称MQ)扮演着至关重要的角色
148 1
|
7月前
|
消息中间件 负载均衡 开发工具
消息队列 MQ产品使用合集之当一个服务出现堆积后,为什么不把后面的流量负载到其它服务上
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ产品使用合集之当一个服务出现堆积后,为什么不把后面的流量负载到其它服务上
|
5月前
|
消息中间件 固态存储 RocketMQ
RocketMQ消息堆积常见场景与处理方案
文章分析了在使用RocketMQ时消息堆积的常见场景,如消费者注册失败或消费速度慢于生产速度,并提供了相应的处理方案,包括提高消费并行度、批量消费、跳过非重要消息以及优化消费代码业务逻辑等。
|
6月前
|
消息中间件 数据安全/隐私保护 RocketMQ
消息队列 MQ使用问题之遇到消费速度是固定的并且导致了堆积,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 监控 物联网
消息队列 MQ使用问题之如何获取和处理消息堆积数据
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
8月前
|
消息中间件 Shell 数据处理
rocket mq 查看消费进度,消息堆积,清除堆积数据命令
该内容是关于RocketMQ的消费进度管理和堆积数据处理的指导。首先,需进入RocketMQ的bin目录,然后使用`mqadmin consumerProgress`命令查看消费者或生产者的消费进度。`broker offset`和`consumer offset`的差值表示未消费消息。通过`resetOffsetByTime`命令可重置消费位点来清除堆积数据,未消费消息默认3天后会被丢弃。此外,`CONSUME_FROM WHERE`枚举类定义了消费起点选项,包括从最后、最开始或指定时间点消费。
1929 3
|
消息中间件 存储 缓存
RocketMQ 监控告警:生产环境如何快速通过监控预警发现堆积、收发失败等问题?
本文主要向大家介绍如何利用 RocketMQ 可观测体系中的指标监控,对生产环境中典型场景:消息堆积、消息收发失败等场景配置合理的监控预警,快速发现问题,定位问题。
1477 0
RocketMQ 监控告警:生产环境如何快速通过监控预警发现堆积、收发失败等问题?