肝完这篇线程池,我咳血了(四)

简介: 我们知道,线程需要的时候要进行创建,不需要的时候需要进行销毁,但是线程的创建和销毁都是一个开销比较大的操作。

worker 对象

Worker 位于 ThreadPoolExecutor 内部,它继承了 AQS 类并且实现了 Runnable 接口。Worker 类主要维护了线程运行过程中的中断控制状态。它提供了锁的获取和释放操作。在 worker 的实现中,我们使用了非重入的互斥锁而不是使用重复锁,因为 Lea 觉得我们不应该在调用诸如 setCorePoolSize 之类的控制方法时能够重新获取锁。

worker 对象的源码比较简单和标准,这里我们只说一下 worker 对象的构造方法,也就是

Worker(Runnable firstTask) {
  setState(-1); 
  this.firstTask = firstTask;
  this.thread = getThreadFactory().newThread(this);
}

构造一个 worker 对象需要做三步操作:

  • 初始 AQS 状态为 -1,此时不允许中断 interrupt(),只有在 worker 线程启动了,执行了 runWorker() 方法后,将 state 置为0,才能进行中断。
  • 将 firstTask 赋值给为当前类的全局变量
  • 通过 ThreadFactory 创建一个新的线程。

###任务运行

我们前面的流程主要分析了线程池的 execute 方法的执行过程,这个执行过程相当于是任务提交过程,而我们下面要说的是从队列中获取任务并运行的这个工作流程。

一般情况下,我们会从初始任务开始运行,所以我们不需要获取第一个任务。否则,只要线程池还处于 Running 状态,我们会调用 getTask 方法获取任务。getTask 方法可能会返回 null,此时可能是由于线程池状态改变或者是配置参数更改而导致的退出。还有一种情况可能是由于 异常 而引发的,这个我们后面会细说。

下面来看一下 runWorker 方法的源码:

final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  // 允许打断
  //  new Worker() 是 state==-1,此处是调用 Worker 类的 tryRelease() 方法,
  //  将 state 置为0
  w.unlock();
  boolean completedAbruptly = true;
  try {
    // 调用 getTask() 获取任务
    while (task != null || (task = getTask()) != null) {
      // 获取全局锁
      w.lock();
      // 确保只有在线程 STOPING 时,才会被设置中断标志,否则清除中断标志。
      // 如果一开始判断线程池状态 < STOPING,但 Thread.interrupted() 为 true,
      // 即线程已经被中断,又清除了中断标示,再次判断线程池状态是否 >= stop
      // 是,再次设置中断标示,wt.interrupt()
      // 否,不做操作,清除中断标示后进行后续步骤
      if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() &&
            runStateAtLeast(ctl.get(), STOP))) &&
          !wt.isInterrupted())
        wt.interrupt();
      try {
        // 执行前需要调用的方法,交给程序员自己来实现
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
          task.run();
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          // 执行后需要调用的方法,交给程序员自己来实现
          afterExecute(task, thrown);
        }
      } finally {
        // 把 task 置为 null,完成任务数 + 1,并进行解锁
        task = null;
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
    // 最后处理 worker 的退出
  } finally {
    processWorkerExit(w, completedAbruptly);
  }
}

下面是 runWorker 的执行流程图

微信图片_20220416152154.png

这里需要注意一下最后的 processWorkerExit 方法,这里面其实也做了很多事情,包括判断 completedAbruptly 的布尔值来表示是否完成任务,获取锁,尝试从队列中移除 worker,然后尝试中断,接下来会判断一下中断状态,在线程池当前状态小于 STOP 的情况下会创建一个新的 worker 来替换被销毁的 worker。

任务获取

任务获取就是 getTask 方法的执行过程,这个环节主要用来获取任务和剔除任务。下面进入源码分析环节

private Runnable getTask() {
  // 判断最后一个 poll 是否超时。
  boolean timedOut = false; // Did the last poll() time out?
  for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);
    // Check if queue empty only if necessary.
    // 必要时检查队列是否为空
    // 对线程池状态的判断,两种情况会 workerCount-1,并且返回 null
    // 线程池状态为 shutdown,且 workQueue 为空(反映了 shutdown 状态的线程池还是要执行 workQueue 中剩余的任务的)
    // 线程池状态为 stop(shutdownNow() 会导致变成 STOP)(此时不用考虑 workQueue 的情况)
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      decrementWorkerCount();
      return null;
    }
    int wc = workerCountOf(c);
    // Are workers subject to culling?
    // 是否需要定时从 workQueue 中获取
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    // 如果工作线程的数量大于 maximumPoolSize 会进行线程剔除
    // 如果使用了 allowCoreThreadTimeOut ,并且工作线程不为0或者队列有任务的话,会直接进行线程剔除
    if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
      if (compareAndDecrementWorkerCount(c))
        return null;
      continue;
    }
    try {
      Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
      workQueue.take();
      if (r != null)
        return r;
      timedOut = true;
    } catch (InterruptedException retry) {
      timedOut = false;
    }
  }
}

