Java 线程池源码解析(2)

简介: Java 线程池源码解析

Java 线程池源码解析(1)https://developer.aliyun.com/article/1534179

因为 while 中的条件均为 false,所以 runWorker 会先执行下面的 completedAbruptly = false; 然后执行 finally 中的 processWorkerExit(w, completedAbruptly); 这里的 processWorkerExit 方法正是回收线程的关键所在,如果没有它,我们的所有空闲线程就都将被回收,所以这个方法中就定义了回收的逻辑:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
 
        tryTerminate();
 
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // 线程直接结束
            }
            addWorker(null, false); // 本线程结束,再新建一个线程
        }
    }

比如现在我们一共有 11 个线程(10个核心线程),那么 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; (暂不考虑 allowCoreThreadTimeOut)的结果就是 10,所以这个 11 个线程在执行 if (workerCountOf(c) >= min) 的时候,第一个执行该语句的线程会直接 return,之后执行该判断语句的 10 个线程由于不满足条件,但是自己又存活不了,所以在自己结束之前,会再创建一个没有初始任务的新线程,这样就保证了线程池中的线程数总是不低于核心线程数的。

2.2、线程池五大状态变化源码

下面是 ThreadPoolExecutor 源码中对于常量 ctl 的解释:

       主池控制状态ctl是一个原子整数,包含两个概念字段workerCount,表示线程的有效数量runState,表示是否正在运行、关闭等。

       为了将它们打包为一个整数,我们将workerCounts限制为(2^29)-1(约5亿)个线程,而不是(2^31)-1(20亿)个其他可表示的线程。如果这在未来是一个问题,可以将变量更改为AtomicLong,并调整下面的移位/掩码常数。但是,在需要之前,使用int,此代码会更快、更简单。workerCount是允许启动但不允许停止的工人数量。该值可能与实际活动线程数暂时不同,例如,当ThreadFactory在被请求时未能创建线程,以及当退出的线程在终止前仍在执行记账时。用户可见的池大小报告为工作者集的当前大小。

       runState提供了主要的生命周期控制,取值为:

  • RUNNING:正常接受新任务和处理排队的任务
  • SHUTDOWN:不接受新任务,但处理排队任务
  • STOP:不接受新建任务,不处理排队任务,并中断正在进行的任务
  • TIDING:所有任务都已终止,workerCount为零,转换到状态的线程TIDING将运行terminated()钩子方法terminated:terminated。

       runState随时间单调增加,但不需要达到每个状态。转换为:RUNNING->SHUTDOWN On invocation of SHUTDOWN()(RUNNING或SHUTDOWN)->STOP On invocationof shutdownNow()SHUTDOWN->TIDING当队列和池都为空时STOP->TIDIING当池为空时TIDIING->TERMINATED当TERMINATED()钩子方法完成时等待awaitTermination()的线程将在状态达到TERMINATED时返回。检测从SHUTDOWN到TIDING的转换并不像您希望的那样简单,因为队列在非空之后可能会变空,反之亦然,但我们只能在看到它是空的之后,看到workerCount为0时终止(这有时需要重新检查——见下文)

线程池的五大状态从源码中可以看到:

这里的 COUNT_MASK 代表的是线程池的最大数量,也就是 2 的 29 次方 -1 个。

Integer.SIZE为32,所以COUNT_BITS为29,最终各个状态对应的二级制为:

  1. RUNNING:11100000 00000000 00000000 00000000
  2. SHUTDOWN:00000000 00000000 00000000 00000000
  3. STOP:00100000 00000000 00000000 00000000
  4. TIDYING:01000000 00000000 00000000 00000000
  5. TERMINATED:01100000 00000000 00000000 00000000

       所以,只需要使用一个Integer数字的最高三个bit,就可以表示5种线程池的状态,而剩下的29个bit就可以用来表示工作线程数,比如,假如 ctl 为:11100000 00000000 00000000 00001010,就表示线程池的状态为RUNNING,线程池中目前在工作的线程有10个,这里说的“在工作”意思是线程活着,要么在执行任务,要么在阻塞等待任务。

       把线程状态放到高3位,把线程数量放到剩下的29位的好处就是方便之后频繁修改线程个数。

下面是线程状态的转换关系:

2.2.1、Thread.interrupt 线程中断
public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    System.out.println("线程 t1 在执行任务");
                }
            }
        });
 
        t1.start();
 
        Thread.sleep(3000);
 
        t1.interrupt();
 
        System.out.println("线程 t1 被阻塞了");
    }

       事实上,上面的代码并不会真正中断线程 t1 ,因为 interrupt 只是一个信号,线程要不要停掉还是取决于它自己。所以要想真正打断线程,需要线程自己判断:

Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!Thread.interrupted()){ // 如果线程未被中断
                    System.out.println("线程 t1 在执行任务");
                }
            }
        });

