金石推荐 | 【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)(一)https://developer.aliyun.com/article/1471001
TimewheelTask时间轮刻度点
@Getter public abstract class TimewheelTask implements Delayed { private List<BizTask> tasks = new ArrayList<BizTask>(); private int level; private Long delay; private long calDelay; private TimeUnit calUnit; public TimewheelTask(int level) { this.level = level; } public void setDelay(Long delay, TimeUnit unit) { this.calDelay=delay; this.calUnit=unit; } public void calDelay() { this.delay = TimeUnit.NANOSECONDS.convert(this.calDelay, this.calUnit) + System.nanoTime(); } public long getDelay(TimeUnit unit) { return this.delay - System.nanoTime(); } public int compareTo(Delayed o) { long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } public void addTask(BizTask task) { synchronized (this) { tasks.add(task); } } public void clear() { tasks.clear(); } public abstract void run(); }
- 业务任务集合:
private List tasks = new ArrayList();
- 层级
- java
- 复制代码
private int level;
- 延时时间
- java
- 复制代码
private Long delay;
- 实际用于延时计算的时间,就是底层是统一化所有的延时时间到对应的延时队列
- java
- 复制代码
private long calDelay;
- 实际用于延时计算的时间,就是底层是统一化所有的延时时间到对应的延时队列(用于统一化的时间单位)
- java
- 复制代码
private TimeUnit calUnit;
添加对应的业务延时任务到轮盘刻度点
java
复制代码
public void addTask(BizTask task) { synchronized (this) { tasks.add(task); } }
刻度点的实现类
因为对应的任务可能会需要将下游的业务任务进行升级或者降级,所以我们会针对于执行任务点分为,执行任务刻度点和跃迁任务刻度点两种类型。
- 执行任务延时队列刻度点
java
复制代码
public class ExecuteTimewheelTask extends TimewheelTask { public ExecuteTimewheelTask(int level) { super(level); } //到时间执行所有的任务 public void run() { List<BizTask> tasks = getTasks(); if (CollectionUtils.isNotEmpty(tasks)) { tasks.forEach(task -> ThreadPool.submit(task)); } } }
再次我们就定义执行这些任务的线程池为:
java
复制代码
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 100, 3, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(10000), new MyThreadFactory("executor"), new ThreadPoolExecutor.CallerRunsPolicy());
- 跃迁任务延时队列刻度点
java
复制代码
public class MoveTimewheelTask extends TimewheelTask { public MoveTimewheelTask(int level) { super(level); } //跃迁到其他轮盘,将对应的任务 public void run() { List<BizTask> tasks = getTasks(); if (CollectionUtils.isNotEmpty(tasks)) { tasks.forEach(task -> { long delay = task.getDelay(); TimerWheel.adddTask(task,delay, TimeUnitProvider.getTimeUnit()); }); } } }
致辞整个时间轮轮盘的数据模型就定义的差不多了,接下来我们需要定义运行在时间轮盘上面的任务模型,BizTask基础模型。
BizTask基础模型
java
复制代码
public abstract class BizTask implements Runnable { protected long interval; protected int index; protected long executeTime; public BizTask(long interval, TimeUnit unit, int index) { this.interval = interval; this.index = index; this.executeTime= TimeUnitProvider.getTimeUnit().convert(interval,unit)+TimeUnitProvider.getTimeUnit().convert(System.nanoTime(),TimeUnit.NANOSECONDS); } public long getDelay() { return this.executeTime - TimeUnitProvider.getTimeUnit().convert(System.nanoTime(), TimeUnit.NANOSECONDS); } }
主要针对于任务执行,需要交给线程池去执行,故此,实现了Runnable接口。
- protected long interval;:跨度操作
- protected int index;:索引下表,在整个队列里面的下表处理
- protected long executeTime;:对应的执行时间
其中最重要的便是获取延时时间的操作,主要提供给框架的Delayed接口进行判断是否到执行时间了。
java
复制代码
public long getDelay() { return this.executeTime - TimeUnitProvider.getTimeUnit().convert(System.nanoTime(), TimeUnit.NANOSECONDS); }
层级时间轮的门面TimerWheel
最后我们要进行定义和设计开发对应的整体的时间轮层级模型。
java
复制代码
public class TimerWheel { private static Map<Integer, TimewheelBucket> cache = new ConcurrentHashMap<>(); //一个轮表示三十秒 private static int interval = 30; private static wheelThread wheelThread; public static void adddTask(BizTask task, Long time, TimeUnit unit) { if(task == null){ return; } long intervalTime = TimeUnitProvider.getTimeUnit().convert(time, unit); if(intervalTime < 1){ ThreadPool.submit(task); return; } Integer[] wheel = getWheel(intervalTime,interval); TimewheelBucket taskList = cache.get(wheel[0]); if (taskList != null) { taskList.addTask(wheel[1], task); } else { synchronized (cache) { if (cache.get(wheel[0]) == null) { taskList = new TimewheelBucket(interval-1, wheel[0]); wheelThread.add(taskList.init()); cache.putIfAbsent(wheel[0],taskList); } } taskList.addTask(wheel[1], task); } } static{ interval = 30; wheelThread = new wheelThread(); wheelThread.setDaemon(false); wheelThread.start(); } private static Integer[] getWheel(long intervalTime,long baseInterval) { //转换后的延时时间 if (intervalTime < baseInterval) { return new Integer[]{0, Integer.valueOf(String.valueOf((intervalTime % 30)))}; } else { return getWheel(intervalTime,baseInterval,baseInterval, 1); } } private static Integer[] getWheel(long intervalTime,long baseInterval,long interval, int p) { long nextInterval = baseInterval * interval; if (intervalTime < nextInterval) { return new Integer[]{p, Integer.valueOf(String.valueOf(intervalTime / interval))}; } else { return getWheel(intervalTime,baseInterval,nextInterval, (p+1)); } } static class wheelThread extends Thread { DelayQueue<TimewheelTask> queue = new DelayQueue<TimewheelTask>(); public DelayQueue<TimewheelTask> getQueue() { return queue; } public void add(List<TimewheelTask> tasks) { if (CollectionUtils.isNotEmpty(tasks)) { tasks.forEach(task -> add(task)); } } public void add(TimewheelTask task) { task.calDelay(); queue.add(task); } @Override public void run() { while (true) { try { TimewheelTask task = queue.take(); int p = task.getLevel(); long nextInterval = MathTool.power(interval, Integer.valueOf(String.valueOf(MathTool.power(2, p)))); TimewheelBucket timewheelBucket = cache.get(p); synchronized (timewheelBucket) { timewheelBucket.indexAdd(); task.run(); task.clear(); } task.setDelay(nextInterval, TimeUnitProvider.getTimeUnit()); task.calDelay(); queue.add(task); } catch (InterruptedException e) { } } } } }
TimerWheel的模型定义
java
复制代码
private static Map<Integer, TimewheelBucket> cache = new ConcurrentHashMap<>();
一个轮表示30秒的整体跨度。
java
复制代码
private static int interval = 30;
创建整体驱动的执行线程
java
复制代码
private static wheelThread wheelThread; static{ interval = 30; wheelThread = new wheelThread(); wheelThread.setDaemon(false); wheelThread.start(); } static class wheelThread extends Thread { DelayQueue<TimewheelTask> queue = new DelayQueue<TimewheelTask>(); public DelayQueue<TimewheelTask> getQueue() { return queue; } public void add(List<TimewheelTask> tasks) { if (CollectionUtils.isNotEmpty(tasks)) { tasks.forEach(task -> add(task)); } } public void add(TimewheelTask task) { task.calDelay(); queue.add(task); } @Override public void run() { while (true) { try { TimewheelTask task = queue.take(); int p = task.getLevel(); long nextInterval = MathTool.power(interval, Integer.valueOf(String.valueOf(MathTool.power(2, p)))); TimewheelBucket timewheelBucket = cache.get(p); synchronized (timewheelBucket) { timewheelBucket.indexAdd(); task.run(); task.clear(); } task.setDelay(nextInterval, TimeUnitProvider.getTimeUnit()); task.calDelay(); queue.add(task); } catch (InterruptedException e) { } } }
获取对应的时间轮轮盘模型体系
java
复制代码
private static Integer[] getWheel(long intervalTime,long baseInterval) { //转换后的延时时间 if (intervalTime < baseInterval) { return new Integer[]{0, Integer.valueOf(String.valueOf((intervalTime % 30)))}; } else { return getWheel(intervalTime,baseInterval,baseInterval, 1); } } private static Integer[] getWheel(long intervalTime,long baseInterval,long interval, int p) { long nextInterval = baseInterval * interval; if (intervalTime < nextInterval) { return new Integer[]{p, Integer.valueOf(String.valueOf(intervalTime / interval))}; } else { return getWheel(intervalTime,baseInterval,nextInterval, (p+1)); } }
到这里相信大家,基本上应该是了解了如何去实现对应的时间轮盘的技术实现过程,有兴趣希望整个完整源码的,可以联系我哦。谢谢大家!