调度线程池ScheduledThreadPoolExecutor源码解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 调度线程池ScheduledThreadPoolExecutor源码解析

前言


ScheduledThreadPoolExecutor可以用来很方便实现我们的调度任务,具体使用可以参考调度线程池ScheduledThreadPoolExecutor的正确使用姿势这篇文章,那大家知道它是怎么实现的吗,本文就带大家来揭晓谜底。


实现机制分析


我们先思考下,如果让大家去实现ScheduledThreadPoolExecutor可以周期性执行任务的功能,需要考虑哪些方面呢?

  1. ScheduledThreadPoolExecutor的整体实现思路是什么呢?

答: 我们是不是可以继承线程池类,按照线程池的思路,将任务先丢到阻塞队列中,等到时间到了,工作线程就从阻塞队列获取任务执行。

  1. 如何实现等到了未来的时间点就开始执行呢?

答: 我们可以根据参数获取这个任务还要多少时间执行,那么我们是不是可以从阻塞队列中获取任务的时候,通过条件队列的的awaitNanos(delay)方法,阻塞一定时间。

  1. 如何实现 任务的重复性执行呢?

答:这就更加简单了,任务执行完成后,把她再次加入到队列不就行了吗。

1671196370297.jpg


源码解析


类结构图


1671196378935.jpg

ScheduledThreadPoolExecutor的类结构图如上图所示,很明显它是在我们的线程池ThreadPoolExecutor框架基础上扩展的。

  • ScheduledExecutorService:实现了该接口,封装了调度相关的API
  • ThreadPoolExecutor:继承了该类,保留了线程池的能力和整个实现的框架
  • DelayedWorkQueue:内部类,延迟阻塞队列。
  • ScheduledFutureTask:延迟任务对象,包含了任务、任务状态、剩余的时间、结果等信息。


重要属性


通过ScheduledThreadPoolExecutor类的成员属性,我们可以了解它的数据结构。

  1. shutdown 后是否继续执行周期任务(重复执行)
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
  1. shutdown 后是否继续执行延迟任务(只执行一次)
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
  1. 调用cancel()方法后,是否将该任务从队列中移除,默认false
private volatile boolean removeOnCancel = false;
  1. 任务的序列号,保证FIFO队列的顺序,用来比较优先级
private static final AtomicLong sequencer = new AtomicLong()
  1. ScheduledFutureTask延迟任务类
  • ScheduledFutureTask 继承 FutureTask,实现 RunnableScheduledFuture 接口,无论是 runnable 还是 callable,无论是否需要延迟和定时,所有的任务都会被封装成 ScheduledFutureTask
  • 该类具有延迟执行的特点, 覆盖 FutureTaskrun 方法来实现对延时执行、周期执行的支持。
  • 对于延时任务调用 FutureTask#run,而对于周期性任务则调用 FutureTask#runAndReset 并且在成功之后根据 fixed-delay/fixed-rate 模式来设置下次执行时间并重新将任务塞到工作队列。
  • 成员属性如下:
// 任务序列号
private final long sequenceNumber;
// 任务可以被执行的时间,交付时间,以纳秒表示
private long time;  
// 0 表示非周期任务
// 正数表示 fixed-rate(两次开始启动的间隔)模式的周期,
// 负数表示 fixed-delay(一次执行结束到下一次开始启动) 模式
private final long period;  
// 执行的任务对象
RunnableScheduledFuture<V> outerTask = this;
// 任务在队列数组中的索引下标, -1表示删除
int heapIndex;
  1. DelayedWorkQueue延迟队列
  • DelayedWorkQueue 是支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue(小根堆、满二叉树)存储元素。
  • 内部数据结构是数组,所以延迟队列出队头元素后需要让其他元素(尾)替换到头节点,防止空指针异常。
  • 成员属性如下:
