RocketMQ消息堆积常见场景与处理方案

简介: 文章分析了在使用RocketMQ时消息堆积的常见场景,如消费者注册失败或消费速度慢于生产速度,并提供了相应的处理方案,包括提高消费并行度、批量消费、跳过非重要消息以及优化消费代码业务逻辑等。

一、前言

在我的日常开发工作中,有碰到过一些使用RocketMQ消息堆积的场景,消息堆积是某个生产者发送了消息到topic下的队列中,但是消费者未及时消费,导致消费者组的消息堆积的情况,本文将分析常见的消息堆积的场景以及实际的处理方案。

二、消息堆积出现的场景

1、消费者注册未成功

这种情况发生在新上线消费者组,我们有遇到过消费组下的消费者实例设置的instanceName都一致的情况下,这个时候消费者组下某些消费者注册不成功,从而会发生消息堆积的情况。

image.png

2、消息生产过快,而消费者消费过慢

这种情况是我们的业务消费逻辑消费慢的场景,比如消费用户访问app首页消息,这种消息生产速度是非常快的,如果我们的消费逻辑非常慢的话,就出现消息堆积的情况。

三、消息堆积解决方案

  • 如果出现了消息堆积的情况,我们需要怎么处理呢。

上面第一种情况是属于MQ使用姿势问题,消费者都没有注册上去,这种情况需要及时调整代码,将消费者注册好,对于堆积的消息进行抛弃或者加快消费。

  • 如果是第二种情况,这是非使用姿势问题,消费者消费能力的问题。我们可以怎么处理呢?

手段一、提高消费并行度

1、同一个ConsumerGroup,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。

2、提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMinconsumeThreadMax实现。RocketMQ默认消费线程是20个。我们可以将他配置到分布式配置中心,比如apollo,实现动态的调整。

image.png

手段二、批量消费方式

某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量。

通过设置 consumer的 consumeMessageBatchMaxSize 参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。

手段三、跳过非重要消息,追赶上生产者进度

发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。示例代码如下:


public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
   
   
        long offset = msgs.get(0).getQueueOffset();
        String maxOffset =
                msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
        long diff = Long.parseLong(maxOffset) - offset;
        if (diff > 100000) {
   
   
            // TODO 消息堆积情况的特殊处理
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        // TODO 正常消费过程
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

手段四、优化消费代码业务逻辑

举例如下,某条消息的消费过程如下:

image.png

这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。

四、总结

消息堆积通常由于使用姿势问题或者消费者消费速率过慢导致,其中消费者过慢的解决方案也有很多种,我们的解决实际问题期间需要根据实际的使用场景选择解决最快并且消耗更少资源的方案。

你有没有其他的方案呢?欢迎在评论区和JY分享。

image.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 存储 监控
MQ线上大规模消息堆积问题处理及使用场景详解
【11月更文挑战第21天】在如今的高并发互联网应用中,消息队列(Message Queue,简称MQ)扮演着至关重要的角色
164 1
|
19天前
|
消息中间件 存储 Java
招行面试:10Wqps场景,RocketMQ 顺序消费 的性能 如何提升 ?
45岁资深架构师尼恩在其读者群中分享了关于如何提升RocketMQ顺序消费性能的高并发面试题解析。面对10W QPS的高并发场景,尼恩详细讲解了RocketMQ的调优策略,包括专用方案如增加ConsumeQueue数量、优化Topic设计等,以及通用方案如硬件配置(CPU、内存、磁盘、网络)、操作系统调优、Broker配置调整、客户端配置优化、JVM调优和监控与日志分析等方面。通过系统化的梳理,帮助读者在面试中充分展示技术实力,获得面试官的认可。相关真题及答案将收录于《尼恩Java面试宝典PDF》V175版本中,助力求职者提高架构、设计和开发水平。
招行面试:10Wqps场景,RocketMQ 顺序消费 的性能 如何提升 ?
|
2月前
|
消息中间件 存储 运维
2024最全RabbitMQ集群方案汇总
本文梳理了RabbitMQ集群的几种方案,主要包括普通集群、镜像集群(高可用)、Quorum队列(仲裁队列)、Streams集群模式(高可用+负载均衡)和插件方式。重点介绍了每种方案的特点、优缺点及适用场景。搭建步骤包括安装Erlang和RabbitMQ、配置集群节点、修改hosts文件、配置Erlang Cookie、启动独立节点并创建集群,以及配置镜像队列以提高可用性和容错性。推荐使用Quorum队列与Streams模式,其中Quorum队列适合高可用集群,Streams模式则同时支持高可用和负载均衡。此外,还有Shovel和Federation插件可用于特定场景下的集群搭建。
244 2
|
2月前
|
消息中间件 RocketMQ
2024最全RocketMQ集群方案汇总
在研究RocketMQ集群方案时,发现网上存在诸多不一致之处,如组件包含NameServer、Broker、Proxy等。通过查阅官方文档,了解到v4.x和v5.x版本的差异。v4.x部署模式包括单主、多主、多主多从(异步复制、同步双写),而v5.x新增Local与Cluster模式,主要区别在于Broker和Proxy是否同进程部署。Local模式适合平滑升级,Cluster模式适合高可用需求。不同模式下,集群部署方案大致相同,涵盖单主、多主、多主多从等模式,以满足不同的高可用性和性能需求。
241 0
|
3月前
|
消息中间件 存储 Java
MQ线上消息乱序问题处理及场景详解
【11月更文挑战第22天】在现代分布式系统中,消息队列(MQ)作为核心组件,承担着异步处理、削峰填谷和系统解耦的重任。
107 1
|
4月前
|
消息中间件 存储 弹性计算
云消息队列 RabbitMQ 版方案评测
本文评估了阿里云《高弹性,低成本,云消息队列 RabbitMQ 实践》方案,从实践原理理解、部署体验、方案优势展现及业务场景匹配四个方面进行了深入分析。文中指出,该方案在解决消息积压、提高系统稳定性、支持弹性伸缩等方面表现优异,但也提出了在组件功能解释、实战案例提供等方面的改进建议,以期帮助用户更好地理解和应用该技术解决方案。
200 0
|
4月前
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
150 0
|
7月前
|
消息中间件 数据安全/隐私保护 RocketMQ
消息队列 MQ使用问题之遇到消费速度是固定的并且导致了堆积,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 监控 物联网
消息队列 MQ使用问题之如何获取和处理消息堆积数据
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
253 11