【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)

简介: 承接上一篇文章【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(上)】我们基本上对层级时间轮算法的基本原理有了一定的认识,本章节就从落地的角度进行分析和介绍如何通过Java进行实现一个属于我们自己的时间轮服务组件,最后,在告诉大家一下,其实时间轮的技术是来源于生活中的时钟。

承接上文

承接上一篇文章【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(上)】我们基本上对层级时间轮算法的基本原理有了一定的认识,本章节就从落地的角度进行分析和介绍如何通过Java进行实现一个属于我们自己的时间轮服务组件,最后,在告诉大家一下,其实时间轮的技术是来源于生活中的时钟。

时间轮演示结构总览

无序列表时间轮

【无序列表时间轮】主要是由LinkedList链表和启动线程、终止线程实现。

遍历定时器中所有节点,将剩余时间为 0s 的任务进行过期处理,在执行一个周期。

  • 无序链表:每一个延时任务都存储在该链表当中(无序存储)。
  • 启动线程: 直接在链表后面push ,时间复杂度 O(1)。
  • 终止线程: 直接在链表中删除节点,时间复杂度 O(1) 。
遍历周期:需要遍历链表中所有节点,时间复杂度 O(n),所以伴随着链表中的元素越来越多,速度也会越来越慢!

无序列表时间轮的长度限制了其适用场景,这里对此进行优化。因此引入了有序列表时间轮。

有序列表时间轮

与无序列表时间轮一样,同样使用链表进行实现和设计,但存储的是绝对延时时间点

  • 启动线程有序插入,比较时间按照时间大小有序插入,时间复杂度O(n),主要耗时在插入操作
  • 终止线程链表中查找任务,删除节点,时间复杂度O(n),主要耗时在插入操作
找到执行最后一个过期任务即可,无需遍历整个链表,时间复杂度 O(1),从上面的描述「有序列表定时器」的性能瓶颈在于插入时的任务排序,但是换来的就是缩短了遍历周期。

所以我们如果要提高性,就必须要提升一下插入和删除以及检索的性能,因此引入了「树形有序列表时间轮」在「有序列表定时器」的基础上进行优化,以有序树的形式进行任务存储。

树形有序列表时间轮

  • 启动定时器: 有序插入,比较时间按照时间大小有序插入,时间复杂度 O(logn)
  • 终止定时器: 在链表中查找任务,删除节点,时间复杂度 O(logn)
  • 周期清算: 找到执行最后一个过期任务即可,无需遍历整个链表,时间复杂度 O(1)

层级时间轮

整体流程架构图,如下所示。

对应的原理,在这里就不进行赘述了,之前本人已经有两篇文章对层级式时间轮进行了较为详细的介绍了,有需要的小伙伴,可以直接去前几篇文章去学习,接下来我们进行相关的实现。

时间轮数据模型

时间轮(TimingWheel)是一个存储定时任务的环形队列,数组中的每个元素可以存放一个定时任务列表,其中存放了真正的定时任务,如下图所示。

时间轮的最基本逻辑模型,由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs),所以我们先来设计和定义开发对应的时间轮的轮盘模型。命名为Roulette类。

轮盘抽象类-Roulette

之所以定义这个抽象类

public abstract class Roulette {
    // 链表数据-主要用于存储每个延时任务节点
    List<TimewheelTask> tasks = null;
    // 游标指针索引
    protected int index;
    // 时间轮轮盘的容量大小,如果是分钟级别,一般就是60个格
    protected int capacity;
    // 时间轮轮盘的层级,如果是一级,它的上级就是二级
    protected Integer level;
    private AtomicInteger num = new AtomicInteger(0);

