首先来分析第一部分的代码6
// 6 检查队列是否只在必要的时候为空 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;
展开!运算后等价于
rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
也就是说下面几种情况下会返回false:
当前线程池状态为STOP,TIDYING,TERMINATED
当前线程池状态为SHUTDOWN并且已经有了第一个任务
当前线程池状态为SHUTDOWN并且任务队列为空
内层循环的作用是使用CAS操作增加线程数,代码(7.1)判断如果线程个数超限则返回false,否则执行代码(7.2)CAS操作设置线程个数,CAS成功则退出双循环,CAS失败则执行代码(7.3)看当前线程池的状态是否变化了,如果变了,则再次进入外层循环重新获取线程池状态,否则进入内层循环继续进行CAS尝试。
执行到第二部分的代码(8)时说明使用CAS成功地增加了线程个数,但是现在任务还没开始执行。这里使用全局的独占锁来控制把新增的Worker添加到工作集workers中。代码(8.1)创建了一个工作线程Worker。
代码(8.2)获取了独占锁,代码(8.3)重新检查线程池状态,这是为了避免在获取锁前其他线程调用了shutdown关闭了线程池。如果线程池已经被关闭,则释放锁,新增线程失败,否则执行代码(8.4)添加工作线程到线程工作集,然后释放锁。代码(8.5)判断如果新增工作线程成功,则启动工作线程。
工作线程Worker的执行
用户线程提交任务到线程池后,由Worker来执行。先看下Worker的构造函数。
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //创建一个线程 this.thread = getThreadFactory().newThread(this); }
在构造函数内首先设置Worker的状态为-1,这是为了避免当前Worker在调用runWorker方法前被中断(当其他线程调用了线程池的shutdownNow时,如果Worker状态>=0则会中断该线程)。这里设置了线程的状态为-1,所以该线程就不会被中断了。在如下runWorker代码中,运行代码(9)时会调用unlock方法,该方法把status设置为了0,所以这时候调用shutdownNow会中断Worker线程。
/** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 9 将state 置为0 ,允许终端 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 10 while (task != null || (task = getTask()) != null) { // 10.1 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 10.2 执行任务前干一些事情 beforeExecute(wt, task); Throwable thrown = null; try { // 10.3 执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 10.4 执行任务完成后干一些事情 afterExecute(task, thrown); } } finally { task = null; // 10.5 统计当前Worker完成了多少任务 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 11 执行清理工作 processWorkerExit(w, completedAbruptly); } }
在如上代码(10)中,如果当前task==null或者调用getTask从任务队列获取的任务返回null,则跳转到代码(11)执行。
如果task不为null则执行代码(10.1)获取工作线程内部持有的独占锁,然后执行扩展接口代码(10.2)在具体任务执行前做一些事情。代码(10.3)具体执行任务,代码(10.4)在任务执行完毕后做一些事情,代码(10.5)统计当前Worker完成了多少个任务,并释放锁。
这里在执行具体任务期间加锁,是为了避免在任务运行期间,其他线程调用了shutdown后正在执行的任务被中断(shutdown只会中断当前被阻塞挂起的线程)
getTask()
如果当前task为空,则直接执行,否者调用getTask从任务队列获取一个任务执行,如果任务队列为空,则worker退出。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果当前线程池状态>=STOP 或者线程池状态为shutdown并且工作队列为空则,减少工作线程个数 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { //根据timed选择调用poll还是阻塞的take Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
processWorkerExit
代码(11)执行清理任务,其代码如下。
/** * Performs cleanup and bookkeeping for a dying worker. Called * only from worker threads. Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit. This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers. * * @param w the worker * @param completedAbruptly if the worker died due to user exception */ private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); //统计整个线程池完成的任务个数 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } //尝试设置线程池状态为TERMINATED,如果当前是shutdonw状态并且工作队列为空 //或者当前是stop状态当前线程池里面没有活动线程 tryTerminate(); //如果当前线程个数小于核心个数,则增加 int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
shutdown
调用shutdown后,线程池就不会在接受新的任务了,但是工作队列里面的任务还是要执行的,但是该方法立刻返回的,并不等待队列任务完成在返回。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 12 advanceRunState(SHUTDOWN);// 13 interruptIdleWorkers();// 14 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate();// 15 }
代码(12)检查看是否设置了安全管理器,是则看当前调用shutdown命令的线程是否有关闭线程的权限,如果有权限则还要看调用线程是否有中断工作线程的权限,如果没有权限则抛出SecurityException或者NullPointerException异常。
其中代码(13)的内容如下,如果当前线程池状态>=SHUTDOWN则直接返回,否则设置为SHUTDOWN状态。
private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
代码(14)的内容如下,其设置所有空闲线程的中断标志。这里首先加了全局锁,同时只有一个线程可以调用shutdown方法设置中断标志。然后尝试获取Worker自己的锁,获取成功则设置中断标志。由于正在执行的任务已经获取了锁,所以正在执行的任务没有被中断。这里中断的是阻塞到getTask()方法并企图从队列里面获取任务的线程,也就是空闲线程。
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
在如上代码中,首先使用CAS设置当前线程池状态为TIDYING,如果设置成功则执行扩展接口terminated在线程池状态变为TERMINATED前做一些事情,然后设置当前线程池状态为TERMINATED。最后调用 termination.signalAll()激活因调用条件变量termination的await系列方法而被阻塞的所有线程
shutdownNow
调用shutdownNow方法后,线程池就不会再接受新的任务了,并且会丢弃工作队列里面的任务,正在执行的任务会被中断,该方法会立刻返回,并不等待激活的任务执行完成。返回值为这时候队列里面被丢弃的任务列表。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 16 advanceRunState(STOP);// 17 interruptWorkers();//18 tasks = drainQueue();//19 } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
在如上代码中,首先调用代码(16)检查权限,然后调用代码(17)设置当前线程池状态为STOP,随后执行代码(18)中断所有的工作线程。这里需要注意的是,中断的所有线程包含空闲线程和正在执行任务的线程。
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
然后代码(19)将当前任务队列里面的任务移动到tasks列表。
awaitTermination
等待线程池状态变为TERMINATED则返回,或者时间超时。由于整个过程独占锁,所以一般调用shutdown或者shutdownNow后使用。
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
如上代码首先获取独占锁,然后在无限循环内部判断当前线程池状态是否至少是TERMINATED状态,如果是则直接返回,否则说明当前线程池里面还有线程在执行 ,则看设置的超时时间nanos是否小于0,小于0则说明不需要等待,那就直接返回,如果大于0则调用条件变量termination的awaitNanos方法等待nanos时间,期望在这段时间内线程池状态变为TERMINATED。
在shutdown方法时提到过,当线程池状态变为TERMINATED时,会调用termination.signalAll()用来激活调用条件变量termination的await系列方法被阻塞的所有线程,所以如果在调用awaitTermination之后又调用了shutdown方法,并且在shutdown内部将线程池状态设置为TERMINATED,则termination.awaitNanos方法会返回。
另外在工作线程Worker的runWorker方法内,当工作线程运行结束后,会调用processWorkerExit方法,在processWorkerExit方法内部也会调用tryTerminate方法测试当前是否应该把线程池状态设置为TERMINATED,如果是,则也会调用termination.signalAll()用来激活调用线程池的awaitTermination方法而被阻塞的线程。
而且当等待时间超时后,termination.awaitNanos也会返回,这时候会重新检查当前线程池状态是否为TERMINATED,如果是则直接返回,否则继续阻塞挂起自己。
小结
线程池巧妙地使用一个Integer类型的原子变量来记录线程池状态和线程池中的线程个数。通过线程池状态来控制任务的执行,每个Worker线程可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销。