Java 线程池源码解析(2)

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
云解析DNS,个人版 1个月
简介: Java 线程池源码解析

Java 线程池源码解析(1)https://developer.aliyun.com/article/1534179

因为 while 中的条件均为 false,所以 runWorker 会先执行下面的 completedAbruptly = false; 然后执行 finally 中的 processWorkerExit(w, completedAbruptly); 这里的 processWorkerExit 方法正是回收线程的关键所在,如果没有它,我们的所有空闲线程就都将被回收,所以这个方法中就定义了回收的逻辑:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
 
        tryTerminate();
 
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // 线程直接结束
            }
            addWorker(null, false); // 本线程结束,再新建一个线程
        }
    }

比如现在我们一共有 11 个线程(10个核心线程),那么 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; (暂不考虑 allowCoreThreadTimeOut)的结果就是 10,所以这个 11 个线程在执行 if (workerCountOf(c) >= min) 的时候,第一个执行该语句的线程会直接 return,之后执行该判断语句的 10 个线程由于不满足条件,但是自己又存活不了,所以在自己结束之前,会再创建一个没有初始任务的新线程,这样就保证了线程池中的线程数总是不低于核心线程数的。

2.2、线程池五大状态变化源码

下面是 ThreadPoolExecutor 源码中对于常量 ctl 的解释:

       主池控制状态ctl是一个原子整数,包含两个概念字段workerCount,表示线程的有效数量runState,表示是否正在运行、关闭等。

       为了将它们打包为一个整数,我们将workerCounts限制为(2^29)-1(约5亿)个线程,而不是(2^31)-1(20亿)个其他可表示的线程。如果这在未来是一个问题,可以将变量更改为AtomicLong,并调整下面的移位/掩码常数。但是,在需要之前,使用int,此代码会更快、更简单。workerCount是允许启动但不允许停止的工人数量。该值可能与实际活动线程数暂时不同,例如,当ThreadFactory在被请求时未能创建线程,以及当退出的线程在终止前仍在执行记账时。用户可见的池大小报告为工作者集的当前大小。

       runState提供了主要的生命周期控制,取值为:

  • RUNNING:正常接受新任务和处理排队的任务
  • SHUTDOWN:不接受新任务,但处理排队任务
  • STOP:不接受新建任务,不处理排队任务,并中断正在进行的任务
  • TIDING:所有任务都已终止,workerCount为零,转换到状态的线程TIDING将运行terminated()钩子方法terminated:terminated。

       runState随时间单调增加,但不需要达到每个状态。转换为:RUNNING->SHUTDOWN On invocation of SHUTDOWN()(RUNNING或SHUTDOWN)->STOP On invocationof shutdownNow()SHUTDOWN->TIDING当队列和池都为空时STOP->TIDIING当池为空时TIDIING->TERMINATED当TERMINATED()钩子方法完成时等待awaitTermination()的线程将在状态达到TERMINATED时返回。检测从SHUTDOWN到TIDING的转换并不像您希望的那样简单,因为队列在非空之后可能会变空,反之亦然,但我们只能在看到它是空的之后,看到workerCount为0时终止(这有时需要重新检查——见下文)

线程池的五大状态从源码中可以看到:

这里的 COUNT_MASK 代表的是线程池的最大数量,也就是 2 的 29 次方 -1 个。

Integer.SIZE为32,所以COUNT_BITS为29,最终各个状态对应的二级制为:

  1. RUNNING:11100000 00000000 00000000 00000000
  2. SHUTDOWN:00000000 00000000 00000000 00000000
  3. STOP:00100000 00000000 00000000 00000000
  4. TIDYING:01000000 00000000 00000000 00000000
  5. TERMINATED:01100000 00000000 00000000 00000000

       所以,只需要使用一个Integer数字的最高三个bit,就可以表示5种线程池的状态,而剩下的29个bit就可以用来表示工作线程数,比如,假如 ctl 为:11100000 00000000 00000000 00001010,就表示线程池的状态为RUNNING,线程池中目前在工作的线程有10个,这里说的“在工作”意思是线程活着,要么在执行任务,要么在阻塞等待任务。

       把线程状态放到高3位,把线程数量放到剩下的29位的好处就是方便之后频繁修改线程个数。