getTask 方法的执行流程图如下

微信图片_20220416152159.png

工作线程退出

工作线程退出是 runWorker 的最后一步,这一步会判断工作线程是否突然终止,并且会尝试终止线程,以及是否需要增加线程来替换原工作线程。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
  // worker数量 -1
  // completedAbruptly 是 true,突然终止,说明是 task 执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的 worker 线程数量需要-1
  // completedAbruptly 是 false 是突然终止,说明是 worker 线程没有 task 可执行了,不用-1,因为已经在 getTask() 方法中-1了
  if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();
  // 从 Workers Set 中移除 worker
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);
  } finally {
    mainLock.unlock();
  }
  // 尝试终止线程,
  tryTerminate();
  // 是否需要增加 worker 线程
  // 线程池状态是 running 或 shutdown
  // 如果当前线程是突然终止的,addWorker()
  // 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
  // 故如果调用线程池 shutdown(),直到workQueue为空前,线程池都会维持 corePoolSize 个线程,
  // 然后再逐渐销毁这 corePoolSize 个线程
  int c = ctl.get();
  if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
      if (min == 0 && ! workQueue.isEmpty())
        min = 1;
      if (workerCountOf(c) >= min)
        return; // replacement not needed
    }
    addWorker(null, false);
  }
}

源码搞的有点头大了,可能一时半会无法理解上面这些源码,不过你可以先把注释粘过去,等有时间了需要反复刺激,加深印象!

其他线程池

下面我们来了解一下其他线程池的构造原理,主要涉及 FixedThreadPool、SingleThreadExecutor、CachedThreadPool

newFixedThreadPool

newFixedThreadPool 被称为可重用固定线程数的线程池,下面是 newFixedThreadPool 的源码

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}

可以看到,newFixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被设置为创建 FixedThreadPool 时指定的参数 nThreads,也就是说,在 newFiexedThreadPool 中,核心线程数就是最大线程数。

下面是 newFixedThreadPool 的执行示意图

微信图片_20220416152204.png

newFixedThreadPool 的工作流程如下

  • 如果当前运行的线程数少于 corePoolSize,则会创建新线程 addworker 来执行任务
  • 如果当前线程的线程数等于 corePoolSize,会将任务直接加入到 LinkedBlockingQueue 无界阻塞队列中,LinkedBlockingQueue 的上限如果没有制定,默认为 Integer.MAX_VALUE 大小。
  • 等到线程池中的任务执行完毕后,newFixedThreadPool 会反复从 LinkedBlockingQueue 中获取任务来执行。

相较于 ThreadPoolExecutor,newFixedThreadPool 主要做了以下改变

  • 核心线程数等于最大线程数,因此 newFixedThreadPool 只有两个最大容量,一个是线程池的线程容量,还有一个是 LinkedBlockingQueue 无界阻塞队列的线程容量。
  • 这里可以看到还有一个变化是 0L,也就是 keepAliveTime = 0L,keepAliveTime 就是到达工作线程最大容量后的线程等待时间,0L 就意味着当线程池中的线程数大于 corePoolsize 时,空余的线程会被立即终止。
  • 由于使用无界队列,运行中的 newFixedThreadPool 不会拒绝任务,也就是不会调用 RejectedExecutionHandler.rejectedExecution 方法。

newSingleThreadExecutor

newSingleThreadExecutor 中只有单个工作线程,也就是说它是一个只有单个 worker 的 Executor。

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>(),
                            threadFactory));
}

可以看到,在 newSingleThreadExecutor 中,corePoolSize 和 maximumPoolSize 都被设置为 1,也不存在超时情况,同样使用了 LinkedBlockingQueue 无界阻塞队列,除了 corePoolSize 和 maximumPoolSize 外,其他几乎和 newFixedThreadPool 一模一样。

下面是 newSingleThreadExecutor  的执行示意图

微信图片_20220416152208.png

newSingleThreadExecutor 的执行过程和 newFixedThreadPool 相同,只是 newSingleThreadExecutor 的工作线程数为 1。

newCachedThreadPool

newCachedThreadPool 是一个根据需要创建工作线程的线程池,newCachedThreadPool 线程池最大数量是 Integer.MAX_VALUE,保活时间是 60 秒,使用的是SynchronousQueue 无缓冲阻塞队列。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>(),
                                threadFactory);
}