这样才能真正的关闭线程,否则 interrupt 方法没有意义。

2.2.2、线程池的状态转换

       线程池的 shutdown 方法会把线程池状态设置为 SHUTDOWN ,但是 SHUTDOWN 状态并不会立即关闭线程池中的所有线程,而是会等任务执行完毕再关闭线程,毕竟直接关闭线程可能会出现一些安全问题。

       如果希望立即关闭线程,需要调用 shutdownNow方法,这个方法会把线程池的状态设置为 STOP

这里的 onShutdown 方法是个空的(它是提供给子类用的),这里不需要关心。,而下面的 tryTerminate 才会对线程池的状态进行修改,这才是我们需要关心的:

同样,这里的 terminate 方法其实也只是一个空方法,它也是留给子类在需要时使用的,就相当于一个扩展机制,以后要是再 TIDYING 状态之后希望对线程池进行什么操作,就重写它就可以了。

上面代码中的 isRunning 方法也特别简单:

只要当前的线程池状态小于 SHUTDOWN 就是 RUNNING 状态。

2.2.3、addWorker 再解析

首先了解乐观锁:

  • Java提供了如AtomicInteger、AtomicLong、AtomicReference等原子类来支持CAS操作,这些类位于java.util.concurrent.atomic包下。
  • 下面的源码中就是在循环体内部使用CAS操作或版本号机制来实现了乐观锁的行为。
 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN) // 如果当前线程池的状态 >= SHUTDOWN
                && (runStateAtLeast(c, STOP) // 并且当前线程池的状态 >= STOP
                    || firstTask != null // execute(command) 的command为null代表不携带任务
                    || workQueue.isEmpty())) // 任务队列为空
                return false; // 返回false代表创建线程失败
 
            for (;;) {
                if (workerCountOf(c) // 线程池的总线程数
                    // 在execute方法源码中,当当前线程数<核心线程数是addWorker的core=true
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) // 是否大于核心线程数或者总线程数
                    return false;
                // 当有多个线程同时执行compareAndIncrementWorkerCount方法时,只会有一个返回true
                if (compareAndIncrementWorkerCount(c)) // 先把总线程数+1再去创建线程,ctl是原子类型的,所以这样不会出现线程安全的问题
                    break retry; // 如果新建线程成功,就会跳出去取真正新建线程
                c = ctl.get();  // 新建线程失败的话刷新总线程数
                if (runStateAtLeast(c, SHUTDOWN)) // 判断线程池状态是否>=SHUTDOWN
                    continue retry; 
                // else CAS failed due to workerCount change; retry inner loop
                // 否则继续执行该循环
            }
        }
 
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
 
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        workerAdded = true;
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

       比如我们设置核心线程数为10,但是现在只创建了9个核心线程,如果此时有 2 个以上线程同时执行了 executor.execute(task) 方法,那么在 execute 中会先判断当前线程总数是否大于核心线程数,因为这些线程是同时执行的,所以都返回 9 ,那么 addWorker 的 core 参数就是 true,进入 addWorker 方法,这些线程会都先给总线程数 +1 再去创建线程,而不是先去创建线程再给总线程数 + 1,因为 addWorker 的参数 core = true(代表创建一个核心线程),那么如果都去创建核心线程就会使得核心线程数超过设置的值。

       所以第一个抢先给 ctl + 1 的线程会真正创建一个核心线程(ctl 是原子性的),而其它线程只能被跳出循环重新执行,这样当第二次执行循环时,就会发现此时的核心线程数已经达到了,就会返回 false,代表创建核心线程失败。

总结

       所以我们需要知道,在 Java 的线程池中,它并不是上来就把所有核心线程都开启,而是需要的时候才会开启;此外,当核心线程都忙碌的时候,新来的任务并不会去创建非核心线程,而是优先放到一个任务队列当中去,只有当任务队列满了(这里用的 LinkedBlockingQueue 是链表结构,所以不会满)才会去创建一个新的非核心队列。

