肝完这篇线程池,我咳血了(三)

简介: 我们知道,线程需要的时候要进行创建,不需要的时候需要进行销毁,但是线程的创建和销毁都是一个开销比较大的操作。

深入理解线程池

上面我和你简单聊了一下线程池的基本构造,线程池有几个非常重要的参数可以细细品味,但是哥们醒醒,接下来才是刺激的地方。

线程池状态

首先我们先来聊聊线程池状态,线程池状态是一个非常有趣的设计点,ThreadPoolExecutor 使用 ctl 来存储线程池状态,这些状态也叫做线程池的生命周期。想想也是,线程池作为一个存储管理线程的资源池,它自己也要有这些状态,以及状态之间的变更才能更好的满足我们的需求。ctl 其实就是一个 AtomicInteger 类型的变量,保证原子性

ctl 除了存储线程池状态之外,它还存储 workerCount 这个概念,workerCount 指示的是有效线程数,workerCount 表示的是已经被允许启动但不允许停止的工作线程数量。workerCount 的值与实际活动线程的数量不同。

ctl 高低位来判断是线程池状态还是工作线程数量,线程池状态位于高位

这里有个设计点,为什么使用 AtomicInteger 而不是存储上线更大的 AtomicLong 之类的呢?

Lea 并非没有考虑过这个问题,为了表示 int 值,目前 workerCount 的大小是**(2 ^ 29)-1(约 5 亿个线程),而不是(2 ^ 31)-1(20亿个)可表示的线程**。如果将来有问题,可以将该变量更改为 AtomicLong。但是在需要之前,使用 int 可以使此代码更快,更简单,int 存储占用存储空间更小。

runState 具有如下几种状态

private static final int RUNNING    = -1 << COUNT_BITS; 
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

我们先上状态轮转图,然后根据状态轮转图做详细的解释。

微信图片_20220416151852.png

这几种状态的解释如下

  • RUNNING: 如果线程池处于 RUNNING 状态下的话,能够接收新任务,也能处理正在运行的任务。可以从 ctl 的初始化得知,线程池一旦创建出来就会处于 RUNNING 状态,并且线程池中的有效线程数为 0。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  • SHUTDOWN: 在调用 shutdown 方法后,线程池的状态会由 RUNNING -> SHUTDOWN 状态,位于 SHUTDOWN 状态的线程池能够处理正在运行的任务,但是不能接受新的任务,这和我们上面说的对于 shutdown 的描述一致。
  • STOP: 和 shutdown 方法类似,在调用 shutdownNow 方法时,程序会从 RUNNING/SHUTDOWN -> STOP 状态,处于 STOP 状态的线程池,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
  • TIDYING:TIDYING 状态有个前置条件,分为两种:一种是是当线程池位于 SHUTDOWN 状态下,阻塞队列和线程池中的线程数量为空时,会由 SHUTDOWN -> TIDYING;另一种是当线程池位于 STOP 状态下时,线程池中的数量为空时,会由 STOP -> TIDYING 状态。转换为 TIDYING 的线程池会调用 terminated这个钩子方法,terminated 在 ThreadPoolExecutor 类中是空实现,若用户想在线程池变为 TIDYING 时,进行相应的处理,可以通过重载 terminated 函数来实现。
  • TERMINATED:TERMINATED 状态是线程池的最后一个状态,线程池处在 TIDYING 状态时,执行完terminated 方法之后,就会由 TIDYING -> TERMINATED 状态。此时表示线程池的彻底终止。

重要变量

下面我们一起来了解一下线程池中的重要变量。

private final BlockingQueue<Runnable> workQueue;

阻塞队列,这个和我们上面说的阻塞队列的参数是一个意思,因为在构造 ThreadPoolExecutor 时,会把参数的值赋给 this.workQueue。

private final ReentrantLock mainLock = new ReentrantLock();

线程池的主要状态锁,对线程池的状态(比如线程池大小、运行状态)的改变都需要使用到这个锁

private final HashSet<Worker> workers = new HashSet<Worker>();

workers 持有线程池中所有线程的集合,只有持有上面 mainLock 的锁才能够访问。

private final Condition termination = mainLock.newCondition();

等待条件,用来支持 awaitTermination 方法。Condition 和 Lock 一起使用可以实现通知/等待机制。

private int largestPoolSize;

largestPoolSize 表示线程池中最大池的大小,只有持有 mainLock 才能访问

private long completedTaskCount;

completedTaskCount 表示任务完成的计数,它仅仅在任务终止时更新,需要持有 mainLock 才能访问。

private volatile ThreadFactory threadFactory;

threadFactory 是创建线程的工厂,所有的线程都会使用这个工厂,调用 addWorker 方法创建。

private volatile RejectedExecutionHandler handler;

handler 表示拒绝策略,handler 会在线程饱和或者将要关闭的时候调用。

private volatile long keepAliveTime;

保活时间,它指的是空闲线程等待工作的超时时间,当存在多个 corePoolSize 或 allowCoreThreadTimeOut 时,线程将使用这个超时时间。

