一、问题场景
想实现一个轻量级的延迟队列,此时可以考虑基于Redis来实现,如果当前的基础设施不是阿里云Mq,开源的RocketMQ只有18个等级,1ms~2h的18个等级。当然商业版的阿里云可以实现精度的延迟。
二、基于redis实现延迟队列
那如果基于redis实现延迟队列。首先需要考虑:
业务系统调用Api:
sendMsg(topic, msgId, msg, delaySeconds, TimeUnit.SECONDS, ttlSeconds, TimeUnit.SECONDS, maxRetry)
相关参数说明:
主题:topic
消息体:msg
delaySeconds:延迟时间
TimeUnit.SECONDS:延迟单位
ttlSeconds:过期时间
TimeUnit.SECONDS:过期单位
maxRetry:重试次数
使用原因:
如果发送失败,需要执行重试。发送的时候,需要考虑实时和延迟的情况的处理。
如果是实时的,如何处理?如果是延迟的,又如何处理?
如果实时的,则可以先将消息存到Redis中,然后执行操作,基于事件,发布需要生产的消息,然后订阅需要消费的消息,执行消费。
如果是非实时的话,需要基于定时任务触发器触发,触发当前的延迟队列消息,如果到了需要发送的时候,执行发送。此时的操作基于扫描定时任务和事件触发。
而且获取消息的时候,需要考虑基于原子操作实现,也即可以基于lua脚本实现。
三、实现数据结构
而需要实现对消息和延迟的实现,可以考虑基于zset数据结构实现。其中score可以作为延迟时间。
client.zadd(key, score, member);
而对应实时的消息,可以考虑lpush操作 即可
client.lpush(key, strings);
获取可以基于eval实现
client.eval(script, toByteArray(keyCount), params);
四、具体实现时序图