一、线程池初探
所谓线程池,就是将多个线程放在一个池子里面(所谓池化技术),然后需要线程的时候不是创建一个线程,而是从线程池里面获取一个可用的线程,然后执行我们的任务。线程池的关键在于它为我们管理了多个线程,我们不需要关心如何创建线程,我们只需要关系我们的核心业务,然后需要线程来执行任务的时候从线程池中获取线程。任务执行完之后线程不会被销毁,而是会被重新放到池子里面,等待机会去执行任务。
我们为什么需要线程池呢?首先一点是线程池为我们提高了一种简易的多线程编程方案,我们不需要投入太多的精力去管理多个线程,线程池会自动帮我们管理好,它知道什么时候该做什么事情,我们只要在需要的时候去获取就可以了。其次,我们使用线程池很大程度上归咎于创建和销毁线程的代价是非常昂贵的,甚至我们创建和销毁线程的资源要比我们实际执行的任务所花费的时间还要长,这显然是不科学也是不合理的,而且如果没有一个合理的管理者,可能会出现创建了过多的线程的情况,也就是在JVM中存活的线程过多,而存活着的线程也是需要销毁资源的,另外一点,过多的线程可能会造成线程过度切换的尴尬境地。
对线程池有了一个初步的认识之后,我们来看看如何使用线程池。
- 创建一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(1);
- 提交任务
executorService.submit(() -> System.out.println("run")); Future<String> stringFuture = executorService.submit(() -> "run");
- 创建一个调度线程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
- 提交一个周期性执行的任务
scheduledExecutorService .scheduleAtFixedRate(() -> System.out.println("schedule"), 0, 1, TimeUnit.SECONDS);
- shutdown
executorService.shutdownNow(); scheduledExecutorService.shutdownNow();
可以发现使用线程池非常简单,只需要极少的代码就可以创建出我们需要的线程池,然后将我们的任务提交到线程池中去。我们只需要在结束之时记得关闭线程池就可以了。本文的重点并非在于如何使用线程池,而是试图剖析线程池的实现,比如一个调度线程池是怎么实现的?是靠什么实现的?为什么能这样实现等等问题。
二、Java线程池实现架构
Java中与线程池相关的类有下面一些:
- Executor
- ExecutorService
- ScheduledExecutorService
- ThreadPoolExecutor
- ScheduledThreadPoolExecutor
- Executors
通过上面一节中的使用示例,可以发现Executors类是一个创建线程池的有用的类,事实上,Executors类的角色也就是创建线程池,它是一个工厂类,可以产生不同类型的线程池,而Executor是线程池的鼻祖类,它有两个子类是ExecutorService和ScheduledExecutorService,而ThreadPoolExecutor和ScheduledThreadPoolExecutor则是真正的线程池,我们的任务将被这两个类交由其所管理者的线程池运行,可以发现,ScheduledThreadPoolExecutor是一个集大成者类,下面我们可以看看它的类关系图:
ScheduledThreadPoolExecutor的类关系图
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,ThreadPoolExecutor实现了一般的线程池,没有调度功能,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor的实现,然后增加了调度功能。
ScheduledThreadPoolExecutor相较于ThreadPoolExecutor增加了调度功能 |
最为原始的Executor只有一个方法execute,它接受一个Runnable类型的参数,意思是使用线程池来执行这个Runnable,可以发现Executor不提供有返回值的任务。ExecutorService继承了Executor,并且极大的增强了Executor的功能,不仅支持有返回值的任务执行,而且还有很多十分有用的方法来为你提供服务。
Executor不提供有返回值的任务,ExecutorService继承自Executor,支持有返回值的任务执行 |
下面展示了ExecutorService提供的方法:
ExecutorService提供的方法
ScheduledExecutorService继承了ExecutorService,并且增加了特有的调度(schedule)功能。关于Executor、ExecutorService和ScheduledExecutorService的关系,可以见下图:
Executor、ExecutorService和ScheduledExecutorService的关系
总结一下,经过我们的调研,可以发现其实对于我们编写多线程代码来说,最为核心的是Executors类,根据我们是需要ExecutorService类型的线程池还是ScheduledExecutorService类型的线程池调用相应的工厂方法就可以了,而ExecutorService的实现表现在ThreadPoolExecutor上,ScheduledExecutorService的实现则表现在ScheduledThreadPoolExecutor上,下文将分别剖析这两者,尝试弄清楚线程池的原理。
三、ThreadPoolExecutor解析
上文中描述了Java中线程池相关的架构,了解了这些内容其实我们就可以使用java的线程池为我们工作了,使用其提供的线程池我们可以很方便的写出高质量的多线程代码,本节将分析ThreadPoolExecutor的实现,来探索线程池的运行原理。下面的图片展示了ThreadPoolExecutor的类图:
ThreadPoolExecutor的类图
下面是几个比较关键的类成员:
// 任务队列,我们的任务会添加到该队列里面,线程将从该队列获取任务来执行 private final BlockingQueue<Runnable> workQueue; //任务的执行值集合,来消费workQueue里面的任务 private final HashSet<Worker> workers = new HashSet<Worker>(); //线程工厂 private volatile ThreadFactory threadFactory; //拒绝策略,默认会抛出异异常,还要其他几种拒绝策略如下: private volatile RejectedExecutionHandler handler; 1、CallerRunsPolicy:在调用者线程里面运行该任务 2、DiscardPolicy:丢弃任务 3、DiscardOldestPolicy:丢弃workQueue的头部任务 //最下保活work数量 private volatile int corePoolSize; //work上限 private volatile int maximumPoolSize;
我们尝试执行submit方法,下面是执行的关键路径,总结起来就是:如果Worker数量还没达到上限则继续创建,否则提交任务到workQueue,然后让worker来调度运行任务。
step 1: <ExecutorService> Future<?> submit(Runnable task); step 2:<AbstractExecutorService> public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } step 3:<Executor> void execute(Runnable command); step 4:<ThreadPoolExecutor> public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //提交我们的额任务到workQueue int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) //使用maximumPoolSize作为边界 reject(command); //还不行?拒绝提交的任务 } step 5:<ThreadPoolExecutor> private boolean addWorker(Runnable firstTask, boolean core) step 6:<ThreadPoolExecutor> w = new Worker(firstTask); //包装任务 final Thread t = w.thread; //获取线程(包含任务) workers.add(w); // 任务被放到works中 t.start(); //执行任务
上面的流程是高度概括的,实际情况远比这复杂得多,但是我们关心的是怎么打通整个流程,所以这样分析问题是没有太大的问题的。观察上面的流程,我们发现其实关键的地方在于Worker,如果弄明白它是如何工作的,那么我们也就大概明白了线程池是怎么工作的了。下面分析一下Worker类。
worker类图
上面的图片展示了Worker的类关系图,关键在于他实现了Runnable接口,所以问题的关键就在于run方法上。在这之前,我们来看一下Worker类里面的关键成员:
final Thread thread; Runnable firstTask; //我们提交的任务,可能被立刻执行,也可能被放到队列里面
thread是Worker的工作线程,上面的分析我们也发现了在addWorker中会获取worker里面的thread然后start,也就是这个线程的执行,而Worker实现了Runnable接口,所以在构造thread的时候Worker将自己传递给了构造函数,thread.start执行的其实就是Worker的run方法。下面是run方法的内容:
public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
我们来分析一下runWorker这个方法,这就是整个线程池的核心。首先获取到了我们刚提交的任务firstTask,然后会循环从workQueue里面获取任务来执行,获取任务的方法如下:
|
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
其实核心也就一句:
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
我们再回头看一下execute,其实我们上面只走了一条逻辑,在execute的时候,我们的worker的数量还没有到达我们设定的corePoolSize的时候,会走上面我们分析的逻辑,而如果达到了我们设定的阈值之后,execute中会尝试去提交任务,如果提交成功了就结束,否则会拒绝任务的提交。我们上面还提到一个成员:maximumPoolSize,其实线程池的最大的Worker数量应该是maximumPoolSize,但是我们上面的分析是corePoolSize,这是因为我们的private boolean addWorker(Runnable firstTask, boolean core)的参数core的值来控制的,core为true则使用corePoolSize来设定边界,否则使用maximumPoolSize来设定边界。
直观的解释一下,当线程池里面的Worker数量还没有到corePoolSize,那么新添加的任务会伴随着产生一个新的worker,如果Worker的数量达到了corePoolSize,那么就将任务存放在阻塞队列中等待Worker来获取执行,如果没有办法再向阻塞队列放任务了,那么这个时候maximumPoolSize就变得有用了,新的任务将会伴随着产生一个新的Worker,如果线程池里面的Worker已经达到了maximumPoolSize,那么接下来提交的任务只能被拒绝策略拒绝了。可以参考下面的描述来理解:
|---corePoolSize[创建]---||---workQueue[等待keepAliveTime]---||---maximumPoolSize[创建]---||---拒绝策略---|
* When a new task is submitted in method {@link #execute(Runnable)}, * and fewer than corePoolSize threads are running, a new thread is * created to handle the request, even if other worker threads are * idle. If there are more than corePoolSize but less than * maximumPoolSize threads running, a new thread will be created only * if the queue is full. By setting corePoolSize and maximumPoolSize * the same, you create a fixed-size thread pool. By setting * maximumPoolSize to an essentially unbounded value such as {@code * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary * number of concurrent tasks. Most typically, core and maximum pool * sizes are set only upon construction, but they may also be changed * dynamically using {@link #setCorePoolSize} and {@link * #setMaximumPoolSize}. 在方法{@link #execute(Runnable)}中提交新任务时, 如果运行的线程小于corePoolSize,则创建新线程处理请求,即使其他工作线程闲置。 如果运行的线程大于corePoolSize,但是小于maximumPoolSize,当线程运行时,如果队列已满则会创建一个新线程 同样通过设置corePoolSize和maximumPoolSize,创建一个固定大小的线程池。通过设置maximumPoolSize到一个 本质上无界的值,比如{@code Integer.MAX_VALUE},您允许池容纳任意的并发任务的数量。 最典型的是核心池和最大池尺寸只在构造时设置,但也可以更改动态使用{@link #setCorePoolSize}和{@link #setMaximumPoolSize}。
在此需要说明一点,有一个重要的成员:keepAliveTime,当线程池里面的线程数量超过corePoolSize了,那么超出的线程将会在空闲keepAliveTime之后被terminated。可以参考下面的文档:
* If the pool currently has more than corePoolSize threads, * excess threads will be terminated if they have been idle for more * than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
四、ScheduledThreadPoolExecutor解析
ScheduledThreadPoolExecutor适用于延时执行或者周期性执行的任务调度,ScheduledThreadPoolExecutor在实现上继承了ThreadPoolExecutor,所以你依然可以将ScheduledThreadPoolExecutor当成ThreadPoolExecutor来使用,但是ScheduledThreadPoolExecutor的功能要强大得多,因为ScheduledThreadPoolExecutor可以根据设定的参数来周期性调度运行,下面的图片展示了四个和周期性相关的方法:
四个Scheduled方法
- 如果你想延时一段时间之后运行一个Runnable,那么使用第一个方法
- 如果你想延时一段时间然后运行一个Callable,那么使用的第二个方法
- 如果你想要延时一段时间,然后根据设定的参数周期执行Runnable,那么可以选择第三个和第四个方法,第三个方法和第四个方法的区别在于:第三个方法严格按照规划的时间路径来执行,比如周期为2,延时为0,那么执行的序列为0,2,4,6,8....,而第四个方法将基于上次执行时间来规划下次的执行,也就是在上次执行完成之后再次执行。比如上面的执行序列0,2,4,6,8...,如果第2秒没有被调度执行,而在第三秒的时候才被调度,那么下次执行的时间不是4,而是5,以此类推。
下面来看一下这四个方法的一些细节:
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
通过上面的代码我们可以发现,前两个方法是类似的,后两个方法也是类似的。前两个方法属于一次性调度,所以period都为0,区别在于参数不同,一个是Runnable,而一个是Callable,可笑的是,最后都变为了Callable了,见下面的构造函数:
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
对于后两个方法,区别仅仅在于period的,scheduleWithFixedDelay对参数进行了操作,将原来的时间变为负数了,而后面在计算下次被调度的时间的时候会根据这个参数的正负值来分别处理,正数代表scheduleAtFixedRate,而负数代表了scheduleWithFixedDelay。
一个需要被我们注意的细节是,以上四个方法最后都会调用一个方法: delayedExecute(t),下面看一下这个方法:
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
大概的意思就是先判断线程池是否被关闭了,如果被关闭了,则拒绝任务的提交,否则将任务加入到任务队列中去等待被调度执行。最后的ensurePrestart的意思是需要确保线程池已经被启动起来了。下面是这个方法:
void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
主要是增加了一个没有任务的worker,有什么用呢?我们还记得Worker的逻辑吗?addWorker方法的执行,会触发Worker的run方法的执行,然后runWorker方法就会被执行,而runWorker方法是循环从workQueue中取任务执行的,所以确保线程池被启动起来是重要的,而只需要简单的执行addWorker便会触发线程池的启动流程。 对于调度线程池来说,只要执行了addWorker方法,那么线程池就会一直在后台周期性的调度执行任务。到此,似乎我们还是没有闹明白ScheduledThreadPoolExecutor是如何实现周期性的,上面讲到四个scheduled方法时,我们没有提一个重要的类:ScheduledFutureTask,对,所有神奇的事情将会发生在这个类中,下面来分析一下这个类。
ScheduledFutureTask类图
看上面的类图,貌似这个类非常复杂,还好,我们发现他实现了Runnable接口,那么必然会有一个run方法,而这个run方法必然是整个类的核心,下面来看一下这个run方法的内容:
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } } }
首先,判断是否是周期性的任务,如果不是,则直接执行(一次性),否则执行,然后设置下次执行的时间,然后重新调度,等待下次执行。这里有一个方法需要注意,也就是setNextRunTime,上面我们提到scheduleAtFixedRate和scheduleWithFixedDelay在传递参数时不一样,后者将delay值变为了负数,所以下面的处理正好印证了前文所述。
private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }
下面来看一下reExecutePeriodic方法是如何做的,他的目标是将任务再次被调度执行,下面的代码展示了这个功能的实现:
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }
可以看到,这个方法就是将我们的任务再次放到了workQueue里面,那这个参数是什么?在上面的run方法中我们调用了reExecutePeriodic方法,参数为outerTask,而这个变量是什么?看下面的代码:
/** The actual task to be re-enqueued by reExecutePeriodic */ RunnableScheduledFuture<V> outerTask = this;
这个变量指向了自己,而this的类型是什么?是ScheduledFutureTask,也就是可以被调度的task,这样就实现了循环执行任务了。
上面的分析已经到了循环执行,但是ScheduledThreadPoolExecutor的功能是周期性执行,所以我们接着分析ScheduledThreadPoolExecutor是如何根据我们的参数走走停停的。这个时候,是应该看一下ScheduledThreadPoolExecutor的构造函数了,我们来看一个最简单的构造函数:
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
我们知道ScheduledThreadPoolExecutor的父类是ThreadPoolExecutor,所以这里的super其实是ThreadPoolExecutor的构造函数,我们发现其中有一个参数DelayedWorkQueue,看名字貌似是一个延迟队列的样子,进一步跟踪代码,发现了下面的一行代码(构造函数中):
this.workQueue = workQueue;
所以在ScheduledThreadPoolExecutor中,workQueue是一个DelayedWorkQueue类型的队列,我们暂且认为DelayedWorkQueue是一种具备延迟功能的队列吧,那么,到此我们便可以想明白了,上面的分析我们明白了ScheduledThreadPoolExecutor是如何循环执行任务的,而这里我们明白了ScheduledThreadPoolExecutor使用DelayedWorkQueue来达到延迟的目标,所以组合起来,就可以实现ScheduledThreadPoolExecutor周期性执行的目标。下面我们来看一下DelayedWorkQueue是如何做到延迟的吧,上文中提到一个方法:getTask,这个方法的作用是从workQueue中取出任务来执行,而在ScheduledThreadPoolExecutor里面,getTask方法是从DelayedWorkQueue中取任务的,而取任务无非两个方法:poll或者take,下面我们对DelayedWorkQueue的take方法来分析一下:
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
在for循环里面,首先从queue中获取第一个任务,然后从任务中取出延迟时间,而后使用available变量来实现延迟效果。这里面需要几个点需要探索一下:
- 这个queue是什么东西?
- 延迟时间的来龙去脉?
- available变量的来龙去脉?
对于第一个问题,看下面的代码:
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
它是一个RunnableScheduledFuture类型的数组,下面是RunnableScheduledFuture类的类关系图:
RunnableScheduledFuture类关系
数组里面保存了我们的RunnableScheduledFuture,对queue的操作,主要来看一下增加元素和消费元素的操作。首先,假设使用add方法来增加RunnableScheduledFuture到queue,调用的链路如下:
public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; }
解释一下,add方法直接转到了offer方法,该方法中,首先判断数组的容量是否足够,如果不够则grow,增长的策略如下:
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
每次增长50%,如此下去。增长完成后,如果这是第一个元素,则放在坐标为0的位置,否则,使用siftUp操作,下面是该方法的内容:
private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
这个数组实现了堆这种数据结构,使用对象比较将最需要被调度执行的RunnableScheduledFuture放到数组的前面,而这得力于compareTo方法,下面是RunnableScheduledFuture类的compareTo方法的实现,主要是通过延迟时间来做比较。
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
上面是生产元素,下面来看一下消费数据。在上面我们提到的take方法中,使用了一个方法如下:
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { int s = --size; RunnableScheduledFuture<?> x = queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); setIndex(f, -1); return f; }
这个方法中调用了一个方法siftDown,这个方法如下:
private void siftDown(int k, RunnableScheduledFuture<?> key) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; RunnableScheduledFuture<?> c = queue[child]; int right = child + 1; if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; if (key.compareTo(c) <= 0) break; queue[k] = c; setIndex(c, k); k = child; } queue[k] = key; setIndex(key, k); }
对其的解释就是:
Replaces first element with last and sifts it down. Call only when holding lock.
总结一下,当我们向queue插入任务的时候,会发生siftUp方法的执行,这个时候会把任务尽量往根部移动,而当我们完成任务调度之后,会发生siftDown方法的执行,与siftUp相反,siftDown方法会将任务尽量移动到queue的末尾。总之,大概的意思就是queue通过compareTo实现了类似于优先级队列的功能。
下面我们来看一下第二个问题:延迟时间的来龙去脉。在上面的take方法里面,首先获取了delay,然后再使用available来做延迟效果,那这个delay从哪里来的呢?通过上面的类图RunnableScheduledFuture的类图我们知道,RunnableScheduledFuture类实现了Delayed接口,而Delayed接口里面的唯一方法是getDelay,我们到RunnableScheduledFuture里面看一下这个方法的具体实现:
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); }
time是我们设定的下次执行的时间,所以延迟就是(time - now()),没毛病!
第三个问题:available变量的来龙去脉,至于这个问题,我们看下面的代码:
/** * Condition signalled when a newer task becomes available at the * head of the queue or a new thread may need to become leader. */ private final Condition available = lock.newCondition();
这是一个条件变量,take方法里面使用这个变量来做延迟效果。Condition可以在多个线程间做同步协调工作,更为具体细致的关于Condition的内容,可以参考更多的资料来学习,本文对此知识点点到为止。
到此为止,我们梳理了ScheduledThreadPoolExecutor是如何实现周期性调度的,首先分析了它的循环性,然后分析了它的延迟效果,对于线程池的学习现在才刚刚起步,需要更多更专业的知识类帮我理解更为底层的内容,当然,为了更进一步理解线程池的实现细节,首先需要对线程间通信有足够的把握,其次是要对各种数据结构有清晰的认识,比如队列、优先级队列、堆等高级的数据结构,以及java语言对于这些数据结构的实现,更为重要的是要结合实际情况分析问题,在工作和平时的学习中不断总结,不断迭代对于线程、线程池的认知
五、Executors工厂类详解
首先列出了Executors这个类提供的一些方法。
Executors方法
本文需要对以上12个类做一些区分,从其特点出发,然后分析其应用场景。
public static ExecutorService newFixedThreadPool(int nThreads)
使用这个方法会产生这样一个线程池:线程池最多会保持nThreads个线程处于活动状态,如果当前所有任务都处于活动状态,那么新提交的任务会被添加到任务阻塞队列中去。总结一下就是:使用固定大小的线程池,并发数是固定的。
* Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * {@code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}.
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
相比于newFixedThreadPool(int nThreads), 你可以使用这个方法来传递你自己的线程工厂,线程工厂是用来干嘛的?就是用来生成线程的,你可以使用线程工厂做一些个性化的线程特性定制。
public static ExecutorService newWorkStealingPool(int parallelism)
在了解或者使用这个方法之前,你你该对java的Fork/Join并行框架有一些了解,如果你想要快速了解一下该部分的内容,可以参考这篇文章:Java Fork/Join并行框架。
从名字上我们就知道这个方法生产出来的线程池具有某种“小偷”的行为,在Fork/Join里面,线程的工作模式为“盗窃算法”,也就是在自己的任务队列消费完了之后不是进入等到状态,而是会主动去偷窃别的线程的任务来做,其实是没有一种奖励机制来鼓励这些线程去帮助别的线程去消费任务的,所以可以认为这些线程都是好人,都为了快速完成任务协调作战。这种工作方式的重点在于,每个线程都将有一个任务队列,线程之间通过“偷窃”的方式互相帮助来完成任务的消费。
可以看下这个方法的实现:
return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
可以发现,这个方法不是使用我们在第一篇文章中分析了ThreadPoolExecutor来生成线程池。而是使用了ForkJoinPool,也就是Fork/Join里面的线程池,关于ForkJoinPool更为深入的分析不再本文的涉及范围内,你只要知道Fork/Join框架的一般运行原理就可以了,下面的描述可以帮助你决策你是否需要该方法提供的线程池来工作:
* Creates a thread pool that maintains enough threads to support * the given parallelism level, and may use multiple queues to * reduce contention. The parallelism level corresponds to the * maximum number of threads actively engaged in, or available to * engage in, task processing. The actual number of threads may * grow and shrink dynamically. A work-stealing pool makes no * guarantees about the order in which submitted tasks are * executed.
public static ExecutorService newWorkStealingPool()
参考newWorkStealingPool(int parallelism)。
public static ExecutorService newSingleThreadExecutor()
下面是对该方法的描述:
* Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads.
可以从方法的名字上知道,该方法产生的线程池仅仅有一个Worker,任何时刻都将只有一个Worker在工作,添加的任务有很大概率被放在阻塞任务队列中等待执行。这些任务会被顺序执行,这个方法的返回值其实是对ThreadPoolExecutor的一层包装,下面的代码展示了最终执行任务的类:
static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future<?> submit(Runnable task) { return e.submit(task); } public <T> Future<T> submit(Callable<T> task) { return e.submit(task); } public <T> Future<T> submit(Runnable task, T result) { return e.submit(task, result); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return e.invokeAll(tasks); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } }
从上面的代码可以看出,这个类其实就是使用了构造时传递的参数e来完成,更像是代理。而e是什么?看下面的代码:
ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
其实就是一个只有一个线程的ThreadPoolExecutor。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
参考newSingleThreadExecutor(),多了一个线程工厂参数
public static ExecutorService newCachedThreadPool()
首先看它的方法体内容:
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
可以看到,核心线程数量为0,而上限为Integer.MAX_VALUE,而且keepAliveTime为60秒,那么这个线程池的工作模式为:只要有任务呗提交,而且当前没有空闲的线程可用,那么就会创建一个新的Worker来工作,一个线程工作完了之后会缓存(idle)60秒,如果60秒之内有新的任务提交,则会被唤醒进入工作模式,否则60秒后就会被回收。可以参考下面的描述:
* Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available. These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to {@code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using {@link ThreadPoolExecutor} constructors.
从描述上,我们可以想到,其实这种类型的线程池比较适合于短期高流量的场景,也就是我们所说的“秒杀”场景,在那样的场景下,需要的线程数量较多,那么使用该类型的线程池可以满足,而且该线程池还有自动收缩的功能,在不需要那么多线程的时候,会自动回收线程,释放资源。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
参考newCachedThreadPool()。
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
只有一个线程的调度线程池,类似于newSingleThreadExecutor,但是该方法生产的线程池具备调度功能,下面是对该方法的描述:
* Creates a single-threaded executor that can schedule commands * to run after a given delay, or to execute periodically. * (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newScheduledThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads.
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
参考newSingleThreadExecutor和newSingleThreadScheduledExecutor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
参考newFixedThreadPool,但是返回类型不一样。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
参考newFixedThreadPool。
通过上面的分析,我们应该对java线程池的理解更为深入,再次说明,第五节是对前面四节内容的补充,你应该首先前四节之后再来阅读第五节,那样内容上更完整,但是单独阅读本文一样具备独立性,但是收获肯定没有同时阅读那样多。
六、ScheduleExecutorService
如果在一个ScheduleExecutorService中提交一个任务,这个任务的调度周期设置
的时间比任务本身执行的时间短的话会出现什么情况?也就是在线程调度时间已经到了
但是上次的任务还没有做完的情况下,ScheduleExecutorService是怎么处理的?
这个问题曾经困扰了我很久,我们都知道,ScheduleExecutorService是一个支持周期调度的线程池,我们可以设置调度的周期period,ScheduleExecutorService会按照设定好的周期调度我们的任务,如果我们设定的调度周期小于任务运行时间,那么很好理解,比如说我们设置的调度周期为1秒,而任务实际只需要10毫秒就可以执行完成一次,那么执行完成之后放到调度队列即可,下次调度时间到了再次调度执行。那么,如果我们的任务执行时间大于我们设定的调度时间会怎么样?比如我们设定的调度周期为1秒,但是我们的任务每次需要执行2秒,这个情况是不是很奇怪呢?
对于ScheduleExecutorService来说,你给我设定的调度周期是1秒,那么我当然1秒就会去运行一次你,但是运行1秒后发现你还在运行,那我是再次运行你还是等你运行完成再调度你运行?
当然,这都是我的主观臆断来猜测ScheduleExecutorService的原理,ScheduleExecutorService的真正原理需要去阅读源码来理解,下面带着这个问题,以解决这个问题为目标去看一下ScheduleExecutorService的源码吧。
首先,我们使用下面的代码作为测试:
private static Runnable blockRunner = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("one round:" + new Date()); } catch (InterruptedException e) { e.printStackTrace(); } }; private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); public static void main(String ... args) { scheduledExecutorService .scheduleAtFixedRate(blockRunner, 0, 100, TimeUnit.MILLISECONDS); }
我们设定了调度周期为100毫秒,但是blockRunner实际上需要执行2秒才能返回。关于java的线程池,已经在前面写到了。
先来看一下scheduleAtFixedRate这个方法:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
我们的任务command被包装了两次,一次变成了一个ScheduledFutureTask类型的对象,然后又变成了RunnableScheduledFuture类型的对象。然后执行了一个方法delayedExecute,这个方法字面意思上看起来像是延时执行的意思,看一下它的代码:
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
它的执行逻辑是:如果线程池被关闭了,那么拒绝提交的任务,否则,将该任务添加队列中去。这个队列就是ThreadPoolExecutor中的workQueue,而这个workQueue是在ThreadPoolExecutor的构造函数中被初始化的,也就是下面这关键的一句:
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
也就是说,我们的任务被添加到了一个DelayedWorkQueue队列中去了,而DelayedWorkQueue我们在Java阻塞队列详解中已经分析过,它是一个可以延迟消费的阻塞队列。而延时的时间是通过接口Delayed的getDelay方法来获得的,我们最后找到ScheduledFutureTask实现了Delayed的getDelay方法。
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); }
time变量是什么?原来是delay,好像和period无关啊!!分析了这么久,发现这是第一次执行任务的逻辑啊,我想知道的是第二次、第三次以后和初始的delay无关之后的周期调度的情况啊,继续找吧!
然后发现了ScheduledFutureTask的run方法,很明显这就是任务调度被执行的关键所在,看下代码:
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } } }
最为关键的地方在于:
else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); }
首先是:runAndReset()这个方法,然后是setNextRunTime()这个方法,然后是reExecutePeriodic(outerTask)这个方法。
第一个方法runAndReset()貌似是执行我们的提交的任务的,我们看下代码:
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
关键的地方是c.call()这一句,这个c就是我们提交的任务。
第二个方法setNextRunTime()的意思是设置下次执行的时间,下面是他的代码细节:
private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }
我们只需要看p>0这个分支就可以了,其实这是两种策略。我们的示例对应了第一个分支的策略,所以很显然,time这个变量会被加p,而p则是我们设定好的period。下面我们找一下这个time是在哪里初始化的,回忆一下scheduleAtFixedRate这个方法的内,我们说我们的任务被包装了两次,而time就是在这里被初始化的:
/** * Returns the trigger time of a delayed action. */ private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } /** * Returns the trigger time of a delayed action. */ long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
无论如何,我们知道一个任务会被运行完一次之后再次设置时间,然后线程池会获取任务来执行,而任务队列是一个延时阻塞队列,所以也就造成了周期性运行的假象。可以看下下面获取任务的take方法:
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
可以看到,如果delay小于等于0,那么就是说需要被立即调度,否则延时delay这样一段时间。也就是延时消费。
结论就是:一个任务会被重复添加到一个延时任务队列,所以同一时间任务队列中会有多个任务待调度,线程池会首先获取优先级高的任务执行。如果我们的任务运行时间大于设置的调度时间,那么效果就是任务运行多长时间,调度时间就会变为多久,因为添加到任务队列的任务的延时时间每次都是负数,所以会被立刻执行
上面列出了最近写的关于java线程池ScheduleExecutorService的内容,可以作为参考,本文是对ScheduleExecutorService学习和总结的一个收尾,对java线程池技术更为深入的学习和总结将在未来适宜的时候进行。
6.1 ScheduleExecutorService提交死循环任务
首先提出一个问题,如果向一个调度线程池提交一个死循环任务会发生什么?为了内容的完整性,本文会提到一些在上面列出的文章中已经涉及到的内容。
比如我们运行下面的代码:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); private static Runnable loopRunner = () -> { for(;;){ } }; scheduledExecutorService .scheduleAtFixedRate(loopRunner, 0, 100, TimeUnit.MILLISECONDS);
loopRunner里面只有一个死循环什么也不做,当然这是极端情况,更为一般的情况为在for(;;)里面做一些某种驱动类型的工作,比如Netty的EventLoop一样,那样的循环更有意义,但是本文只是为了学习当向一个调度线程池提交了一个死循环任务之后的运行情况。
下面我们就分析一下scheduleAtFixedRate方法的调用链路:
1、将loopRunner包装成一个ScheduledFutureTask对象,ScheduledFutureTask这个类对于调度线程池至关重要
2、再次包装变为RunnableScheduledFuture对象
3、delayedExecute方法运行,确保任务被正确处理,如果线程池已经被关闭了,那么拒绝任务的提交,否则将任务添加到一个延时队列(workQueue)中去,这是一个具有延时功能的阻塞队列,初始容量为16,每次扩容增加50%的容量,最大容量为Integer.MAX_VALUE
4、运行方法ensurePrestart,确保线程池已经开始工作了,如果线程池里面的线程数量还没有达到设定的corePoolSize,那么就添加一个新的Worker,然后让这个Worker去延时队列去获取任务来执行
5、方法addWorker执行,添加一个Worker,然后让他执行我们提交的任务,下面摘取一段addWorker的方法内容:
/** * 完整代码见源码,下面只是摘取了部分,去除了一些不影响阅读的部分 */ private boolean addWorker(Runnable firstTask, boolean core) { boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //添加一个新的worker int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //如果添加新的Worker成功,那么就启动它来执行我们提交的任务 t.start(); workerStarted = true; } } } return workerStarted; }
6、第五步中最为重要的一句话就是t.start(),这句话的执行会发生什么?首先看这个t是什么东西:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
而this就是Worker自身,而Worker是实现了Runnable的,也就是说,t.start()这句话会执行worker自身的run方法
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
7、我们已经知道现在会执行Worker的run方法,下面是run方法的内容:
public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
首先从Worker中获取其负责的task,如果task为空,那么就去延时队列获取任务,如果没有获取到任务那么线程就可以休息了,如果获取到,那么继续执行下面的内容。主要的就是一句:task.run(),那这句话会发生什么呢?
8、想要知道task.run()之后会发生什么,就需要知道task是个什么东西,第二步的时候说过,也就是我们的任务,只是被包装成了一个RunnableScheduledFuture<Void>对象,那现在就去看RunnableScheduledFuture这个方法里面的run会发生什么,下面展示了其run方法的具体细节:
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
如果这不是一个周期性的任务,那么就执行super的run方法,否则执行runAndReset方法,介于本文是问题导向的文章,所以在此不对super的run方法和runAndReset方法做分析,只要知道这就是执行我们实际提交的任务就好了。也就是说,走到这一步,我们的任务开始运行起来了,也就是我们的那个loopRunner开始无限循环了,下面的代码将永远得不到执行。所以,到这一步就可以解决问题了,向一个调度线程池提交一个死循环的任务,那么这个任务会霸占一个线程一直不会释放,如果很不幸线程池里面只允许有一个线程的话,那么其他提交的任务都将得不到调度执行。
9、为了走通整个流程,我们假设我们提交的不是一个死循环任务,那么提交的任务总是会被执行完的,线程总是会被释放的,那么就会执行setNextRunTime这个方法,下面是这个方法的细节:
private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }
p > 0代表的是scheduleAtFixedRate,p < 0代表的是scheduleWithFixedDelay,两者的区别在于前者总是按照设定的轨迹来设定下次应该调度的时间,而后者总是在任务执行完成之后再根据周期设定下一次应该执行的时间。我们只分析前者。对于第一次提交的任务,time等于当前时间 + 首次延时执行的时间,对于delay等于0的情况下,首次提交任务的time就是当前时间,然后 + p代表的是下一次应该被调度的时间。
10、我们发现,每个任务都是在执行完一次之后再设定下次执行任务的时间的,这也特别关键。设定好下次调度的时间,那么就要开始去准备执行它吧,也就是reExecutePeriodic方法会执行,下面是reExecutePeriodic方法的内容:
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }
这个方法只是将任务重新提交到了延时队列而已,一次完整的流程到底也就结束了,为了内容的完整性,再来分析一下一个Worker从延时队列获取任务时的情况。回到第七步,我们有一个方法没有提到,那就是getTask():
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
我们主要来看两个方法: poll/take,这两个方法都是从延时队列获取一个任务,下面是poll的代码,take会阻塞一直到获取到内容,而poll则不会阻塞,take的代码就不粘了:
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); if (nanos <= 0) return null; first = null; // don't retain ref while waiting if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
poll的代码最为核心的内容就是,获取队列首部的任务,然后获取其延时时间,这个时间是我们在完成一次调度之后设置的下次调度时间,如果任务的运行时间大于我们设定的周期的话,这个延时时间就是负数的,那么就会被立即执行,否则会等到设定的时间,时间到了再返回给Worker执行。
最后把getDelay方法的细节粘出来,这样内容就完整了,其中的time是我们设定的:
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); }
七、ThreadLocal
ThreadLocal从字面理解就是线程本地变量,貌似是一种线程私有的缓存变量的容器。为了说明ThreadLocal的特点,举个例子:比如有三个人,每个人比作一个线程,它们都需要一个袋子来装捡到的东西,也就是每个线程都希望自己有一个容器,当然,自己的捡到的东西肯定不希望和别人分享啊,也就是希望这个容器对其他人(线程)是不可见的,如果现在只有一个袋子,那怎么办?
- 每个人在捡东西之前一定会先抢到那个唯一的袋子,然后再捡东西,如果使用袋子的时间到了,就会马上把里面的东西消费掉,然后把袋子放到原来的地方,然后再次去抢袋子。这个方案是使用锁来避免线程竞争问题的,三个线程需要竞争同一个共享变量。
- 我们假设现在不是只有一个袋子了,而是有三个袋子,那么就可以给每个人安排一个袋子,然后每个人的袋子里面的对象是对其他人不可见的,这样的好处是解决了多个人竞争同一个袋子的问题。这个方案就是使用ThreadLocal来避免不必要的线程竞争的。
大概了解了ThreadLocal,下面来看看它的使用方法:
private static class UnsafeThreadClass { private int i; UnsafeThreadClass(int i) { this.i = i; } int getAndIncrement() { return ++ i; } @Override public String toString() { return "[" + Thread.currentThread().getName() + "]" + i; } } private static ThreadLocal<UnsafeThreadClass> threadLocal = new ThreadLocal<>(); static class ThreadLocalRunner extends Thread { @Override public void run() { UnsafeThreadClass unsafeThreadClass = threadLocal.get(); if (unsafeThreadClass == null) { unsafeThreadClass = new UnsafeThreadClass(0); threadLocal.set(unsafeThreadClass); } unsafeThreadClass.getAndIncrement(); System.out.println(unsafeThreadClass); } }
上面的例子仅仅是为了说明ThreadLocal可以为每个线程保存一个本地变量,这个变量不会受到其他线程的干扰,你可以使用多个ThreadLocal来让线程保存多个变量,下面我们分析一下ThreadLocal的具体实现细节,首先展示了ThreadLocal提供的一些方法,我们重点关注的是get、set、remove方法。
ThreadLocal方法
首先,我们需要new一个ThreadLocal对象,那么ThreadLocal的构造函数做了什么呢?
/** * Creates a thread local variable. * @see #withInitial(java.util.function.Supplier) */ public ThreadLocal() { }
很遗憾它什么都没做,那么初始化的过程势必是在首次set的时候做的,我们来看一下set方法的细节:
public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); }
看起来首先根据当前线程获取到了一个ThreadLocalMap,getMap方法是做了什么?
ThreadLocalMap getMap(Thread t) { return t.threadLocals; }
非常的简洁,是和Thread与生俱来的,我们看一下Thread中的相关定义:
/* ThreadLocal values pertaining to this thread. This map is maintained * by the ThreadLocal class. */ ThreadLocal.ThreadLocalMap threadLocals = null; /* * InheritableThreadLocal values pertaining to this thread. This map is * maintained by the InheritableThreadLocal class. */ ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
获得了线程的ThreadLocalMap之后,如果不为null,说明不是首次set,直接set就可以了,注意key是this,也就是当前的ThreadLocal啊不是Thread。如果为空呢?说明还没有初始化,那么就需要执行createMap这个方法:
void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); }
没什么特别的,就是初始化线程的threadLocals,然后设定key-value。
下面分析一下get的逻辑:
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }
和set一样,首先根据当前线程获取ThreadLocalMap,然后判断是否为null,如果为null,说明ThreadLocalMap还没有被初始化啊,那么就返回方法setInitialValue的结果,这个方法做了什么?
private T setInitialValue() { T value = initialValue(); Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); return value; } protected T initialValue() { return null; }
最后会返回null,但是会做一些初始化的工作,和set一样。在get里面,如果返回的ThreadLocalMap不为null,则说明ThreadLocalMap已经被初始化了,那么就可以正常根据ThreadLocal作为key获取了。
当线程退出时,会清理ThreadLocal,可以看下面的代码:
/** * This method is called by the system to give a Thread * a chance to clean up before it actually exits. */ private void exit() { if (group != null) { group.threadTerminated(this); group = null; } /* Aggressively null out all reference fields: see bug 4006245 */ target = null; /* Speed the release of some of these resources */ threadLocals = null; inheritableThreadLocals = null; inheritedAccessControlContext = null; blocker = null; uncaughtExceptionHandler = null; }
这里做了大量“Help GC”的工作。包括我们本节所讲的threadLocals和下一小节要讲的inheritableThreadLocals都会被清理。
如果我们想要显示的清理ThreadLocal,可以使用remove方法:
public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this); }
逻辑较为直接,很好理解。
八、InheritableThreadLocal
ThreadLocal固然很好,但是子线程并不能取到父线程的ThreadLocal的变量,比如下面的代码:
private static ThreadLocal<Integer> integerThreadLocal = new ThreadLocal<>(); private static InheritableThreadLocal<Integer> inheritableThreadLocal = new InheritableThreadLocal<>(); public static void main(String[] args) throws InterruptedException { integerThreadLocal.set(1001); // father inheritableThreadLocal.set(1002); // father new Thread(() -> System.out.println(Thread.currentThread().getName() + ":" + integerThreadLocal.get() + "/" + inheritableThreadLocal.get())).start(); } //output: Thread-0:null/1002
使用ThreadLocal不能继承父线程的ThreadLocal的内容,而使用InheritableThreadLocal时可以做到的,这就可以很好的在父子线程之间传递数据了。下面我们分析一下InheritableThreadLocal的实现细节,下面展示了InheritableThreadLocal提供的方法:
InheritableThreadLocal方法
InheritableThreadLocal继承了ThreadLocal,然后重写了上面三个方法,所以除了上面三个方法之外,其他所有对InheritableThreadLocal的调用都是对ThreadLocal的调用,没有什么特别的。我们上文中提到了Thread类,里面有我们本文关心的两个成员,我们来看一下再Thread中做了哪些工作,我们跟踪一下new一个Thread的调用路径:
new Thread() init(ThreadGroup g, Runnable target, String name, long stackSize) init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) -> if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); createInheritedMap(ThreadLocalMap parentMap) ThreadLocalMap(ThreadLocalMap parentMap)
上面列出了最为关键的代码,可以看到,最后会调用ThreadLocal的createInheritedMap方法,而该方法会新建一个ThreadLocalMap,看一下构造函数的内容:
private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table; int len = parentTable.length; setThreshold(len); table = new Entry[len]; for (int j = 0; j < len; j++) { Entry e = parentTable[j]; if (e != null) { @SuppressWarnings("unchecked") ThreadLocal<Object> key = (ThreadLocal<Object>) e.get(); if (key != null) { Object value = key.childValue(e.value); Entry c = new Entry(key, value); int h = key.threadLocalHashCode & (len - 1); while (table[h] != null) h = nextIndex(h, len); table[h] = c; size++; } } } }
parentMap就是父线程的ThreadLocalMap,这个构造函数的意思大概就是将父线程的ThreadLocalMap复制到自己的ThreadLocalMap里面来,这样我们就可以使用InheritableThreadLocal访问到父线程中的变量了。
对ThreadLocal更为具体和深入的分析将在其他的篇章中进行,本文点到即可,为了深入理解ThreadLocal,可以阅读ThreadLocalMap的源码,以及可以在项目中多思考是否可以使用ThreadLocal来做一些事情,比如,如果我们具有这样一种线程模型,一个任务从始至终只会被一个线程执行,那么可以使用ThreadLocal来计算运行该任务的时间。