原创 默达 淘系技术 4月29日
说到消息队列,首先映入脑海的就是Kafka等,消息队列在各个领域都发挥了很大的作用。但是,在一些场景下,传统的消息队列Kafka无法满足需求,比如以下场景:
- 消息重复概率比较高时,需要对重复消息进行合并处理避免浪费有限的资源,减少消费延迟;
- 需要根据业务自定义优先级进行消息处理,高优先级的消息比低优先级的消息先处理;
- 消息需要定时消费的场景,消息只有在设定的消费时间到了之后立马被消费。
本文将介绍一种基于Redis实现的消息队列(Redis message queue, RMQ),RMQ可以作为传统消息队列的互补选择,在传统消息队列没有涉及的场景中使用RMQ。
功能介绍
RMQ设计为一个二方库,可以帮助用户基于Redis快速实现消息队列的功能,RMQ消息队列具有消息合并、区分优先级、支持定时消息等特性。RMQ消息队列可以用于异步解耦、削峰填谷,支持亿级数据堆积。RMQ消息队列目前支持三种类型的消息,分别是RangeMergeMessage(区间重复合并消息)、PriorityMessage(优先级消息)、FixedTimeMessage(任意定时消息)。
▐ 区间重复合并消息
RangeMergeMessage支持区间重复消息合并,发送消息时需要设置时间区间,消息延迟该时间区间长度后被消费,在该时间区间内如果发送重复的消息,重复消息将会被合并。如果消息在Redis服务端发生堆积,重复到来的消息依然会被合并处理。
该类型消息适用于消息重复率较高且希望重复消息合并处理的场景,对重复消息进行合并可以减少下游消费系统的压力,减少不必要的资源消耗,将有限的资源最大化的利用,提升消费效率。
▐ 优先级消息
PriorityMessage支持给消息设置任意等级的优先级,优先级高的消息会被优先消费,相同优先级的消息被随机消费。如果消息在Redis服务端发生堆积,重复的消息将被合并处理,合并后消息的优先级等于最后存储的消息的优先级。该类型消息适用于希望重复消息合并处理且需要设置优先级的场景,下游消费者资源有限时,合并重复消息且优先处理优先级高的消息将可以合理利用有限的资源。
▐ 任意定时消息
FixedTimeMessage支持给消息设置任意消费时间,只有消费时间到了之后消息才被消费,消费时间可精确到秒。消息到期后没有及时被消费时,消费者将按照时间由远及近进行消费。如果消息在Redis服务端发生堆积,重复的消息将被合并处理,合并后消息的消费时间等于最后存储的消息的消费时间。
该类型消息适用于希望重复消息合并处理且需要定时消费的场景,定时消息应用场景非常丰富,比如定时打标去标、活动结束后清理动作、订单超时关闭等。
▐ 并发消费控制
使用传统消息中间件进行集群消费的时候,为了避免并发处理同一元数据导致不一致问题,通常需要对元数据加分布式锁,频繁的锁冲突会导致消费效率低下。加分布式锁的最终目的其实就是保障属于同一元数据的消息被串行消费。加分布式锁并不是最好的方案,最好的方案应该是从根上解决并发问题,让属于同一元数据的消息串行消费。
RMQ消息队列具有并发消费控制能力,属于同一元数据的消息只会被分配给全局唯一一个线程进行消费,因此属于同一元数据的消息将被串行消费。使用方如果需要该能力,除了需要提供Redis,还需要提供ZooKeeper。
▐ 重试次数控制
RMQ消息队列支持失败重试消费16次,业务返回消费失败后,消息会被回滚并等待重试消费,重试16次后消息进入死信队列,消息不再被消费,除非人工干预。
技术原理
▐ 总体框架
RMQ消息队列由三部分组成,分别为ZooKeeper、RMQ二方库、Redis。ZooKeeper负责维护集群worker的信息,将topic的所有slot分配给全局的woker。Redis负责存储消息,采用Sorted Set结构存储,Store Queue是消息存放的队列,Prepare Queue是采用二阶段消费方式正在消费的消息存放队列,Dead Queue是死信队列。RMQ二方库由RmqClient、Consumer、Producer三部分组成。RmqClient负责RMQ的启动工作,包括上报TopicDef、Worker给ZooKeeper,分配Slot给Worker,扫描业务定义的MessageListener Bean。Producer负责根据不用消息类型将消息按照指定的方式存储到Redis。Consumer负责根据不用消息类型按照指定方式从Redis弹出消息并调用业务的MessageListener。
▐ 消息存储
- Topic的设计
Topic的定义有三部分组成,topic表示主题名称,slotAmount表示消息存储划分的槽数量,topicType表示消息的类型。主题名称是一个Topic的唯一标示,相同主题名称Topic的slotAmount和topicType一定是一样的。
消息存储采用Redis的Sorted Set结构,为了支持大量消息的堆积,需要把消息分散存储到很多个槽中,slotAmount表示该Topic消息存储共使用的槽数量,槽数量一定需要是2的n次幂。在消息存储的时候,采用对指定数据或者消息体哈希求余得到槽位置。
- StoreQueue的设计
上图中topic划分了8个槽位,编号0-7。如果发送方指定了消息的slotBasis,则计算slotBasis的CRC32值,CRC32值对槽数量进行取模得到槽序号,SlotKey设计为#{topic}_#{index}(也即Redis的键),其中#{}表示占位符。
发送方需要保证相同内容的消息的slotBasis相同,如果没有指定slotBasis则采用消息内容计算SlotKey,这样内容相同的消息体就会落在同一个Sorted Set里面,所以内容相同的消息会进行合并。
Redis的Sorted Set中的数据按照分数排序,实现不同类型的消息的关键就在于如何利用分数、如何添加消息到Sorted Set、如何从Sorted Set中弹出消息。优先级消息将优先级作为分数,消费时每次弹出分数最大的消息。任意定时消息将时间戳作为分数,消费时每次弹出分数大于当前时间戳的一个消息。
区间重复合并消息将时间戳作为分数,添加消息时将(当前时间戳+时间区间)作为分数,消费时每次弹出分数大于当前时间戳的一个消息。
- PrepareQueue的设计
为了保障RMQ消息队列的可用性,做到每条消息至少消费一次,消费者不是直接pop有序集合中的元素,而是将元素从StoreQueue移动到PrepareQueue并返回消息给消费者,等消费成功后再从PrepareQueue从删除,或者消费失败后从PreapreQueue重新移动到StoreQueue,这便是根据二阶段提交的思想实现的二阶段消费。
在后面将会详细介绍二阶段消费的实现思路,这里重点介绍下PrepareQueue的存储设计。StoreQueue中每一个Slot对应PrepareQueue中的Slot,PrepareQueue的SlotKey设计为prepare{#{topic}#{index}}。PrepareQueue采用Sorted Set作为存储,消息移动到PrepareQueue时刻对应的(秒级时间戳*1000+重试次数)作为分数,字符串存储的是消息体内容。这里分数的设计与重试次数的设计密切相关,所以在重试次数设计章节详细介绍。
PrepareQueue的SlotKey设计中需要注意的一点,由于消息从StoreQueue移动到PrepareQueue是通过Lua脚本操作的,因此需要保证Lua脚本操作的Slot在同一个Redis节点上,如何保证PrepareQueue的SlotKey和对应的StoreQueue的SlotKey被hash到同一个Redis槽中呢。Redis的hash tag功能可以指定SlotKey中只有某一部分参与计算hash,这一部分采用{}包括,因此PrepareQueue的SlotKey中采用{}包括了StoreQueue的SlotKey。
- DeadQueue的设计
消息重试消费16次后,消息将进入DeadQueue。DeadQueue的SlotKey设计为prepare{#{topic}#{index}},这里同样采用hash tag功能保证DeadQueue的SlotKey与对应StoreQueue的SlotKey存储在同一Redis节点。
▐ 生产者
生产者的任务就是将消息添加到Redis的Sorted Set中。首先,需要计算出消息添加到Redis的SlotKey,如果发送方指定了消息的slotBasis(否则采用content代替),则计算slotBasis的CRC32值,CRC32值对槽数量进行取模得到槽序号,SlotKey设计为#{topic}_#{index},其中#{}表示占位符。然后,不同类型的消息有不同的添加方式,因此分布讲述三种类型消息的添加过程。
- 区间重复合并消息
发送该消息时需要设置timeRange,timeRange必须大于0,单位为毫秒,表示消息将延迟timeRange毫秒后被消费,期间到来的重复消息将被合并,合并后的消息依然维持原来的消费时间。
因此在存储该类型消息的时候,采用(当前时间戳+timeRange)作为分数,添加消息采用Lua脚本执行,保证操作的原子性,Lua脚本首先采用zscore命令检查消息是否已经存在,如果已经存在则直接返回,如果不存在则执行zadd命令添加。
- 优先级消息
发送该消息时需要设置priority,priority必须大于16,表示消息的优先级,数值越大表示优先级越高。因此在存储该类型消息的时候,采用priority作为分数,采用zadd命令直接添加。
- 任意定时消息
发送该类型消息时需要设置fixedTime,fixedTime必须大于当前时间,表示消费时间戳,当前时间大于该消费时间戳的时候,消息才会被消费。因此在存储该类型消息的时候,采用fixedTime作为分数,采用命令zadd直接添加。
▐ 消费者
- 二阶段消费方式
三种消费模式
一般消息队列存在三种消费模式,分别是:最多消费一次、至少消费一次、只消费一次。最多消费一次模式消息可能丢失,一般不怎么使用。至少消费一次模式消息不会丢失,但是可能存在重复消费,比较常用。只消费一次模式消息被精确只消费一次,实现较困难,一般需要业务记录幂等ID来实现。RMQ实现了至少消费一次的模式,那么如何保证消息至少被消费一次呢?
至少消费一次模式实现的难点
从最简单的消费模式——最多消费一次说起,消费者端只需要从消息队列服务中取出消息就行,即执行Redis的zpopmax命令,不伦消费者是否接收到该消息并成功消费,消息队列服务都认为消息消费成功。最多一次消费模式导致消息丢失的因素可能有:网络丢包导致消费者没有接收到消息,消费者接收到消息但在消费的时候宕机了,消费者接收到消息但消费失败。针对消费失败导致消息丢失的情况比较好解决,只需要把消费失败的消息重新放入消息队列服务就行,但是网络丢包和消费系统异常导致的消息丢失问题不好解决。
可能有人会想到,我们不把元素从有序集合中pop出来,我们先查询优先级最高的元素,然后消费,再删除消费成功的元素,但是这样消息服务队列就变成了同步阻塞队列,性能会很差。
至少消费一次模式的实现
至少消费一次的问题比较类似银行转账问题,A向B账户转账100元,如何保障A账户扣减100同时B账户增加100,因此我们可以想到二阶段提交的思想。第一个准备阶段,A、B分别进行资源冻结并持久化undo和redo日志,A、B分别告诉协调者已经准备好;第二个提交阶段,协调者告诉A、B进行提交,A、B分别提交事务。RMQ基于二阶段提交的思想来实现至少消费一次的模式。
RMQ存储设计中PrepareQueue的作用就是用来冻结资源并记录事务日志,消费者端即是参与者也是协调者。第一个准备阶段,消费者端通过执行Lua脚本从StoreQueue中Pop消息并存储到PrepareQueue,同时消息传输到消费者端,消费者端消费该消息;第二个提交阶段,消费者端根据消费结果是否成功协调消息队列服务是提交还是回滚,如果消费成功则提交事务,该消息从PrepareQueue中删除,如果消费失败则回滚事务,消费者端将该消息从PrepareQueue移动到StoreQueue,如果因为各种异常导致PrepareQueue中消息滞留超时,超时后将自动执行回滚操作。二阶段消费的流程图如下所示。
实现方案的异常情况分析
我们来分析下采用二阶段消费方案可能存在的异常情况,从以下分析来看二阶段消费方案可以保障消息至少被消费一次。
- 网络丢包导致消费者没有接收到消息,这时消息已经记录到PrepareQueue,如果到了超时时间,消息被回滚放回StoreQueue,等待下次被消费,消息不丢失。
- 消费者接收到了消息,但是消费者还没来得及消费完成系统就宕机了,消息消费超时到了后,消息会被重新放入StoreQueue,等待下次被消费,消息不丢失。
- 消费者接收到了消息并消费成功,消费者端在协调事务提交的时候宕机了,消息消费超时到了后,消息会被重新放入StoreQueue,等待下次被消费,消息被重复消费。
- 消费者接收到了消息但消费失败,消费者端在协调事务提交的时候宕机了,消息消费超时到了后,消息会被重新放入StoreQueue,等待下次被消费,消息不丢失。
- 消费者接收到了消息并消费成功,但是由于fullgc等原因使消费时间太长,PrepareQueue中的消息由于超时已经回滚到StoreQueue,等待下次被消费,消息被重复消费。
- 重试次数控制的实现
采用二阶段消费方式,需要将消息在StoreQueue和PrepareQueue之间移动,如何实现重试次数控制呢,其关键在StoreQueue和PrepareQueue的分数设计。
PrepareQueue的分数需要与时间相关,正常情况下,消费者不管消费失败还是消费成功,都会从PrepareQueue删除消息,当消费者系统发生异常或者宕机的时候,消息就无法从PrepareQueue中删除,我们也不知道消费者是否消费成功,为保障消息至少被消费一次,我们需要做到超时回滚,因此分数需要与消费时间相关。当PrepareQueue中的消息发生超时的时候,将消息从PrepareQueue移动到StoreQueue。
因此PrepareQueue的分数设计为:秒级时间戳*1000+重试次数。不同类型的消息首次存储到StoreQueue中的分数表示的含义不尽相同,区间重复合并消息和任意定时消息存储时的分数表示消费时间戳,优先级消息存储时的分数表示优先级。如果消息消费失败,消息从PrepareQueue回滚到StoreQueue,所有类型的消息存储时的分数都表示剩余重试次数,剩余重试次数从16次不断降低最后为0,消息进入死信队列。消息在StoreQueue和PrepareQueue之间移动流程如下:
- Pop消息
不同类型的消息在消费的时候Pop消息的方式不一样,因此接下来分别讲述三种类型消息的Pop方式。
区间重复合并消息
该消息存储的分数设计为消费时间戳,当前时间大于消息的消费时间戳时,该消息应该被消费。因此采用Redis命令ZRANGEBYSCORE弹出分数小于当前时间戳的一条消息。
优先级消息
该消息存储的分数设计为优先级,优先级越高分数越大,因此采用Redis命令ZPOPMAX弹出分数最大的一条消息。
任意定时消息该消息存储的分数设计为消费时间戳,当前时间大于消息的消费时间戳时,该消息应该被消费。因此采用Redis命令ZRANGEBYSCORE弹出分数小于当前时间戳的一条消息。
相关应用
▐ 主图价格表达项目
在主图价格表达中需要实现一个功能,商品价格发生变化时将商品价格打印在商品主图上面,那么需要在价格发生变动的时候触发合成一张带价格的图片,每一次触发合图时计算价格都是获取当前最新的价格。上游价格变化的因素很多,变化很频繁,下游合图消耗GPU资源较大,处理容量较低。因此需要尽可能合并触发合图消息,减轻下游处理压力,于是使用了RMQ作为消息队列来进行削峰填谷、消息合并。不仅如此,还可以根据商家等级划分触发合图消息的等级,使KA商家能够优先得到处理,缩短价格变化的延迟。
在线上实际环境中,集群共130台机器,RMQ消息队列的发送消息能力和消费消息能力均可以达到5w tps,而且这并不是峰值,理论上可以达到10w tps。
▐ 在线数据圈选引擎
在线数据圈选引擎需要处理各种来源的大量动态数据,需要将一段时间区间内的消息合并处理,减少处理压力,并且在对同一元数据进行并发处理需要加分布式锁,锁冲突导致消费效率下降。RMQ的区间重复合并消息和并发消费控制能力可以帮助解决这些问题。目前,在线数据圈选引擎已经采用了RMQ消息队列作为核心组件,RMQ消息队列发挥了很大的作用。
总结
本文提出了一种可实现的基于Redis的消息队列,充分利用Sorted Set结构设计了消息合并、优先级、定时等特性,与传统消息队列形成互补,弥补传统消息队列这方面特性的缺失。为了实现高可用,本文在二阶段提交的思想上进行改进设计了二阶段消费方式,保障消息至少被消费一次。
未来将基于Redis的特性打造更多独特的功能,与传统消息中间件形成互补。在消费控制方面会增加流量自动调控能力,根据消息类型调控消费速度,减少因为某种类型消息消费瓶颈导致整体消费性能下降。