java源码-ScheduledThreadPoolExecutor

简介: 开篇  ScheduledThreadPoolExecutor的文章主要需要讲清楚一个核心概念(划重点),如何通过数据结构实现定时调度功能,顺带了解下如何实现循环调度功能。

开篇

  ScheduledThreadPoolExecutor的文章主要需要讲清楚一个核心概念(划重点),如何通过数据结构实现定时调度功能,顺带了解下如何实现循环调度功能。
 ScheduledThreadPoolExecutor的周期性在于执行完任务后重新计算下一次周期并提交到DelayedWorkQueue当中,判断是否到执行时间通过DelayedWorkQueue(堆排序)实现。


类图

 ScheduledThreadPoolExecutor的类依赖图当中,我们可以得出以下几个结论:

  • 继承自ThreadPoolExecutor,所以具备ThreadPoolExecutor的特性。
  • 实现ScheduledExecutoService,提供一系列Schedule特性相关的接口。
img_18e49126468395627f192e551ce31a3b.png
ScheduledThreadPoolExecutor类依赖图
img_3209358ef2e0bfc7f4aa4c0b3830d01d.png
ScheduledThreadPoolExecutor接口


类源码

  • ScheduledThreadPoolExecutor的构造函数内部通过ThreadPoolExecutor的构造函数去实现核心结构的构造的。
  • ScheduledThreadPoolExecutor通过DelayedWorkQueue去保存任务,这样保证了按照时间的倒序访问,直接可以通过判断第一个节点的时间来判定是否需要执行调度任务。
  • ScheduledThreadPoolExecutor的keepAliveTime设置为0,那么如果有空闲线程就立即回收
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }


任务提交

  • ScheduledThreadPoolExecutor的任务提交方式有两种:submit和shedule。
  • 内部创建ScheduledFutureTask对象对运行的task进行简单封装。
  • 内部执行delayedExecute()方法提交任务。
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {

    public Future<?> submit(Runnable task) {
        return schedule(task, 0, NANOSECONDS);
    }

    public <T> Future<T> submit(Runnable task, T result) {
        return schedule(Executors.callable(task, result), 0, NANOSECONDS);
    }

    public <T> Future<T> submit(Callable<T> task) {
        return schedule(task, 0, NANOSECONDS);
    }

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
}

  • delayedExecute()方法内部判断线程池是否已经关闭,关闭则直接拒绝任务。
  • delayedExecute()方法直接通过DelayedWorkQueue的add方法添加任务。
  • DelayedWorkQueue其实是类似DelayedQueue的基于最小堆的数据结构。
  • DelayedWorkQueue按照提交任务的过期时间进行排序,最快过期数据在堆顶。
  • delayedExecute()方法内部会判断当前核心线程池的大小,如果过少就增加核心线程。
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                // 启动工作线程来执行定时调度任务
                ensurePrestart();
        }
    }

    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        // 启动核心工作线程
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }


任务单元

 ScheduledFutureTask的核心变量主要包括:

  • time:延迟周期也就是执行任务时间点
  • period:周期执行的时间配置
  • outerTask:执行的任务

  ScheduledFutureTask的执行逻辑run()方法负责执行调度任务:

  • 如果是非周期性任务,那么就执行一次。
  • 如果是周期性任务,那么先执行任务后重新计算下一次周期重新投递到队列中。
private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

        private final long sequenceNumber;
        // 延迟周期也就是执行任务时间点
        private long time;
        // 周期执行的时间配置
        private final long period;
        // 执行任务
        RunnableScheduledFuture<V> outerTask = this;
        int heapIndex;

        ScheduledFutureTask(Runnable r, V result, long ns) {
            super(r, result);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        ScheduledFutureTask(Callable<V> callable, long ns) {
            super(callable);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        // 计算是否到期,用下一次执行时间减去当前时间来判断
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), NANOSECONDS);
        }

        public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

        public boolean isPeriodic() {
            return period != 0;
        }
        
        // 设置下一个运行时间,如果周期性的就在当前时间+周期性时间
        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }

        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            if (cancelled && removeOnCancel && heapIndex >= 0)
                remove(this);
            return cancelled;
        }

        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            // 如果不是周期性任务,那么就执行一次
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                // 如果是周期性任务,那么就重新计算下一次任务周期并重新添加任务队列当中
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }
    }

    // 重新提交任务到DelayedWorkQueue
    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