// 初始容量
private static final int INITIAL_CAPACITY = 16; 
// 节点数量
private int size = 0;
// 存放任务的数组
private RunnableScheduledFuture<?>[] queue = 
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; 
// 控制并发用的锁
private final ReentrantLock lock = new ReentrantLock(); 
// 条件队列
private final Condition available = lock.newCondition();
//指定用于等待队列头节点任务的线程
private Thread leader = null;


提交延迟任务schedule()原理


延迟执行方法,并指定延迟执行的时间,只会执行一次。

  1. schedule()方法是延迟任务方法的入口。
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    // 判空处理
    if (command == null || unit == null)
        throw new NullPointerException();
    // 将外部传入的任务封装成延迟任务对象ScheduledFutureTask
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    // 执行延迟任务
    delayedExecute(t);
    return t;
}
  1. decorateTask(...) 该方法是封装延迟任务
  • 调用triggerTime(delay, unit)方法计算延迟的时间。
// 返回【当前时间 + 延迟时间】,就是触发当前任务执行的时间
private long triggerTime(long delay, TimeUnit unit) {
    // 设置触发的时间
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
    // 如果 delay < Long.Max_VALUE/2,则下次执行时间为当前时间 +delay
    // 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay
    return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
// 下面这种情况很少,大家看不懂可以不用强行理解
// 如果某个任务的 delay 为负数,说明当前可以执行(其实早该执行了)。
// 阻塞队列中维护任务顺序是基于 compareTo 比较的,比较两个任务的顺序会用 time 相减。
// 那么可能出现一个 delay 为正数减去另一个为负数的 delay,结果上溢为负数,则会导致 compareTo 产生错误的结果
private long overflowFree(long delay) {
    Delayed head = (Delayed) super.getQueue().peek();
    if (head != null) {
        long headDelay = head.getDelay(NANOSECONDS);
        // 判断一下队首的delay是不是负数,如果是正数就不用管,怎么减都不会溢出
        // 否则拿当前 delay 减去队首的 delay 来比较看,如果不出现上溢,排序不会乱
    // 不然就把当前 delay 值给调整为 Long.MAX_VALUE + 队首 delay
        if (headDelay < 0 && (delay - headDelay < 0))
            delay = Long.MAX_VALUE + headDelay;
    }
    return delay;
}
  • 调用RunnableScheduledFuture的构造方法封装为延迟任务
ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
     // 任务的触发时间
    this.time = ns;
     // 任务的周期, 延迟任务的为0,因为不需要重复执行
    this.period = 0;
    // 任务的序号 + 1
    this.sequenceNumber = sequencer.getAndIncrement();
}
  • 调用decorateTask()方法装饰延迟任务
// 没有做任何操作,直接将 task 返回,该方法主要目的是用于子类扩展
protected <V> RunnableScheduledFuture<V> decorateTask(
    Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}


提交周期任务scheduleAtFixedRate()原理


按照固定的评率周期性的执行任务,捕手renwu,一次任务的启动到下一次任务的启动的间隔

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    // 任务封装,【指定初始的延迟时间和周期时间】
    ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(initialDelay, unit), unit.toNanos(period));
    // 默认返回本身
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 开始执行这个任务
    delayedExecute(t);
    return t;
}


提交周期任务scheduleWithFixedDelay()原理


按照指定的延时周期性执行任务,上一个任务执行完毕后,延时一定时间,再次执行任务。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null) 
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    // 任务封装,【指定初始的延迟时间和周期时间】,周期时间为 - 表示是 fixed-delay 模式
    ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(initialDelay, unit), unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
     // 开始执行这个任务
    delayedExecute(t);
    return t;
}


执行任务delayedExecute(t)原理


上面多种提交任务的方式,殊途同归,最终都会调用delayedExecute()方法执行延迟或者周期任务。

  • delayedExecute()方法是执行延迟任务的入口