相关文章
|
2月前
|
前端开发 Java 关系型数据库
基于Java+Springboot+Vue开发的鲜花商城管理系统源码+运行
基于Java+Springboot+Vue开发的鲜花商城管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的鲜花商城管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。技术学习共同进步
204 7
|
2月前
|
消息中间件 算法 安全
JUC并发—1.Java集合包底层源码剖析
本文主要对JDK中的集合包源码进行了剖析。
|
2月前
|
人工智能 安全 Java
智慧工地源码,Java语言开发,微服务架构,支持分布式和集群部署,多端覆盖
智慧工地是“互联网+建筑工地”的创新模式,基于物联网、移动互联网、BIM、大数据、人工智能等技术,实现对施工现场人员、设备、材料、安全等环节的智能化管理。其解决方案涵盖数据大屏、移动APP和PC管理端,采用高性能Java微服务架构,支持分布式与集群部署,结合Redis、消息队列等技术确保系统稳定高效。通过大数据驱动决策、物联网实时监测预警及AI智能视频监控,消除数据孤岛,提升项目可控性与安全性。智慧工地提供专家级远程管理服务,助力施工质量和安全管理升级,同时依托可扩展平台、多端应用和丰富设备接口,满足多样化需求,推动建筑行业数字化转型。
97 5
|
2月前
|
存储 Java
【源码】【Java并发】【ThreadLocal】适合中学者体质的ThreadLocal源码阅读
前言 下面,跟上主播的节奏,马上开始ThreadLocal源码的阅读( ̄▽ ̄)" 内部结构 如下图所示,我们可以知道,每个线程,都有自己的threadLocals字段,指向ThreadLocalMap
414 81
【源码】【Java并发】【ThreadLocal】适合中学者体质的ThreadLocal源码阅读
|
2月前
|
前端开发 Java 物联网
智慧班牌源码,采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署
智慧班牌系统是一款基于信息化与物联网技术的校园管理工具,集成电子屏显示、人脸识别及数据交互功能,实现班级信息展示、智能考勤与家校互通。系统采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署与私有化定制。核心功能涵盖信息发布、考勤管理、教务处理及数据分析,助力校园文化建设与教学优化。其综合性和可扩展性有效打破数据孤岛,提升交互体验并降低管理成本,适用于日常教学、考试管理和应急场景,为智慧校园建设提供全面解决方案。
269 70
|
28天前
|
JavaScript Java 关系型数据库
家政系统源码,java版本
这是一款基于SpringBoot后端框架、MySQL数据库及Uniapp移动端开发的家政预约上门服务系统。
家政系统源码,java版本
|
11天前
|
存储 安全 Java
Java 集合面试题从数据结构到 HashMap 源码剖析详解及长尾考点梳理
本文深入解析Java集合框架,涵盖基础概念、常见集合类型及HashMap的底层数据结构与源码实现。从Collection、Map到Iterator接口,逐一剖析其特性与应用场景。重点解读HashMap在JDK1.7与1.8中的数据结构演变,包括数组+链表+红黑树优化,以及put方法和扩容机制的实现细节。结合订单管理与用户权限管理等实际案例,展示集合框架的应用价值,助你全面掌握相关知识,轻松应对面试与开发需求。
68 3
|
1月前
|
供应链 JavaScript 前端开发
Java基于SaaS模式多租户ERP系统源码
ERP,全称 Enterprise Resource Planning 即企业资源计划。是一种集成化的管理软件系统,它通过信息技术手段,将企业的各个业务流程和资源管理进行整合,以提高企业的运营效率和管理水平,它是一种先进的企业管理理念和信息化管理系统。 适用于小微企业的 SaaS模式多租户ERP管理系统, 采用最新的技术栈开发, 让企业简单上云。专注于小微企业的应用需求,如企业基本的进销存、询价,报价, 采购、销售、MRP生产制造、品质管理、仓库库存管理、财务应收付款, OA办公单据、CRM等。
127 23
|
2月前
|
Java
【源码】【Java并发】【ReentrantLock】适合中学者体质的ReentrantLock源码阅读
因为本文说的是ReentrantLock源码,因此会默认,大家对AQS有基本的了解(比如同步队列、条件队列大概> 长啥样?)。 不懂AQS的小朋友们,你们好呀!也欢迎先看看这篇
89 13
【源码】【Java并发】【ReentrantLock】适合中学者体质的ReentrantLock源码阅读
|
2月前
|
Java
【源码】【Java并发】【ConcurrentHashMap】适合中学体质的ConcurrentHashMap
本文深入解析了ConcurrentHashMap的实现原理,涵盖JDK 7与JDK 8的区别、静态代码块、构造方法、put/get/remove核心方法等。JDK 8通过Node数组+链表/红黑树结构优化并发性能,采用CAS和synchronized实现高效锁机制。文章还详细讲解了hash计算、表初始化、扩容协助及计数更新等关键环节,帮助读者全面掌握ConcurrentHashMap的工作机制。
81 6
【源码】【Java并发】【ConcurrentHashMap】适合中学体质的ConcurrentHashMap

推荐镜像

更多
  • DNS