线程池 ThreadPoolExecutor
介绍
线程池主要解决两个问题:一是当执行大量异步任务时线程池能够提供较好的性能。在不使用线程池时,每当需要执行任务时就需要 new 一个线程来执行,频繁的创建与销毁非常消耗性能。而线程池中的线程是可以复用的,不需要在每次需要执行任务时候都重新创建和销毁。二是线程池提供了资源限制和管理的手段,比如可以限制线程个数,动态增加线程等。
另外,线程池也提供了许多可调参数和可扩展性接口,以满足不同情况下的需要,我们可以使用更方便的 Executors 的工厂方法,来创建不同类型的线程池,也可以自己自定义线程池。
线程池的工作机制
- 线程池刚创建的时候没有任何线程,当来了新的请求的时候才会创建
核心线程
去处理对应的请求 - 当处理完成之后,核心线程并不会回收
- 在核心线程达到指定的数量之前,每一个请求都会在线程池中创建一个新的核心线程
- 当核心线程全都被占用的时候,新来的请求会放入工作队列中。工作队列本质上是一个
阻塞队列
- 当工作队列被占满,再来的新请求会交给临时线程来处理
- 临时线程在使用完成之后会继续存活一段时间,直到没有请求处理才会被销毁
类图介绍
如上类图所示,Executors 是一个工具类,提供了多种静态方法,根据我们选择的不同提供不同的线程池实例。
ThreadPoolExecutor 继承了 AbstractExecutorService 抽象类,在 ThreadPoolExecutor 中成员变量 ctl 是一个 Integer 的原子性变量,用来记录线程池的状态和线程中线程个数,类似于 ReentrantReadWriteLock 使用一个变量来保存两种信息一样。
假设 Integer 类型是 32 位二进制表示,则其中高 3 位表示线程池的状态,后 29 为表示线程池线程数量。
//默认RUNNING状态,线程个数为0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
获取高 3 位,运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
获取低 29 位,线程个数
private static int workerCountOf(int c) { return c & CAPACITY; }
线程池状态含义如下:
RUNNING
:接受新任务并且处理阻塞队列中的任务SHUTDOWN
:拒绝新任务但是处理阻塞队列中的任务STOP
:拒绝新任务并且抛弃阻塞队列里的任务,同时中断正在处理的任务TIDYING
:所有任务都执行完(包括阻塞队列中的任务)后当前线程池活动线程数量为 0,将调用 terminated 方法TERMINATED
:终止状态。terminated 调用完成方法后的状态
线程池状态转换如下:
- RUNNING -> SHUTDOWN:显式调用 shutdown 方法,或者隐式调用了 finalize 方法里面的 shutdown 方法
- RUNNING 或 SHUTDOWN -> STOP :显式调用 shutdownNow 方法时
- SHUTDOWN -> TIDYING:当线程池和任务队列都为空时
- STOP -> TIDYING:当线程池为空时
- TIDYING -> TERMINATED:当 terminated hook 方法执行完成时
线程池参数如下:
参数名 | 类型 | 含义 |
corePoolSize | int | 核心线程数 |
maxPoolSize | int | 最大线程数 |
keepAliveTime | long | 保持存活时间 |
workQueue | BlockingQueue | 任务存储队列 |
threadFactory | ThreadFactory | 当线程池需要新的线程时,使用 ThreadFactory 来创建新的线程 |
Handler | RejectedExecutionHandler | 由于线程池无法接受所提交的任务所给出的拒绝策略 |
corePoolSize
:指的是核心线程数,线程池初始化完成后,默认情况下,线程池并没有任何线程,线程池会等待任务到来时,再创建新的线程去执行任务。maxPoolSize
:线程池有可能会在核心线程数上,额外增加一些线程,但是这些新增加的线程有一个上限,最大不能超过 maxPoolSize。
- 如果线程数小于 corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新的线程来运行任务。
- 如果线程数大于等于 corePoolSize 但少于 maxPoolSize,则将任务放进工作队列中。
- 如果队列已满,并且线程数小于 maxPoolSize,则创建一个新线程来运行任务。
- 如果队列已满,并且线程数已经大于等于 maxPoolSize,则使用拒绝策略来拒绝该任务。
keepAliveTime
:一个线程如果处于空闲状态,并且当前的线程数量大于 corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由 keepAliveTime 来设定。workQueue
:新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk 中提供了四种工作队列:
ArrayBlockingQueue
:基于数组的有界阻塞队列
,按 FIFO 排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到 corePoolSize 后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到 maxPoolSize,则会执行拒绝策略。LinkedBlockingQueue
:基于链表的无界阻塞队列
(其实最大容量为 Interger.MAX),按照 FIFO 排序。由于该队列的近似无界性,当线程池中线程数量达到 corePoolSize 后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到 maxPoolSize,因此使用该工作队列时,参数 maxPoolSize 其实是不起作用的。SynchronousQueue
:一个不缓存任务的阻塞队列
,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到 maxPoolSize,则执行拒绝策略。PriorityBlockingQueue
:具有优先级的无界阻塞队列
,优先级通过参数 Comparator 实现。delayQueue
:具有优先级的延时无界阻塞队列
。LinkedTransferQueue
:基于链表的无界阻塞队列
。LinkedBlockingDeque
:基于链表的双端阻塞队列
。
threadFactory
:创建一个新线程时使用的工厂,可以用来设定线程名、是否为 daemon 线程等等handler
:当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,就会执行拒绝策略。
如上 ThreadPoolExecutor 类图所示,其中 mainLock 是独占锁,用来控制新增 Worker 线程操作的原子性。termination 是该锁对应的条件队列,在线程调用 awaitTermination 时用来存放阻塞的线程。
private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>(); private final Condition termination = mainLock.newCondition();
Worker 类继承 AQS 和 Runnable 接口,是具体承载任务的对象。Worker 继承了 AQS,实现了简单的不可重入的独占锁,state = 0
表示锁未被获取,state = 1
则表示锁已经被获取,state = -1
是创建 Worker 时默认的状态,创建时状态设置为-1 是为了避免该线程在运行 runWorker 方法前被中断。其中变量 firstTask 记录该工作线程执行的第一个任务,thread 是具体执行任务的线程。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ final Thread thread; Runnable firstTask; //...
DefaultThreadFactory 是线程工厂,newThread 方法是对线程的一个修饰。其中 poolNumber 是个静态的原子变量,用来统计线程工厂的个数,threadNumber 用来记录每个线程工厂创建了多少线程,这两个值也作为线程池和线程的名称的一部分。
源码解析
execute 方法
execute 方法主要作用就是提交任务 command 到线程池中进行执行。
该图可以看到,ThreadPoolExecutor 实现其实就是一个生产者消费者模型,当用户添加任务到线程池相当于生产者生产元素,workers 线程中的线程直接执行任务或者从任务队列里面获取任务则相当于是消费者消费元素。
具体代码如下:
public void execute(Runnable command) { //1. 校验任务是否为null if (command == null) throw new NullPointerException(); //2. 获取当前线程池的状态+线程个数的组合值 int c = ctl.get(); //3. 判断线程池中线程个数是否小于corePoolSize,小则开启新线程(core线程)运行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //4. 如果线程池处于RUNNING状态,则添加任务到阻塞队列 if (isRunning(c) && workQueue.offer(command)) { //4.1 二次检查 int recheck = ctl.get(); //4.2 如果当前线程池状态不是RUNNING则从队列中删除任务,并执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); //4.3 如果当前线程池为空,则添加一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //5. 如果队列满,则新增线程,新增失败则执行拒绝策略 else if (!addWorker(command, false)) reject(command); }
如果当前线程池线程个数大于等于 corePoolSize 则执行代码(4),如果当前线程处于 RUNNING 则添加到任务队列。
需要注意的是这里判断线程池状态是因为有可能线程池已经处于非 RUNNING 状态,在非 RUNNING 状态下是要抛弃新任务的。
如果任务添加成功,则执行代码(4.2)进行二次校验,因为在执行 4.2 之前可能线程池的状态发送变化,如果线程池状态不是 RUNNING 则把任务从任务队列中移除,然后执行拒绝策略;如果二次校验通过,则重新判断线程池里是否还有线程,没有则新增一个线程。
如果代码(4)添加任务失败,则说明队列已满,随后执行代码(5)尝试新增线程也就是上图中的 thread3,thread4 线程来执行任务,如果当前线程池个数 > maximumPoolSize 则执行拒绝策略。
我们接下来看 addWorker 方法:
private boolean addWorker(Runnable firstTask, boolean core) { retry: // 每次for循环都需要获取最新的ctl值 for (;;) { int c = ctl.get(); int rs = runStateOf(c); //1. 检查队列是否只在必要时为空 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //2. 循环CAS增加线程个数 for (;;) { int wc = workerCountOf(c); //2.1 如果线程个数超限制则返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //2.2 通过CAS增加线程个数 if (compareAndIncrementWorkerCount(c)) break retry; //2.3 CAS失败后,查看线程状态是否发生变化,如果变化则跳转到外层循环重新尝试获取线程池状态,否则内层循环重新进行CAS c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } //3. 执行到这一步说明CAS成功 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //3.1 创建Worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //3.2 增加独占锁,实现同步,因为可能多个线程同时调用线程池的execute方法 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //3.3 重新检查线程池状态,避免在获取锁前调用了shutdown int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //3.4 添加任务 workers.add(w); // 更新当前最大线程数量 maximumPoolSize 和 corePoolSize可以在线程池创建之后动态修改的 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //3.5 添加成功后启动任务 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 如果没有执行过t.start() 就要把这个woker从workers里面删除,并且ctl里面worker数量减1 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
首先明确第一部分双重循环目的是通过 CAS 操作进行添加线程数,第二部分主要通过 ReentrantLock 安全的将任务添加到 workers 里,随后启动任务。
首先看第一部分代码(1)。
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && //(1) firstTask == null && //(2) ! workQueue.isEmpty())) //(3)
代码(1)中会在下面三种情况返回 false:
- (1)当前线程池状态为 STOP、TIDYING 或 TERMINATED
- (2)当前线程池状态为 SHUTDOWN 并且已经有了第一个任务
- (3)当前线程池状态为 SHUTDOWN 并且任务队列为空
代码(2)内层循环的作用是使用 CAS 操作增加线程数量。
执行到代码(8)时,说明已经通过 CAS 成功增加了线程个数,现在任务还没有开始执行,所以这部分代码通过全局锁控制来增加 Worker 到工作集合 workers 中。
工作线程 Worker 的执行
用户线程提交到线程池之后,由 Worker 来执行,下面是 Worker 的构造函数。
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this);//创建一个线程 }
在构造函数内首先设置 Worker 的状态为-1,为了避免当前 Worker 在调用 runWorker 方法前被中断(当其他线程调用了线程池的 shutdownNow 时,如果 Worker 状态>=0 则会中断该线程)。这里设置了线程的状态为-1,所以该线程就不会被中断了。
在 runWorker 代码中,运行代码(1)时会调用 unlock 方法,该方法把 status 设置为了 0,所以这时候调用 shutdownNow 会中断 Worker 线程。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); //1. 将state设置为0,允许中断 boolean completedAbruptly = true; try { //2. while (task != null || (task = getTask()) != null) { //2.1 w.lock(); ////如果状态值大于等于STOP且当前线程还没有被中断,则主动中断线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //2.2 执行任务前处理操作,默认是一个空实现;在子类中可以通过重写来改变任务执行前的处理行为 beforeExecute(wt, task); Throwable thrown = null; try { //2.3 执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //2.4 任务之后处理,同beforeExecute afterExecute(task, thrown); } } finally { task = null; //2.5 统计当前worker完成了多少个任务 w.completedTasks++; w.unlock(); } } //设置为false,表示任务正常处理完成 completedAbruptly = false; } finally { //3. 清理工作 processWorkerExit(w, completedAbruptly); } }
这里在执行具体任务期间加锁,是为了避免在任务运行期间,其他线程调用了 shutdown 后正在执行的任务被中断(shutdown 只会中断当前被阻塞挂起的线程)
清理工作代码如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly为true则表示任务执行过程中抛出了未处理的异常 // 所以还没有正确地减少worker计数,这里需要减少一次worker计数 if (completedAbruptly) decrementWorkerCount(); //1. 统计线程池中完成任务的个数,并从工作集合里面删除当前Worker final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } //1.2 尝试设置线程池状态为TERMINATED,在关闭线程池时等到所有worker都被回收后再结束线程池 tryTerminate(); //1.3 如果线程池状态 < STOP,即RUNNING或SHUTDOWN,则需要考虑创建新线程来代替被销毁的线程 int c = ctl.get(); if (runStateLessThan(c, STOP)) { // 如果worker是正常执行完的,则要判断一下是否已经满足了最小线程数要求 // 否则直接创建替代线程 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } // 重新创建一个worker来代替被销毁的线程 addWorker(null, false); } }
在如上代码中,代码(1.1)统计线程池完成任务个数,并且在统计前加了全局锁。把在当前工作线程中完成的任务累加到全局计数器,然后从工作集中删除当前 Worker。
代码(1.2)判断如果当前线程池状态是 SHUTDOWN 并且工作队列为空,或者当前线程池状态是 STOP 并且当前线程池里面没有活动线程,则设置线程池状态为 TERMINATED。如果设置为了 TERMINATED 状态,则还需要调用条件变量 termination 的 signalAll 方法激活所有因为调用线程池的 awaitTermination 方法而被阻塞的线程。
代码(1.3)则判断当前线程池里面线程个数是否小于核心线程个数,如果是则新增一个线程。
shutdown 方法
调用 shutdown 方法后,线程池就不会再接受新的任务了,但是工作队列里面的任务还是要执行的。该方法会立刻返回,并不等待队列任务完成再返回。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //1. 权限检查 checkShutdownAccess(); //2. 设置当前线程池状态为SHUTDOWN,如果已经是SHUTDOWN则直接返回 advanceRunState(SHUTDOWN); //3. 设置中断标志 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //4. 尝试将状态改为TERMINATED tryTerminate(); }
首先查看当前调用的线程是否有关闭线程的权限。
随后代码(2)的代码如下。如果当前线程池状态 >= SHUTDOWN 则直接返回,否则设置为 SHUTDOWN 状态。
private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
代码(3)的源码如下,其设置所有空闲线程的中断标志。这里首先加了全局锁,同时只有一个线程可以调用 shutdown 方法设置中断标志。然后尝试获取 Worker 自己的锁,获取成功则设置中断标志。由于正在执行的任务已经获取了锁,所以正在执行的任务没有被中断。这里中断的是阻塞到 getTask 方法并企图从队列里面获取任务的线程,也就是空闲线程。
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
最后尝试将状态改为 TERMINATED,首先使用 CAS 设置当前线程池状态为 TIDYING,如果设置成功则执行扩展接口 terminated 在线程池状态变为 TERMINATED 前做一些事情,然后设置当前线程池状态为 TERMINATED。最后调用 termination.signalAll 激活因调用条件变量 termination 的 await 系列方法而被阻塞的所有线程。
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
shutdownNow 方法
调用 shutdownNow 方法后,线程池就不会再接受新的任务了,并且会丢弃工作队列里面的任务,正在执行的任务会被中断,该方法会立刻返回,并不等待激活的任务执行完成。返回值为这时候队列里面被丢弃的任务列表。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //1. 权限检查 checkShutdownAccess(); //2. 设置线程池状态为STOP advanceRunState(STOP); //3. 中断所有线程 interruptWorkers(); //4. 将任务队列移动到tasks中 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
需要注意的是,中断的所有线程包含空闲线程和正在执行任务的线程。
awaitTermination 方法
当线程调用 awaitTermination 方法后,当前线程会被阻塞,直到线程池状态变为 TERMINATED 才返回,或者等待时间超时才返回。
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
首先获取独占锁,然后在无限循环内部判断当前线程池状态是否至少是 TERMINATED 状态,如果是则直接返回,否则说明当前线程池里面还有线程在执行,则看设置的超时时间 nanos 是否小于 0,小于 0 则说明不需要等待,那就直接返回,如果大于 0 则调用条件变量 termination 的 awaitNanos 方法等待 nanos 时间,期望在这段时间内线程池状态变为 TERMINATED。
总结
线程池巧妙地使用一个 Integer 类型的原子变量来记录线程池状态和线程池中的线程个数。通过线程池状态来控制任务的执行,每个 Worker 线程可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销。