private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 线程池是 SHUTDOWN 状态,执行拒绝策略
    if (isShutdown())
        // 调用拒绝策略的方法
        reject(task);
    else {
        // 把当前任务放入阻塞队列
        super.getQueue().add(task);
        // 线程池状态为 SHUTDOWN 并且不允许执行任务了,就从队列删除该任务,并设置任务的状态为取消状态
        // 非主流程,可以跳过,不重点看了
        if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
            task.cancel(false);
        else
            // 开始执行了哈
            ensurePrestart();
    }
}
  • ensurePrestart()方法开启线程执行
// ThreadPoolExecutor#ensurePrestart
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    // worker数目小于corePoolSize,则添加一个worker。
    if (wc < corePoolSize)
        // 第二个参数 true 表示采用核心线程数量限制,false 表示采用 maximumPoolSize
        addWorker(null, true);
    // corePoolSize = 0的情况,至少开启一个线程,【担保机制】
    else if (wc == 0)
        addWorker(null, false);
}

addWorker()方法实际上父类ThreadPoolExecutor的方法,这个方法在该文章 Java线程池源码深度解析中详细介绍过,这边做个总结:

  • 如果线程池中工作线程数量小于最大线程数,创建工作线程,执行任务。
  • 如果线程池重工作线程数量大于最大线程数,直接返回。


获取延迟任务take()原理


目前工作线程已经创建好了,工作线程开始工作了,它会从阻塞队列中获取延迟任务执行,这部分也是线程池里面的原理,不做展开,那我们看下它是如何实现延迟执行的? 主要关注如何从阻塞队列中获取任务。

  1. DelayedWorkQueue#take()方法获取延迟任务
  • 该方法会在上面的addWoker()方法创建工作线程后,工作线程中循环持续调用workQueue.take()方法获取延迟任务。
  • 该方法主要获取延迟队列中任务延迟时间小于等于0 的任务。
  • 如果延迟时间不小于0,那么调用条件队列的awaitNanos(delay)阻塞方法等待一段时间,等时间到了,延迟时间自然小于等于0了。
  • 获取到任务后,工作线程就可以开始执行调度任务了。
// DelayedWorkQueue#take()
public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加可中断锁
    lock.lockInterruptibly();
    try {
        // 自旋
        for (;;) {
            // 获取阻塞队列中的头结点
            RunnableScheduledFuture<?> first = queue[0];
            // 如果阻塞队列没有数据,为空
            if (first == null)
                // 等待队列不空,直至有任务通过 offer 入队并唤醒
                available.await();
            else {
                // 获取头节点的的任务还剩余多少时间才执行
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    // 到达触发时间,获取头节点并调整堆,重新选择延迟时间最小的节点放入头部
                    return finishPoll(first);
                // 逻辑到这说明头节点的延迟时间还没到
                first = null;
                // 说明有 leader 线程在等待获取头节点,当前线程直接去阻塞等待
                if (leader != null)
                  // 当前线程阻塞
                    available.await();
                else {
                    // 没有 leader 线程,【当前线程作为leader线程,并设置头结点的延迟时间作为阻塞时间】
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 当前线程通过awaitNanos方法等待delay时间后,会自动唤醒,往后面继续执行
                        available.awaitNanos(delay);
                        // 到达阻塞时间时,当前线程会从这里醒来,进入下一轮循环,就有可能执行了
                    } finally {
                        // t堆顶更新,leader 置为 null,offer 方法释放锁后,
                        // 有其它线程通过 take/poll 拿到锁,读到 leader == null,然后将自身更新为leader。
                        if (leader == thisThread)
                            // leader 置为 null 用以接下来判断是否需要唤醒后继线程
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 没有 leader 线程并且头结点不为 null,唤醒阻塞获取头节点的线程,
        // 【如果没有这一步,就会出现有了需要执行的任务,但是没有线程去执行】
        if (leader == null && queue[0] != null)
            available.signal();
        // 解锁
        lock.unlock();
    }
}
  1. finishPoll()方法获取到任务后执行

该方法主要做两个事情, 获取头节点并调整堆,重新选择延迟时间最小的节点放入头部。

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    // 获取尾索引
    int s = --size;
    // 获取尾节点
    RunnableScheduledFuture<?> x = queue[s];
    // 将堆结构最后一个节点占用的 slot 设置为 null,因为该节点要尝试升级成堆顶,会根据特性下调
    queue[s] = null;
    // s == 0 说明 当前堆结构只有堆顶一个节点,此时不需要做任何的事情
    if (s != 0)
        // 从索引处 0 开始向下调整
        siftDown(0, x);
    // 出队的元素索引设置为 -1
    setIndex(f, -1);
    return f;
}


延迟任务运行的原理


从延迟队列中获取任务后,工作线程会调用延迟任务的run()方法执行任务。

  1. ScheduledFutureTask#run()方法运行任务
  • 调用isPeriodic()方法判断任务是否是周期性任务还是非周期性任务
  • 如果任务是非周期任务,就调用父类的FutureTask#run()执行一次
  • 如果任务是非周期任务,就调用父类的FutureTask#runAndReset(), 返回true会设置下一次的执行时间,重新放入线程池的阻塞队列中,等待下次获取执行
public void run() {
    // 是否周期性,就是判断 period 是否为 0
    boolean periodic = isPeriodic();
    // 根据是否是周期任务检查当前状态能否执行任务,不能执行就取消任务
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 非周期任务,直接调用 FutureTask#run 执行一次
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 周期任务的执行,返回 true 表示执行成功
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 设置周期任务的下一次执行时间
        setNextRunTime();
        // 任务的下一次执行安排,如果当前线程池状态可以执行周期任务,加入队列,并开启新线程
        reExecutePeriodic(outerTask);
    }
}
  1. FutureTask#runAndReset()执行周期性任务
  • 周期任务正常完成后任务的状态不会变化,依旧是 NEW,不会设置 outcome 属性。
  • 但是如果本次任务执行出现异常,会进入 setException 方法将任务状态置为异常,把异常保存在 outcome 中。
  • 方法返回 false,后续的该任务将不会再周期的执行
