Java 线程池源码解析(1)

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析DNS,个人版 1个月
云解析 DNS,旗舰版 1个月
简介: Java 线程池源码解析

线程池

池化思想:线程池、数据连接池等,比如我们 Spark 的 Executor 就是典型的线程池,用户在启动 Spark 作业的同时启动线程池,这样 Spark 的 Task 就可以直接获取资源,而不用像 MR 程序那样等待容器上的进程开启了。

如果不使用线程池的话,我们需要:

  1. 手动创建线程对象
  2. 执行任务
  3. 执行完毕,回收资源

优点

  1. 提高线程的利用率
  2. 提高程序的响应速度(线程对象提前创建好的,节省了创建和销毁的开销)
  3. 便于统一管理线程对象
  4. 可以控制最大的并发数

1、Java 创建线程池示例

1.1、构造参数解释

我们先看看 ThreadPoolExecutor 的构造器的源码:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime); // 统一转为亚秒格式
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

参数大概意思就是:核心线程数、最大线程数、存活时间、存活时间单位、阻塞队列(存放任务)、线程创建工厂、拒绝策略。我们也可以通过下面的例子理解一下:

       线程池就好比我们的银行,它有很多个接待客人的柜台(一个柜台就是一个线程,用来执行任务)和用来供客人等待的座位(等待的执行的任务)。上面的图中,我们有 5 个柜台(对应第2个参数:maximumPoolSize) ;而其中绿色的三个柜台代表常驻柜台(也就是一直都有人,对应第1个参数:corePoolSize);红色的柜台代表预备柜台,也就是当业务繁忙的时候,最多还可以打电话摇两个人来(maximumPoolSize - corePoolSize);但是摇人来帮忙是有时效性的,如果帮完忙一段时间(取决于第3个和第4个参数)没有活干,这些线程就会被释放;摇人的方式取决于第6个参数(下面我们是通过默认的线程工厂来再创建线程的);蓝色的等候区区域代表座位(可允许等待的任务数量),当柜台前和座位都满了的时候,如果再有任务进来就会被拒绝,具体的拒绝方式取决于第7个参数,下面我们给出的拒绝策略是直接报错:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class TestThreadPool {
  public static void main(String[] args) {
    // 1.核心线程数 2.最大线程数 3.存活时间 4.时间单位 5.等待队列 6.线程工厂 7.拒绝策略(直接抛异常)
    ExecutorService executorService = new ThreadPoolExecutor(3, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
    for (int i = 0; i < 9; i++) {
      executorService.execute(()->{
        System.out.println(Thread.currentThread().getName()+"正在工作");
      });
    }
        // 关闭线程池
    executorService.shutdown();
  }
}
  • 使用线程池中执行任务很简单,我们不需要关心具体是哪个线程执行的,只需要把任务丢给它即可(通过 lambda 表达式来告诉线程池我们的任务执行逻辑)

       上面的案例中,我们的线程池的最大接收的任务量是 8 (最大线程数:5 + 等待队列容量:3),但并不是说只能跑8个任务,如果有任务释放资源仍然可以继续执行任务。

       所以,上面的案例同一时刻最多只能共存8个任务(其中最多只有五个任务会同时执行),如果,当我们的任务超过8时,会直接报错(因为我们设置了拒绝策略就是直接抛异常)。

2、ThreadExecutorPool 源码解析

创建一个包含 10 个核心线程、总线程数为 20、存活时间为 0 s、拒绝策略为直接抛异常的一个线程池对象:

public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                10, 20,
                0L, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()
        );
 
        executor.execute(new Runnable() {
            @Override
            public void run() {
 
            }
        });
    }

2.1、线程池保活与回收源码分析

2.1.1、execute 方法源码
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // 如果总线程数小于核心线程数
            if (addWorker(command, true))  // 添加一个核心线程
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // 如果当前全部线程都在工作就把任务添加到阻塞任务队列当中
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) 
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 如果上面没有成功把任务加到队列就新加一个线程
            reject(command); // 如果新加线程失败(比如超过最大线程数)就拒绝任务
    }

