概述
Java Review - 并发编程_ThreadPoolExecutor原理&源码剖析 我们复习了Java中线程池ThreadPoolExecutor的原理,ThreadPoolExecutor只是Executors工具类的一部分功能。
下面来介绍另外一部分功能,也就是ScheduledThreadPoolExecutor的实现,这是一个可以在指定一定延迟时间后或者定时进行任务调度执行的线程池。
类结构
Executors其实是个工具类,它提供了好多静态方法,可根据用户的选择返回不同的线程池实例。
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口。
线程池队列是DelayedWorkQueue,其和DelayedQueue类似,是一个延迟队列
ScheduledFutureTask是具有返回值的任务,继承自FutureTask。FutureTask的内部有一个变量state用来表示任务的状态,一开始状态为NEW,所有状态为
private static final int NEW = 0; // 初始状态 private static final int COMPLETING = 1; // 执行中 private static final int NORMAL = 2; // 正常运行结束状态 private static final int EXCEPTIONAL = 3; // 运行中异常 private static final int CANCELLED = 4; // 任务被取消 private static final int INTERRUPTING = 5; // 任务正在被中断 private static final int INTERRUPTED = 6; // 任务已经被中断
可能的任务状态转换路径为
NEN-> COMPLETING-> NORMAL//初始状态->执行中ー>正常结東 NEN-> COMPILETING-> EXCEPTIONAL//初始状态->执行中ー>执行异常 NEN-> CANCELLED//初始状态一>任务取消 NEN-> INTERRUPTING-> INTERRUPTED//初始状态->被中断中->被中断
ScheduledFutureTask内部还有一个变量period用来表示任务的类型,任务类型如下
period=0,说明当前任务是一次性的,执行完毕后就退出了。
period为负数,说明当前任务为fixed-delay任务,是固定延迟的定时可重复执行任务。
period为正数,说明当前任务为fixed-rate任务,是固定频率的定时可重复执行任务
- ScheduledThreadPoolExecutor的一个构造函数如下,由该构造函数可知线程池队列是DelayedWorkQueue。
/** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given core pool size. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @throws IllegalArgumentException if {@code corePoolSize < 0} */ // 使用改造后的DelayQueue public ScheduledThreadPoolExecutor(int corePoolSize) { // 调用父类ThreadPoolExecutor的构造函数 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
核心方法&源码解析
schedule(Runnable command, long delay,TimeUnit unit)
该方法的作用是提交一个延迟执行的任务,任务从提交时间算起延迟单位为unit的delay时间后开始执行。提交的任务不是周期性任务,任务只会执行一次.
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { // 1 参数校验 if (command == null || unit == null) throw new NullPointerException(); // 2 任务转换 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); // 3 添加任务到延迟队列 delayedExecute(t); return t; }
代码(1)进行参数校验,如果command或者unit为null,则抛出NPE异常。
代码(2)装饰任务,把提交的command任务转换为ScheduledFutureTask。
ScheduledFutureTask是具体放入延迟队列里面的东西。由于是延迟任务,所以ScheduledFutureTask实现了long getDelay(TimeUnit unit)和int compareTo(Delayed other)方法。triggerTime方法将延迟时间转换为绝对时间,也就是把当前时间的纳秒数加上延迟的纳秒数后的long型值。
ScheduledFutureTask的构造函数如下。
/** * Creates a one-shot action with given nanoTime-based trigger time. */ ScheduledFutureTask(Runnable r, V result, long ns) { // 调用父类FutureTask的构造函数 super(r, result); this.time = ns; this.period = 0; // 0 说明是一次性任务 this.sequenceNumber = sequencer.getAndIncrement(); }
在构造函数内部首先调用了父类FutureTask的构造函数,父类FutureTask的构造函数代码如下。
public FutureTask(Runnable runnable, V result) { // 通过适配器把runnable转换为callable this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable 设置状态为NEW }
FutureTask中的任务被转换为Callable类型后,被保存到了变量this.callable里面,并设置FutureTask的任务状态为NEW。
然后在ScheduledFutureTask构造函数内部设置time为上面说的绝对时间。需要注意,这里period的值为0,这说明当前任务为一次性的任务,不是定时反复执行任务。其中long getDelay(TimeUnit unit)方法的代码如下(该方法用来计算当前任务还有多少时间就过期了)。
public long getDelay(TimeUnit unit) { // 装饰后的时间 - 当前时间 = 即将过期剩余时间 return unit.convert(time - now(), NANOSECONDS); } public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
compareTo的作用是加入元素到延迟队列后,在内部建立或者调整堆时会使用该元素的compareTo方法与队列里面其他元素进行比较,让最快要过期的元素放到队首。所以无论什么时候向队列里面添加元素,队首的元素都是最快要过期的元素。
代码(3)将任务添加到延迟队列,delayedExecute的代码如下。
/** * Main execution method for delayed or periodic tasks. If pool * is shut down, rejects the task. Otherwise adds task to queue * and starts a thread, if necessary, to run it. (We cannot * prestart the thread to run the task because the task (probably) * shouldn't be run yet.) If the pool is shut down while the task * is being added, cancel and remove it if required by state and * run-after-shutdown parameters. * * @param task the task */ private void delayedExecute(RunnableScheduledFuture<?> task) { // 4 如果线程池拐臂了,则执行线程执行拒绝策略 if (isShutdown()) reject(task); else { // 5 添加任务到延迟队列 super.getQueue().add(task); // 6 再次校验 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 7 确保至少一个线程在处理任务 ensurePrestart(); } }
代码(4)首先判断当前线程池是否已经关闭了,如果已经关闭则执行线程池的拒绝策略 ,否则执行代码(5)将任务添加到延迟队列
添加完毕后还要重新检查线程池是否被关闭了,如果已经关闭则从延迟队列里面删除刚才添加的任务,但是此时有可能线程池中的线程已经从任务队列里面移除了该任务,也就是该任务已经在执行了,所以还需要调用任务的cancle方法取消任务。
如果代码(6)判断结果为false,则会执行代码(7)确保至少有一个线程在处理任务,即使核心线程数corePoolSize被设置为0
/** * Same as prestartCoreThread except arranges that at least one * thread is started even if corePoolSize is 0. */ void ensurePrestart() { int wc = workerCountOf(ctl.get()); // 增加核心线程数 if (wc < corePoolSize) addWorker(null, true); // 如果初始化corePoolSize=0,则也添加一个线程 else if (wc == 0) addWorker(null, false); }
如上代码首先获取线程池中的线程个数,如果线程个数小于核心线程数则新增一个线程,否则如果当前线程数为0则新增一个线程。
上面我们分析了如何向延迟队列添加任务,下面我们来看线程池里面的线程如何获取并执行任务。
前面说ThreadPoolExecutor时我们说过,具体执行任务的线程是Worker线程,Worker线程调用具体任务的run方法来执行。由于这里的任务是ScheduledFutureTask,所以我们下面看看ScheduledFutureTask的run方法
/** * Overrides FutureTask version so as to reset/requeue if periodic. */ public void run() { // 8 是否只执行一次 boolean periodic = isPeriodic(); // 9 取消任务 if (!canRunInCurrentRunState(periodic)) cancel(false); // 10 只执行一次,调用schedule方法 else if (!periodic) ScheduledFutureTask.super.run(); // 11 定时执行 else if (ScheduledFutureTask.super.runAndReset()) { // 11.1 设置time=time+period setNextRunTime(); // 11.2 重新加入该任务到delay队列 reExecutePeriodic(outerTask); } }
代码(8)中的isPeriodic的作用是判断当前任务是一次性任务还是可重复执行的任务
public boolean isPeriodic() { return period != 0; }
- 可以看到,其内部是通过period的值来判断的,由于转换任务在创建ScheduledFutureTask时传递的period的值为0 ,所以这里isPeriodic返回false。
- 代码(9)判断当前任务是否应该被取消,canRunInCurrentRunState的代码如下
boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
传递的periodic的值为false,所以isRunningOrShutdown的参数为executeExisti ngDelayedTasksAfterShutdown。executeExistingDelayedTasksAfterShutdown默认为true,表示当其他线程调用了shutdown命令关闭了线程池后,当前任务还是要执行,否则如果为false,则当前任务要被取消。
由于periodic的值为false,所以执行代码(10)调用父类FutureTask的run方法具体执行任务。FutureTask的run方法的代码如下
public void run() { // 12 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; // 13 try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 13.1 setException(ex); } // 13.2 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
代码(12)判断如果任务状态不是NEW则直接返回,或者如果当前任务状态为NEW但是使用CAS设置当前任务的持有者为当前线程失败则直接返回
代码(13)具体调用callable的call方法执行任务。这里在调用前又判断了任务的状态是否为NEW,是为了避免在执行代码(12)后其他线程修改了任务的状态(比如取消了该任务)。\
如果任务执行成功则执行代码(13.2)修改任务状态,set方法的代码如下。
protected void set(V v) { // 如果为NEW,设置为COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; // 设置当前任务的状态为NORMAL,也就是任务正常结束 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
如上代码首先使用CAS将当前任务的状态从NEW转换到COMPLETING。这里当有多个线程调用时只有一个线程会成功。成功的线程再通过UNSAFE.putOrderedInt设置任务的状态为正常结束状态,这里没有使用CAS是因为对于同一个任务只可能有一个线程运行到这里。
在这里使用putOrderedInt比使用CAS或者putLongvolatile效率要高,并且这里的场景不要求其他线程马上对设置的状态值可见。
在什么时候多个线程会同时执行CAS将当前任务的状态从NEW转换到COMPLETING?其实当同一个command被多次提交到线程池时就会存在这样的情况,因为同一个任务共享一个状态值state。
如果任务执行失败,则执行代码(13.1)。setException的代码如下,可见与set函数类似。
protected void setException(Throwable t) { // 如果当前任务的状态为NEW,则设置为COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; // 设置当前任务的状态为EXCEPTIONAL,也就是任务非正常结束 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
到这里代码(10)的逻辑执行完毕,一次性任务也就执行完毕了
scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
该方法的作用是,当任务执行完毕后,让其延迟固定时间后再次运行(fixed-delay任务)
- initialDelay表示提交任务后延迟多少时间开始执行任务command
- delay表示当任务执行完毕后延长多少时间后再次运行command任务
- unit是initialDelay和delay的时间单位
任务会一直重复运行直到任务运行中抛出了异常,被取消了,或者关闭了线程池。
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { // 14 参数校验 if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); // 15 任务转换 ,注意这里的 poeriod = -dealy < 0 【 unit.toNanos(-delay)】 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; // 16 添加任务到队列 delayedExecute(t); return t; }
代码(14)进行参数校验,校验失败则抛出异常
代码(15)将command任务转换为ScheduledFutureTask。这里需要注意的是,传递给ScheduledFutureTask的period变量的值为-delay,period<0说明该任务为可重复执行的任务。
然后代码(16)添加任务到延迟队列后返回。
将任务添加到延迟队列后线程池线程会从队列里面获取任务,然后调用ScheduledFutureTask的run方法执行。由于这里period<0,所以isPeriodic返回true,所以执行代码(11)。runAndReset的代码如下。
/** * Executes the computation without setting its result, and then * resets this future to initial state, failing to do so if the * computation encounters an exception or is cancelled. This is * designed for use with tasks that intrinsically execute more * than once. * * @return {@code true} if successfully run and reset */ protected boolean runAndReset() { // 17 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; // 18 boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
该代码和FutureTask的run方法类似,只是任务正常执行完毕后不会设置任务的状态,这样做是为了让任务成为可重复执行的任务。
这里多了代码(19),这段代码判断如果当前任务正常执行完毕并且任务状态为NEW则返回true,否则返回false。 如果返回了true则执行代码(11.1)的setNextRunTime方法设置该任务下一次的执行时间。
/** * Sets the next time to run for a periodic task. */ private void setNextRunTime() { long p = period; if (p > 0) // ffixed-rate类型任务 time += p; else // fixed-delay 类型任务 time = triggerTime(-p); }
这里p<0说明当前任务为fixed-delay类型任务。然后设置time为当前时间加上-p的时间,也就是延迟-p时间后再次执行。
fixed-delay类型的任务的执行原理为: 当添加一个任务到延迟队列后,等待initialDelay时间,任务就会过期,过期的任务就会被从队列移除,并执行。执行完毕后,会重新设置任务的延迟时间,然后再把任务放入延迟队列,循环往复。需要注意的是,如果一个任务在执行中抛出了异常,那么这个任务就结束了,但是不影响其他任务的执行。
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
该方法相对起始时间点以固定频率调用指定的任务(fixed-rate任务)。当把任务提交到线程池并延迟initialDelay时间(时间单位为unit)后开始执行任务command 。然后从initialDelay+period时间点再次执行,而后在 initialDelay + 2 * period时间点再次执行,循环往复,直到抛出异常或者调用了任务的cancel方法取消了任务,或者关闭了线程池。
scheduleAtFixedRate的原理与scheduleWithFixedDelay类似,下面我们看下它们之间的不同点。
首先调用scheduleAtFixedRate的代码如下
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // 装饰任务,注意这里的period=period>0 不是负的 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
在如上代码中,在将fixed-rate类型的任务command转换为ScheduledFutureTask时设置period=period,不再是-period。
所以当前任务执行完毕后,调用setNextRunTime设置任务下次执行的时间时执行的是time += p而不再是time = triggerTime(-p)。
总结:相对于fixed-delay任务来说,fixed-rate方式执行规则为,时间为initdelday +n*period时启动任务,但是如果当前任务还没有执行完,下一次要执行任务的时间到了,则不会并发执行,下次要执行的任务会延迟执行,要等到当前任务执行完毕后再执行。
小结
ScheduledThreadPoolExecutor的实现原理,其内部使用DelayQueue来存放具体任务。任务分为三种,其中一次性执行任务执行完毕就结束了,fixed-delay任务保证同一个任务在多次执行之间间隔固定时间,fixed-rate任务保证按照固定的频率执行。任务类型使用period的值来区分。