   // 构造器
    public Roulette(int capacity, Integer level) {
        this.capacity = capacity;
        this.level = level;
        this.tasks = new ArrayList<>(capacity);
        this.index = 0;
    }
    // 获取当前下表的索引对应的时间轮的任务
    public TimewheelTask getTask() {
        return tasks.get(index);
    }
   // init初始化操作机制
    public List<TimewheelTask> init() {
        long interval = MathTool.power((capacity + 1), level);
        long add = 0;
        TimewheelTask delayTask = null;
        for (int i = 0; i < capacity; i++) {
            add += interval;
            if (level == 0) {
                delayTask = new DefaultDelayTask(level);
            } else {
                delayTask = new SplitDelayTask(level);
            }
            //已经转换为最小的时间间隔
            delayTask.setDelay(add, TimeUnitProvider.getTimeUnit());
            tasks.add(delayTask);
        }
        return tasks;
    }

   // 索引下标移动
    public void indexAdd() {
        this.index++;
        if (this.index >= capacity) {
            this.index = 0;
        }
    }
   // 添加对应的任务到对应的队列里面
    public void addTask(TimewheelTask task) {
        tasks.add(task);
    }
    // 给子类提供的方法进行实现对应的任务添加功能
    public abstract void addTask(int interval, MyTask task);
}
时间轮盘的熟悉信息介绍

链表数据-主要用于存储每个延时任务节点。

List<TimewheelTask> tasks = null;
tasks也可以改成 双向链表 + 数组的结构:即节点存贮的对象中有指针,组成环形,可以通过数组的下标灵活访问每个节点,类似 LinkedHashMap。

游标指针索引

protected int index;

时间轮轮盘的容量大小,如果是分钟级别,一般就是60个格

protected int capacity;

时间轮轮盘的层级,如果是一级,它的上级就是二级

protected Integer level;

init初始化时间轮轮盘对象模型,主要用于分配分配每一个轮盘上面元素的TimewheelTask,用于延时队列的执行任务线程,已经分配对应的每一个节点的延时时间节点数据。

 public List<TimewheelTask> init() {
       //  那么整个时间轮的总体时间跨度(interval)
        long interval = MathTool.power((capacity + 1), level);
        long add = 0;
        TimewheelTask delayTask = null;
        for (int i = 0; i < capacity; i++) {
            add += interval;
            if (level == 0) {
                delayTask = new ExecuteTimewheelTask(level);
            } else {
                delayTask = new MoveTimewheelTask(level);
            }
            //已经转换为最小的时间间隔
            delayTask.setDelay(add, TimeUnitProvider.getTimeUnit());
            tasks.add(delayTask);
        }
        return tasks;
}
  • 整数a的n次幂:interval,计算跨度,主要是各级别之间属于平方倍数

例如,第一层:20 ,第二层:20^2 ......

    //例如 n=7  二进制 0   1                 1          1
    //a的n次幂 = a的2次幂×a的2次幂  ×   a的1次幂×a的1次幂  ×a
    public static long power(long a, int n) {
        int rtn = 1;
        while (n >= 1) {
            if((n & 1) == 1){
                rtn *= a;
            }
            a *= a;
            n = n >> 1;
        }
        return rtn;
    }
TimeUnitProvider工具类

主要用于计算时间单位操作的转换

public class TimeUnitProvider {
    private static TimeUnit unit = TimeUnit.SECONDS;
    public static TimeUnit getTimeUnit() {
        return unit;
    }
}

代码简介:

  • interval:代表着初始化的延时时间数据值,主要用于不同的层次的出发时间数据
  • for (int i = 0; i < capacity; i++) :代表着进行for循环进行添加对应的延时队列任务到集合中
  • add += interval,主要用于添加对应的延时队列的延时数据值!并且分配给当前轮盘得到所有数据节点。

获取当前下标的索引对应的时间轮的任务节点

public TimewheelTask getTask() {
        return tasks.get(index);
}
层级时间轮的Bucket数据桶

在这里我们建立了一个TimewheelBucket类实现了Roulette轮盘模型,从而进行建立对应的我们的层级时间轮的数据模型,并且覆盖了addTask方法。

public class TimewheelBucket extends Roulette {

    public TimewheelBucket(int capacity, Integer level) {
        super(capacity, level);
    }