计算下一次执行任务的时间,triggerTime()就是完成这个任务的。

    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }

    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

    private long overflowFree(long delay) {
        Delayed head = (Delayed) super.getQueue().peek();
        if (head != null) {
            long headDelay = head.getDelay(NANOSECONDS);
            if (headDelay < 0 && (delay - headDelay < 0))
                delay = Long.MAX_VALUE + headDelay;
        }
        return delay;
    }


任务调度

  任务调度通过ThreadPoolExecutor的worker的run()方法进入while()循环调用实现任务的消费。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }
    }


    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }


参考文章

java源码-ThreadPoolExecutor(1)
java源码-ThreadPoolExecutor(2)
java源码-ThreadPoolExecutor(3)
java源码-DelayQueue
深入理解scheduledthreadpoolexecutor


彩蛋

倚楼听风雨,淡看江湖路;莫问前尘有悔,唯愿今朝无憾。
目录
相关文章
|
2天前
|
安全 小程序 Java
基于Java实训中心管理系统设计和实现(源码+LW+调试文档+讲解等)
基于Java实训中心管理系统设计和实现(源码+LW+调试文档+讲解等)
|
2天前
|
存储 安全 Java
基于Java+MySQL停车场车位管理系统详细设计和实现(源码+LW+调试文档+讲解等)
基于Java+MySQL停车场车位管理系统详细设计和实现(源码+LW+调试文档+讲解等)
|
2天前
|
存储 小程序 Java
基于Java图书馆管理系统详细设计和实现(源码+LW+调试文档+讲解等)
基于Java图书馆管理系统详细设计和实现(源码+LW+调试文档+讲解等)
|
3天前
|
搜索推荐 算法 小程序
基于Java协同过滤算法的电影推荐系统设计和实现(源码+LW+调试文档+讲解等)
基于Java协同过滤算法的电影推荐系统设计和实现(源码+LW+调试文档+讲解等)
|
3天前
|
搜索推荐 算法 小程序
基于Java协同过滤算法的图书推荐系统设计和实现(源码+LW+调试文档+讲解等)
基于Java协同过滤算法的图书推荐系统设计和实现(源码+LW+调试文档+讲解等)
|
2天前
|
数据采集 监控 前端开发
JAVA公立医院绩效考核管理系统源码-对接HIS数据
在医院的工作和管理上,院领导需要对院内工作人员的工作情况进行了解、评价和监控。 下面将对医院绩效管理系统的HIS数据流程加以阐述。
10 1
JAVA公立医院绩效考核管理系统源码-对接HIS数据
|
3天前
|
安全 小程序 Java
基于Java考研助手网站设计和实现(源码+LW+调试文档+讲解等)
基于Java考研助手网站设计和实现(源码+LW+调试文档+讲解等)
|
2天前
|
移动开发 小程序 关系型数据库
java+ IDEA+ Uniapp+ mysql医院3D智能导诊系统源码
这是一个基于Java、IDEA、Uniapp和MySQL的医院3D智能导诊系统,采用Springboot后端框架和Redis、Mybatis Plus、RocketMQ等技术。系统通过对话式交互,精准推荐就诊科室,解决患者挂号困扰。它还具备智能预问诊功能,提升诊疗效率和准确性,确保医生能快速了解患者详情。此系统还支持小程序和H5,方便患者使用。
8 0
|
2天前
|
小程序 Java 数据库
基于Java作业管理系统设计和实现(源码+LW+调试文档+讲解等)
基于Java作业管理系统设计和实现(源码+LW+调试文档+讲解等)
|
2天前
|
小程序 Java 关系型数据库
基于Java学生选课管理系统设计和实现(源码+LW+调试文档+讲解等)
基于Java学生选课管理系统设计和实现(源码+LW+调试文档+讲解等)