下面是线程状态的转换关系:

2.2.1、Thread.interrupt 线程中断
public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    System.out.println("线程 t1 在执行任务");
                }
            }
        });
 
        t1.start();
 
        Thread.sleep(3000);
 
        t1.interrupt();
 
        System.out.println("线程 t1 被阻塞了");
    }

       事实上,上面的代码并不会真正中断线程 t1 ,因为 interrupt 只是一个信号,线程要不要停掉还是取决于它自己。所以要想真正打断线程,需要线程自己判断:

Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!Thread.interrupted()){ // 如果线程未被中断
                    System.out.println("线程 t1 在执行任务");
                }
            }
        });

这样才能真正的关闭线程,否则 interrupt 方法没有意义。

2.2.2、线程池的状态转换

       线程池的 shutdown 方法会把线程池状态设置为 SHUTDOWN ,但是 SHUTDOWN 状态并不会立即关闭线程池中的所有线程,而是会等任务执行完毕再关闭线程,毕竟直接关闭线程可能会出现一些安全问题。

       如果希望立即关闭线程,需要调用 shutdownNow方法,这个方法会把线程池的状态设置为 STOP

这里的 onShutdown 方法是个空的(它是提供给子类用的),这里不需要关心。,而下面的 tryTerminate 才会对线程池的状态进行修改,这才是我们需要关心的:

同样,这里的 terminate 方法其实也只是一个空方法,它也是留给子类在需要时使用的,就相当于一个扩展机制,以后要是再 TIDYING 状态之后希望对线程池进行什么操作,就重写它就可以了。

上面代码中的 isRunning 方法也特别简单:

只要当前的线程池状态小于 SHUTDOWN 就是 RUNNING 状态。

2.2.3、addWorker 再解析

首先了解乐观锁:

  • Java提供了如AtomicInteger、AtomicLong、AtomicReference等原子类来支持CAS操作,这些类位于java.util.concurrent.atomic包下。
  • 下面的源码中就是在循环体内部使用CAS操作或版本号机制来实现了乐观锁的行为。
 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN) // 如果当前线程池的状态 >= SHUTDOWN
                && (runStateAtLeast(c, STOP) // 并且当前线程池的状态 >= STOP
                    || firstTask != null // execute(command) 的command为null代表不携带任务
                    || workQueue.isEmpty())) // 任务队列为空
                return false; // 返回false代表创建线程失败
 
            for (;;) {
                if (workerCountOf(c) // 线程池的总线程数
                    // 在execute方法源码中,当当前线程数<核心线程数是addWorker的core=true
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) // 是否大于核心线程数或者总线程数
                    return false;
                // 当有多个线程同时执行compareAndIncrementWorkerCount方法时,只会有一个返回true
                if (compareAndIncrementWorkerCount(c)) // 先把总线程数+1再去创建线程,ctl是原子类型的,所以这样不会出现线程安全的问题
                    break retry; // 如果新建线程成功,就会跳出去取真正新建线程
                c = ctl.get();  // 新建线程失败的话刷新总线程数
                if (runStateAtLeast(c, SHUTDOWN)) // 判断线程池状态是否>=SHUTDOWN
                    continue retry; 
                // else CAS failed due to workerCount change; retry inner loop
                // 否则继续执行该循环
            }
        }
 
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
 
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        workerAdded = true;
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

       比如我们设置核心线程数为10,但是现在只创建了9个核心线程,如果此时有 2 个以上线程同时执行了 executor.execute(task) 方法,那么在 execute 中会先判断当前线程总数是否大于核心线程数,因为这些线程是同时执行的,所以都返回 9 ,那么 addWorker 的 core 参数就是 true,进入 addWorker 方法,这些线程会都先给总线程数 +1 再去创建线程,而不是先去创建线程再给总线程数 + 1,因为 addWorker 的参数 core = true(代表创建一个核心线程),那么如果都去创建核心线程就会使得核心线程数超过设置的值。

       所以第一个抢先给 ctl + 1 的线程会真正创建一个核心线程(ctl 是原子性的),而其它线程只能被跳出循环重新执行,这样当第二次执行循环时,就会发现此时的核心线程数已经达到了,就会返回 false,代表创建核心线程失败。

