RMQ——支持合并和优先级的消息队列

简介: # 业务背景 在主图价格表达项目中需要实现一个功能,商品价格发生变化时将商品价格打印在商品主图上面,那么需要在价格发生变动的时候触发合成一张带价格的图片,每一次触发合图时计算价格都是获取当前最新的价格。上游价格变化的因素很多,变化很频繁,下游合图消耗GPU资源较大,处理容量较低。 上游生产速度很快,下游处理速度很慢,上下游处理速度存在巨大差距时,我们首先可以想到使用消息队列进行削峰填谷,比如R

业务背景

在主图价格表达项目中需要实现一个功能,商品价格发生变化时将商品价格打印在商品主图上面,那么需要在价格发生变动的时候触发合成一张带价格的图片,每一次触发合图时计算价格都是获取当前最新的价格。上游价格变化的因素很多,变化很频繁,下游合图消耗GPU资源较大,处理容量较低。
上游生产速度很快,下游处理速度很慢,上下游处理速度存在巨大差距时,我们首先可以想到使用消息队列进行削峰填谷,比如RocketMQ、Kafka。但是,在本项目的背景中,触发价格变化的来源很多,产生的触发消息可能存在大量重复,下游重复消费不但会浪费资源还会导致延迟。采用现有MQ消息队列的问题在于重复的消息无法合并处理以减少下游重复处理的次数。
在本项目中,由于合图资源有限,因此需要对不同等级的商家区分优先级处理。采用现有MQ消息队列的问题是,消息发生堆积后,消息只能按照FIFO(先进先出)顺序消费。由于无法区分优先级进行消费,紧急的任务也只能等待先到的任务先消费完成。
接下来,我将介绍一种可将消息合并处理并支持优先级的消息队列——RMQ,RMQ适用于重复消息比较频繁、上下游处理速度存在巨大差距的场景。

产品功能

RMQ是一个支持多Topic的消息队列,可以用作削峰填谷、异步解耦。相比已有的消息队列,他还具有消息合并和优先级的功能,这两个功能也是它存在的意义。

消息合并

RMQ是一个可合并消息的消息队列,如果消息堆积在消息队列中时,内容重复的消息会合并成一条。RMQ支持消息合并但是不支持消息去重,多条内容相同的消息堆积在RMQ中时,多条消息会被合并成一条消息,但一条消息可能由于系统宕机而被重复消费。
还有一种情况也无法避免,两条内容相同的消息先后产生,还没等到第二条消息产生,第一条消息就被消费了,紧接着第二条消息产生后也被消费了。但是,这种情况说明上下游处理速度不存在差距,业务上需要保障可以重复处理。

优先级

RMQ支持消息设置优先级,优先级分为高、中、低三个等级,优先级高的任务不管什么时候产生都会比优先级低的任务先执行,相同优先级的任务会随机被执行。

RocketMQ与RMQ功能对比

消息队列 堆积能力 顺序消息 优先级 消息合并 消息去重 可用性 应用场景
RocketMQ 海量 支持 不支持 不支持 不支持 高可用 削峰填谷、异步解耦、海量堆积、重复消息不多的场景
RMQ 亿级 不支持 支持 支持 不支持 高可用 消费填谷、异步解耦、消息存在重复、上游生产速度快,下游消费能力低的场景

消息合并与消息去重的差异?
消息合并是指,多个内容相同的消息只被消费一次。消息去重是指,同一个消息只被消费一次。

实现方案

为了快速实现RMQ并具备以上特性,我们选择站在巨人的肩膀上。我们选择Redis作为消息队列的存储,选择RocketMQ来维护消费集群。RMQ总体架构图如下所示。
总体框架.png
首先,生产者需要在配置管理服务中注册一个topic才能发送消息,消费者需要在配置管理服务绑定一个topic才能接收消息。然后,生产者发送消息到消费队列服务,配置管理服务会定时通过心跳发送绑定的topic信息到消费者,消费者根据topic信息从消费队列服务中拉取消息进行消费。
接下来将从消息队列服务、配置管理服务、生产者和消费者四个方面详细阐述。

消息队列服务

