Java 线程池源码解析(1)https://developer.aliyun.com/article/1534179
因为 while 中的条件均为 false,所以 runWorker 会先执行下面的 completedAbruptly = false; 然后执行 finally 中的 processWorkerExit(w, completedAbruptly); 这里的 processWorkerExit 方法正是回收线程的关键所在,如果没有它,我们的所有空闲线程就都将被回收,所以这个方法中就定义了回收的逻辑:
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // 线程直接结束 } addWorker(null, false); // 本线程结束,再新建一个线程 } }
比如现在我们一共有 11 个线程(10个核心线程),那么 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; (暂不考虑 allowCoreThreadTimeOut)的结果就是 10,所以这个 11 个线程在执行 if (workerCountOf(c) >= min) 的时候,第一个执行该语句的线程会直接 return,之后执行该判断语句的 10 个线程由于不满足条件,但是自己又存活不了,所以在自己结束之前,会再创建一个没有初始任务的新线程,这样就保证了线程池中的线程数总是不低于核心线程数的。
2.2、线程池五大状态变化源码
下面是 ThreadPoolExecutor 源码中对于常量 ctl 的解释:
主池控制状态ctl是一个原子整数,包含两个概念字段workerCount,表示线程的有效数量runState,表示是否正在运行、关闭等。
为了将它们打包为一个整数,我们将workerCounts限制为(2^29)-1(约5亿)个线程,而不是(2^31)-1(20亿)个其他可表示的线程。如果这在未来是一个问题,可以将变量更改为AtomicLong,并调整下面的移位/掩码常数。但是,在需要之前,使用int,此代码会更快、更简单。workerCount是允许启动但不允许停止的工人数量。该值可能与实际活动线程数暂时不同,例如,当ThreadFactory在被请求时未能创建线程,以及当退出的线程在终止前仍在执行记账时。用户可见的池大小报告为工作者集的当前大小。
runState提供了主要的生命周期控制,取值为:
- RUNNING:正常接受新任务和处理排队的任务
- SHUTDOWN:不接受新任务,但处理排队任务
- STOP:不接受新建任务,不处理排队任务,并中断正在进行的任务
- TIDING:所有任务都已终止,workerCount为零,转换到状态的线程TIDING将运行terminated()钩子方法terminated:terminated。
runState随时间单调增加,但不需要达到每个状态。转换为:RUNNING->SHUTDOWN On invocation of SHUTDOWN()(RUNNING或SHUTDOWN)->STOP On invocationof shutdownNow()SHUTDOWN->TIDING当队列和池都为空时STOP->TIDIING当池为空时TIDIING->TERMINATED当TERMINATED()钩子方法完成时等待awaitTermination()的线程将在状态达到TERMINATED时返回。检测从SHUTDOWN到TIDING的转换并不像您希望的那样简单,因为队列在非空之后可能会变空,反之亦然,但我们只能在看到它是空的之后,看到workerCount为0时终止(这有时需要重新检查——见下文)
线程池的五大状态从源码中可以看到:
这里的 COUNT_MASK 代表的是线程池的最大数量,也就是 2 的 29 次方 -1 个。
Integer.SIZE为32,所以COUNT_BITS为29,最终各个状态对应的二级制为:
- RUNNING:11100000 00000000 00000000 00000000
- SHUTDOWN:00000000 00000000 00000000 00000000
- STOP:00100000 00000000 00000000 00000000
- TIDYING:01000000 00000000 00000000 00000000
- TERMINATED:01100000 00000000 00000000 00000000
所以,只需要使用一个Integer数字的最高三个bit,就可以表示5种线程池的状态,而剩下的29个bit就可以用来表示工作线程数,比如,假如 ctl 为:11100000 00000000 00000000 00001010,就表示线程池的状态为RUNNING,线程池中目前在工作的线程有10个,这里说的“在工作”意思是线程活着,要么在执行任务,要么在阻塞等待任务。
把线程状态放到高3位,把线程数量放到剩下的29位的好处就是方便之后频繁修改线程个数。
下面是线程状态的转换关系:
2.2.1、Thread.interrupt 线程中断
public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(new Runnable() { @Override public void run() { while (true){ System.out.println("线程 t1 在执行任务"); } } }); t1.start(); Thread.sleep(3000); t1.interrupt(); System.out.println("线程 t1 被阻塞了"); }
事实上,上面的代码并不会真正中断线程 t1 ,因为 interrupt 只是一个信号,线程要不要停掉还是取决于它自己。所以要想真正打断线程,需要线程自己判断:
Thread t1 = new Thread(new Runnable() { @Override public void run() { while (!Thread.interrupted()){ // 如果线程未被中断 System.out.println("线程 t1 在执行任务"); } } });
这样才能真正的关闭线程,否则 interrupt 方法没有意义。
2.2.2、线程池的状态转换
线程池的 shutdown 方法会把线程池状态设置为 SHUTDOWN ,但是 SHUTDOWN 状态并不会立即关闭线程池中的所有线程,而是会等任务执行完毕再关闭线程,毕竟直接关闭线程可能会出现一些安全问题。
如果希望立即关闭线程,需要调用 shutdownNow方法,这个方法会把线程池的状态设置为 STOP。
这里的 onShutdown 方法是个空的(它是提供给子类用的),这里不需要关心。,而下面的 tryTerminate 才会对线程池的状态进行修改,这才是我们需要关心的:
同样,这里的 terminate 方法其实也只是一个空方法,它也是留给子类在需要时使用的,就相当于一个扩展机制,以后要是再 TIDYING 状态之后希望对线程池进行什么操作,就重写它就可以了。
上面代码中的 isRunning 方法也特别简单:
只要当前的线程池状态小于 SHUTDOWN 就是 RUNNING 状态。
2.2.3、addWorker 再解析
首先了解乐观锁:
- Java提供了如AtomicInteger、AtomicLong、AtomicReference等原子类来支持CAS操作,这些类位于java.util.concurrent.atomic包下。
- 下面的源码中就是在循环体内部使用CAS操作或版本号机制来实现了乐观锁的行为。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { // Check if queue empty only if necessary. if (runStateAtLeast(c, SHUTDOWN) // 如果当前线程池的状态 >= SHUTDOWN && (runStateAtLeast(c, STOP) // 并且当前线程池的状态 >= STOP || firstTask != null // execute(command) 的command为null代表不携带任务 || workQueue.isEmpty())) // 任务队列为空 return false; // 返回false代表创建线程失败 for (;;) { if (workerCountOf(c) // 线程池的总线程数 // 在execute方法源码中,当当前线程数<核心线程数是addWorker的core=true >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) // 是否大于核心线程数或者总线程数 return false; // 当有多个线程同时执行compareAndIncrementWorkerCount方法时,只会有一个返回true if (compareAndIncrementWorkerCount(c)) // 先把总线程数+1再去创建线程,ctl是原子类型的,所以这样不会出现线程安全的问题 break retry; // 如果新建线程成功,就会跳出去取真正新建线程 c = ctl.get(); // 新建线程失败的话刷新总线程数 if (runStateAtLeast(c, SHUTDOWN)) // 判断线程池状态是否>=SHUTDOWN continue retry; // else CAS failed due to workerCount change; retry inner loop // 否则继续执行该循环 } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException(); workers.add(w); workerAdded = true; int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
比如我们设置核心线程数为10,但是现在只创建了9个核心线程,如果此时有 2 个以上线程同时执行了 executor.execute(task) 方法,那么在 execute 中会先判断当前线程总数是否大于核心线程数,因为这些线程是同时执行的,所以都返回 9 ,那么 addWorker 的 core 参数就是 true,进入 addWorker 方法,这些线程会都先给总线程数 +1 再去创建线程,而不是先去创建线程再给总线程数 + 1,因为 addWorker 的参数 core = true(代表创建一个核心线程),那么如果都去创建核心线程就会使得核心线程数超过设置的值。
所以第一个抢先给 ctl + 1 的线程会真正创建一个核心线程(ctl 是原子性的),而其它线程只能被跳出循环重新执行,这样当第二次执行循环时,就会发现此时的核心线程数已经达到了,就会返回 false,代表创建核心线程失败。
总结
所以我们需要知道,在 Java 的线程池中,它并不是上来就把所有核心线程都开启,而是需要的时候才会开启;此外,当核心线程都忙碌的时候,新来的任务并不会去创建非核心线程,而是优先放到一个任务队列当中去,只有当任务队列满了(这里用的 LinkedBlockingQueue 是链表结构,所以不会满)才会去创建一个新的非核心队列。