深入理解线程池
上面我和你简单聊了一下线程池的基本构造,线程池有几个非常重要的参数可以细细品味,但是哥们醒醒,接下来才是刺激的地方。
线程池状态
首先我们先来聊聊线程池状态,线程池状态是一个非常有趣的设计点,ThreadPoolExecutor 使用 ctl
来存储线程池状态,这些状态也叫做线程池的生命周期
。想想也是,线程池作为一个存储管理线程的资源池,它自己也要有这些状态,以及状态之间的变更才能更好的满足我们的需求。ctl 其实就是一个 AtomicInteger
类型的变量,保证原子性
。
ctl 除了存储线程池状态之外,它还存储 workerCount
这个概念,workerCount 指示的是有效线程数,workerCount 表示的是已经被允许启动但不允许停止的工作线程数量。workerCount 的值与实际活动线程的数量不同。
ctl 高低位来判断是线程池状态还是工作线程数量,线程池状态位于高位。
这里有个设计点,为什么使用 AtomicInteger 而不是存储上线更大的 AtomicLong 之类的呢?
Lea 并非没有考虑过这个问题,为了表示 int 值,目前 workerCount 的大小是**(2 ^ 29)-1(约 5 亿个线程),而不是(2 ^ 31)-1(20亿个)可表示的线程**。如果将来有问题,可以将该变量更改为 AtomicLong。但是在需要之前,使用 int 可以使此代码更快,更简单,int 存储占用存储空间更小。
runState 具有如下几种状态
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
我们先上状态轮转图,然后根据状态轮转图做详细的解释。
这几种状态的解释如下
RUNNING
: 如果线程池处于 RUNNING 状态下的话,能够接收新任务,也能处理正在运行的任务。可以从 ctl 的初始化得知,线程池一旦创建出来就会处于 RUNNING 状态,并且线程池中的有效线程数为 0。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
SHUTDOWN
: 在调用 shutdown 方法后,线程池的状态会由 RUNNING -> SHUTDOWN 状态,位于 SHUTDOWN 状态的线程池能够处理正在运行的任务,但是不能接受新的任务,这和我们上面说的对于 shutdown 的描述一致。STOP
: 和 shutdown 方法类似,在调用 shutdownNow 方法时,程序会从 RUNNING/SHUTDOWN -> STOP 状态,处于 STOP 状态的线程池,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。TIDYING
:TIDYING 状态有个前置条件,分为两种:一种是是当线程池位于 SHUTDOWN 状态下,阻塞队列和线程池中的线程数量为空时,会由 SHUTDOWN -> TIDYING;另一种是当线程池位于 STOP 状态下时,线程池中的数量为空时,会由 STOP -> TIDYING 状态。转换为 TIDYING 的线程池会调用terminated
这个钩子方法,terminated 在 ThreadPoolExecutor 类中是空实现,若用户想在线程池变为 TIDYING 时,进行相应的处理,可以通过重载 terminated 函数来实现。TERMINATED
:TERMINATED 状态是线程池的最后一个状态,线程池处在 TIDYING 状态时,执行完terminated 方法之后,就会由 TIDYING -> TERMINATED 状态。此时表示线程池的彻底终止。
重要变量
下面我们一起来了解一下线程池中的重要变量。
private final BlockingQueue<Runnable> workQueue;
阻塞队列,这个和我们上面说的阻塞队列的参数是一个意思,因为在构造 ThreadPoolExecutor 时,会把参数的值赋给 this.workQueue。
private final ReentrantLock mainLock = new ReentrantLock();
线程池的主要状态锁
,对线程池的状态(比如线程池大小、运行状态)的改变都需要使用到这个锁
private final HashSet<Worker> workers = new HashSet<Worker>();
workers 持有线程池中所有线程的集合,只有持有上面 mainLock
的锁才能够访问。
private final Condition termination = mainLock.newCondition();
等待条件,用来支持 awaitTermination 方法。Condition 和 Lock 一起使用可以实现通知/等待机制。
private int largestPoolSize;
largestPoolSize 表示线程池中最大池的大小,只有持有 mainLock 才能访问
private long completedTaskCount;
completedTaskCount 表示任务完成的计数,它仅仅在任务终止时更新,需要持有 mainLock 才能访问。
private volatile ThreadFactory threadFactory;
threadFactory 是创建线程的工厂,所有的线程都会使用这个工厂,调用 addWorker
方法创建。
private volatile RejectedExecutionHandler handler;
handler 表示拒绝策略,handler 会在线程饱和或者将要关闭的时候调用。
private volatile long keepAliveTime;
保活时间,它指的是空闲线程等待工作的超时时间,当存在多个 corePoolSize 或 allowCoreThreadTimeOut 时,线程将使用这个超时时间。
下面是一些其他变量,这些变量比较简单,我就直接给出注释了。
private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间 private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列) private volatile int maximumPoolSize; //线程池最大能容忍的线程数 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); // 默认的拒绝策略
任务提交
现在我们知道了 ThreadPoolExecutor 创建出来就会处于运行状态,此时线程数量为 0 ,等任务到来时,线程池就会创建线程来执行任务,而下面我们的关注点就会放在任务提交这个过程上。
通常情况下,我们会使用
executor.execute()
来执行任务,我在很多书和博客教程上都看到过这个执行过程,下面是一些书和博客教程所画的 ThreadPoolExecutor 的执行示意图和执行流程图
执行示意图
处理流程图
ThreadPoolExecutor 的执行 execute 的方法分为下面四种情况
- 如果当前运行的工作线程少于 corePoolSize 的话,那么会创建新线程来执行任务 ,这一步需要获取 mainLock
全局锁
。 - 如果运行线程不小于 corePoolSize,则将任务加入 BlockingQueue 阻塞队列。
- 如果无法将任务加入 BlockingQueue 中,此时的现象就是队列已满,此时需要创建新的线程来处理任务,这一步同样需要获取 mainLock 全局锁。
- 如果创建新线程会使当前运行的线程超过
maximumPoolSize
的话,任务将被拒绝,并且使用RejectedExecutionHandler.rejectEExecution()
方法拒绝新的任务。
ThreadPoolExecutor 采取上面的整体设计思路,是为了在执行 execute 方法时,避免获取全局锁,因为频繁获取全局锁会是一个严重的可伸缩瓶颈
,所以,几乎所有的 execute 方法调用都是通过执行步骤2。
上面指出了 execute 的运行过程,整体上来说这个执行过程把非常重要的点讲解出来了,但是不够细致,我查阅 ThreadPoolExecute 和部分源码分析文章后,发现这事其实没这么简单,先来看一下 execute 的源码,我已经给出了中文注释
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 获取 ctl 的值 int c = ctl.get(); // 判断 ctl 的值是否小于核心线程池的数量 if (workerCountOf(c) < corePoolSize) { // 如果小于,增加工作队列,command 就是一个个的任务 if (addWorker(command, true)) // 线程创建成功,直接返回 return; // 线程添加不成功,需要再次判断,每需要一次判断都会获取 ctl 的值 c = ctl.get(); } // 如果线程池处于运行状态并且能够成功的放入阻塞队列 if (isRunning(c) && workQueue.offer(command)) { // 再次进行检查 int recheck = ctl.get(); // 如果不是运行态并且成功的从阻塞队列中删除 if (! isRunning(recheck) && remove(command)) // 执行拒绝策略 reject(command); // worker 线程数量是否为 0 else if (workerCountOf(recheck) == 0) // 增加工作线程 addWorker(null, false); } // 如果不能增加工作线程的数量,就会直接执行拒绝策略 else if (!addWorker(command, false)) reject(command); }
下面是我根据源码画出的执行流程图
下面我们针对 execute 流程进行分析,可能有点啰嗦,因为几个核心流程上面已经提过了,不过为了流程的完整性,我们再在这里重新提一下。
- 如果线程池的核心数量少于
corePoolSize
,那么就会使用 addWorker 创建新线程,addworker 的流程我们会在下面进行分析。如果创建成功,那么 execute 方法会直接返回。如果没创建成功,可能是由于线程池已经 shutdown,可能是由于并发情况下 workerCountOf(c) < corePoolSize ,别的线程先创建了 worker 线程,导致 workerCoun t>= corePoolSize。 - 如果线程池还在 Running 状态,会将 task 加入阻塞队列,加入成功后会进行
double-check
双重校验,继续下面的步骤,如果加入失败,可能是由于队列线程已满,此时会判断是否能够加入线程池中,如果线程池也满了的话,就会直接执行拒绝策略,如果线程池能加入,execute 方法结束。 - 步骤 2 中的 double-check 主要是为了判断进入 workQueue 中的 task 是否能被执行:如果线程池已经不是 Running 状态,则应该拒绝添加任务,从 workQueue 队列中删除任务。如果线程池是 Running,但是从 workQueue 中删除失败了,此时的原因可能是由于其他线程执行了这个任务,此时会直接执行拒绝策略。
- 如果线程是 Running 状态,并且不能把任务从队列中移除,进而判断工作线程是否为 0 ,如果不为 0 ,execute 执行完毕,如果工作线程是 0 ,则会使用 addWorker 增加工作线程,execute 执行完毕。
添加 worker 线程
从上面的执行流程可以看出,添加一个 worker 涉及的工作也非常多,这也是一个比价难啃的点,我们一起来分析下,这是 worker 的源码
private boolean addWorker(Runnable firstTask, boolean core) { // retry 的用法相当于 goto retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 仅在必要时检查队列是否为空。 // 线程池状态有五种,state 越小越是运行状态 // rs >= SHUTDOWN,表示此时线程池状态可能是 SHUTDOWN、STOP、TIDYING、TERMINATED // 默认 rs >= SHUTDOWN,如果 rs = SHUTDOWN,直接返回 false // 默认 rs < SHUTDOWN,是 RUNNING,如果任务不是空,返回 false // 默认 RUNNING,任务是空,如果工作队列为空,返回 false // if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 执行循环 for (;;) { // 统计工作线程数量 int wc = workerCountOf(c); // 如果 worker 数量>线程池最大上限 CAPACITY(即使用int低29位可以容纳的最大值) // 或者 worker数量 > corePoolSize 或 worker数量>maximumPoolSize ),即已经超过了给定的边界 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 使用 CAS 增加 worker 数量,增加成功,跳出循环。 if (compareAndIncrementWorkerCount(c)) break retry; // 检查 ctl c = ctl.get(); // Re-read ctl // 如果状态不等于之前获取的 state,跳出内层循环,继续去外层循环判断 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } /* worker数量+1成功的后续操作 * 添加到 workers Set 集合,并启动 worker 线程 */ boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 包装 Runnable 对象 // 设置 firstTask 的值为 -1 // 赋值给当前任务 // 使用 worker 自身这个 runnable,调用 ThreadFactory 创建一个线程,并设置给worker的成员变量thread w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 在持有锁的时候重新检查 // 如果 ThreadFactory 失败或在获得锁之前关闭,请回退。 int rs = runStateOf(ctl.get()); //如果线程池在运行 running<shutdown 或者 线程池已经 shutdown,且firstTask==null // (可能是 workQueue 中仍有未执行完成的任务,创建没有初始任务的 worker 线程执行) //worker 数量 -1 的操作在 addWorkerFailed() if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // workers 就是一个 HashSet 集合 workers.add(w); // 设置最大的池大小 largestPoolSize,workerAdded 设置为true int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } //如果启动线程失败 // worker 数量 -1 } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
妈的真长的一个方法,有点想吐血,其实我肝到现在已经肝不动了,但我一想到看这篇文章的读者们能给我一个关注,就算咳出一口老血也值了。
这个方法的执行流程图如下
这里我们就不再文字描述了,但是上面流程图中有一个对象引起了我的注意,那就是 worker
对象,这个对象就代表了线程池中的工作线程,那么这个 worker 对象到底是啥呢?