消息队列服务主要负责消息的存储,在这里实现了RMQ的消息合并和优先级的特性。消息队列服务借助Redis进行实现,Redis的有序集合中的元素具有唯一性,这个特点可以帮助RMQ实现消息的合并,Redis有序集合中的元素根据分数进行排序,这个特点可以帮助RMQ实现优先级的功能。基于Redis的ZSet数据结构设计了RMQ的存储结构,存储设计的框架图如下图所示。存储设计.png

SlotKey和StoreQueue的设计

一个Topic可以根据预估数据量划分固定的槽数量,槽数量一定需要是2的n次幂,上图中topic划分了8个槽位,编号0-7。生产者将消息体序列化成字符串,并计算字符串的CRC32值,CRC32值对槽数量进行取模得到槽序号,topic和槽序号拼接组装成SlotKey(也即Redis的键),每个SlotKey对应一个StoreQueue,StoreQueue使用有序集合ZSet作为存储结构,这样内容相同的消息体就会落在同一个StoreQueue里面,所以内容相同的消息会进行合并。
Redis的有序集合底层采用压缩列表或者跳跃表实现,当数据量小的时候采用压缩列表,数据量大的时候采用跳跃表。有序集合中的元素由分数和字符串组成,元素按照分数进行排序。在RMQ的存储设计中,使用分数来表示优先级,因此消息按照优先级进行排序,消费者每次都拉取优先级最大的消息。

PrepareQueue的设计

为了保障RMQ的可用性,做到每条消息至少消费一次,消费者不是直接pop有序集合中的元素,而是将元素从StoreQueue移动到PrepareQueue并返回消息给消费者,等消费成功后再从PrepareQueue从删除,或者消费失败后从PreapreQueue重新移动到StoreQueue,这便是根据二阶段提交的思想实现的二阶段消费。
在消费者章节将会详细介绍二阶段消费的实现思路,这里重点介绍下PrepareQueue的存储设计。一个topic只有一个PrepareQueue,对应的SlotKey为${topic}_PrepareQueue,PrepareQueue采用有序集合作为存储,消息移动到PrepareQueue时刻对应的时间戳作为分数,字符串依然是消息体内容。
为什么需要使用时间戳作为分数呢?正常情况下,消费者不管消费失败还是消费成功,都会从PrepareQueue删除消息,当消费者系统发生异常或者宕机的时候,消息就无法从PrepareQueue中删除,我们也不知道消费者是否消费成功,为保障消息至少被消费一次,我们需要做到超时回滚,因此需要保存时间戳。当PrepareQueue中的消息发生超时的时候,将消息从PrepareQueue移动到StoreQueue。判断PrepareQueue中消息是否超时只需要查询分数最小的消息是否已经超时,使用有序集合可以有效的提升性能。

死信队列的设计

如果消息消费失败,并且重试消费了16次依然失败,那么需要将消息存入到死信队列里面。一个topic只有一个死信队列,对应的SlotKey为${topic}_DeadQueue,采用Redis的列表结构存储。存储在死信队列的消费无法再被消费。

配置管理服务

为了快速实现RMQ,并没有采用类似RocketMQ的配置管理服务NameServer,而是利用RocketMQ发送心跳消息给集群消费,消费集群根据心跳消息中的topic信息从消息队列服务从拉取消息进行消费。配置管理服务的工作流程如下所示:

  1. 生产者在配置管理服务注册topic,并指定topic划分的槽数量SlotNumber。
  2. 消费者在配置管理服务中绑定消费topic。
  3. 配置管理服务通过RocketMQ定时发送心跳消息给消费集群,心跳消息中包含消费者订阅的topic信息。
  4. 消费者接收到心跳消息后,解析消息并把订阅的topic信息存储在本机。

生产者

业务系统中引入RMQ二方包后,可以调用生产者接口发送消息,生产者的主要工作就是将需要发送的内容序列化后存储在对应的位置,生产的工作流程如下所示:

  1. 生产者将需要发送的内容序列化成字符串,因为RMQ是根据消息内容进行合并的,所以业务上需要只将必要的信息存储在消息内容里面。
  2. 根据消息内容字符串计算CRC32值,并对槽数量进行取模,这里采用位运算&代替取模运算可以提升计算性能,并减少冲突、分布更均匀,因此槽数量一定要是2的n次幂。模数就是槽的序号。
  3. 根据topic和第2步骤求得的槽序号组装成SlotKey,组装规则是${topic}_${槽序号}。
  4. 将业务设置的优先级转换成double类型的分数,高优先级对应分数18.0,中优先级对应分数17.0,低优先级对应分数16.0(为何这样设计将在消费者章节中讲解)。
  5. 调用消息队列服务接口发送消息,即执行Redis命令sadd,将分数和消息体内容存储到对应的键值中。

