1.产生原因
消息积压(Message Backlog)指的是在消息队列(MQ)系统中等待被处理的消息数量超过了正常的处理速度,导致消息在队列中积压堆积的情况。
消息积压的常见表现:
- 系统资源使用率上升:消息积压可能导致系统资源使用率上升,例如CPU利用率、内存占用、磁盘活动等。这是因为积压的消息需要占用系统资源来存储和管理。
- 消息丢失或过期:如果消息队列没有足够的容量来处理新消息,可能会导致旧消息被丢弃或过期,从而导致数据丢失。
- 系统警报和异常:一些监控系统会检测到消息积压问题,并发出警报或异常通知,以提醒运维团队或开发人员。
- 队列故障或堵塞:在极端情况下,消息积压可能导致消息队列系统的故障或堵塞,导致无法正常工作。
消息积压的常见原因:
- 生产者速度快于消费者:如果消息生产者(产生消息的系统或组件)的速度比消息消费者(处理消息的系统或组件)的速度快,就会导致消息积压。
- 消费者故障:如果消息消费者遇到故障或处理速度变慢,未能及时处理消息,也会导致积压。
- 高峰负载:系统在高峰时段接收到大量的消息,超过了正常处理速度。
- 消息处理失败:如果某些消息由于错误或异常而无法被正常处理,它们可能会在队列中积压。
消息积压可能导致的后果:
- 延迟:积压的消息会导致正常消息的处理延迟,从而影响了系统的响应时间。
- 系统负载:积压消息可能导致消息队列系统和消息处理组件的负载过高,甚至可能引发系统故障。
- 数据丢失:消息积压导致消息久久不被消费,有可能导致超时被丢弃掉,从而导致数据丢失。
- 不一致性:如果消息处理不及时,可能会导致系统数据的不一致性问题。
2.解决办法
一说到消息积压,很多同学的第一反应可能是加机器,确实加机器是能处理掉消息积压的,因为不管是何种原因造成的消息积压,上机器一定是有效果的。但是很明显这不是最合理的解法,针对mq消息积压其实存在一套成熟的打法,一共分三步:
- 事前处理机制
- 事中处理机制
- 事后处理机制
2.1.事前处理机制
事前处理机制其实就是要对系统上线前进行一个预估,预估一下系统的mq要面对的峰值流量是多大,然后用单台机器部署压测,根据压测来确定机器的大概数量。一般事前压测后都能解决绝大多数问题。直接上我们的性能测试神器,jmeter:
文章链接:
如何使用jmeter进行压测_使用jmeter压力测试__BugMan的博客-CSDN博客
2.2.事中处理机制
如果在生产中系统中确实出现了mq的消息积压,必须存在事中机制来处理了。事中机制的核心目标是——恢复正常。
怎么恢复正常喃?其实就是扩容,然后快速消费完积压的消息,扩容十倍、二十倍、三十倍都可以,达成目的即可。当然这种情况下可以通过k8s来进行动态缩/扩容是最合适的。
2.3.事后处理机制
发生生产环境的mq积压后,处理完mq的积压,需要加固一下系统,避免下一次的mq消息积压,一般事后的处理机制可以从如下几方面来预防下一次的mq消息积压:
- 提高消费并行度
- 跳过非重要的消费
- 优化每条消息的消费过程。
提供消费并行度和优化消息的消费路径这个没啥好说的,事儿都发生了,要是能力不够该扩容就扩容,要是逻辑不对,该优化耗时就优化耗时。着重要讲一下的是跳过非重要消费。
跳过非重要消费:
在实际生产中,消息的类型也是有轻重缓急的,如果实在是发生了消息的堆积,这时候我们可以先不去消费不重要的消息,先去消费重要的消息。比如先不去消费日志消息,先处理订单消息。我们可以给mq上一个监控,设置一个阈值,当超过这个阈值的时候,触发跳过机制。
这时候我想有些同学会有点疑惑,怎么去监控mq中的消息数量喃?有以下几种方式:
- JMX监控:如果你的MQ系统支持Java Management Extensions(JMX),你可以使用JMX客户端工具,如JConsole或VisualVM,连接到MQ服务器并监控队列的消息数量。这种方式能够实时获取消息数量。
- API调用:许多MQ系统提供API,允许你通过编程方式查询队列的状态信息,包括消息数量。你可以编写脚本或应用程序,定期调用API来获取消息数量,并在需要时进行处理或记录。
- 监控工具:使用第三方监控工具或平台,如Prometheus、Grafana、Datadog等,将其与MQ系统集成。这些工具可以定期查询MQ的消息数量,并通过仪表板、图表或警报提供实时监控。
- 日志分析:在MQ系统的日志文件中可能会包含消息数量的信息。你可以使用日志分析工具来实时监控日志文件,并提取消息数量的指标。