消息队列的消息大量积压怎么办?

简介: 消息队列的消息大量积压怎么办?

消息队列的消息大量积压怎么办?

1 问题追溯

系统出现性能问题,来不及处理上游发的消息,导致消息积压。消息积压是正常现象,但积压太多就需要处理了。就像水库,日常蓄水是正常的,但下游泄洪能力太差,导致水库水位一直不停上涨,就不正常!

2 开发的梦魇

  • 日常开发使用MQ时,如何避免消息积压?
  • 若线上已出现积压了,如何应急?

3 性能优化

性能优化主要在生产者和消费者这俩业务逻辑。

MQ自身性能,作为API使用者,无需过于关注。因大多MQ业务,MQ本身处理能力>>业务系统。主流MQ的单个节点,消息收发性能可达几w ~ 几十w条消息/s,还可水平扩展Broker实例数倍增处理能力。而一般业务系统需处理的业务逻辑远比MQ复杂,单节点每秒可处理几百~几千次请求,已算性能佳。

所以,MQ性能优化,更关注在消息收发两端,业务代码怎么和MQ协作达到最佳性能。

3.1  生产端

此端的业务代码处理性能,和MQ关系不大,都是先执行业务逻辑,最后再发消息。若你的代码发送消息的性能上不去,优先检查是否为发消息前的业务逻辑耗时太多。

对于发消息的业务逻辑,只需注意设置合适的并发和同步大小,即可达到很好发送性能。Pro发消息给Broker,Broker收到消息后返回确认响应,是一次完整交互。假设一次交互平均时延1ms,把这1ms分解:

  1. 发送端准备数据、序列化消息、构造请求等逻辑时间,即发送端在发送网络请求前的耗时
  2. 发送消息和返回响应在网络传输中耗时
  3. Broker处理消息的时延

若单线程发送,每次只发1条,则每秒只发

1000ms / 1ms * 1条/ms =1000条消息

这并不能压榨MQ性能。

无论是增加每次发送消息的批量大小、增加并发,都能倍增发送性能。那到底选择批量发送 or 增加并发?取决于发送端的业务性质。能满足你的性能要求,就能搞。

  • 若发送端是个微服务,主要接受RPC请求处理在线业务微服务在处理每次请求时,就在当前线程直接发消息,因为所有RPC框架都是多线程支持并发,自然可并行发送消息。且在线业务比较在意请求响应时延,批量发送势必影响RPC服务时延。这时通过并发提升发送性能就更好。
  • 若是离线分析系统,并不关心时延,而注重整个系统的吞吐量发送端数据都来自DB,更适合批量发送,可批量从DB读数据,然后批量发送消息,用少量并发即可获得高吞吐量。

批量消费中,若某条消息消费失败,则重试会将整批消息重发。批量消费是一次取一批消息,等这一批消息都成功,再提交最后一条消息的位置,作为新的消费位置。若其中任一条失败,则认为整批都失败。

3.2 消费端

使用MQ,大部分性能问题都出在消费端。若消费速度跟不上发送端生产消息速度,就会造成消息积压。若这种性能倒挂的问题是暂时的,问题不大,只要消费端性能恢复后,超过发送端的性能,积压的消息是可逐渐被消化的。若消费速度一直比生产速度慢,系统就会异常:

  • MQ存储被填满无法提供服务
  • 消息丢失

所以设计系统,要保证消费端消费性能>生产端发送性能

消费端性能优化除优化消费业务逻辑,也可水平扩容,增加消费端并发数提升总体消费性能。扩容Con实例数量时,必须同步扩容主题中的分区(也叫队列)数量,确保Con实例数和分区数量相等。若Con实例数量>分区数量,这样的扩容实际上徒劳。因为对Con,在每个分区实际上只能支持单线程消费。

这一步需要业务consumer团队联系消息中间件团队一起运维配合。

很多消费程序这样解决消费慢:

它收消息处理的业务逻辑可能较慢,也很难再优化,为避免积压,在收消息的OnMessage方法,不处理任何业务,把这消息放到一个本地消息表就返回。然后可通过定时任务,启动很多业务线程,里面是真正处理消息的业务逻辑,这些线程从本地消息表取消息处理,就解决了单Consumer不能并行消费问题。

消息积压,如何处理

还有种消息积压,日常系统正常运转时,没有积压或只有少量积压很快就消费了。但某刻,突然开始积压消息且积压持续上涨。这时,得在短时间内找到消息积压原因,迅速解决问题才不至于继续影响业务。排查消息积压的原因,有成熟方案。积压突然增加,最粗粒度的原因:

  • 发送变快
  • 消费变慢

