构造函数
先来看看其中的构造函数,这里一共有两个构造函数,用于接收一个线程池及时间轮的大小。
线程池的作用会在后面讲到。
这里的时间轮大小也是有讲究的,它的长度必须得是 2∧n
,至于为什么有这个要求后面也会讲到。
默认情况下会初始化一个长度为 64 的数组。
添加任务
下面来看看添加任务的逻辑,根据我们之前的那张抽象图其实很容易实现。
首先我们要定义一个 Task
类,用于抽象任务;它本身也是一个线程,一旦延时到期便会执行其中的 run 函数,所以使用时便可继承该类,将业务逻辑写在 run()
中即可。
它其中还有两个成员变量,也很好理解。
cycleNum
用于记录该任务所在时间轮的圈数。
key
在这里其实就是延时时间。
//通过 key 计算应该存放的位置 private Set<Task> get(int key) { int index = mod(key, bufferSize); return (Set<Task>) ringBuffer[index]; } private int mod(int target, int mod) { // equals target % mod target = target + tick.get() ; return target & (mod - 1); }
首先是根据延时时间 (key
) 计算出所在的位置,其实就和 HashMap
一样的取模运算,只不过这里使用了位运算替代了取模,同时效率会高上不少。
这样也解释了为什么数组长度一定得是
2∧n
。
然后查看该位置上是否存在任务,不存在就新建一个;存在自然就是将任务写入这个集合并更新回去。
private int cycleNum(int target, int mod) { //equals target/mod return target >> Integer.bitCount(mod - 1); }
其中的
cycleNum()
自然是用于计算该任务所处的圈数,也是考虑到效率问题,使用位运算替代了除法。
private void put(int key, Set<Task> tasks) { int index = mod(key, bufferSize); ringBuffer[index] = tasks; }
而 put()
函数就非常简单了,就是将任务写入指定数组下标即可。
启动时间轮
任务写进去后下一步便是启动这个时间轮了,我这里定义了一个 start()
函数。
其实本质上就是开启了一个后台线程来做这个事情:
它会一直从时间轮中取出任务来运行,而运行这些任务的线程便是我们在初始化时传入的线程池;所以所有的延时任务都是由自定义的线程池调度完成的,这样可以避免时间轮的阻塞。
这里调用的 remove(index)
很容易猜到是用于获取当前数组中的所有任务。
逻辑很简单就不再赘述,不过其中的 size2Notify()
倒是值得说一下。
他是用于在停止任务时,主线程等待所有延时任务执行完毕的唤醒条件。这类用法几乎是所有线程间通信的常规套路,值得收入技能包。
停止时间轮
刚才提到的唤醒主线程得配合这里的停止方法使用:
如果是强制停止那便什么也不管,直接更新停止标志,同时关闭线程池即可。
但如果是软停止(等待所有任务执行完毕)时,那就得通过上文提到的方式阻塞主线程,直到任务执行完毕后被唤醒。
CIM 中的应用
介绍了核心原理和基本 API
后,我们来看看实际业务场景如何结合使用(背景是一个即时通讯项目)。
我这里所使用的场景在文初也提到了,就是真的发送一条延时消息;
现有的消息都是实时消息,所以要实现一个延时消息便是在现有的发送客户端处将延时消息放入到这个时间轮中,在任务到期时再执行真正的消息发送逻辑。
由于项目本身结合了 Spring
,所以第一步自然是配置 bean
。
bean
配置好后其实就可以使用了。
每当发送的是延时消息时,只需要将这个消息封装为一个 Job
放到时间轮中,然后在自己的业务类中完成业务即可。
后续可以优化下
api
,不用每次新增任务都要调用start()
方法。
这样一个延时消息的应用便完成了。
总结
时间轮这样的应用还非常多,比如 Netty
中的 HashedWheelTimer
工具原理也差不多,可以用于维护长连接心跳信息。
甚至 Kafka
在这基础上还优化出了层级时间轮,大家感兴趣的话可以自行搜索资料
本文的所有源码都可在此处查阅: