ScheduledThreadPoolExecutor作为ScheduledExecutorService接口的实现,提供了延迟执行任务或者周期性执行任务的能力。通过名称可以看出,ScheduledThreadPoolExecutor基于线程池实现,它通过继承ThreadPoolExecutor实现线程池管理能力的复用,同时扩展了自己的定时任务调度能力。
首先来看ScheduledExecutorServicej接口,它继承了ExecutorService接口,作为任务执行器的一种扩展类型,提供了如下方法:
- schedule方法:用于任务的单次执行,允许指定延迟时间,当时间为0或者负数时,表示立即执行任务;
- scheduleAtFixedRate方法:以固定的时间间隔执行任务,当任务本身的执行时间超过时间间隔时,会等到任务执行完成后,立即执行下一次任务;同一个任务总是串行执行,不会并发执行;
- scheduleWithFixedDelay方法:以固定的延迟执行任务,当前任务执行时间与上一次任务执行时间相隔固定的延迟;任务每次执行完成后,会在结束时间上加上固定的延迟作为下一次执行时间。任务执行的周期会将任务本身执行耗时考虑在内,因而并非每次执行的时间间隔都相同;
ScheduledThreadPoolExecutor继承ThreadPoolExecutor,主要做了如下改变:
- 使用ScheduledFutureTask作为任务封装类,代替原先的FutureTask类;
- 使用DelayedWorkQueue作为阻塞队列,队列为无界队列;ScheduledThreadPoolExecutor的构造器仅需要传入corePoolSize,使用"corePoolSize+无界队列"实现任务调度;
- 支持run-after-shutdown参数,使得ScheduledThreadPoolExecutor重写shutdown方法,允许移除并且取消不需要在shutdown后执行的任务;
- 提供了decorateTask方法,用来定制任务操作;
ScheduledThreadPoolExecutor的组成
ScheduledThreadPoolExecutor由3部分组成:
- 任务调度控制:ScheduledThreadPoolExecutor,负责任务调度控制,实现了ScheduledExecutorService接口;
- 阻塞队列:DelayedWorkQueue,作为ScheduledThreadPoolExecutor的内部类,用于缓存线程任务的阻塞队列,仅能够存放RunnableScheduledFuture对象;该队列实现了延迟调度任务的逻辑,如果当前时间大于等于任务的延迟执行时间,任务才可以被调度。
- 调度任务:ScheduledFutureTask,作为ScheduledFutureTask的内部类,实现了RunnableScheduledFuture,封装了调度任务的执行逻辑。其中的time字段存放下一次执行时间,DelayedWorkQueue会据此判断任务是否可以被执行。period字段存放执行周期,对于周期性执行任务,每次会根据period计算time。
ScheduledThreadPoolExecutor初始化
ScheduledThreadPoolExecutor的构造器最多指定3个参数:
- corePoolSize:线程池核心工作线程数量;
- threadFactory:定制工作线程创建方式;
- handler:驳回任务处理策略;
ScheduledThreadPoolExecutor构造器会调用父类构造器进行线程池初始化,使用DelayedWorkQueue作为阻塞队列,该队列为无界队列,因而maximumPoolSize属性配置无效。又因为都是核心工作线程,没有非核心线程需要回收,因而keepAliveTime配置为0。代码如下:
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
ScheduledThreadPoolExecutor初始化时并不会预先创建工作线程,而是在提交任务的时候,通过父类java.util.concurrent.ThreadPoolExecutor#ensurePrestart方法判断线程数是否达到corePoolSize,如果未达到,则新增线程;实现逻辑如下:
void ensurePrestart() { int wc = workerCountOf(ctl.get()); // 当前线程数 < corePoolSize,则添加核心工作线程; if (wc < corePoolSize) addWorker(null, true); // 如果0 == wc >= corePoolSize,则表示corePoolSize配置为0,则初始化一个非核心工作线程; else if (wc == 0) addWorker(null, false); }
任务执行
ScheduledThreadPoolExecutor的任务执行分为单次执行和周期性执行。
- 单次执行:通过schedule方法执行的任务属于单次执行任务。Executor的execute方法、ExecutorService的submit方法都是通过调用schedule方法执行,故也是单次执行的任务。除了schedule可以指定延迟时间以外,其余方法的延迟时间均为0,即立刻执行任务。比如:execute方法实现如下:
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
- 周期性执行:通过scheduleAtFixedRate、scheduleWithFixedDelay方法执行的任务均为周期性执行任务。周期性执行的实现可以理解为每次执行完成后设定下一次执行时间,然后将任务重新放入到阻塞队列等待下一次调度。
任务入口:delayedExecute()
无论是单次执行还是周期性执行,其执行的入口都是delayedExecute方法。delayedExecute()将任务放入到阻塞队列中,复用ThreadPoolExecutor的逻辑进行任务调度。代码如下:
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 如果调度器关闭,则拒绝接收任务
if (isShutdown())
// 执行拒绝策略
reject(task);
// 如果调度器未关闭
else {
// 添加任务到阻塞队列,等待调度执行;
super.getQueue().add(task);
// 如果此时调度器关闭,则取消任务;
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
// 如果调度器正常运行,则检查开启线程数是否达到corePoolSize
// 如果未达到corePoolSize,则初始化1个工作线程;
// 如果corePoolSize设置为0,则会初始化1个非core工作线程;
else
ensurePrestart();
}
}
当ThreadPoolExecutor的Worker线程从阻塞队列取出任务执行时,会调用ScheduledFutureTask的run方法。该方法对任务类型进行判断,如果是单次执行任务,则立即执行并设置返回结果。如果是周期性执行任务,则执行任务并设置下一次执行时间,然后将任务放入到阻塞队列中,等待下一次调度。方法代码如下:
public void run() {
boolean periodic = isPeriodic();
// 如果当前不可运行任务,则取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果是单次执行的任务
else if (!periodic)
// 直接调用FutureTask.run()方法执行;
ScheduledFutureTask.super.run();
// 如果是周期性任务,则运行任务但不设置结果;
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置任务下一次执行时间
setNextRunTime();
// 将任务重新加入队列,等待下一次调度
reExecutePeriodic(outerTask);
}
}
单次执行:schedule()
schedule的执行主要分为参数封装和执行两个步骤。实现如下:
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// 封装任务参数,调用decorateTask装饰方法进行任务定制
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 执行任务
delayedExecute(t);
return t;
}
参数封装过程会调用decorateTask方法,该方法为protected的空方法,用于定制RunnableScheduledFuture的属性,可以通过重写实现定制。
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
周期性执行:scheduleAtFixedRate() / scheduleWithFixedDelay()
scheduleAtFixedRate()的实现与schedule()方法非常相似,仅是将decorateTask()返回的RunnableScheduledFuture对象设置为原有Future的outerTask属性。在重新知心任务时,会将outerTask添加到阻塞队列,从而保证decorateTask()的定制效果一直有效。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 保存定制后的Future对象,便于再次调用;
sft.outerTask = t;
delayedExecute(t);
return t;
}
scheduleAtFixedRate()的实现与schedule()方法非常相似,仅是设置ScheduledFutureTask延迟时,使用负数,标识执行方式为scheduleAtFixedRate。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
ScheduledFutureTask并没有设置单独的字段用于标识执行类型,而是通过period字段的正负号和是否为0表示执行方式:
- 正数:fixed-rate执行方式;
- 负数:fixed-delay执行方式;
- 0:单次执行任务;
scheduleAtFixedRate() / scheduleWithFixedDelay()执行的主要区别在于设置下一次执行时间的策略不同,而执行时间通过ScheduledFutureTask的time字段保存,通过ScheduledFutureTask#setNextRunTime()进行设置,代码如下:
private void setNextRunTime() {
long p = period;
// 如果是fixed-rate执行方式:下一次执行时间 = 上一次执行时间 + period
if (p > 0)
time += p;
// 如果是fixed-delay执行方式:下一次执行时间 = now() + period
else
time = triggerTime(-p);
}
延迟功能实现:DelayedWorkQueue
DelayedWorkQueue是专门存放RunnableScheduledFuture和ScheduledFutureTask对象的优先队列,底层基于最小二叉堆实现,为了能够提升任务的查找和删除效率,ScheduledFutureTask中增加了一个heapIndex的成员变量,用于存放任务在堆数组中的索引位置,当需要查找或者删除某个特定的任务时,直接根据任务的heapIndex访问堆数组中的元素。任务是否到达执行时间的判断逻辑均在DelayedWorkQueue中实现。
主要成员变量
/**
* 存放堆的数组,初始化大小为16
*/
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
/**
* 队列中的任务个数
*/
private int size = 0;
/**
* 保证队列操作的锁
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* 存放用于等待任务的leader线程引用;
* 为降低性能消耗,同一时间并不需要所有线程都轮询等待任务到达执行时间;
* 只需要一个leader线程负责轮询等待即可;
*/
private Thread leader = null;
/**
* Condition signalled when a newer task becomes available at the
* head of the queue or a new thread may need to become leader.
*/
private final Condition available = lock.newCondition();
与PriorityQueue的实现不同,DelayedWorkQueue涉及到多线程访问,因而需要保证线程同步测正确性,故使用ReentrantLock来控制操作的原子性,同时使用Condition来协调线程的执行;
设置任务索引
为了方便在DelayedWorkQueue中查找和删除任务,ScheduledFutureTask有一个heapIndex用于存放任务在堆数组中的索引位置。每当任务在队列中的位置改变时,需要同步更新任务的heapIndex。
private void setIndex(RunnableScheduledFuture<?> f, int idx) {
if (f instanceof ScheduledFutureTask)
((ScheduledFutureTask)f).heapIndex = idx;
}
上浮、下沉操作
上浮、下沉操作的实现与PriorityQueue实现相似,只多了更新索引位置的操作,且需要在加锁的环境下调用。
/**
* 上浮操作
*/
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
// 更新父节点索引位置
setIndex(e, k);
k = parent;
}
queue[k] = key;
// 更新当前节点索引位置
setIndex(key, k);
}
/**
* 下沉操作
*/
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
// 更新子节点索引
setIndex(c, k);
k = child;
}
queue[k] = key;
// 更新当前节点索引
setIndex(key, k);
}
入队操作
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
// 操作前先加锁,保证原子性
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 容量不够,则成倍扩容
if (i >= queue.length)
grow();
size = i + 1;
// 如果队列为空,则直接放入任务
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 如果队列不为空,则执行上浮操作
siftUp(i, e);
}
// queue[0] == e包含两种情况:
// 1)e为队列中的第一个元素;
// 2)e为队列中最近要执行的任务;
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
public void put(Runnable e) {
offer(e);
}
public boolean add(Runnable e) {
return offer(e);
}
出队操作
/**
* 任务出队后,通过下沉操作使得堆有序;
*/
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
// 出队任务heapIndex设置为-1
setIndex(f, -1);
return f;
}
/**
* 出队,非阻塞
*/
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];
// 如果队列为空或者没有任务到执行时间,则返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
// 执行下沉操作,返回队首任务
return finishPoll(first);
} finally {
lock.unlock();
}
}
/**
* 出队,阻塞线程,直到有任务返回
*/
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加锁,允许响应中断
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 如果队列为空,则阻塞等待
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
// 如果第一个任务已经到达时间点,则立刻返回任务
if (delay <= 0)
return finishPoll(first);
// 如果未到达时间点
first = null; // don't retain ref while waiting
// 如果已经有leader线程等待任务,则阻塞当前线程
if (leader != null)
available.await();
// 如果没有leader线程,则设置当前线程为leader线程,轮询等待任务到达执行时间点
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待队首任务到达执行时间
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 唤醒一个线程,确保至少有一个线程未被阻塞
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
/**
* 出队,阻塞线程,直到有任务返回或者超时
*/
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 加锁,允许响应中断
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 如果队列为空
if (first == null) {
// 如果到达超时时间,则返回null
if (nanos <= 0)
return null;
// 未到达超时时间,则等待超时
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
// 如果队首任务到达执行时间,则立即返回任务;
if (delay <= 0)
return finishPoll(first);
// 如果未到达执行时间,且超时,则返回null;
if (nanos <= 0)
return null;
// 如果未到达执行时间,且没有超时
first = null; // don't retain ref while waiting
// 如果已经有leader线程,或者超时时间小于第一个任务执行时间,
// 则阻塞当前线程直至超时
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
// 如果没有leader线程,或者没有超时且没有任务到达时间点;
// 则阻塞等待任务到达执行时间点
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
// 如果当前线程是leader线程,则取消其leader属性
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 唤醒一个线程,确保至少有一个线程未被阻塞
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
根据heapIndex查找和删除任务
/**
* 查找一个任务的index,如果未找到,则返回-1
*/
private int indexOf(Object x) {
if (x != null) {
// 如果是ScheduledFutureTask类型任务,则直接返回heapIndex,效率O(1)
if (x instanceof ScheduledFutureTask) {
int i = ((ScheduledFutureTask) x).heapIndex;
// 检查ScheduledFutureTask是否属于当前pool
if (i >= 0 && i < size && queue[i] == x)
return i;
}
// 如果是RunnableScheduledFuture类型任务,则遍历查找,效率O(1)
else {
for (int i = 0; i < size; i++)
if (x.equals(queue[i]))
return i;
}
}
return -1;
}
/**
* 查找任务
*/
public boolean contains(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return indexOf(x) != -1;
} finally {
lock.unlock();
}
}
/**
* 从队列中删除任务,用于取消任务的场景
*/
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 查找任务索引
int i = indexOf(x);
// 未找到任务,则不执行删除操作,返回false
if (i < 0)
return false;
// 清空任务索引信息,减小任务队列size
// 使用队尾任务替换现有任务索引位置,然后通过下沉、上浮操作找到合适位置
setIndex(queue[i], -1);
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
if (s != i) {
siftDown(i, replacement);
// 如果任务未下沉,则执行上浮操作
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
通过上面代码我们总结DelayedWorkQueue的实现原理:
1)基于最小二叉堆实现的优先队列,根据ScheduledFutureTask.compareTo方法比较任务执行时间,使得最近要执行的任务位于队首;
2)任务出队时,通过轮询判断任务是否到达执行时间点,ScheduledFutureTask实现了Delayed接口,通过getDelay方法能够获取到任务还有多长时间执行;
3)当队列中所有任务都没有到达执行时间时,队列中会维持一个leader线程,用于轮询等待队首任务,其余线程均await()。
4)ScheduledFutureTask增加heapIndex属性,用于标记任务在堆数组中的索引,从而便于任务的快速查找(是否存在)与取消(删除);
任务取消
任务的取消通过ScheduledFutureTask.cancel()方法实现,该方法调用ThreadPoolExecutor.cancel(),在取消任务后,判断是否需要从阻塞队列中移除任务。其中removeOnCancel参数通过setRemoveOnCancelPolicy()设置。之所以要在取消任务后移除阻塞队列中任务,是为了防止队列中积压大量已被取消的任务。
public boolean cancel(boolean mayInterruptIfRunning) {
// 调用ThreadPoolExecutor.cancel方法取消任务
boolean cancelled = super.cancel(mayInterruptIfRunning);
// 从阻塞队列中移除任务
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
关闭调度器
ScheduledThreadPoolExecutor的shutdown() / shutdownNow()方法均调用ThreadPoolExecutor的相应方法实现。同时,ScheduledThreadPoolExecutor实现了ThreadPoolExecutor的onShutdown()用于在shutdown()执行过程中取消任务执行。
此处涉及2个参数:
- executeExistingDelayedTasksAfterShutdown:当执行shutdown()后,是否继续执行队列中的单次执行任务;默认为true,即执行;
- continueExistingPeriodicTasksAfterShutdown:当执行shutdown()后,是否继续执行队列中的周期性任务;默认为false,即不执行;
@Override void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
// 如果设置不执行队列中的任务,则取消队列中所有任务,清空队列;
if (!keepDelayed && !keepPeriodic) {
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
// 如果设置执行队列中的任务:周期性的或者单次的任务
else {
// 遍历队列中的任务,逐个取消并删除不需要执行的任务
// 先拷贝到数组再遍历,防止遍历时队列元素更新,导致异常;
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) {
if (q.remove(t))
t.cancel(false);
}
}
}
}
tryTerminate();
}