它的执行示意图如下

微信图片_20220416152212.png

  • 首先会先执行 SynchronousQueue.offer 方法,如果当前 maximumPool 中有空闲线程正在执行 SynchronousQueue.poll ,就会把任务交给空闲线程来执行,execute 方法执行完毕,否则的话,继续向下执行。
  • 如果 maximumPool 中没有线程执行 SynchronousQueue.poll 方法,这种情况下 newCachedThreadPool 会创建一个新线程执行任务,execute 方法执行完成。
  • 执行完成的线程将执行 poll 操作,这个 poll 操作会让空闲线程最多在 SynchronousQueue 中等待 60 秒钟。如果 60 秒钟内提交了一个新任务,那么空闲线程会执行这个新提交的任务,否则空闲线程将会终止。

这里的关键点在于 SynchronousQueue 队列,它是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程对应的移除操作。这其实就是一种任务传递,如下图所示

微信图片_20220416152216.png

其实还有一个线程池 ScheduledThreadPoolExecutor ,就先不在此篇文章做详细赘述了。

线程池实践考量因素

下面介绍几种在实践过程中使用线程池需要考虑的几个点

  • 避免任务堆积,比如我们上面提到的 newFixedThreadPool,它是创建指定数目的线程,但是工作队列是无界的,这就导致如果工作队列线程太少,导致处理速度跟不上入队速度,这种情况下很可能会导致 OOM,诊断时可以使用 jmap 检查是否有大量任务入队。
  • 生产实践中很可能由于逻辑不严谨或者工作线程不能及时释放导致 线程泄漏,这个时候最好检查一下线程栈
  • 避免死锁等同步问题
  • 尽量避免在使用线程池时操作 ThreadLocal,因为工作线程的生命周期可能会超过任务的生命周期。

线程池大小的设置

线程池大小的设置也是面试官经常会考到的一个点,一般需要根据任务类型来配置线程池大小

  • 如果是 CPU 密集型任务,那么就意味着 CPU 是稀缺资源,这个时候我们通常不能通过增加线程数来提高计算能力,因为线程数量太多,会导致频繁的上下文切换,一般这种情况下,建议合理的线程数值是 N(CPU)数 + 1
  • 如果是 I/O 密集型任务,就说明需要较多的等待,这个时候可以参考 Brain Goetz 的推荐方法 线程数 = CPU核数 × (1 + 平均等待时间/平均工作时间)。参考值可以是 N(CPU) 核数 * 2。

当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

后记

这篇文章真的写了很久,因为之前对线程池认识不是很深,所以花了大力气来研究,希望这篇文章对你有所帮助。

这篇文章探讨了对 Executor 框架的主要组成、线程池结构与生命周期,线程池源码和线程池实现的细节等方面进行了讲解和分析,希望对你有所帮助。

如果这篇文章写的还不错,希望读者朋友们可以不吝给出四连:点赞、在看、留言、分享,记住这次一定哦!

相关文章
|
6月前
|
Java
线程池笔记
线程池笔记
40 0
|
存储 安全 Java
并发编程系列教程(07) - 线程池原理分析(一)
并发编程系列教程(07) - 线程池原理分析(一)
32 0
|
存储 缓存 监控
并发编程系列教程(08) - 线程池原理分析(二)
并发编程系列教程(08) - 线程池原理分析(二)
41 0
|
Java
线程池的类型有哪些?适用场景?第一篇
线程池的类型有哪些?适用场景?第一篇
183 0
线程池详解(通俗易懂超级好)
这个参数的设计完全参考系统运行环境和硬件压力设定,没有固定的参考值,用户可以根据经验和系统产生任务的时间间隔合理设置一个值即可;
线程池详解(通俗易懂超级好)
线程池:第一章:线程池的底层原理
线程池:第一章:线程池的底层原理
线程池:第一章:线程池的底层原理
|
存储 监控 Java
Java线程池理解与学习
线程过多就容易引发内存溢出,因此我们有必要使用线程池的技术 线程池的好处 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的消耗 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行 提高线程管理性: 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控
67 0
Java线程池理解与学习
|
存储 缓存 Java
Java线程池笔记
总结一下Java线程池相关八股文
214 0
|
存储 Java 程序员
搞懂Java线程池
搞懂Java线程池
搞懂Java线程池
|
Java 程序员 API
肝完这篇线程池,我咳血了(一)
我们知道,线程需要的时候要进行创建,不需要的时候需要进行销毁,但是线程的创建和销毁都是一个开销比较大的操作。
肝完这篇线程池,我咳血了(一)