下面是一些其他变量,这些变量比较简单,我就直接给出注释了。

private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间
private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数
private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy(); // 默认的拒绝策略

任务提交

现在我们知道了 ThreadPoolExecutor 创建出来就会处于运行状态,此时线程数量为 0 ,等任务到来时,线程池就会创建线程来执行任务,而下面我们的关注点就会放在任务提交这个过程上。

通常情况下,我们会使用

executor.execute()

来执行任务,我在很多书和博客教程上都看到过这个执行过程,下面是一些书和博客教程所画的 ThreadPoolExecutor 的执行示意图和执行流程图

执行示意图

微信图片_20220416151900.png

处理流程图

微信图片_20220416151904.png

ThreadPoolExecutor 的执行 execute 的方法分为下面四种情况

  1. 如果当前运行的工作线程少于 corePoolSize 的话,那么会创建新线程来执行任务 ,这一步需要获取 mainLock 全局锁
  2. 如果运行线程不小于 corePoolSize,则将任务加入 BlockingQueue 阻塞队列。
  3. 如果无法将任务加入 BlockingQueue 中,此时的现象就是队列已满,此时需要创建新的线程来处理任务,这一步同样需要获取 mainLock 全局锁。
  4. 如果创建新线程会使当前运行的线程超过 maximumPoolSize 的话,任务将被拒绝,并且使用 RejectedExecutionHandler.rejectEExecution() 方法拒绝新的任务。

ThreadPoolExecutor 采取上面的整体设计思路,是为了在执行 execute 方法时,避免获取全局锁,因为频繁获取全局锁会是一个严重的可伸缩瓶颈,所以,几乎所有的 execute 方法调用都是通过执行步骤2。

上面指出了 execute 的运行过程,整体上来说这个执行过程把非常重要的点讲解出来了,但是不够细致,我查阅 ThreadPoolExecute 和部分源码分析文章后,发现这事其实没这么简单,先来看一下 execute 的源码,我已经给出了中文注释

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  // 获取 ctl 的值
  int c = ctl.get();
  // 判断 ctl 的值是否小于核心线程池的数量
  if (workerCountOf(c) < corePoolSize) {
    // 如果小于,增加工作队列,command 就是一个个的任务
    if (addWorker(command, true))
      // 线程创建成功,直接返回
      return;
    // 线程添加不成功,需要再次判断,每需要一次判断都会获取 ctl 的值
    c = ctl.get();
  }
  // 如果线程池处于运行状态并且能够成功的放入阻塞队列
  if (isRunning(c) && workQueue.offer(command)) {
    // 再次进行检查
    int recheck = ctl.get();
    // 如果不是运行态并且成功的从阻塞队列中删除
    if (! isRunning(recheck) && remove(command))
      // 执行拒绝策略
      reject(command);
    // worker 线程数量是否为 0
    else if (workerCountOf(recheck) == 0)
      // 增加工作线程
      addWorker(null, false);
  }
  // 如果不能增加工作线程的数量,就会直接执行拒绝策略
  else if (!addWorker(command, false))
    reject(command);
}

下面是我根据源码画出的执行流程图

微信图片_20220416151908.png

下面我们针对 execute 流程进行分析,可能有点啰嗦,因为几个核心流程上面已经提过了,不过为了流程的完整性,我们再在这里重新提一下。

  1. 如果线程池的核心数量少于 corePoolSize,那么就会使用 addWorker 创建新线程,addworker 的流程我们会在下面进行分析。如果创建成功,那么 execute 方法会直接返回。如果没创建成功,可能是由于线程池已经 shutdown,可能是由于并发情况下 workerCountOf(c) < corePoolSize ,别的线程先创建了 worker 线程,导致 workerCoun t>= corePoolSize。
  2. 如果线程池还在 Running 状态,会将 task 加入阻塞队列,加入成功后会进行 double-check 双重校验,继续下面的步骤,如果加入失败,可能是由于队列线程已满,此时会判断是否能够加入线程池中,如果线程池也满了的话,就会直接执行拒绝策略,如果线程池能加入,execute 方法结束。
  3. 步骤 2 中的 double-check 主要是为了判断进入 workQueue 中的 task 是否能被执行:如果线程池已经不是 Running 状态,则应该拒绝添加任务,从 workQueue 队列中删除任务。如果线程池是 Running,但是从 workQueue 中删除失败了,此时的原因可能是由于其他线程执行了这个任务,此时会直接执行拒绝策略。
  4. 如果线程是 Running 状态,并且不能把任务从队列中移除,进而判断工作线程是否为 0 ,如果不为 0 ,execute 执行完毕,如果工作线程是 0 ,则会使用 addWorker 增加工作线程,execute 执行完毕。

添加 worker 线程

从上面的执行流程可以看出,添加一个 worker 涉及的工作也非常多,这也是一个比价难啃的点,我们一起来分析下,这是 worker 的源码