protected boolean runAndReset() {
    // 任务不是新建的状态了,或者被别的线程执行了,直接返回 false
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                // 执行方法,没有返回值
                c.call();
                ran = true;
            } catch (Throwable ex) {
                // 出现异常,把任务设置为异常状态,唤醒所有的 get 阻塞线程
                setException(ex);
            }
        }
    } finally {
    // 执行完成把执行线程引用置为 null
        runner = null;
        s = state;
        // 如果线程被中断进行中断处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    // 如果正常执行,返回 true,并且任务状态没有被取消
    return ran && s == NEW;
}
  1. ScheduledFutureTask#setNextRunTime()设置下次执行时间
  • 如果属性period大于0,表示fixed-rate模式,直接加上period时间即可。
  • 如果属性period小于等于0, 表示是fixed-delay模式, 调用triggerTime重新计算下次时间。
// 任务下一次的触发时间
private void setNextRunTime() {
    long p = period;
    if (p > 0)
        // fixed-rate 模式,【时间设置为上一次执行任务的时间 + p】,两次任务执行的时间差
        time += p;
    else
        // fixed-delay 模式,下一次执行时间是【当前这次任务结束的时间(就是现在) + delay 值】
        time = triggerTime(-p);
}
  1. ScheduledFutureTask#reExecutePeriodic(),重新放入阻塞任务队列,等待获取,进行下一轮执行
// ScheduledThreadPoolExecutor#reExecutePeriodic
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        // 【放入任务队列】
        super.getQueue().add(task);
        // 如果提交完任务之后,线程池状态变为了 shutdown 状态,需要再次检查是否可以执行,
        // 如果不能执行且任务还在队列中未被取走,则取消任务
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            // 当前线程池状态可以执行周期任务,加入队列,并【根据线程数量是否大于核心线程数确定是否开启新线程】
            ensurePrestart();
    }
}


目录
相关文章
|
10天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
10天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
10天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
11天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
87 2
|
3月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
87 0
|
3月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
68 0
|
3月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
73 0
|
3月前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
96 0
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
57 12

推荐镜像

更多