调度线程池:
- 调度线程池是线程池类ThreadPoolExecutor的子类,复用了线程池的计算框架,主要用于解决任务在一定的时间间隔后重复执行的问题。
- 例子
public class ScheduledThreadPoolTest { /** * 以固定的频率调度,包括任务执行的时间 */ public static void scheduledAtFixRate(){ ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); //任务开始执行的延迟时间 int initDelay = 1; //任务循环执行的间隔 int period = 5; scheduledExecutorService.scheduleAtFixedRate(new Task(),initDelay,period,TimeUnit.SECONDS); } /** * 以固定的时间间隔调度,不包括任务执行的时间 */ public static void scheduledWithFixDelay(){ ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); //任务开始执行的延迟时间 int initDelay = 1; //任务循环执行的间隔 int period = 5; scheduledExecutorService.scheduleWithFixedDelay(new Task(),initDelay,period,TimeUnit.SECONDS); } static class Task implements Runnable{ @Override public void run() { System.out.println(String.format("开始执行任务调度,执行线程:%s",Thread.currentThread().getName())); try { //模拟任务的执行时间为5秒 Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行任务调度完成"); } } public static void main(String[] args){ ScheduledThreadPoolTest.scheduledAtFixRate(); ScheduledThreadPoolTest.scheduledWithFixDelay(); } }
(1)其中,ScheduledThreadPoolTest.scheduleAtFixRate()方法,会启动一个调度线程池,并以初始延迟1s,固定频率为5s,使用调度线程池的scheduleAtFixedRate执行任务Task,任务Task的执行时间为5s。
(2)ScheduledThreadPoolTest.scheduleWithFixDelay()方法,会启动一个调度线程池,并以初始延迟1s,固定频率为5s,使用调度线程池的scheduleWithFixedDelay执行任务Task,任务Task的执行时间为5s。
(3)那么,这两个方法的区别在哪里呢?scheduleAtFixRate是以固定的频率执行任务,在本例中任务开始执行的时间点(以当前时间为起点)为:1s,6s,11s,16s,…,该方法和任务本身的执行时间无关,以固定的频率触发任务执行。而scheduleWithFixDelay是以固定的延迟执行任务,在本例中任务开始执行的时间点(以当前时间为起点)为:1s,11s,21s,31s,…,该方法和任务本身的执行时间相关,在任务执行完成后间隔5s再开始执行下一次任务。
3.ScheduledExecutorService:
//Executors类的newScheduledThreadPool方法 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
//可以看到该类是ThreadPoolExecutor的子类,并且构造函数里面向父类传递了一个类型为DelayedWorkQueue的阻塞队列. public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } ... }
4.scheduleAtFixedRate:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); //将待执行的任务进行包装,生成一个调度任务,任务循环执行的秘密就在这个调度任务里 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { //将任务添加进阻塞队列,这个阻塞队列就是上面提的DelayedWorkQueue super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
5.ScheduledFutureTask是如何实现任务循环执行:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** * 任务执行的具体方法,还记得ThreadPoolExecutor的runWorker方法吗?方法内部会调用任务的run()方法以执行具体的任务 */ public void run() { //判断任务是否需要周期执行 boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); //执行具体的任务,调用最原始任务的run()方法 else if (ScheduledFutureTask.super.runAndReset()) { //计算任务下一次开始执行的时间 setNextRunTime(); //重新将任务放入阻塞队列,DelayedWorkQueue reExecutePeriodic(outerTask); } } //计算任务下一次执行时间 private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); } void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { //将任务放入阻塞队列,DelayedWorkQueue super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } } }
可以看到这里的关键是在执行任务的同时,计算任务的下一次执行时间,并用原始任务生成一个新的任务重新放入阻塞队列等待执行。
6.阻塞队列DelayedWorkQueue:
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> //线程池的Worker会从阻塞队列中取任务,调用的就是队列的take方法 public RunnableScheduledFuture<?> take() throws InterruptedException { //多线程操作,要先获取锁 final ReentrantLock lock = this.lock; //以可中断的方式获取锁,线程池被关闭的时候会中断内部线程 lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) //如果队列中无任务需要等待 available.await(); else { //计算任务是否到达执行时间,<0代表到达执行时间直接返回,否则等待 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) //从队列中出队,并维护堆(此处队列内部是用堆来实现的) return finishPoll(first); first = null; // don't retain ref while waiting //leader不为空,说明其他线程正在获取任务,当前线程必须等待 if (leader != null) available.await(); else {//leander指的是处理队列中第一个任务的线程,leader为空,将当前线程设置为leader Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 阻塞一定时间(调度任务的时间) available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) //任务获取成功后,唤醒其他等待的线程 available.signal(); lock.unlock(); } } }