延时队列,相信各位小伙伴并不会陌生,jdk原生提供了延时队列的使用,当然我们这里介绍的不是这种;在实际的项目中,如果我们有延时队列的场景,可以怎样去实现呢
举一个简单的例子,如下单15分钟内,若没有支付,则自动取消订单
本文将介绍一种非常非常简单的实现方式
I. 方案设计
要实现15分钟后自动取消订单,这个也太简单了,来给出一段神级代码
new Thread(() -> { // 休眠十五分钟,执行取消订单 Thread.sleep(15 * 60 * 1000); cancelOrder(); }).start(); 复制代码
好的,本文就此结束(开玩笑....)
忽略上面的段子,接下来想一想,如果让我们来实现一个延时队列,可以怎么整?
- 单机:
- DelayQueue
- 定时任务
- 分布式:
- Quartz定时任务
- rabbitmq延时队列
- redis zset
- redis 过期回调
- 时间轮
首先我们这里排除掉单机版,至于原因,现在单体单实例应用实在不多见了,直接来看多实例的情况吧
在上面的几种方案中,重心放在redis上,两种case,下面分别介绍一下
1. redis过期时间
我们知道,在使用redis做缓存时,可以设置失效时间,借助redis的失效事件,我们可以来实现延时队列的场景
比如,现在一个订单,我们在redis中新加入一个订单id,失效时间设置为15分钟;当支付成功之后,主动删除这个缓存;若一直没有付钱,则15分钟后,触发一个过期事件,然后订阅这个事件,来执行取消订单
上面这种实现,有两个问题
- key失效监听,可能存在大量的无效信息
- 广播方式消费事件,多实例接收到这个事件,怎么防并发?或者没有一个实例接收到这个事件,那么这个取消订单就会漏掉
显然上面的第二点,漏消息是不能接受的
2. redis zset
zset属于redis提供的几个基本数据结构中的一种,它的特点是有 value + score
如果我们想使用zset拉实现演示队列,那么一个可选的方案就是将score设置为触发的时间戳,value为业务值
然后写一个定时任务,不断的从zset中,取出score小于当前时间戳的数据,任务它们都是已经到期可以执行的
借助这个方案,可以相对简单的实现一个演示队列了
II. redis演示队列实现
1. 环境配置
接下来我们将以redis的zset来实现延时队列,本文借助SpringBoot来搭建一个演示工程,使用的基本配置如下
本项目借助SpringBoot 2.2.1.RELEASE
+ maven 3.5.3
+ IDEA
进行开发
核心依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- 下面这里两个非必须,主要是后面的实现演示使用到了 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> </dependency> 复制代码
redis使用默认的配置,本机 localhost + 6379
2. 核心实现
借助redis zset来实现延时队列,具体的实现代码就很简单了,无非是从zset中取出score小于当前时间戳的数据
private static final Long DELETE_SUCCESS = 1L; @Autowired private StringRedisTemplate redisTemplate; public String fetchOne(String key) { Set<String> sets = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis(), 0, 3); if (CollectionUtils.isEmpty(sets)) { return null; } for (String val : sets) { if (DELETE_SUCCESS.equals(redisTemplate.opsForZSet().remove(key, val))) { // 删除成功,表示抢占到 return val; } } return null; } 复制代码
注意上面的实现,有一个点需要说一下
zset:每次查询时取了三个数据,然后遍历获取到的数据,依次尝试去删除,若删除成功,则表示当前实例抢占到了这个消息
为什么这样设计?
这里有两个点,先解释第一个,为啥先查后删
如果我们按照正常的实现流程,每次从zset中取一个,但是无法保证这个时候就只有我一个人拿到了这个数据,在多实例的场景下,可能存在多个实例同时拿到了它,那么如何才能表示只有我一个人霸占了她呢(忽然进入言情的世界😓)
借助redis的单线程机制,只可能有一个实例会删除成功,所以拿到并删除成功的那个小伙伴,就是最终的幸运儿;
因此实现细节就是先查,后删,若删除成功,表示获取成功;否则表示被其他的实例捷足先登
接下来再看第二个,为啥一次拿三个
从上面的分析可以看出,如果我一次只拿一个,那么我抢占到的几率并不太大,特别是当实例比较多时,可能会做多次的无效操作;为了减少这个可能性,所以我一次多拿几个做备选,这样抢占到的概率就会高一些,至于为什么是3,这个就看实际的实例与定时任务的执行间隔了
3. 写入队列
上面是从队列中拿数据,有拿当然得有写,所以我们简单的封装一下写入队列的case
@Component public class RedisDelayListWrapper implements ApplicationContextAware { private static final Long DELETE_SUCCESS = 1L; private Set<String> topic = new CopyOnWriteArraySet<>(); public void publish(String key, Object val, long delayTime) { topic.add(key); String strVal = val instanceof String ? (String) val : JSONObject.toJSONString(val); redisTemplate.opsForZSet().add(key, strVal, System.currentTimeMillis() + delayTime); } } 复制代码
4. 定时取演示队列消息
接下来就是一个定时任务,不断的调用上面的实现,从zset中获取到期的数据
@Scheduled(fixedRate = 10_000) public void schedule() { for (String specialTopic : topic) { String cell = fetchOne(specialTopic); if (cell != null) { applicationContext.publishEvent(new DelayMsg(this, cell, specialTopic)); } } } @ToString public static class DelayMsg extends ApplicationEvent { @Getter private String msg; @Getter private String topic; public DelayMsg(Object source, String msg, String topic) { super(source); this.msg = msg; this.topic = topic; } } 复制代码
上面的定时任务,直接借助Spring的@Schedule
来实现,遍历所有的topic,捞出数据之后,通过spring的 event/listener
事件机制来实现消息处理的解耦
5. 消息消费
最终就是我们的消息消费逻辑了,主要就是消费前面抛出的DelayMsg
,我们这里借助AOP来实现消息过滤
定义一个注解Consumer
,用来指定消费哪个topic
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @EventListener public @interface Consumer { String topic(); } 复制代码
注意这个注解上面还有 @EventListener
,表明它可以监听的spring的事件
aop拦截逻辑,根据topic进行过滤
@Aspect @Component public class ConsumerAspect { @Around("@annotation(consumer)") public Object around(ProceedingJoinPoint joinPoint, Consumer consumer) throws Throwable { Object[] args = joinPoint.getArgs(); boolean check = false; for (Object obj : args) { if (obj instanceof RedisDelayListWrapper.DelayMsg) { check = consumer.topic().equals(((RedisDelayListWrapper.DelayMsg) obj).getTopic()); } } if (!check) { // 不满足条件,直接忽略 return null; } // topic匹配成功,执行 return joinPoint.proceed(); } } 复制代码
5. 测试demo
最后写一个测试demo,验证下上面的实现
@EnableScheduling @RestController @SpringBootApplication public class Application { private static final String TEST_DELAY_QUEUE = "test"; private static final String DEMO_DELAY_QUEUE = "demo"; @Autowired private RedisDelayListWrapper redisDelayListWrapper; private Random random = new Random(); public static void main(String[] args) { SpringApplication.run(Application.class); } @GetMapping(path = "publish") public String publish(String msg, Long delayTime) { if (delayTime == null) { delayTime = 10_000L; } String queue = random.nextBoolean() ? TEST_DELAY_QUEUE : DEMO_DELAY_QUEUE; msg = queue + "#" + msg + "#" + (System.currentTimeMillis() + delayTime); redisDelayListWrapper.publish(queue, msg, delayTime); System.out.println("延时: " + delayTime + "ms后消费: " + msg + " now:" + LocalDateTime.now()); return "success!"; } @Consumer(topic = TEST_DELAY_QUEUE) public void consumer(RedisDelayListWrapper.DelayMsg delayMsg) { System.out.println("TEST消费延时消息: " + delayMsg + " at:" + System.currentTimeMillis()); } @Consumer(topic = DEMO_DELAY_QUEUE) public void consumerDemo(RedisDelayListWrapper.DelayMsg delayMsg) { System.out.println("DEMO消费延时消息: " + delayMsg + " at:" + System.currentTimeMillis()); } } 复制代码
6. 小结
本文属于一个实战小技巧,借助redis的zset来灵活的实现一个简单的延时队列,实现倒是没有太大的难度,其中的一些小细节还是挺有意思的,好的,今天分享到此over,欢迎各位老铁来撩,公众号 一灰灰blog
你值得拥有