前言
大家好,我是小郭,今天来说一下面试中常问到问题,我们知道线程池是帮助我们对线程资源的管理,只有我们合理的使用使用线程池,他才能做到事倍功半,但是你知道线程池是如何复用的吗?
概要
- Worker执行任务模型
- Worker线程如何增加?
- Worker线程执行任务
- 原理总结
问题
- 线程如何复用?
- 线程如何回收? 等看完源码,我们再来回答
源码环节
1. Worker执行任务模型
图片来源网上
主要执行任务的三个角色 Worker-》WorkQueue-》Thread
2. addWorker源码
涉及参数:
firstTask:新线程首先要执行的任务,若没有则传null。
core:如果为true,则与 corePoolSize 绑定。为false,则与 maximumPoolSize 绑定。
private boolean addWorker(Runnable firstTask, boolean core) { retry: //检测当前线程池状态 for (;;) { //包含线程池状态和线程中有效线程数量 int c = ctl.get(); //通过高三位获取线程池状态 int rs = runStateOf(c); // 线程池不处于RUNNABLE状态且现在状态为关系状态,未传入新任务,阻塞队列不为空 返回false if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //通过低29位计算线程池内有效线程的数量 int wc = workerCountOf(c); //超过size,则返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //增加工作线程数 if (compareAndIncrementWorkerCount(c)) break retry; //再次获取线程池状态和线程中有效线程数量 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //创建一个Worker对象,将新任务放入Worker w = new Worker(firstTask); //实例化一个线程 final Thread t = w.thread; if (t != null) { //加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. //再次检查线程池运行状态 int rs = runStateOf(ctl.get()); //线程池运行状态为RUNNABLE 或者 为已关闭且无新任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) //检测是否处于活动 throw new IllegalThreadStateException(); //将新任务放入set数组中 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; //添加任务工程 workerAdded = true; } } finally { //释放锁 mainLock.unlock(); } //启动任务 if (workerAdded) { t.start(); workerStarted = true; } } } finally { //添加失败则加入失败队列 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
上面做了四件事:
- 检查线程池状态
- 新建线程,使用Worker进行包装,放入Hashet数组中,最终真正执行任务的线程就放在Worker,所以新增一个addWorker就是新增一个线程。主要实现复用就是Worker类中的runWorker(this)
- 启动线程Start()
- 添加失败操作,移除Worker,减少WorkerCount
2.1 worker源码
worker实际的实现也是Runnbale,通过继承AbstractQueuedSynchronizer,来获取独占锁,所以上面调用start(),实际上也是执行Worker中的run方法
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } ... }
3. Worker线程执行任务
Worker线程执行任务流程图
3.1 runWorker源码
final void runWorker(Worker w) { //获取当前的线程 Thread wt = Thread.currentThread(); //从Worker中取出新任务 Runnable task = w.firstTask; //释放Worker w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //任务不为空 while (task != null || (task = getTask()) != null) { //上锁,避免被其他线程中断 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt //检查状态如果线程池状态处于中断,则中断线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //中断当前线程 wt.interrupt(); try { //执行beforeExecute beforeExecute(wt, task); Throwable thrown = null; try { //执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //worker回收 processWorkerExit(w, completedAbruptly); } }
使线程池复用的核心方法,getTask() 只要不返回null,就会一直执行
3.2 getTask源码
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { //获取线程池运行状态和线程池内中有效线程池 int c = ctl.get(); //获取运行状态 int rs = runStateOf(c); // Check if queue empty only if necessary. // 线程不处于RUNNABLE 且(已关闭 或 worker队列为空) if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //减少工作线程数 decrementWorkerCount(); return null; } //获取线程池内有效线程池数量 int wc = workerCountOf(c); // Are workers subject to culling? // 1.allowCoreThreadTimeOut变量默认是false,核心线程即使空闲也不会被销毁 // 如果为true,核心线程在keepAliveTime内仍空闲则会被销毁。 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果运行线程数超过了最大线程数,但是缓存队列已经空了,这时递减worker数量。 // 如果有设置允许线程超时或者线程数量超过了核心线程数量, // 并且线程在规定时间内均未poll到任务且队列为空则递减worker数量 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //如果为true,则调用poll方法获取任务,超过keepAliveTime,则会返回null //如果为false,则直接调用take方法获取任务,workQueue.offer(command) ,当任务加入时,再被唤醒,返回任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
3.3 addWorkerFailed源码
//回滚创建 private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //新任务不为空 if (w != null) //移除新任务 workers.remove(w); //减少WorkerCount decrementWorkerCount(); //其他状态为Terminate tryTerminate(); } finally { mainLock.unlock(); } }
3.4 tryTerminate源码
final void tryTerminate() { for (;;) { //获取线程池状态和线程池中有效线程数量 int c = ctl.get(); // 正在运行 或 任务都已经终止 或 处于关闭状态且池和队列不为空 提前退出 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //工作线程数不为0,则中断空闲的worker if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } //获取对象锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
4. 原理总结
线程池的优点就是提高对线程的管理,提高资源的利用率,控制线程的数量。
在线程池中,线程可以从阻塞队列 中不断 getTask() 新任务来执行,其核心原理在于线程池用Worker对Thread进行了封装,每调用一个 addWorker 就是等于新开一个线程,并不是每次执行任务都会调用 Thread.start() 来创建新线程,而是让每个线程去轮询,在这个轮询中,不停地检查是否还有任务等待被执行,如果有则直接去执行这个任务,也就是调用任务的 run() 方法,把 run() 方法当作和普通方法一样的地位去调用,相当于把每个任务的 run() 方法串联了起来,所以线程数量并不增加。
通过上面的了解,我们可以来回答下面这两个问题了
4.1 线程如何复用?
ThreadPoolExecutor 在创建线程时,会将线程封装成工作线程 Worker ,并放入工作线程组中,然后这个 Worker 反复从阻塞队列中拿任务去执行。
- 通过取 Worker 的 firstTask 或者通过 getTask 方法从 workQueue 中获取待执行的任务。
- 直接调用 task 的 run 方法来执行具体的任务(而不是新建线程)
4.2 线程如何回收?
- 获取不到任务时,回收自己
- 将worker移出线程池
- 线程池状态置为TERMINATED
try { while (task != null || (task = getTask()) != null) { ... } } finally { processWorkerExit(w, completedAbruptly); } private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); ... //将worker移出线程池 completedTaskCount += w.completedTasks; workers.remove(w); //修改线程池状态 tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
以上不足之处,请大家多多指出~ 下一篇,一起来学习线程池的拒绝策略