前言
目前正在出一个Java多线程专题
长期系列教程,从入门到进阶含源码解读
, 篇幅会较多, 喜欢的话,给个关注❤️ ~
承接上节的问题,我们继续探讨ThreadPoolExecutor
,一起来看下吧~
ThreadPoolExecutor中是如何做到线程复用的❓
我们知道,一个线程在创建的时候会指定一个线程任务,当执行完这个线程任务之后,线程自动销毁。但是线程池却可以复用线程,一个线程执行完线程任务后不销毁,继续执行另外的线程任务。那么它是如何做到的❓这得从addWorker()
说起
addWorker()
- 先看上半部分
addWorker()
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 对边界设定的检查 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } 复制代码
retry:
可能有些同学没用过,它只是一个标记,它的下一个标记就是for循环
,在for循环里面调用continue/break再紧接着retry标记时,就表示从这个地方开始执行continue/break操作,但这不是我们关注的重点。
从上面的代码,我们可以看出,ThreadPoolExecutor在创建线程时,会将线程封装成工作线程worker,并放入工作线程组中,然后这个worker反复从阻塞队列中拿任务去执行。这个addWorker
是excute
方法中调用的
- 我们接着看下半部分
private boolean addWorker(Runnable firstTask, boolean core) { // 上半部分 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // core是ture,需要创建的线程为核心线程,则先判断当前线程是否大于核心线程 // 如果core是false,证明需要创建的是非核心线程,则先判断当前线程数是否大于总线程数 // 如果不小于,则返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 下半部分 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 创建worker对象 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 获取线程全局锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // 判断线程池状态 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 将当前线程添加到线程组 workers.add(w); int s = workers.size(); // 如果线程组中的线程数大于最大线程池数 largestPoolSize赋值s if (s > largestPoolSize) largestPoolSize = s; // 添加成功 workerAdded = true; } } finally { mainLock.unlock(); } // 添加成功后执行线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 添加失败后执行 addWorkerFailed if (! workerStarted) addWorkerFailed(w); } return workerStarted; } 复制代码
再看 addWorkerFailed()
,与上边相反,相当于一个回滚操作,会移除失败的工作线程
private void addWorkerFailed(Worker w) { // 同样需要全局锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } } 复制代码
Worker
我们接着看Worker
对象
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } //..... // 省略下边代码 } 复制代码
Worker
类实现了Runnable
接口,所以Worker
也是一个线程任务。在构造方法中,创建了一个线程,回过头想想addWorker()
里为啥可以t.start()
应该很清楚了吧, 并且在构造方法中调用了线程工厂
创建了一个线程实例,我们上节讲过线程工厂
。其实这也不是关注的重点,重点是这个runWorker()
final void runWorker(Worker w) { // 获取当前的线程实例 Thread wt = Thread.currentThread(); // 直接从第一个任务开始执行 Runnable task = w.firstTask; // 获取完之后把worker的firstTask置为null 防止下次获取到 w.firstTask = null; // 线程启动之后,通过unlock方法释放锁 w.unlock(); // allow interrupts // 线程异常退出时 为 true boolean completedAbruptly = true; try { // Worker执行firstTask或从workQueue中获取任务,直到任务为空 while (task != null || (task = getTask()) != null) { // 获取锁以防止在任务执行过程中发生中断 w.lock(); // 判断边界值 如果线程池中断 则中断线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 相当于钩子方法 beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } 复制代码
首先去执行创建这个worker时就有的任务,当执行完这个任务后,worker的生命周期并没有结束,在while
循环中,worker会不断地调用getTask
方法从阻塞队列中获取任务然后调用task.run()
执行任务,从而达到复用线程的目的。只要getTask
方法不返回null
,此线程就不会退出。
我们接着看getTask()
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果运行线程数超过了最大线程数,但是缓存队列已经空了,这时递减worker数量。 // 如果有设置允许线程超时或者线程数量超过了核心线程数量,并且线程在规定时间内均未poll到任务且队列为空则递减worker数量 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 如果timed为true,则会调用workQueue的poll方法获取任务. // 超时时间是keepAliveTime。如果超过keepAliveTime时长, // 如果timed为false, 则会调用workQueue的take方法阻塞在当前。 // 队列中有任务加入时,线程被唤醒,take方法返回任务,并执行。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } 复制代码
大家有没有想过这里为啥要用take
和poll
,它们都是出队的操作,这么做有什么好处?
take & poll
我们说take()
方法会将核心线程阻塞挂起,这样一来它就不会占用太多的cpu资源,直到拿到Runnable
然后返回。
如果allowCoreThreadTimeOut设置为true
,那么核心线程就会去调用poll
方法,因为poll
可能会返回null
,所以这时候核心线程满足超时条件也会被销毁
非核心线程会workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),如果超时还没有拿到,下一次循环判断compareAndDecrementWorkerCount就会返回null
,Worker对象的run()
方法循环体的判断为null
,任务结束,然后线程被系统回收 。
再回头看一下runWorker()
是不是设计的很巧妙~
结束语
本节内容不是很好理解,想继续探讨的同学可以继续阅读它的源码,这部分内容了解一下就好,其实我们从源码中可以看到大量的线程状态检查,代码写的很健壮
,可以从中学习一下。下一节, 带大家学习一下阻塞队列BlockingQueue
~