worker 对象
Worker 位于 ThreadPoolExecutor
内部,它继承了 AQS 类并且实现了 Runnable 接口。Worker 类主要维护了线程运行过程中的中断控制状态。它提供了锁的获取和释放操作。在 worker 的实现中,我们使用了非重入的互斥锁而不是使用重复锁,因为 Lea 觉得我们不应该在调用诸如 setCorePoolSize 之类的控制方法时能够重新获取锁。
worker 对象的源码比较简单和标准,这里我们只说一下 worker 对象的构造方法,也就是
Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
构造一个 worker 对象需要做三步操作:
- 初始 AQS 状态为 -1,此时不允许中断 interrupt(),只有在 worker 线程启动了,执行了 runWorker() 方法后,将 state 置为0,才能进行中断。
- 将 firstTask 赋值给为当前类的全局变量
- 通过
ThreadFactory
创建一个新的线程。
###任务运行
我们前面的流程主要分析了线程池的 execute 方法的执行过程,这个执行过程相当于是任务提交过程,而我们下面要说的是从队列中获取任务并运行的这个工作流程。
一般情况下,我们会从初始任务开始运行,所以我们不需要获取第一个任务。否则,只要线程池还处于 Running 状态,我们会调用 getTask
方法获取任务。getTask 方法可能会返回 null,此时可能是由于线程池状态改变或者是配置参数更改而导致的退出。还有一种情况可能是由于 异常
而引发的,这个我们后面会细说。
下面来看一下 runWorker
方法的源码:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 允许打断 // new Worker() 是 state==-1,此处是调用 Worker 类的 tryRelease() 方法, // 将 state 置为0 w.unlock(); boolean completedAbruptly = true; try { // 调用 getTask() 获取任务 while (task != null || (task = getTask()) != null) { // 获取全局锁 w.lock(); // 确保只有在线程 STOPING 时,才会被设置中断标志,否则清除中断标志。 // 如果一开始判断线程池状态 < STOPING,但 Thread.interrupted() 为 true, // 即线程已经被中断,又清除了中断标示,再次判断线程池状态是否 >= stop // 是,再次设置中断标示,wt.interrupt() // 否,不做操作,清除中断标示后进行后续步骤 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 执行前需要调用的方法,交给程序员自己来实现 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 执行后需要调用的方法,交给程序员自己来实现 afterExecute(task, thrown); } } finally { // 把 task 置为 null,完成任务数 + 1,并进行解锁 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; // 最后处理 worker 的退出 } finally { processWorkerExit(w, completedAbruptly); } }
下面是 runWorker 的执行流程图
这里需要注意一下最后的 processWorkerExit
方法,这里面其实也做了很多事情,包括判断 completedAbruptly
的布尔值来表示是否完成任务,获取锁,尝试从队列中移除 worker,然后尝试中断,接下来会判断一下中断状态,在线程池当前状态小于 STOP 的情况下会创建一个新的 worker 来替换被销毁的 worker。
任务获取
任务获取就是 getTask 方法的执行过程,这个环节主要用来获取任务和剔除任务。下面进入源码分析环节
private Runnable getTask() { // 判断最后一个 poll 是否超时。 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 必要时检查队列是否为空 // 对线程池状态的判断,两种情况会 workerCount-1,并且返回 null // 线程池状态为 shutdown,且 workQueue 为空(反映了 shutdown 状态的线程池还是要执行 workQueue 中剩余的任务的) // 线程池状态为 stop(shutdownNow() 会导致变成 STOP)(此时不用考虑 workQueue 的情况) if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // 是否需要定时从 workQueue 中获取 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果工作线程的数量大于 maximumPoolSize 会进行线程剔除 // 如果使用了 allowCoreThreadTimeOut ,并且工作线程不为0或者队列有任务的话,会直接进行线程剔除 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
getTask 方法的执行流程图如下
工作线程退出
工作线程退出是 runWorker 的最后一步,这一步会判断工作线程是否突然终止,并且会尝试终止线程,以及是否需要增加线程来替换原工作线程。
private void processWorkerExit(Worker w, boolean completedAbruptly) { // worker数量 -1 // completedAbruptly 是 true,突然终止,说明是 task 执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的 worker 线程数量需要-1 // completedAbruptly 是 false 是突然终止,说明是 worker 线程没有 task 可执行了,不用-1,因为已经在 getTask() 方法中-1了 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); // 从 Workers Set 中移除 worker final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } // 尝试终止线程, tryTerminate(); // 是否需要增加 worker 线程 // 线程池状态是 running 或 shutdown // 如果当前线程是突然终止的,addWorker() // 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker() // 故如果调用线程池 shutdown(),直到workQueue为空前,线程池都会维持 corePoolSize 个线程, // 然后再逐渐销毁这 corePoolSize 个线程 int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
源码搞的有点头大了,可能一时半会无法理解上面这些源码,不过你可以先把注释粘过去,等有时间了需要反复刺激,加深印象!
其他线程池
下面我们来了解一下其他线程池的构造原理,主要涉及 FixedThreadPool、SingleThreadExecutor、CachedThreadPool。
newFixedThreadPool
newFixedThreadPool 被称为可重用固定线程数
的线程池,下面是 newFixedThreadPool 的源码
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
可以看到,newFixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被设置为创建 FixedThreadPool 时指定的参数 nThreads
,也就是说,在 newFiexedThreadPool 中,核心线程数就是最大线程数。
下面是 newFixedThreadPool 的执行示意图
newFixedThreadPool 的工作流程如下
- 如果当前运行的线程数少于 corePoolSize,则会创建新线程 addworker 来执行任务
- 如果当前线程的线程数等于 corePoolSize,会将任务直接加入到
LinkedBlockingQueue
无界阻塞队列中,LinkedBlockingQueue 的上限如果没有制定,默认为 Integer.MAX_VALUE 大小。 - 等到线程池中的任务执行完毕后,newFixedThreadPool 会反复从 LinkedBlockingQueue 中获取任务来执行。
相较于 ThreadPoolExecutor,newFixedThreadPool 主要做了以下改变
- 核心线程数等于最大线程数,因此 newFixedThreadPool 只有两个最大容量,一个是线程池的线程容量,还有一个是 LinkedBlockingQueue 无界阻塞队列的线程容量。
- 这里可以看到还有一个变化是 0L,也就是 keepAliveTime = 0L,keepAliveTime 就是到达工作线程最大容量后的线程等待时间,0L 就意味着当线程池中的线程数大于 corePoolsize 时,空余的线程会被立即终止。
- 由于使用无界队列,运行中的 newFixedThreadPool 不会拒绝任务,也就是不会调用 RejectedExecutionHandler.rejectedExecution 方法。
newSingleThreadExecutor
newSingleThreadExecutor 中只有单个工作线程,也就是说它是一个只有单个 worker 的 Executor。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
可以看到,在 newSingleThreadExecutor 中,corePoolSize 和 maximumPoolSize 都被设置为 1,也不存在超时情况,同样使用了 LinkedBlockingQueue 无界阻塞队列,除了 corePoolSize 和 maximumPoolSize 外,其他几乎和 newFixedThreadPool 一模一样。
下面是 newSingleThreadExecutor 的执行示意图
newSingleThreadExecutor 的执行过程和 newFixedThreadPool 相同,只是 newSingleThreadExecutor 的工作线程数为 1。
newCachedThreadPool
newCachedThreadPool 是一个根据需要创建工作线程的线程池,newCachedThreadPool 线程池最大数量是 Integer.MAX_VALUE,保活时间是 60
秒,使用的是SynchronousQueue
无缓冲阻塞队列。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
它的执行示意图如下
- 首先会先执行 SynchronousQueue.offer 方法,如果当前 maximumPool 中有空闲线程正在执行
SynchronousQueue.poll
,就会把任务交给空闲线程来执行,execute 方法执行完毕,否则的话,继续向下执行。 - 如果 maximumPool 中没有线程执行 SynchronousQueue.poll 方法,这种情况下 newCachedThreadPool 会创建一个新线程执行任务,execute 方法执行完成。
- 执行完成的线程将执行 poll 操作,这个 poll 操作会让空闲线程最多在 SynchronousQueue 中等待 60 秒钟。如果 60 秒钟内提交了一个新任务,那么空闲线程会执行这个新提交的任务,否则空闲线程将会终止。
这里的关键点在于 SynchronousQueue 队列,它是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程对应的移除操作。这其实就是一种任务传递,如下图所示
其实还有一个线程池 ScheduledThreadPoolExecutor
,就先不在此篇文章做详细赘述了。
线程池实践考量因素
下面介绍几种在实践过程中使用线程池需要考虑的几个点
- 避免任务堆积,比如我们上面提到的 newFixedThreadPool,它是创建指定数目的线程,但是工作队列是无界的,这就导致如果工作队列线程太少,导致处理速度跟不上入队速度,这种情况下很可能会导致 OOM,诊断时可以使用
jmap
检查是否有大量任务入队。 - 生产实践中很可能由于逻辑不严谨或者工作线程不能及时释放导致 线程泄漏,这个时候最好检查一下线程栈
- 避免死锁等同步问题
- 尽量避免在使用线程池时操作
ThreadLocal
,因为工作线程的生命周期可能会超过任务的生命周期。
线程池大小的设置
线程池大小的设置也是面试官经常会考到的一个点,一般需要根据任务类型
来配置线程池大小
- 如果是 CPU 密集型任务,那么就意味着 CPU 是稀缺资源,这个时候我们通常不能通过增加线程数来提高计算能力,因为线程数量太多,会导致频繁的上下文切换,一般这种情况下,建议合理的线程数值是
N(CPU)数 + 1
。 - 如果是 I/O 密集型任务,就说明需要较多的等待,这个时候可以参考 Brain Goetz 的推荐方法 线程数 = CPU核数 × (1 + 平均等待时间/平均工作时间)。参考值可以是 N(CPU) 核数 * 2。
当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
后记
这篇文章真的写了很久,因为之前对线程池认识不是很深,所以花了大力气来研究,希望这篇文章对你有所帮助。
这篇文章探讨了对 Executor 框架的主要组成、线程池结构与生命周期,线程池源码和线程池实现的细节等方面进行了讲解和分析,希望对你有所帮助。
如果这篇文章写的还不错,希望读者朋友们可以不吝给出四连:点赞、在看、留言、分享,记住这次一定哦!