- DelayQueue
- Timer
- ScheduledThreadPoolExecutor
- RocketMQ
- RabbitMQ
- 监听Redis过期key
- Redisson的RDelayedQueue
- Netty的HashedWheelTimer
- Hutool的SystemTimer
- Qurtaz
- 无限轮询延迟任务
- 最后
由于Redis具有过期监听的功能,于是就有人拿它来实现订单超时自动关闭的功能,但是这个方案并不完美。今天来聊聊11种实现订单超时自动关闭的方案,总有一种适合你!这些方案并没有绝对的好坏之分,只是适用场景的不大相同。
DelayQueue
DelayQueue是JDK提供的api,是一个延迟队列
DelayQueue泛型参数得实现Delayed接口,Delayed继承了Comparable接口。
getDelay
方法返回这个任务还剩多久时间可以执行,小于0的时候说明可以这个延迟任务到了执行的时间了。
compareTo
这个是对任务排序的,保证最先到延迟时间的任务排到队列的头。
来个demo
@Getter public class SanYouTask implements Delayed { private final String taskContent; private final Long triggerTime; public SanYouTask(String taskContent, Long delayTime) { this.taskContent = taskContent; this.triggerTime = System.currentTimeMillis() + delayTime * 1000; } @Override public long getDelay(TimeUnit unit) { return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return this.triggerTime.compareTo(((SanYouTask) o).triggerTime); } }
SanYouTask实现了Delayed接口,构造参数
- taskContent:延迟任务的具体的内容
- delayTime:延迟时间,秒为单位
测试
@Slf4j public class DelayQueueDemo { public static void main(String[] args) { DelayQueue<SanYouTask> sanYouTaskDelayQueue = new DelayQueue<>(); new Thread(() -> { while (true) { try { SanYouTask sanYouTask = sanYouTaskDelayQueue.take(); log.info("获取到延迟任务:{}", sanYouTask.getTaskContent()); } catch (Exception e) { } } }).start(); log.info("提交延迟任务"); sanYouTaskDelayQueue.offer(new SanYouTask("芋道源码5s", 5L)); sanYouTaskDelayQueue.offer(new SanYouTask("芋道源码3s", 3L)); sanYouTaskDelayQueue.offer(new SanYouTask("芋道源码8s", 8L)); } }
开启一个线程从DelayQueue中获取任务,然后提交了三个任务,延迟时间分为别5s,3s,8s。
测试结果:
成功实现了延迟任务。
实现原理
offer
方法在提交任务的时候,会通过根据compareTo
的实现对任务进行排序,将最先需要被执行的任务放到队列头。
take
方法获取任务的时候,会拿到队列头部的元素,也就是队列中最早需要被执行的任务,通过getDelay返回值判断任务是否需要被立刻执行,如果需要的话,就返回任务,如果不需要就会等待这个任务到延迟时间的剩余时间,当时间到了就会将任务返回。
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
Timer
Timer也是JDK提供的api
先来demo
@Slf4j public class TimerDemo { public static void main(String[] args) { Timer timer = new Timer(); log.info("提交延迟任务"); timer.schedule(new TimerTask() { @Override public void run() { log.info("执行延迟任务"); } }, 5000); } }
通过schedule
提交一个延迟时间为5s的延迟任务
实现原理
提交的任务是一个TimerTask
public abstract class TimerTask implements Runnable { //忽略其它属性 long nextExecutionTime; }
TimerTask内部有一个nextExecutionTime
属性,代表下一次任务执行的时间,在提交任务的时候会计算出nextExecutionTime
值。
Timer内部有一个TaskQueue对象,用来保存TimerTask任务的,会根据nextExecutionTime
来排序,保证能够快速获取到最早需要被执行的延迟任务。
在Timer内部还有一个执行任务的线程TimerThread,这个线程就跟DelayQueue demo中开启的线程作用是一样的,用来执行到了延迟时间的任务。
所以总的来看,Timer有点像整体封装了DelayQueue demo中的所有东西,让用起来简单点。
虽然Timer用起来比较简单,但是在阿里规范中是不推荐使用的,主要是有以下几点原因:
- Timer使用单线程来处理任务,长时间运行的任务会导致其他任务的延时处理
- Timer没有对运行时异常进行处理,一旦某个任务触发运行时异常,会导致整个Timer崩溃,不安全
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
ScheduledThreadPoolExecutor
由于Timer在使用上有一定的问题,所以在JDK1.5版本的时候提供了ScheduledThreadPoolExecutor,这个跟Timer的作用差不多,并且他们的方法的命名都是差不多的,但是ScheduledThreadPoolExecutor解决了单线程和异常崩溃等问题。
来个demo
@Slf4j public class ScheduledThreadPoolExecutorDemo { public static void main(String[] args) { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, new ThreadPoolExecutor.CallerRunsPolicy()); log.info("提交延迟任务"); executor.schedule(() -> log.info("执行延迟任务"), 5, TimeUnit.SECONDS); } }
结果
实现原理
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,也就是继承了线程池,所以可以有很多个线程来执行任务。
ScheduledThreadPoolExecutor在构造的时候会传入一个DelayedWorkQueue阻塞队列,所以线程池内部的阻塞队列是DelayedWorkQueue。
在提交延迟任务的时候,任务会被封装一个任务会被封装成ScheduledFutureTask
对象,然后放到DelayedWorkQueue阻塞队列中。
ScheduledFutureTask
ScheduledFutureTask
实现了前面提到的Delayed接口,所以其实可以猜到DelayedWorkQueue会根据ScheduledFutureTask
对于Delayed接口的实现来排序,所以线程能够获取到最早到延迟时间的任务。
当线程从DelayedWorkQueue中获取到需要执行的任务之后就会执行任务。
RocketMQ
RocketMQ是阿里开源的一款消息中间件,实现了延迟消息的功能。
RocketMQ延迟消息的延迟时间默认有18个等级。
当发送消息的时候只需要指定延迟等级即可。如果这18个等级的延迟时间不符和你的要求,可以修改RocketMQ服务端的配置文件。
来个demo
依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> <!--web依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.2.5.RELEASE</version> </dependency>
配置文件
rocketmq: name-server: 192.168.200.144:9876 #服务器ip:nameServer端口 producer: group: sanyouProducer
controller类,通过DefaultMQProducer发送延迟消息到sanyouDelayTaskTopic这个topic,延迟等级为2,也就是延迟时间为5s的意思。
@RestController @Slf4j public class RocketMQDelayTaskController { @Resource private DefaultMQProducer producer; @GetMapping("/rocketmq/add") public void addTask(@RequestParam("task") String task) throws Exception { Message msg = new Message("sanyouDelayTaskTopic", "TagA", task.getBytes(RemotingHelper.DEFAULT_CHARSET)); msg.setDelayTimeLevel(2); // 发送消息并得到消息的发送结果,然后打印 log.info("提交延迟任务"); producer.send(msg); } }
创建一个消费者,监听sanyouDelayTaskTopic的消息。
@Component @RocketMQMessageListener(consumerGroup = "sanyouConsumer", topic = "sanyouDelayTaskTopic") @Slf4j public class SanYouDelayTaskTopicListener implements RocketMQListener<String> { @Override public void onMessage(String msg) { log.info("获取到延迟任务:{}", msg); } }
启动应用,浏览器输入以下链接添加任务
http://localhost:8080/rocketmq/add?task=sanyou
测试结果:
实现原理
生产者发送延迟消息之后,RocketMQ服务端在接收到消息之后,会去根据延迟级别是否大于0来判断是否是延迟消息
- 如果不大于0,说明不是延迟消息,那就会将消息保存到指定的topic中
- 如果大于0,说明是延迟消息,此时RocketMQ会进行一波偷梁换柱的操作,将消息的topic改成
SCHEDULE_TOPIC_XXXX
中,XXXX不是占位符,然后存储。
在BocketMQ内部有一个延迟任务,相当于是一个定时任务,这个任务就会获取SCHEDULE_TOPIC_XXXX
中的消息,判断消息是否到了延迟时间,如果到了,那么就会将消息的topic存储到原来真正的topic(拿我们的例子来说就是sanyouDelayTaskTopic
)中,之后消费者就可以从真正的topic中获取到消息了。
定时任务
RocketMQ这种实现方式相比于前面提到的三种更加可靠,因为前面提到的三种任务内容都是存在内存的,服务器重启任务就丢了,如果要实现任务不丢还得自己实现逻辑,但是RocketMQ消息有持久化机制,能够保证任务不丢失。
RabbitMQ
RabbitMQ也是一款消息中间件,通过RabbitMQ的死信队列也可以是先延迟任务的功能。
demo
引入RabbitMQ的依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.2.5.RELEASE</version> </dependency>
配置文件
spring: rabbitmq: host: 192.168.200.144 #服务器ip port: 5672 virtual-host: /
RabbitMQ死信队列的配置类,后面说原理的时候会介绍干啥的
@Configuration public class RabbitMQConfiguration { @Bean public DirectExchange sanyouDirectExchangee() { return new DirectExchange("sanyouDirectExchangee"); } @Bean public Queue sanyouQueue() { return QueueBuilder //指定队列名称,并持久化 .durable("sanyouQueue") //设置队列的超时时间为5秒,也就是延迟任务的时间 .ttl(5000) //指定死信交换机 .deadLetterExchange("sanyouDelayTaskExchangee") .build(); } @Bean public Binding sanyouQueueBinding() { return BindingBuilder.bind(sanyouQueue()).to(sanyouDirectExchangee()).with(""); } @Bean public DirectExchange sanyouDelayTaskExchange() { return new DirectExchange("sanyouDelayTaskExchangee"); } @Bean public Queue sanyouDelayTaskQueue() { return QueueBuilder //指定队列名称,并持久化 .durable("sanyouDelayTaskQueue") .build(); } @Bean public Binding sanyouDelayTaskQueueBinding() { return BindingBuilder.bind(sanyouDelayTaskQueue()).to(sanyouDelayTaskExchange()).with(""); } }
RabbitMQDelayTaskController用来发送消息,这里没指定延迟时间,是因为在声明队列的时候指定了延迟时间为5s
@RestController @Slf4j public class RabbitMQDelayTaskController { @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/rabbitmq/add") public void addTask(@RequestParam("task") String task) throws Exception { // 消息ID,需要封装到CorrelationData中 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("提交延迟任务"); // 发送消息 rabbitTemplate.convertAndSend("sanyouDirectExchangee", "", task, correlationData); } }
启动应用,浏览器输入以下链接添加任务
http://localhost:8080/rabbitmq/add?task=sanyou
测试结果,成功实现5s的延迟任务
实现原理
整个工作流程如下:
- 消息发送的时候会将消息发送到sanyouDirectExchange这个交换机上
- 由于sanyouDirectExchange绑定了sanyouQueue,所以消息会被路由到sanyouQueue这个队列上
- 由于sanyouQueue没有消费者消费消息,并且又设置了5s的过期时间,所以当消息过期之后,消息就被放到绑定的sanyouDelayTaskExchange死信交换机中
- 消息到达sanyouDelayTaskExchange交换机后,由于跟sanyouDelayTaskQueue进行了绑定,所以消息就被路由到sanyouDelayTaskQueue中,消费者就能从sanyouDelayTaskQueue中拿到消息了
上面说的队列与交换机的绑定关系,就是上面的配置类所干的事。
其实从这个单从消息流转的角度可以看出,RabbitMQ跟RocketMQ实现有相似之处。
消息最开始都并没有放到最终消费者消费的队列中,而都是放到一个中间队列中,等消息到了过期时间或者说是延迟时间,消息就会被放到最终的队列供消费者消息。
只不过RabbitMQ需要你显示的手动指定消息所在的中间队列,而RocketMQ是在内部已经做好了这块逻辑。
除了基于RabbitMQ的死信队列来做,RabbitMQ官方还提供了延时插件,也可以实现延迟消息的功能,这个插件的大致原理也跟上面说的一样,延时消息会被先保存在一个中间的地方,叫做Mnesia,然后有一个定时任务去查询最近需要被投递的消息,将其投递到目标队列中。