业务背景
一个业务实体的属性出现变更,会刷新用户域、订单域、商品域等多个域冗余的数据。变更数据涉及到的数据量大时,会比较耗时、耗内存。
如果同时变更的数据较多时,就超过当前服务的容量,JVM会频繁FullGC,继而pod重启。数据变更的功能不可用了。解决方案1:加机器。增加系统容量
解决方案2:数据源更新与刷新其它业务域的冗余数据解耦。使用MQ来异步刷新冗余的数据实现容量的削峰填谷。
最终使用了方案2,使用目前项目中使用的消息中间件RocketMQ。原因是这个场景并不高频,可能通过控制MQ消费线程数来减少对机器资源的消耗。此处设置为2
方案2上线运行一段时间后,出现一个现象:
变更的事件消息会偶发性的丢失
现象:
1、可以找到到发送成功的日志。
2、疑似丢失的消息,在用户域、商品域找到接收消息并消费成功的日志,但在订单域中没有找到接收消息的日志。
3、订单域一直在刷新冗余数据。未消费且过期的消息,会被Rocket服务端删除。
解决办法:
1、优化数据刷新的逻辑,减少对内存的消耗。
通过翻页获取数据的方式小步快走的方式小批量获取数据、刷新数据。
2、增加RocketMQ的消费线程数。从2调整为8。
事件消息会偶发性丢失的原因分析
过期清理机制引发消息丢失:
消息按照到达服务器的先后顺序被存储到队列中,理论上每个队列都支持无限存储。
但是在实际部署场景中,服务端节点的物理存储空间有限,消息无法做到永久存储。
RocketMQ 使用存储时长作为消息存储的依据。
在存储时长范围内的消息都会被保留,无论消息是否被消费;
超过时长限制的消息则会被清理掉。
JackieTang,公众号:的数字化之路RocketMQ系列 | 如何让消息“丢失”?
RocketMQ如何判定一个消息有没有过期呢?
要讲清楚这个问题,就不得不先聊明白消费进度管理。
消费进度原理
消息位点(Offset)
RocketMQ领域中消息是按到达服务端的先后顺序存储在指定主题[Topic]的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。
任意一个消息队列在逻辑上都是无限存储,即消息位点会从0到Long.MAX无限增加。通过主题、队列和位点就可以定位任意一条消息的位置,具体关系如下图所示:
RocketMQ领域中定义队列中
最早一条消息的位点为最小消息位点(MinOffset);
最新一条消息的位点为最大消息位点(MaxOffset)。
虽然消息队列逻辑上是无限存储,但由于服务端物理节点的存储空间有限,RocketMQ会滚动删除队列中存储最早的消息。因此,消息的最小消费位点和最大消费位点会一直递增变化。
消费位点(ConsumerOffset)
RocketMQ领域模型为发布订阅模式,每个主题的队列都可以被多个消费者分组订阅。若某条消息被某个消费者消费后直接被删除,则其他订阅了该主题的消费者将无法消费该消息。
因此,RocketMQ通过消费位点管理消息的消费进度。每条消息被某个消费者消费完成后不会立即在队列中删除,云消息队列 RocketMQ 版会基于每个消费者分组维护一份消费记录,该记录指定消费者分组消费某一个队列时,消费过的最新一条消息的位点,即消费位点。
当消费者客户端离线,又再次重新上线时,会严格按照服务端保存的消费进度继续处理消息。
如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点。
那么,历史消息会保存多久呢?也就是如何判定一个消息在服务端有没有过期呢?
看情况。不同的RocketMQ服务器都会不同。以阿里的云消息队列RocketMQ版为例:
- 5.0系列实例:
- 最短24小时。
- 最长720小时。
- 4.0系列实例:
- 标准版:存储时长为3天,超过时间将自动滚动删除。
- 企业铂金版:存储时长为3天,若您购买实例的存储空间规格不足时,云消息队列 RocketMQ 版会按存储时间滚动删除最早的消息,此时消息的存储时长可能不足3天。
队列中消息位点MinOffset、MaxOffset和每个消费者分组的消费位点ConsumerOffset的关系如下:
- ConsumerOffset≤MaxOffset:
- 当消费速度和生产速度一致,且全部消息都处理完成时,最大消息位点和消费位点相同,即ConsumerOffset=MaxOffset。
- 当消费速度较慢小于生产速度时,队列中会有部分消息未消费,此时消费位点小于最大消息位点,即ConsumerOffset<MaxOffset,两者之差就是该队列中堆积的消息量。
- ConsumerOffset≥MinOffset:正常情况下有效的消费位点ConsumerOffset必然大于等于最小消息位点MinOffset。消费位点小于最小消息位点时是无效的,相当于消费者要消费的消息已经从队列中删除了,是无法消费到的,此时服务端会将消费位点强制纠正到合法的消息位点。
消费位点初始值
消费位点初始值指的是消费者分组[Group ID]首次启动消费者消费消息时,服务端保存的消费位点的初始值。
RocketMQ定义消费位点的初始值为消费者首次获取消息时,该时刻队列中的最大消息位点。相当于消费者将从队列中最新的消息开始消费。
小结
结合消费进度管理和目前遇到的因为消费慢引发的消息丢失问题,我们来还原下消息丢失的原因:事件消息发出后,由于订单域消费消息的速度低于生产,然后出现消息堆积。
订单服务上线新需求,老的RocketMQ消费客户端下线。
上线完成后,启动新的RocketMQ消费客户端。
新的RocketMQ消费者[Group ID]从RocketMQ Broker服务器拉取消息。
如果RocketMQ服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点。
这些未消费且过期的消息,就会被删除。
从业务上看,这些消息是丢失了。实际上,即使订单服务没有重新发布,也会出现消息丢失。
因为过期的消息已经从RocketMQ服务端自动滚动删除了。
参考
RocketMQ系列 | 如何让消息“丢失”?
https://mp.weixin.qq.com/s/RnS675dt-wErnEuolK6Zeg