大部分MQ内置监控功能,通过监控数据,很容易确定哪种原因。若单位时间发送消息增多,如秒杀,短时内不大可能优化消费端代码以提升性能,只能通过扩容消费端实例数提升总体消费能力。若短时内无足够服务器资源扩容,那就将系统降级,关闭一些不重要业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些核心业务。

kafka不仅是扩容时候,只要是consumer和partition有一方数量变化,都会触发rebalance。

还有种不太常见的,通过监控发现,无论是发送消息速度还是消费消息速度和原来都没啥变化,这时需检查消费端,是不是消费失败,导致同一条消息反复消费,这也会拖慢整个系统的消费速度。

若监控到消费变慢,需检查消费实例,分析消费变慢原因。优先检查日志是否有大量消费错误,若无错误,可打印堆栈信息,看消费线程是不是卡在哪里不动,如触发死锁或卡在等待某些资源

消费端是否可通过同步消费提升消费性能呢?

消费端进行批量操作,感觉和上面的先将消息放在内存队列,然后再并发消费消息类似,若机器宕机,这些批量消息都会丢失,若在DB层面,批量操作在大事务,会导致锁竞争,也会导致主备不一致。若是一些不重要消息,如备份日志,即可使用批量操作,提高消费性能,因为这样一些日志消息丢失也能接受。

若Con消费异常,即使多次消费也无法成功处理(如消息格式异常),导致一直无法ack这条消息,咋办?有的MQ提供“死信队列”功能,会自动把这种反复消费都失败的消息丢到死信队列,避免一条消息卡主队列。

总结

消息积压处理:1、发送端优化,增加批量和线程并发两种方式处理2、消费端优化,优化业务逻辑代码、水平扩容增加并发并同步扩容分区数量

查看消息积压的方法:1、消息队列内置监控,查看发送端发送消息与消费端消费消息的速度变化2、查看日志是否有大量的消费错误3、打印堆栈信息,查看消费线程卡点信息

1.无法提升消费业务效率(仅受消费业务自身逻辑影响),但可提高MQ中堆积消息消费的整体吞吐量(批推比单推mq耗时较短)。2.数据增量同步,监控信息采集。(非核心业务的稳定大数据流操作)。3.批处理意味数据积累和大数据传输,这会让单次消费的最长时延变长。同时批量操作为了保证当前批量操作一致性,在个别失败的情况下会引发批量操作重试。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 监控 应用服务中间件
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
333 1
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
|
消息中间件
|
消息中间件 存储 SQL
阿里云消息队列 Kafka-消息检索实践
本文章主要介绍消息队列使用过程中所遇到的消息丢失、重复消费等痛点问题的排查办法,以及消息队列 Kafka「检索组件」的场景实践,并对其关键技术进行解读。旨在帮助大家对消息队列 Kafka「检索组件」的特点和使用方式更加熟悉,以更有效地解决消息排查过程中所遇到的问题。
阿里云消息队列 Kafka-消息检索实践
|
消息中间件 存储 前端开发
同步异步调用,并谈谈消息队列mq;RocketMQ发送消息和消费消息测试类
同步调用优点: 时效性强,打电话、直播,很快可以得到结果 同步调用的问题:
803 1
|
消息中间件 关系型数据库 MySQL
5. 消息队列中,如何保证消息的顺序性?
5. 消息队列中,如何保证消息的顺序性?
725 0
5. 消息队列中,如何保证消息的顺序性?
|
消息中间件 存储 监控
MQ的作用及如何解决消息队列的丢失、重复和积压问题
引入 MQ 消息中间件最直接的目的是:做系统解耦合流量控制,追其根源还是为了解决互联网系统的高可用和高性能问题。 系统解耦:用 MQ 消息队列,可以隔离系统上下游环境变化带来的不稳定因素,比如京豆服务的系统需求无论如何变化,交易服务不用做任何改变,即使当京豆服务出现故障,主交易流程也可以将京豆服务降级,实现交易服务和京豆服务的解耦,做到了系统的高可用。
297 0
|
消息中间件 存储 安全
如何使用消息队列的事务消息(下)
如何使用消息队列的事务消息
482 0
如何使用消息队列的事务消息(下)
|
消息中间件 NoSQL Kafka
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(下)
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(下)
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(下)
|
消息中间件 Kafka 数据库
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(上)
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(上)
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(上)
|
消息中间件 存储 SQL
消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?(下)
消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?(下)
消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?(下)