    public synchronized void addTask(int interval, MyTask task) {
        interval -= 1;
        int curIndex = interval + this.index;
        if (curIndex >= capacity) {
            curIndex = curIndex - capacity;
        }
        tasks.get(curIndex).addTask(task);
    }
}

添加addTask方法,进行获取计算对应的下标,并且此方法add操作才是对外开发调用的,在这里,我们主要实现了根据层级计算出对应的下标进行获取对应的任务执行调度点,将我们外界BizTask,真正的业务操作封装到这个BizTask模型,交由我们的系统框架进行执行。

     public synchronized void addTask(int interval, BizTask task) {
        interval -= 1;
        int curIndex = interval + this.index;
        if (curIndex >= capacity) {
            curIndex = curIndex - capacity;
        }
        tasks.get(curIndex).addTask(task);
    }
时间轮轮盘上的任务点

我们针对于时间轮轮盘的任务点进行设计和定义对应的调度执行任务模型。一个调度任务点,可以帮到关系到多个BizTask,也就是用户提交上来的业务任务线程对象,为了方便采用延时队列的延时处理模式,再次实现了Delayed这个接口,对应的实现代码如下所示:

Delayed接口
public interface Delayed extends Comparable<Delayed> {
    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}
TimewheelTask时间轮刻度点
@Getter
public abstract class TimewheelTask implements Delayed {
    private List<BizTask> tasks = new ArrayList<BizTask>();
    private int level;
    private Long delay;
    private long calDelay;
    private TimeUnit calUnit;
    public TimewheelTask(int level) {
        this.level = level;
    }
    public void setDelay(Long delay, TimeUnit unit) {
        this.calDelay=delay;
        this.calUnit=unit;
    }
    public void calDelay() {
        this.delay = TimeUnit.NANOSECONDS.convert(this.calDelay, this.calUnit) + System.nanoTime(); 
    }
    public long getDelay(TimeUnit unit) {
        return this.delay - System.nanoTime();
    }
    public int compareTo(Delayed o) {
        long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }
    public void addTask(BizTask task) {
        synchronized (this) {
            tasks.add(task);
        }
    }
    public void clear() {
        tasks.clear();
    }
    public abstract void run();
}
  • 业务任务集合:private List<BizTask> tasks = new ArrayList<BizTask>();

    • 层级
    private int level;
    • 延时时间
  private Long delay;
  • 实际用于延时计算的时间,就是底层是统一化所有的延时时间到对应的延时队列

    private long calDelay;
  • 实际用于延时计算的时间,就是底层是统一化所有的延时时间到对应的延时队列(用于统一化的时间单位)

    private TimeUnit calUnit;
添加对应的业务延时任务到轮盘刻度点
    public void addTask(BizTask task) {
        synchronized (this) {
            tasks.add(task);
        }
    }
刻度点的实现类

因为对应的任务可能会需要将下游的业务任务进行升级或者降级,所以我们会针对于执行任务点分为,执行任务刻度点和跃迁任务刻度点两种类型。

  • 执行任务延时队列刻度点
public class ExecuteTimewheelTask extends TimewheelTask {
    public ExecuteTimewheelTask(int level) {
        super(level);
    }
    //到时间执行所有的任务
    public void run() {
        List<BizTask> tasks = getTasks();
        if (CollectionUtils.isNotEmpty(tasks)) {
            tasks.forEach(task -> ThreadPool.submit(task));
        }
    }
}

再次我们就定义执行这些任务的线程池为:

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 100, 3, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(10000),
            new MyThreadFactory("executor"), new ThreadPoolExecutor.CallerRunsPolicy());
  • 跃迁任务延时队列刻度点
public class MoveTimewheelTask extends TimewheelTask {
    public MoveTimewheelTask(int level) {
        super(level);
    }
    //跃迁到其他轮盘,将对应的任务
    public void run() {
        List<BizTask> tasks = getTasks();
        if (CollectionUtils.isNotEmpty(tasks)) {
            tasks.forEach(task -> {
                long delay = task.getDelay();
                TimerWheel.adddTask(task,delay, TimeUnitProvider.getTimeUnit());
            });
        }
    }
}

