RocketMQ系列 | 容量削峰填谷后,发送的消息“少”了怎么办!!??

简介: 如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点。最后在被消费的消息和服务端存储最小位点之间的消息就丢失了

业务背景

一个业务实体的属性出现变更,会刷新用户域、订单域、商品域等多个域冗余的数据。变更数据涉及到的数据量大时,会比较耗时、耗内存。

如果同时变更的数据较多时,就超过当前服务的容量,JVM会频繁FullGC,继而pod重启。数据变更的功能不可用了。解决方案1:加机器。增加系统容量
解决方案2:数据源更新与刷新其它业务域的冗余数据解耦。使用MQ来异步刷新冗余的数据实现容量的削峰填谷。

最终使用了方案2,使用目前项目中使用的消息中间件RocketMQ。原因是这个场景并不高频,可能通过控制MQ消费线程数来减少对机器资源的消耗。此处设置为2
方案2上线运行一段时间后,出现一个现象:

变更的事件消息会偶发性的丢失

现象:

1、可以找到到发送成功的日志。

2、疑似丢失的消息,在用户域、商品域找到接收消息并消费成功的日志,但在订单域中没有找到接收消息的日志。

3、订单域一直在刷新冗余数据。未消费且过期的消息,会被Rocket服务端删除。

解决办法:

1、优化数据刷新的逻辑,减少对内存的消耗。
通过翻页获取数据的方式小步快走
方式小批量获取数据、刷新数据。

2、增加RocketMQ的消费线程数。从2调整为8

事件消息会偶发性丢失的原因分析

过期清理机制引发消息丢失

消息按照到达服务器的先后顺序被存储到队列中,理论上每个队列都支持无限存储。

但是在实际部署场景中,服务端节点的物理存储空间有限,消息无法做到永久存储。

RocketMQ 使用存储时长作为消息存储的依据。


在存储时长范围内的消息都会被保留,无论消息是否被消费;

超过时长限制的消息则会被清理掉

JackieTang,公众号:的数字化之路RocketMQ系列 | 如何让消息“丢失”?

RocketMQ如何判定一个消息有没有过期呢?

要讲清楚这个问题,就不得不先聊明白消费进度管理。

消费进度原理

消息位点(Offset)

RocketMQ领域中消息是按到达服务端的先后顺序存储在指定主题[Topic]的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。

任意一个消息队列在逻辑上都是无限存储,即消息位点会从0到Long.MAX无限增加。通过主题、队列和位点就可以定位任意一条消息的位置,具体关系如下图所示:

RocketMQ领域中定义队列中
最早一条消息的位点为最小消息位点(MinOffset);
最新一条消息的位点为最大消息位点(MaxOffset)。

虽然消息队列逻辑上是无限存储,但由于服务端物理节点的存储空间有限,RocketMQ会滚动删除队列中存储最早的消息。因此,消息的最小消费位点和最大消费位点会一直递增变化。

消费位点(ConsumerOffset)

RocketMQ领域模型为发布订阅模式,每个主题的队列都可以被多个消费者分组订阅。若某条消息被某个消费者消费后直接被删除,则其他订阅了该主题的消费者将无法消费该消息。

因此,RocketMQ通过消费位点管理消息的消费进度。每条消息被某个消费者消费完成后不会立即在队列中删除,云消息队列 RocketMQ 版会基于每个消费者分组维护一份消费记录,该记录指定消费者分组消费某一个队列时,消费过的最新一条消息的位点,即消费位点。

当消费者客户端离线,又再次重新上线时,会严格按照服务端保存的消费进度继续处理消息。

如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点

那么,历史消息会保存多久呢?也就是如何判定一个消息在服务端有没有过期呢?

看情况。不同的RocketMQ服务器都会不同。以阿里的云消息队列RocketMQ版为例:

  • 5.0系列实例:
  • 最短24小时。
  • 最长720小时。
  • 4.0系列实例:
  • 标准版:存储时长为3天,超过时间将自动滚动删除。
  • 企业铂金版:存储时长为3天,若您购买实例的存储空间规格不足时,云消息队列 RocketMQ 版会按存储时间滚动删除最早的消息,此时消息的存储时长可能不足3天。


队列中消息位点MinOffset、MaxOffset和每个消费者分组的消费位点ConsumerOffset的关系如下:

  • ConsumerOffset≤MaxOffset:
  • 当消费速度和生产速度一致,且全部消息都处理完成时,最大消息位点和消费位点相同,即ConsumerOffset=MaxOffset。
  • 当消费速度较慢小于生产速度时,队列中会有部分消息未消费,此时消费位点小于最大消息位点,即ConsumerOffset<MaxOffset,两者之差就是该队列中堆积的消息量。
  • ConsumerOffset≥MinOffset:正常情况下有效的消费位点ConsumerOffset必然大于等于最小消息位点MinOffset。消费位点小于最小消息位点时是无效的,相当于消费者要消费的消息已经从队列中删除了,是无法消费到的,此时服务端会将消费位点强制纠正到合法的消息位点。

消费位点初始值

消费位点初始值指的是消费者分组[Group ID]首次启动消费者消费消息时,服务端保存的消费位点的初始值。

RocketMQ定义消费位点的初始值为消费者首次获取消息时,该时刻队列中的最大消息位点。相当于消费者将从队列中最新的消息开始消费

小结

结合消费进度管理和目前遇到的因为消费慢引发的消息丢失问题,我们来还原下消息丢失的原因:事件消息发出后,由于订单域消费消息的速度低于生产,然后出现消息堆积。

订单服务上线新需求,老的RocketMQ消费客户端下线。

上线完成后,启动新的RocketMQ消费客户端。

新的RocketMQ消费者[Group ID]从RocketMQ Broker服务器拉取消息。

如果RocketMQ服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点。

这些未消费且过期的消息,就会被删除。

从业务上看,这些消息是丢失了。实际上,即使订单服务没有重新发布,也会出现消息丢失。

因为过期的消息已经从RocketMQ服务端自动滚动删除了


参考

消费进度管理:https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/consumer-progress-management


RocketMQ系列 | 如何让消息“丢失”?

https://mp.weixin.qq.com/s/RnS675dt-wErnEuolK6Zeg


RocketMQ系列 | 容量削峰填谷后,发送的消息“少”了怎么办!!??

https://mp.weixin.qq.com/s/kejgc_u8GHdXrI4uW9TWNw

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
23天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
63 5
|
18天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
63 7
|
21天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
29天前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
1月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
64 4
|
2月前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
80 16
|
2月前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
73 9