代码3执行清理任务,其代码如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) { ... //(3.1)统计整个线程池完成的任务个数,并从工作集里面删除当前woker final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } //(3.2)尝试设置线程池状态为TERMINATED,如果当前是shutdonw状态并且工作队列为空 //或者当前是stop状态且当前线程池里面没有活动线程 tryTerminate(); //(3.3)如果当前线程个数小于核心个数,则增加 int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
代码3.1统计线程池完成任务个数,可知在统计前加了全局锁,把当前工作线程中完成的任务累加到全局计数器,然后从工作集中删除当前Worker。
代码3.2判断如果当前线程池状态是shutdown状态并且工作队列为空,或者当前是stop状态并且当前线程池里面没有活动线程,则设置线程池状态为TERMINATED。
代码3.3判断当前线程中的线程个数是否小于核心线程个数,如果是则新增一个线程。
关闭线程池原理解析
线程池中有两种模式的线程池关闭方法
public void shutdown()
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //(1)权限检查 checkShutdownAccess(); //(2)设置当前线程池状态为SHUTDOWN,如果已经是SHUTDOWN则直接返回 advanceRunState(SHUTDOWN); //(3)设置中断标志 interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } //(4)尝试状态变为TERMINATED tryTerminate(); }
代码1检查如果设置了安全管理器,则看当前调用shutdown命令的线程是否有关闭线程的权限,如果有权限则还要看调用线程是否有中断工作线程的权限,如果没有权限则抛出SecurityException或者NullPointerException异常。
代码2的内容如下,如果当前状态>=SHUTDOWN则直接返回,否则设置当前状态为SHUTDOWN:
private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
代码3的内容如下,设置所有空闲线程的中断标志,这里首先加了全局锁,同时只有一个线程可以调用shutdown设置中断标志。然后尝试获取Worker本身的锁,获取成功则设置中断标识,由于正在执行的任务已经获取了锁,所以正在执行的任务没有被中断。这里中断的是阻塞到getTask()方法,企图从队列里获取任务的线程,也就是空闲线程。
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; //如果工作线程没有被中断,并且没有正在运行则设置中断 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
代码4尝试将线程池的状态变为TERMINATED,tryTerminate的代码如下:
final void tryTerminate() { for (;;) { ... int c = ctl.get(); ... final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {//设置当前线程池状态为TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { //设置当前线程池状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //激活调用条件变量termination的await系列方法被阻塞的所有线程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } } }
如上述代码所示,首先使用CAS设置当前线程池状态为TIDYING,如果成功则执行扩展接口terminated在线程池状态变为TERMINATED前做一些事情,然后设置当前线程池状态为TERMINATED,最后调用termination.signalAll()来激活调用线程池的awaitTermination系列方法被阻塞的所有线程。
public void shutdownNow()
下面我们来看public void shutdownNow()方法的代码逻辑:
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess();//(5)权限检查 advanceRunState(STOP);//(6) 设置线程池状态为stop interruptWorkers();//(7)中断所有线程 tasks = drainQueue();//(8)移动队列任务到tasks } finally { mainLock.unlock(); } //(9)终止状态 tryTerminate(); return tasks; }
首先调用代码5检查权限,然后调用代码6设置当前线程池状态为STOP,接着执行代码7中断所有的工作线程,这里需要注意的是中断所有线程,包含空闲线程和正在执行任务的线程:
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
然后调用代码8将当前任务队列的任务移动到tasks列表,代码如下:
private List<Runnable> drainQueue() { //8.1获取任务队列 BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); //8.2 从任务队列移除任务到taskList列表 q.drainTo(taskList); //8.3 如果q还不为空,则说明drainTo接口调用失效,则循环移除 if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } //8.4返回异常的任务列表 return taskList; }
由上述代码可知,调用线程池队列的drainTo方法把队列中的任务移除到taskList里,如果发现线程池队列还不为空(比如DelayQueue或者其他类型的队列drainTo可能移除元素失败),则循环移除里面的元素,最后返回移除的任务列表。
线程池的拒绝策略解析
线程池是通过池化少量线程来提供线程复用的,当调用线程向线程池中投递大量任务后,线程池可能就处于饱和状态了。所谓饱和状态是指当前线程池队列已经满了,并且线程池中的线程已经达到了最大线程个数。当线程池处于饱和状态时,再向线程池投递任务,而对于投递的任务如何处理,是由线程池拒绝策略决定的。拒绝策略的执行是在execute方法,大家可以返回前面章节查看。
线程池中提供了RejectedExecutionHandler接口,用来提供对线程池拒绝策略的抽象,其定义如下:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
线程池中提供了一系列该接口的实现供我们使用
AbortPolicy
首先我们看下AbortPolicy策略的代码:
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } /** * 抛出RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
由上述代码可知,该拒绝策略执行时会直接向调用线程抛出RejectedExecutionException异常,并丢失提交的任务。
CallerRunsPolicy
然后我们看下CallerRunsPolicy策略的代码:
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } /** * 使用调用线程执行任务r * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
分析上述代码,该拒绝策略执行时,如果线程池没有被关闭,则会直接使用调用线程执行提交的任务r,否则默默丢弃该任务。
DiscardPolicy
然后我们看下DiscardPolicy策略的代码:
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } /** * 什么都不做,默默丢弃提交的任务 * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
该拒绝策略执行时,什么都不做,默默丢弃提交的任务。
DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } /** * 丢弃线程池队列里面最老的任务,并把当前任务提交到线程池 * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll();//移除队首元素 e.execute(r);//提交任务r到线程池执行 } } }
该拒绝策略首先会丢弃线程池队列里面最老的任务,然后把当前任务r提交到线程池。