致辞整个时间轮轮盘的数据模型就定义的差不多了,接下来我们需要定义运行在时间轮盘上面的任务模型,BizTask基础模型。

BizTask基础模型
public abstract class BizTask implements Runnable {
     protected long interval;
     protected int index;
     protected long executeTime;
     public BizTask(long interval, TimeUnit unit, int index) {
          this.interval  = interval;
          this.index = index;
          this.executeTime= TimeUnitProvider.getTimeUnit().convert(interval,unit)+TimeUnitProvider.getTimeUnit().convert(System.nanoTime(),TimeUnit.NANOSECONDS);
     }
     public long getDelay() {
          return this.executeTime - TimeUnitProvider.getTimeUnit().convert(System.nanoTime(), TimeUnit.NANOSECONDS);
     }
}

主要针对于任务执行,需要交给线程池去执行,故此,实现了Runnable接口。

  • protected long interval;:跨度操作
  • protected int index;:索引下表,在整个队列里面的下表处理
  • protected long executeTime;:对应的执行时间

其中最重要的便是获取延时时间的操作,主要提供给框架的Delayed接口进行判断是否到执行时间了。

     public long getDelay() {
          return this.executeTime - TimeUnitProvider.getTimeUnit().convert(System.nanoTime(), TimeUnit.NANOSECONDS);
     }
层级时间轮的门面TimerWheel

最后我们要进行定义和设计开发对应的整体的时间轮层级模型。

public class TimerWheel {

    private static Map<Integer, TimewheelBucket> cache = new ConcurrentHashMap<>();
    //一个轮表示三十秒
    private static int interval = 30;
    private static wheelThread wheelThread;

    public static void adddTask(BizTask task, Long time, TimeUnit unit) {
        if(task == null){
            return;
        }
        long intervalTime = TimeUnitProvider.getTimeUnit().convert(time, unit);
        if(intervalTime < 1){
            ThreadPool.submit(task);
            return;
        }
        Integer[] wheel = getWheel(intervalTime,interval);
        TimewheelBucket taskList = cache.get(wheel[0]);
        if (taskList != null) {
            taskList.addTask(wheel[1], task);
        } else {
            synchronized (cache) {
                if (cache.get(wheel[0]) == null) {
                    taskList = new TimewheelBucket(interval-1, wheel[0]);
                    wheelThread.add(taskList.init());
                    cache.putIfAbsent(wheel[0],taskList);
                }
            }
            taskList.addTask(wheel[1], task);
        }
    }
    static{
        interval = 30;
        wheelThread = new wheelThread();
        wheelThread.setDaemon(false);
        wheelThread.start();
    }
    private static Integer[] getWheel(long intervalTime,long baseInterval) {
        //转换后的延时时间
        if (intervalTime < baseInterval) {
            return new Integer[]{0, Integer.valueOf(String.valueOf((intervalTime % 30)))};
        } else {
            return getWheel(intervalTime,baseInterval,baseInterval, 1);
        }
    }

    private static Integer[] getWheel(long intervalTime,long baseInterval,long interval, int p) {
         long nextInterval = baseInterval * interval;
        if (intervalTime < nextInterval) {
            return new Integer[]{p, Integer.valueOf(String.valueOf(intervalTime / interval))};
        } else {
            return getWheel(intervalTime,baseInterval,nextInterval, (p+1));
        }
    }

    static class wheelThread extends Thread {
        DelayQueue<TimewheelTask> queue = new DelayQueue<TimewheelTask>();

        public DelayQueue<TimewheelTask> getQueue() {
            return queue;
        }

        public void add(List<TimewheelTask> tasks) {
            if (CollectionUtils.isNotEmpty(tasks)) {
                tasks.forEach(task -> add(task));
            }
        }

        public void add(TimewheelTask task) {
            task.calDelay();
            queue.add(task);
        }

