@[TOC]
ScheduledThreadPoolExecutor介绍
ScheduledThreadPoolExecutor是ThreadPoolExecutor的一个子类,在线程池的基础上实现了延迟执行任务以及周期性执行任务的功能。
Java最早提供的是Timer类执行定时任务,单线程串行的,效率非常低。在不采用第三方框架时,需要执行定时任务,ScheduledThreadPoolExecutor是比较好的选择。
ScheduledThreadPoolExecutor就是在线程池的基础上实现的定时执行任务的功能。
ScheduledThreadPoolExecutor提供了比较常用的四种方法执行任务:
- execute:跟普通线程池执行没区别。
- schedule:可以指定延迟时间,一次性执行任务。
- scheduleAtFixedRate:可以让任务在固定的周期下执行。
- scheduleWithFixedDelay:可以让任务在固定的周期下执行,不同与 scheduleAtFixedRate 的是,如果一次任务执行时长超过周期时间,下一次任务会在该次任务执行结束时间基础上,计算执行延时。
代码示例:
public static void main(String[] args) throws InterruptedException {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);
//1. execute
executor.execute(() -> {
System.out.println("execute");
});
//2. schedule
executor.schedule(() -> {
System.out.println("schedule");
},2000,TimeUnit.MILLISECONDS);
//3. AtFixedRate
executor.scheduleAtFixedRate(() -> {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("at:" + System.currentTimeMillis());
},3000,2000,TimeUnit.MILLISECONDS);
//4. WithFixedDelay
executor.scheduleWithFixedDelay(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("with:" + System.currentTimeMillis());
},3000,2000,TimeUnit.MILLISECONDS);
}
如果实际开发应用需要使用到定时任务,推荐一些开源的框架,比如Quartz,XXL-job,Elastic-Job。
ScheduledFutureTask
ScheduledFutureTask是ScheduledThreadPoolExecutor中用于实现可延时执行任务和周期性执行任务的特性而引入的。在ScheduledThreadPoolExecutor中,当一个任务被提交后,它会被转换成ScheduledFutureTask类,该类继承了FutureTask并重写了run方法。 间接的实现了Delayed接口,让任务可以放到延迟队列中,并且基于二叉堆做排序。
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
// 就是计数器,每个任务进来时,都会有一个全局唯一的序号。
// 如果任务的执行时间一模一样,比对sequenceNumber
private final long sequenceNumber;
// 任务执行的时间,单位是纳秒
private long time;
/*
* period == 0:表示一次性执行的任务
* period > 0:表示使用的是At!
* period < 0:表示使用的是With!
*/
private final long period;
// 周期性实行任务时,引用具体任务,方便后面重新扔到阻塞队列
RunnableScheduledFuture<V> outerTask = this;
// 有参构造。schedule时使用当前有参重载封装任务!
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// At,With时,使用当前有参重载封装任务!
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 不考虑这个,有返回结果
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 实现Delayed接口重写的方法,执行的时间
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
// 实现Delayed接口重写的方法,比较的方式,放在二叉堆内部
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
// 判断是否是周期执行
public boolean isPeriodic() {
return period != 0;
}
// 省略部分代码
}
四种方法执行任务
execute方法
这个方法最终会调用schedule。
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
schedule方法
public ScheduledFuture<?> schedule(Runnable command, long delay,TimeUnit unit) {
// 非空判断!
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));
// 延迟执行
delayedExecute(t);
return t;
}
schedule方法将普通的command封装为ScheduledFutureTask, decorateTask方法默认情况下,什么都没做,就是返回了ScheduledFutureTask,可以作为扩展方法,在这个位置修改任务需要执行的具体细节。
// 将command任务封装为ScheduledFutureTask
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
// 任务要执行的系统时间
this.time = ns;
// 任务是否是周期性执行
this.period = 0;
// 基于AtomicLong计算序列化。
this.sequenceNumber = sequencer.getAndIncrement();
}
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
执行延迟任务,先会检查线程池是否为RUNNING状态,如果不是,执行拒绝策略。否则调用阻塞队列,将任务添加进去,将任务扔到了延迟队列中(二叉堆)。在添加任务到延迟队列的数组时,会记录当前任务所在的索引位置,方便取消任务时,从数组中移除。
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
如果任务添加到了阻塞队列中,忽然线程池不是RUNNING状态,那么此时这个任务是否执行?
periodic - true:代表是周期性执行的任务
periodic - false:代表是一次性的延迟任务
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
// 判断当前任务到底执行不执行
final boolean isRunningOrShutdown(boolean shutdownOK) {
// 重新拿到线程池的ctl
int rs = runStateOf(ctl.get());
// 如果线程池是RUNNING,返回true
// 如果线程池状态是SHUTDOWN,那么就配合策略返回true、false
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
// 准备执行任务
void ensurePrestart() {
// 获取线程池中的工作线程个数。
int wc = workerCountOf(ctl.get());
// 如果工作线程个数,小于核心线程数,
if (wc < corePoolSize)
// 创建核心线程,一致在阻塞队列的位置take,等待拿任务执行
addWorker(null, true);
// 如果工作线程数不小于核心线程,但是值为0,创建非核心线程执行任务
else if (wc == 0)
// 创建非核心线程处理阻塞队列任务,而且只要阻塞队列没有任务了,当前线程立即销毁
addWorker(null, false);
}
查看任务放到延迟队列后,是如何被工作线程取出来执行的
执行addWorker方法,会创建一个工作线程,工作线程在创建成功后,会执行start方法。在start方法执行后,会调用Worker的run方法,最终执行了runWorker方法,在runWorker方法中会在阻塞队列的位置执行take方法一直阻塞拿Runnable任务,拿到任务后就返回,然后执行。
所以需要查看的就是延迟队列的take方法,套路和DelayQueue没有区别
在拿到任务后,会执行任务,也就是执行任务的run方法。
// 执行任务
public void run() {
// 获取任务是否是周期执行
// true:周期执行
// false:一次的延迟执行
boolean periodic = isPeriodic();
// 再次判断线程池状态是否不是RUNNING,如果不是RUNNING,并且SHUTDOWN情况也不允许执行,或者是STOP状态
if (!canRunInCurrentRunState(periodic))
// 取消任务
cancel(false);
else if (!periodic)
// 当前任务是一次性的延迟执行。执行任务具体的run方法,执行完,没了………………
ScheduledFutureTask.super.run();
// 后面是周期执行、省略部分代码…………
}
scheduleAtFixedRate和scheduleWithFixedDelay分析
在执行方法的初期,封装任务时:
- At会将period设置为正数,代表固定周期执行表
- With会将period设置为负数,代表在执行任务完毕后,再计算下次执行的时间
// 固定周期执行任务,如果任务的执行时间,超过周期,任务执行完,立即执行下一次任务。
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, // 具体任务
long initialDelay, // 第一次执行的时间
long period, // 周期执行时间
TimeUnit unit) {
// 时间单位
// 阿巴阿巴~~~
if (command == null || unit == null)
throw new NullPointerException();
// 如果传递的周期小于等于0,直接抛异常
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit),
unit.toNanos(period));
// 扩展
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 将任务设置给outerTask属性,方便后期重新扔到延迟队列
sft.outerTask = t;
// 嗯哼~
delayedExecute(t);
return t;
}
// 固定周期执行任务,会在任务执行完毕后,再计算下次执行的时间。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
最终两个方法都会调用delayedExecute方法区将任务扔到阻塞队列,并尝试是否需要构建工作线程,从而执行任务
工作线程会监听延迟队列,拿到任务后会调用任务的run方法
public void run() {
// 查看At和With可确定任务是周期执行
boolean periodic = isPeriodic();
// 线程池状态对不!!
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
// 一次性的延迟执行
ScheduledFutureTask.super.run();
// 到这,先执行任务
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下一次任务的运行时间
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
// 计算任务下次执行时间,time是任务执行的时间,而这里是time的上次的执行时间
private void setNextRunTime() {
// 拿到当前任务的period
long p = period;
// period > 0:At
if (p > 0)
// 直接拿上次执行的时间,添加上周期时间,来计算下次执行的时间。
time = time + p;
else
// period < 0:With
// 任务执行完,拿当前系统时间计算下次执行的时间点
time = now() + p;
}
// 重新将任务扔到延迟队列中
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 线程池状态的判断
if (canRunInCurrentRunState(true)) {
// 将任务扔到了延迟队列中
super.getQueue().add(task);
// 扔到延迟队列后,再次判断线程池状态,是否需要取消任务!
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 需要创建线程不~
ensurePrestart();
}
}