消费者

业务系统消费消息需引入RMQ二方包,并只需实现一个消费的Handler,RMQ消费者端会自动从消息队列服务拉取消息回调业务Handler进行消费。在展开消费者端整体工作流程之前,我们先看下消费者端的两个重要问题,如何保证消息至少消费一次?消费失败重试如何实现?

至少消费一次问题

三种消费模式

一般消息队列存在三种消费模式,分别是:最多消费一次、至少消费一次、只消费一次。最多消费一次模式消息可能丢失,一般不怎么使用。至少消费一次模式消息不会丢失,但是可能存在重复消费,比较常用。只消费一次模式消息被精确只消费一次,实现较困难,一般需要业务记录幂等ID来实现。RMQ实现了至少消费一次的模式,那么如何保证消息至少被消费一次呢?

至少消费一次模式实现的难点

从最简单的消费模式——最多消费一次说起,消费者端只需要从消息队列服务中取出消息就行,即执行Redis的zpopmax命令,不伦消费者是否接收到该消息并成功消费,消息队列服务都认为消息消费成功。最多一次消费模式导致消息丢失的因素可能有:网络丢包导致消费者没有接收到消息,消费者接收到消息但在消费的时候宕机了,消费者接收到消息但消费失败。针对消费失败导致消息丢失的情况比较好解决,只需要把消费失败的消息重新放入消息队列服务就行,但是网络丢包和消费系统异常导致的消息丢失问题不好解决。
可能有人会想到,我们不把元素从有序集合中pop出来,我们先查询优先级最高的元素,然后消费,再删除消费成功的元素,但是这样消息服务队列就变成了同步阻塞队列,性能会很差。

至少消费一次模式的实现

至少消费一次的问题比较类似银行转账问题,A向B账户转账100元,如何保障A账户扣减100同时B账户增加100,因此我们可以想到二阶段提交的思想。第一个准备阶段,A、B分别进行资源冻结并持久化undo和redo日志,A、B分别告诉协调者已经准备好;第二个提交阶段,协调者告诉A、B进行提交,A、B分别提交事务。RMQ基于二阶段提交的思想来实现至少消费一次的模式。
RMQ存储设计中PrepareQueue的作用就是用来冻结资源并记录事务日志,消费者端即是参与者也是协调者。第一个准备阶段,消费者端通过Redis事务将指定消息从StoreQueue移动到PrepareQueue,同时消息传输到消费者端,消费者端消费该消息;第二个提交阶段,消费者端根据消费结果是否成功协调消息队列服务是否回滚,如果消费成功则提交事务,该消息从PrepareQueue中删除,如果消费失败则回滚事务,消费者端通过Redis事务将该消息从PrepareQueue移动到StoreQueue,如果因为各种异常导致PrepareQueue中消息滞留超时,将自动执行回滚操作。如何实现事务将指定消息在StoreQueue和PrepareQueue之间移动呢,Redis可以用Lua脚本实现。二阶段消费的流程图如下所示:

实现方案的异常情况分析

我们来分析下采用二阶段消费方案可能存在的异常情况,从以下分析来看二阶段消费方案可以保障消息至少被消费一次。

  1. 网络丢包导致消费者没有接收到消息,这时消息已经记录到PrepareQueue,如果到了超时时间,消息被回滚放回StoreQueue,等待下次被消费,消息不丢失。
  2. 消费者接收到了消息,但是消费者还没来得及消费完成系统就宕机了,消息消费超时到了后,消息会被重新放入StoreQueue,等待下次被消费,消息不丢失。
  3. 消费者接收到了消息并消费成功,消费者端在协调事务提交的时候宕机了,消息消费超时到了后,消息会被重新放入StoreQueue,等待下次被消费,消息被重复消费。
  4. 消费者接收到了消息但消费失败,消费者端在协调事务提交的时候宕机了,消息消费超时到了后,消息会被重新放入StoreQueue,等待下次被消费,消息不丢失。
  5. 消费者接收到了消息并消费成功,但是由于fullgc等原因使消费时间太长,PrepareQueue中的消息由于超时已经回滚到StoreQueue,等待下次被消费,消息被重复消费。