总结

       所以我们需要知道,在 Java 的线程池中,它并不是上来就把所有核心线程都开启,而是需要的时候才会开启;此外,当核心线程都忙碌的时候,新来的任务并不会去创建非核心线程,而是优先放到一个任务队列当中去,只有当任务队列满了(这里用的 LinkedBlockingQueue 是链表结构,所以不会满)才会去创建一个新的非核心队列。

相关文章
|
3天前
|
存储 监控 算法
Java 内存管理与垃圾回收机制深度解析
本文深入探讨了Java的内存管理与垃圾回收(GC)机制,从JVM内存结构出发,详细分析了堆、栈、方法区的职能及交互。文章重点讨论了垃圾回收的核心概念、常见算法以及调优策略,旨在为Java开发者提供一套系统的内存管理和性能优化指南。 【7月更文挑战第17天】
|
3天前
|
Java 编译器 开发者
Java 内存模型深度解析
本文旨在深入探讨Java内存模型的复杂性及其对并发编程的影响。通过揭示内存模型的核心原理、JMM的结构,并结合具体案例和数据分析,本文将帮助读者理解Java内存模型如何确保多线程程序的正确性和性能,以及如何在实际应用中有效利用这一模型进行高效的并发编程。 【7月更文挑战第17天】
9 4
|
4天前
|
Java
Java中的异常处理机制深度解析
本文旨在深入探讨Java语言中异常处理的机制,从基础概念到高级应用,全面剖析try-catch-finally语句、自定义异常以及异常链追踪等核心内容。通过实例演示和代码分析,揭示异常处理在Java程序设计中的重要性和应用技巧,帮助读者构建更为健壮和易于维护的程序。
|
8天前
|
JavaScript Java 测试技术
基于Java的智慧医疗服务平台系统设计和实现(源码+LW+部署讲解)
基于Java的智慧医疗服务平台系统设计和实现(源码+LW+部署讲解)
28 8
|
8天前
|
JavaScript Java 测试技术
基于Java的人事管理系统设计和实现(源码+LW+部署讲解)
基于Java的人事管理系统设计和实现(源码+LW+部署讲解)
20 7
|
8天前
|
JavaScript Java 测试技术
基于Java的儿童福利院管理系统设计和实现(源码+LW+部署讲解)
基于Java的儿童福利院管理系统设计和实现(源码+LW+部署讲解)
20 7
|
6天前
|
监控 Java API
Java并发编程之线程池深度解析
【7月更文挑战第14天】在Java并发编程领域,线程池是提升性能、管理资源的关键工具。本文将深入探讨线程池的核心概念、内部工作原理以及如何有效使用线程池来处理并发任务,旨在为读者提供一套完整的线程池使用和优化策略。
|
9天前
|
缓存 安全 Java
Java中线程池如何管理?
【7月更文挑战第11天】Java中线程池如何管理?
17 2
|
9天前
|
监控 Java 调度
Java面试题:描述Java线程池的概念、用途及常见的线程池类型。介绍一下Java中的线程池有哪些优缺点
Java面试题:描述Java线程池的概念、用途及常见的线程池类型。介绍一下Java中的线程池有哪些优缺点
21 1
|
9天前
|
设计模式 存储 安全
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
21 1

推荐镜像

更多