这里的注释已经说的很明白了:

  1. 如果正在运行的线程数少于核心线程数(corePoolSize),则尝试启动一个新线程,并将给定的命令参数作为其第一个任务。调用addWorker方法可以原子性地检查运行状态和工作线程数,从而防止在不应该添加线程时发出错误的警报,通过返回false来实现。
  2. 如果任务能够成功入队,我们仍然需要再次检查是否应该添加一个线程(因为自上次检查以来已有线程死亡)或者线程池在进入此方法后已经关闭。所以我们重新检查状态,如果有必要回滚入队操作(如果已停止),或者如果没有线程,则启动一个新线程。
  3. 如果我们无法将任务入队,则尝试添加一个新的线程。如果添加失败,我们知道线程池已经关闭或饱和,因此拒绝该任务。

       所以在上面我们创建线程池的代码中,并不是线程池创建好之后就会立马创建 10 个核心线程,而是真正有任务来的时候才会去新创建一个线程。

思考:如果线程的任务结束了,线程对象会怎么样呢?

       从创建线程池的构造器就不难想到,构造器中有一个阻塞队列的参数,其实当线程没有任务的时候,线程并不会关闭,而是一直阻塞,也叫保活

       保活线程的关键在于阻塞队列,即LinkedBlockingDeque。当队列为空时,如果线程尝试从队列中取元素,线程会被阻塞,直到队列中有元素可供取出。这样,线程就会在等待任务的过程中保持活跃状态。

2.1.2、getTask 方法源码

       getTask 方法是 runWorker 方法里的调用的,而 runWoker 又是 Worker 对象的方法,这个 Worker 实例又是上面的 execute 方法中的 addWorker 方法中实例化出来的。

       getTask 方法是所有线程池中的线程一直在不断调用检查的(在 runTask 方法中的 while 循环中被调用):

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
 
        for (;;) {
            int c = ctl.get();
 
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            
            // 总线程数
            int wc = workerCountOf(c);
 
            // 判断是否需要超时回收线程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
 
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 回收超时的线程
                    workQueue.take(); // 阻塞
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

方法解释

如果满足以下任一条件,该工作线程将返回null并退出:

  1. 当前线程数超过了最大线程数(由于调用了setMaximumPoolSize方法)。
  2. 线程池已停止。
  3. 线程池已关闭且队列为空。
  4. 该工作线程在等待任务时超时,并且超时的工作线程会受到终止处理(即allowCoreThreadTimeOut || workerCount > corePoolSize),无论是在超时等待之前还是之后。如果队列非空,则该工作线程不是线程池中的最后一个线程。

返回值

       如果满足上述条件之一,返回null,表示工作线程必须退出,此时workerCount会减1。

否则,返回task,表示成功获取到任务。

       从源码中可以看到,线程池中的工作线程会执行getTask()方法来获取任务。在这个方法中,线程会调用workQueue.poll()或workQueue.take()方法来尝试从LinkedBlockingDeque中获取任务。如果队列为空,这些方法会使线程阻塞,直到有新的任务添加到队列中。这就是线程在没有任务执行时仍然保持活跃(保活)的机制。

  • workQueue.take():当队列为空时,该方法会无限期地等待,直到有新的任务被添加到队列中。这意味着,如果所有核心线程都在执行任务并且队列为空,那么调用 take() 方法的线程会一直阻塞,直到其他线程向队列中添加了新的任务。
  • workQueue.poll():此方法在队列为空时会立即返回null,而不是等待。这通常用于非核心线程(也称为工作线程),当没有任务可做时,这些线程可以选择终止自己以减少资源占用。

思考:现在的核心线程数是 10,如果此时正在工作的线程有 11 个(10个核心线程,一个其它线程),那如果所有线程的任务都完成了,那么线程池又会执行怎样的逻辑呢?

2.1.3、runWorker 方法源码

在上面的 getTask 方法中,线程池中的每个线程在调用这个方法的时候都会判断是否需要回收线程:

        // 判断是否需要超时回收线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

现在我们有 11 个线程,所以明显总线程数 wc(11) > corePoolSize(10),所以自然会执行下面的逻辑:

        Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;

也就是 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):当前调用该方法的线程会阻塞一段时间(keppAliveTime 个单位),如果这段时间过后依然访问不到任务,那么下面的 timeOut = true ,getTask 方法返回 null。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }


Java 线程池源码解析(2)https://developer.aliyun.com/article/1534180

相关文章
|
3天前
|
存储 监控 算法
Java 内存管理与垃圾回收机制深度解析
本文深入探讨了Java的内存管理与垃圾回收(GC)机制,从JVM内存结构出发,详细分析了堆、栈、方法区的职能及交互。文章重点讨论了垃圾回收的核心概念、常见算法以及调优策略,旨在为Java开发者提供一套系统的内存管理和性能优化指南。 【7月更文挑战第17天】
|
3天前
|
Java 编译器 开发者
Java 内存模型深度解析
本文旨在深入探讨Java内存模型的复杂性及其对并发编程的影响。通过揭示内存模型的核心原理、JMM的结构,并结合具体案例和数据分析,本文将帮助读者理解Java内存模型如何确保多线程程序的正确性和性能,以及如何在实际应用中有效利用这一模型进行高效的并发编程。 【7月更文挑战第17天】
9 4
|
4天前
|
Java
Java中的异常处理机制深度解析
本文旨在深入探讨Java语言中异常处理的机制,从基础概念到高级应用,全面剖析try-catch-finally语句、自定义异常以及异常链追踪等核心内容。通过实例演示和代码分析,揭示异常处理在Java程序设计中的重要性和应用技巧,帮助读者构建更为健壮和易于维护的程序。
|
8天前
|
JavaScript Java 测试技术
基于Java的智慧医疗服务平台系统设计和实现(源码+LW+部署讲解)
基于Java的智慧医疗服务平台系统设计和实现(源码+LW+部署讲解)
28 8
|
6天前
|
监控 Java API
Java并发编程之线程池深度解析
【7月更文挑战第14天】在Java并发编程领域,线程池是提升性能、管理资源的关键工具。本文将深入探讨线程池的核心概念、内部工作原理以及如何有效使用线程池来处理并发任务,旨在为读者提供一套完整的线程池使用和优化策略。
|
1月前
|
XML Java 数据格式
深度解析 Spring 源码:从 BeanDefinition 源码探索 Bean 的本质
深度解析 Spring 源码:从 BeanDefinition 源码探索 Bean 的本质
32 3
|
16天前
|
存储 安全 Java
深度长文解析SpringWebFlux响应式框架15个核心组件源码
以上是Spring WebFlux 框架核心组件的全部介绍了,希望可以帮助你全面深入的理解 WebFlux的原理,关注【威哥爱编程】,主页里可查看V哥每天更新的原创技术内容,让我们一起成长。
|
17天前
|
关系型数据库 分布式数据库 数据库
PolarDB-X源码解析:揭秘分布式事务处理
【7月更文挑战第3天】**PolarDB-X源码解析:揭秘分布式事务处理** PolarDB-X,应对大规模分布式事务挑战,基于2PC协议确保ACID特性。通过预提交和提交阶段保证原子性与一致性,使用一致性快照隔离和乐观锁减少冲突,结合故障恢复机制确保高可用。源码中的事务管理逻辑展现了优化的分布式事务处理流程,为开发者提供了洞察分布式数据库核心技术的窗口。随着开源社区的发展,更多创新实践将促进数据库技术进步。
21 3
|
1月前
|
XML Java 数据格式
深度解析 Spring 源码:揭秘 BeanFactory 之谜
深度解析 Spring 源码:揭秘 BeanFactory 之谜
23 1
|
1月前
|
SQL 缓存 算法
【源码解析】Pandas PandasObject类详解的学习与实践
【源码解析】Pandas PandasObject类详解的学习与实践

推荐镜像

更多