重试次数控制

RMQ支持消费失败后重试16次,重试16次后还是失败则转移到死信队列,死信队列中的消息无法再被消费。失败重试16次的控制是如何做到的呢?在生产者章节中我们说到,高优先级对应分数18.0,中优先级对应分数17.0,低优先级对应分数16.0,如果消息消费失败,则分数减1,直到分数等于0时放入死信队列。由此可知,重试消息的优先级会不断降低,重试消息消费的间隔时间会逐渐增长。

整体工作流程

消费者端整体的工作流程如下所示。消费线程循环随机遍历订阅topic中的所有槽SlotKey,随机遍历是为了让多个topic的多个槽被均匀消费。定时3s逻辑是为了使用消费者端实现PrepareQueue超时回滚功能,PrepareQueue中需要超时回滚的情况一般是由于系统重启、系统宕机、网络丢包导致,一般不会出现很多消息需要超时回滚,所以这里采用定时3s检查避免性能消耗。

实际效果

从实现方案中可以看出RMQ强依赖于Redis,涉及到的Redis命令时间复杂度为O(1)或O(logn),得益于Redis的高性能,RMQ的性能也是非常高。
在主图价格表达项目中,商品价格发生变化后需要进行合图,商品价格变化来源较多,触发合图消息重复概率较高,且下游合图处理速度较慢,我们需要尽可能合并触发合图消息,减轻下游处理压力,于是我们使用了RMQ作为消息中间件来进行削峰填谷、消息合并。不仅如此,我们还根据商家等级划分触发合图消息的等级,使KA商家能够优先得到处理,缩短价格变化的延迟。
下图是主图价格表达项目中的触发合图消息发送量的监控,先看最左边的尖刺,在两分钟内发送了500w条消息,消息发送的TPS达4.1w。再看右边的尖刺,由于多个来源同时触发导致触发消息大量重复,RMQ对消息进行了合并,合并率高达82%。
image.png
下图是主图价格表达项目中合图消息消费监控,在25分钟内消费了500w条消息,消费TPS达3300。当然,RMQ的消费速度远不止这些,RMQ的消费速度取决于消费的RT,在不执行任何业务逻辑的压测情况下,RMQ的消费TPS可达4W,如果增加消费线程可以达到更高的速度。
image.png

未来展望

完善配置管理服务

目前配置管理服务依赖于RocketMQ实现,实现方式很重,未来可以考虑使用zookeeper或者自己实现类似NameServer的服务。目前没有配置管理后台,注册、订阅都是代码写死,未来需要独立的可视化配置管理后台。

支持任意延迟时间的消息

RocketMQ支持延迟消息,但是只支持几个等级的延迟消息,比如延迟1s、5s、10s、30m、2h等。很多场景需要能够设置任意的延迟时间,比如许多TOC超时场景,订单超时关闭、任务超时关闭、活动结束后清理等。由于RMQ的存储设计是基于Redis的有序列表,因此可以做到设置任意延迟时间的消息。主要的实现要点就是把延迟时间作为分数,消息根据延迟时间从小到大排序,只需要不断拉取分数小于当前时间戳的元素进行消费就行。

独立化运营

RMQ想法诞生于主图价格表达项目,目前只有了个雏形,需要不断的完善其功能,未来期望它可以独立成一个中间件,为更多的业务提供支持。RMQ强依赖于Redis,存储容量受Redis的限制,也许未来它可以自己实现一个基于文件的存储系统。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3月前
|
消息中间件 存储 NoSQL
python 使用redis实现支持优先级的消息队列详细说明和代码
python 使用redis实现支持优先级的消息队列详细说明和代码
59 0
|
消息中间件 JavaScript 前端开发
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
118 0
|
消息中间件 JavaScript 前端开发
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
76 0
|
消息中间件 JavaScript 前端开发
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
84 0
|
消息中间件 JavaScript
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
136 0
|
消息中间件 JavaScript
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
|
消息中间件 JavaScript 前端开发
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
184 0
|
6月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。