private boolean addWorker(Runnable firstTask, boolean core) {
  // retry 的用法相当于 goto
  retry:
  for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);
    // Check if queue empty only if necessary.
    // 仅在必要时检查队列是否为空。
    // 线程池状态有五种,state 越小越是运行状态
    // rs >= SHUTDOWN,表示此时线程池状态可能是 SHUTDOWN、STOP、TIDYING、TERMINATED
    // 默认 rs >= SHUTDOWN,如果 rs = SHUTDOWN,直接返回 false
    // 默认 rs < SHUTDOWN,是 RUNNING,如果任务不是空,返回 false
    // 默认 RUNNING,任务是空,如果工作队列为空,返回 false
    //
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
      return false;
    // 执行循环
    for (;;) {
      // 统计工作线程数量
      int wc = workerCountOf(c);
      // 如果 worker 数量>线程池最大上限 CAPACITY(即使用int低29位可以容纳的最大值)
      // 或者 worker数量 > corePoolSize 或 worker数量>maximumPoolSize ),即已经超过了给定的边界
      if (wc >= CAPACITY ||
          wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
      // 使用 CAS 增加 worker 数量,增加成功,跳出循环。
      if (compareAndIncrementWorkerCount(c))
        break retry;
      // 检查 ctl
      c = ctl.get();  // Re-read ctl
      // 如果状态不等于之前获取的 state,跳出内层循环,继续去外层循环判断
      if (runStateOf(c) != rs)
        continue retry;
      // else CAS failed due to workerCount change; retry inner loop
    }
  }
  /*
          worker数量+1成功的后续操作
        * 添加到 workers Set 集合,并启动 worker 线程
         */
  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    // 包装 Runnable 对象
    // 设置 firstTask 的值为 -1
    // 赋值给当前任务
    // 使用 worker 自身这个 runnable,调用 ThreadFactory 创建一个线程,并设置给worker的成员变量thread
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        // 在持有锁的时候重新检查
        // 如果 ThreadFactory 失败或在获得锁之前关闭,请回退。
        int rs = runStateOf(ctl.get());
        //如果线程池在运行 running<shutdown 或者 线程池已经 shutdown,且firstTask==null
        // (可能是 workQueue 中仍有未执行完成的任务,创建没有初始任务的 worker 线程执行)
        //worker 数量 -1 的操作在 addWorkerFailed()
        if (rs < SHUTDOWN ||
            (rs == SHUTDOWN && firstTask == null)) {
          if (t.isAlive()) // precheck that t is startable
            throw new IllegalThreadStateException();
          // workers 就是一个 HashSet 集合
          workers.add(w);
          // 设置最大的池大小 largestPoolSize,workerAdded 设置为true
          int s = workers.size();
          if (s > largestPoolSize)
            largestPoolSize = s;
          workerAdded = true;
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        t.start();
        workerStarted = true;
      }
    }
    //如果启动线程失败
    // worker 数量 -1
  } finally {
    if (! workerStarted)
      addWorkerFailed(w);
  }
  return workerStarted;
}

妈的真长的一个方法,有点想吐血,其实我肝到现在已经肝不动了,但我一想到看这篇文章的读者们能给我一个关注,就算咳出一口老血也值了。

这个方法的执行流程图如下

微信图片_20220416151916.png

这里我们就不再文字描述了,但是上面流程图中有一个对象引起了我的注意,那就是 worker 对象,这个对象就代表了线程池中的工作线程,那么这个 worker 对象到底是啥呢?

相关文章
|
6月前
|
Java
线程池笔记
线程池笔记
40 0
|
存储 安全 Java
并发编程系列教程(07) - 线程池原理分析(一)
并发编程系列教程(07) - 线程池原理分析(一)
32 0
|
存储 缓存 监控
并发编程系列教程(08) - 线程池原理分析(二)
并发编程系列教程(08) - 线程池原理分析(二)
41 0
|
Java
线程池的类型有哪些?适用场景?第一篇
线程池的类型有哪些?适用场景?第一篇
183 0
线程池详解(通俗易懂超级好)
这个参数的设计完全参考系统运行环境和硬件压力设定,没有固定的参考值,用户可以根据经验和系统产生任务的时间间隔合理设置一个值即可;
线程池详解(通俗易懂超级好)
线程池:第一章:线程池的底层原理
线程池:第一章:线程池的底层原理
线程池:第一章:线程池的底层原理
|
存储 监控 Java
Java线程池理解与学习
线程过多就容易引发内存溢出,因此我们有必要使用线程池的技术 线程池的好处 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的消耗 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行 提高线程管理性: 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控
67 0
Java线程池理解与学习
|
存储 缓存 Java
Java线程池笔记
总结一下Java线程池相关八股文
214 0
|
存储 Java 程序员
搞懂Java线程池
搞懂Java线程池
搞懂Java线程池
|
Java 程序员 API
肝完这篇线程池,我咳血了(一)
我们知道,线程需要的时候要进行创建,不需要的时候需要进行销毁,但是线程的创建和销毁都是一个开销比较大的操作。
肝完这篇线程池,我咳血了(一)