        @Override
        public void run() {
            while (true) {
                try {
                    TimewheelTask task = queue.take();
                    int p = task.getLevel();
                    long nextInterval = MathTool.power(interval, Integer.valueOf(String.valueOf(MathTool.power(2, p))));
                    TimewheelBucket timewheelBucket = cache.get(p);
                    synchronized (timewheelBucket) {
                        timewheelBucket.indexAdd();
                        task.run();
                        task.clear();
                    }
                    task.setDelay(nextInterval, TimeUnitProvider.getTimeUnit());
                    task.calDelay();
                    queue.add(task);
                } catch (InterruptedException e) {

                }
            }
        }
    }
}
TimerWheel的模型定义
private static Map<Integer, TimewheelBucket> cache = new ConcurrentHashMap<>();

一个轮表示30秒的整体跨度。

private static int interval = 30;

创建整体驱动的执行线程

private static wheelThread wheelThread;

 static{
        interval = 30;
        wheelThread = new wheelThread();
        wheelThread.setDaemon(false);
        wheelThread.start();
}

    static class wheelThread extends Thread {
        DelayQueue<TimewheelTask> queue = new DelayQueue<TimewheelTask>();
        public DelayQueue<TimewheelTask> getQueue() {
            return queue;
        }
        public void add(List<TimewheelTask> tasks) {
            if (CollectionUtils.isNotEmpty(tasks)) {
                tasks.forEach(task -> add(task));
            }
        }
        public void add(TimewheelTask task) {
            task.calDelay();
            queue.add(task);
        }
        @Override
        public void run() {
            while (true) {
                try {
                    TimewheelTask task = queue.take();
                    int p = task.getLevel();
                    long nextInterval = MathTool.power(interval, Integer.valueOf(String.valueOf(MathTool.power(2, p))));
                    TimewheelBucket timewheelBucket = cache.get(p);
                    synchronized (timewheelBucket) {
                        timewheelBucket.indexAdd();
                        task.run();
                        task.clear();
                    }
                    task.setDelay(nextInterval, TimeUnitProvider.getTimeUnit());
                    task.calDelay();
                    queue.add(task);
                } catch (InterruptedException e) {

                }
            }
   }
获取对应的时间轮轮盘模型体系
    private static Integer[] getWheel(long intervalTime,long baseInterval) {
        //转换后的延时时间
        if (intervalTime < baseInterval) {
            return new Integer[]{0, Integer.valueOf(String.valueOf((intervalTime % 30)))};
        } else {
            return getWheel(intervalTime,baseInterval,baseInterval, 1);
        }
    }

    private static Integer[] getWheel(long intervalTime,long baseInterval,long interval, int p) {
         long nextInterval = baseInterval * interval;
        if (intervalTime < nextInterval) {
            return new Integer[]{p, Integer.valueOf(String.valueOf(intervalTime / interval))};
        } else {
            return getWheel(intervalTime,baseInterval,nextInterval, (p+1));
        }
    }

到这里相信大家,基本上应该是了解了如何去实现对应的时间轮盘的技术实现过程,有兴趣希望整个完整源码的,可以联系我哦。谢谢大家!

相关文章
|
1月前
|
机器学习/深度学习 算法 数据挖掘
K-means聚类算法是机器学习中常用的一种聚类方法,通过将数据集划分为K个簇来简化数据结构
K-means聚类算法是机器学习中常用的一种聚类方法,通过将数据集划分为K个簇来简化数据结构。本文介绍了K-means算法的基本原理,包括初始化、数据点分配与簇中心更新等步骤,以及如何在Python中实现该算法,最后讨论了其优缺点及应用场景。
99 4
|
2月前
|
存储 Java 开发者
Java Map实战:用HashMap和TreeMap轻松解决复杂数据结构问题!
【10月更文挑战第17天】本文深入探讨了Java中HashMap和TreeMap两种Map类型的特性和应用场景。HashMap基于哈希表实现,支持高效的数据操作且允许键值为null;TreeMap基于红黑树实现,支持自然排序或自定义排序,确保元素有序。文章通过具体示例展示了两者的实战应用,帮助开发者根据实际需求选择合适的数据结构,提高开发效率。
77 2
|
4天前
|
存储 运维 监控
探索局域网电脑监控软件:Python算法与数据结构的巧妙结合
在数字化时代,局域网电脑监控软件成为企业管理和IT运维的重要工具,确保数据安全和网络稳定。本文探讨其背后的关键技术——Python中的算法与数据结构,如字典用于高效存储设备信息,以及数据收集、异常检测和聚合算法提升监控效率。通过Python代码示例,展示了如何实现基本监控功能,帮助读者理解其工作原理并激发技术兴趣。
43 20
|
9天前
|
机器学习/深度学习 前端开发 算法
婚恋交友系统平台 相亲交友平台系统 婚恋交友系统APP 婚恋系统源码 婚恋交友平台开发流程 婚恋交友系统架构设计 婚恋交友系统前端/后端开发 婚恋交友系统匹配推荐算法优化
婚恋交友系统平台通过线上互动帮助单身男女找到合适伴侣,提供用户注册、个人资料填写、匹配推荐、实时聊天、社区互动等功能。开发流程包括需求分析、技术选型、系统架构设计、功能实现、测试优化和上线运维。匹配推荐算法优化是核心,通过用户行为数据分析和机器学习提高匹配准确性。
38 3
|
28天前
|
数据采集 存储 算法
Python 中的数据结构和算法优化策略
Python中的数据结构和算法如何进行优化?
|
1月前
|
算法
数据结构之路由表查找算法(深度优先搜索和宽度优先搜索)
在网络通信中,路由表用于指导数据包的传输路径。本文介绍了两种常用的路由表查找算法——深度优先算法(DFS)和宽度优先算法(BFS)。DFS使用栈实现,适合路径问题;BFS使用队列,保证找到最短路径。两者均能有效查找路由信息,但适用场景不同,需根据具体需求选择。文中还提供了这两种算法的核心代码及测试结果,验证了算法的有效性。
103 23
|
27天前
|
机器学习/深度学习 算法 数据挖掘
C语言在机器学习中的应用及其重要性。C语言以其高效性、灵活性和可移植性,适合开发高性能的机器学习算法,尤其在底层算法实现、嵌入式系统和高性能计算中表现突出
本文探讨了C语言在机器学习中的应用及其重要性。C语言以其高效性、灵活性和可移植性,适合开发高性能的机器学习算法,尤其在底层算法实现、嵌入式系统和高性能计算中表现突出。文章还介绍了C语言在知名机器学习库中的作用,以及与Python等语言结合使用的案例,展望了其未来发展的挑战与机遇。
44 1
|
27天前
|
并行计算 算法 测试技术
C语言因高效灵活被广泛应用于软件开发。本文探讨了优化C语言程序性能的策略,涵盖算法优化、代码结构优化、内存管理优化、编译器优化、数据结构优化、并行计算优化及性能测试与分析七个方面
C语言因高效灵活被广泛应用于软件开发。本文探讨了优化C语言程序性能的策略,涵盖算法优化、代码结构优化、内存管理优化、编译器优化、数据结构优化、并行计算优化及性能测试与分析七个方面,旨在通过综合策略提升程序性能,满足实际需求。
61 1
|
1月前
|
算法 测试技术 开发者
在Python开发中,性能优化和代码审查至关重要。性能优化通过改进代码结构和算法提高程序运行速度,减少资源消耗
在Python开发中,性能优化和代码审查至关重要。性能优化通过改进代码结构和算法提高程序运行速度,减少资源消耗;代码审查通过检查源代码发现潜在问题,提高代码质量和团队协作效率。本文介绍了一些实用的技巧和工具,帮助开发者提升开发效率。
49 3
|
2月前
|
机器学习/深度学习 存储 人工智能
数据结构在实际开发中的广泛应用
【10月更文挑战第20天】数据结构是软件开发的基础,它们贯穿于各种应用场景中,为解决实际问题提供了有力的支持。不同的数据结构具有不同的特点和优势,开发者需要根据具体需求选择合适的数据结构,以实现高效、可靠的程序设计。
128 7