背景
前段时间有个小项目需要使用延迟任务,谈到延迟任务,我脑子第一时间一闪而过的就是使用消息队列来做,比如RabbitMQ的死信队列又或者RocketMQ的延迟队列,但是奈何这是一个小项目,并没有引入MQ,我也不太想因为一个延迟任务就引入MQ,增加系统复杂度,所以这个方案直接就被pass了。
虽然基于MQ这个方式走不通了,但是这个项目中使用到Redis,所以我就想是否能够使用Redis来代替MQ实现延迟队列的功能,于是我就查了一下有没有现成可用的方案,别说,还真给我查到了两种方案,并且我还仔细研究对比了这两个方案,发现要想很好的实现延迟队列,并不简单。
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
监听过期key
基于监听过期key的方式来实现延迟队列是我查到的第一个方案,为了弄懂这个方案实现的细节,我还特地去扒了扒官网,还真有所收获
1、Redis发布订阅模式
一谈到发布订阅模式,其实一想到的就是MQ,只不过Redis也实现了一套,并且跟MQ贼像,如图:
图中的channel的概念跟MQ中的topic的概念差不多,你可以把channel理解成MQ中的topic。
生产者在消息发送时需要到指定发送到哪个channel上,消费者订阅这个channel就能获取到消息。
2、keyspace notifications
在Redis中,有很多默认的channel,只不过向这些channel发送消息的生产者不是我们写的代码,而是Redis本身。当消费者监听这些channel时,就可以感知到Redis中数据的变化。
这个功能Redis官方称为keyspace notifications,字面意思就是键空间通知。
这些默认的channel被分为两类:
- 以
__keyspace@<db>__:
为前缀,后面跟的是key的名称,表示监听跟这个key有关的事件。
举个例子,现在有个消费者监听了__keyspace@0__:sanyou
这个channel,sanyou就是Redis中的一个普通key,那么当sanyou这个key被删除或者发生了其它事件,那么消费者就会收到sanyou这个key删除或者其它事件的消息 - 以
__keyevent@<db>__:
为前缀,后面跟的是消息事件类型,表示监听某个事件
同样举个例子,现在有个消费者监听了__keyevent@0__:expired
这个channel,代表了监听key的过期事件。那么当某个Redis的key过期了(expired),那么消费者就能收到这个key过期的消息。如果把expired换成del,那么监听的就是删除事件。具体支持哪些事件,可从官网查。
上述db是指具体的数据库,Redis不是默认分为16个库么,序号从0-15,所以db就是0到15的数字,示例中的0就是指0对应的数据库。
3、延迟队列实现原理
通过对上面的两个概念了解之后,应该就对监听过期key的实现原理一目了然了,其实就是当这个key过期之后,Redis会发布一个key过期的事件到__keyevent@<db>__:expired
这个channel,只要我们的服务监听这个channel,那么就能知道过期的Key,从而就算实现了延迟队列功能。
所以这种方式实现延迟队列就只需要两步:
- 发送延迟任务,key是延迟消息本身,过期时间就是延迟时间
- 监听
__keyevent@<db>__:expired
这个channel,处理延迟任务
4、demo
好了,基本概念和核心原理都说完了之后,又到了show me the code环节。
好巧不巧,Spring已经实现了监听__keyevent@*__:expired
这个channel这个功能,__keyevent@*__:expired
中的*
代表通配符的意思,监听所有的数据库。
所以demo写起来就很简单了,只需3步即可
引入pom
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.2.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.2.5.RELEASE</version> </dependency>
配置类
@Configuration public class RedisConfiguration { @Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(connectionFactory); return redisMessageListenerContainer; } @Bean public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) { return new KeyExpirationEventMessageListener(redisMessageListenerContainer); } }
KeyExpirationEventMessageListener
实现了对__keyevent@*__:expired
channel的监听
当KeyExpirationEventMessageListener
收到Redis发布的过期Key的消息的时候,会发布RedisKeyExpiredEvent
事件
所以我们只需要监听RedisKeyExpiredEvent
事件就可以拿到过期消息的Key,也就是延迟消息。
对RedisKeyExpiredEvent
事件的监听实现MyRedisKeyExpiredEventListener
@Component public class MyRedisKeyExpiredEventListener implements ApplicationListener<RedisKeyExpiredEvent> { @Override public void onApplicationEvent(RedisKeyExpiredEvent event) { byte[] body = event.getSource(); System.out.println("获取到延迟消息:" + new String(body)); } }
整个工程目录也简单
代码写好,启动应用
之后我直接通过Redis命令设置消息,就没通过代码发送消息了,消息的key为sanyou,值为task,值不重要,过期时间为5s
set sanyou task expire sanyou 5
如果上面都理论都正确,不出意外的话,5s后MyRedisKeyExpiredEventListener
应该可以监听到sanyou这个key过期的消息,也就相当于拿到了延迟任务,控制台会打印出获取到延迟消息:sanyou
。
于是我满怀希望,静静地等待了5s。。
5、4、3、2、1,时间一到,我查看控制台,但是控制台并没有按照预期打印出上面那句话。
为什么会没打印出?难道是代码写错了?正当我准备检查代码的时候,官网的一段话道出了真实原因。
我给大家翻译一下上面这段话讲的内容。
上面这段话主要讨论的是key过期事件的时效性问题,首先提到了Redis过期key的两种清除策略,就是面试八股文常背的两种:
- 惰性清除。当这个key过期之后,访问时,这个Key才会被清除
- 定时清除。后台会定期检查一部分key,如果有key过期了,就会被清除
再后面那段话是核心,意思是说,key的过期事件发布时机并不是当这个key的过期时间到了之后就发布,而是这个key在Redis中被清理之后,也就是真正被删除之后才会发布。
到这我终于明白了,上面的例子中即使我设置了5s的过期时间,但是当5s过去之后,只要两种清除策略都不满足,没人访问sanyou这个key,后台的定时清理的任务也没扫描到sanyou这个key,那么就不会发布key过期的事件,自然而然也就监听不到了。
至于后台的定时清理的任务什么时候能扫到,这个没有固定时间,可能一到过期时间就被扫到,也可能等一定时间才会被扫到,这就可能会造成了客户端从发布到监听到的消息时间差会大于等于过期时间,从而造成一定时间消息的延迟,这就着实有点坑了。。
5、坑
除了上面测试demo的时候遇到的坑之外,在我深入研究之后,还发现了一些更离谱的坑。
丢消息太频繁
Redis的丢消息跟MQ不一样,因为MQ都会有消息的持久化机制,可能只有当机器宕机了,才会丢点消息,但是Redis丢消息就很离谱,比如说你的服务在重启的时候就消息会丢消息。
Redis实现的发布订阅模式,消息是没有持久化机制,当消息发布到某个channel之后,如果没有客户端订阅这个channel,那么这个消息就丢了,并不会像MQ一样进行持久化,等有消费者订阅的时候再给消费者消费。
所以说,假设服务重启期间,某个生产者或者是Redis本身发布了一条消息到某个channel,由于服务重启,没有监听这个channel,那么这个消息自然就丢了。