Java 线程池源码解析(2)

简介: Java 线程池源码解析

Java 线程池源码解析(1)https://developer.aliyun.com/article/1534179

因为 while 中的条件均为 false,所以 runWorker 会先执行下面的 completedAbruptly = false; 然后执行 finally 中的 processWorkerExit(w, completedAbruptly); 这里的 processWorkerExit 方法正是回收线程的关键所在,如果没有它,我们的所有空闲线程就都将被回收,所以这个方法中就定义了回收的逻辑:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
 
        tryTerminate();
 
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // 线程直接结束
            }
            addWorker(null, false); // 本线程结束,再新建一个线程
        }
    }

比如现在我们一共有 11 个线程(10个核心线程),那么 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; (暂不考虑 allowCoreThreadTimeOut)的结果就是 10,所以这个 11 个线程在执行 if (workerCountOf(c) >= min) 的时候,第一个执行该语句的线程会直接 return,之后执行该判断语句的 10 个线程由于不满足条件,但是自己又存活不了,所以在自己结束之前,会再创建一个没有初始任务的新线程,这样就保证了线程池中的线程数总是不低于核心线程数的。

2.2、线程池五大状态变化源码

下面是 ThreadPoolExecutor 源码中对于常量 ctl 的解释:

       主池控制状态ctl是一个原子整数,包含两个概念字段workerCount,表示线程的有效数量runState,表示是否正在运行、关闭等。

       为了将它们打包为一个整数,我们将workerCounts限制为(2^29)-1(约5亿)个线程,而不是(2^31)-1(20亿)个其他可表示的线程。如果这在未来是一个问题,可以将变量更改为AtomicLong,并调整下面的移位/掩码常数。但是,在需要之前,使用int,此代码会更快、更简单。workerCount是允许启动但不允许停止的工人数量。该值可能与实际活动线程数暂时不同,例如,当ThreadFactory在被请求时未能创建线程,以及当退出的线程在终止前仍在执行记账时。用户可见的池大小报告为工作者集的当前大小。

       runState提供了主要的生命周期控制,取值为:

  • RUNNING:正常接受新任务和处理排队的任务
  • SHUTDOWN:不接受新任务,但处理排队任务
  • STOP:不接受新建任务,不处理排队任务,并中断正在进行的任务
  • TIDING:所有任务都已终止,workerCount为零,转换到状态的线程TIDING将运行terminated()钩子方法terminated:terminated。

       runState随时间单调增加,但不需要达到每个状态。转换为:RUNNING->SHUTDOWN On invocation of SHUTDOWN()(RUNNING或SHUTDOWN)->STOP On invocationof shutdownNow()SHUTDOWN->TIDING当队列和池都为空时STOP->TIDIING当池为空时TIDIING->TERMINATED当TERMINATED()钩子方法完成时等待awaitTermination()的线程将在状态达到TERMINATED时返回。检测从SHUTDOWN到TIDING的转换并不像您希望的那样简单,因为队列在非空之后可能会变空,反之亦然,但我们只能在看到它是空的之后,看到workerCount为0时终止(这有时需要重新检查——见下文)

线程池的五大状态从源码中可以看到:

这里的 COUNT_MASK 代表的是线程池的最大数量,也就是 2 的 29 次方 -1 个。

Integer.SIZE为32,所以COUNT_BITS为29,最终各个状态对应的二级制为:

  1. RUNNING:11100000 00000000 00000000 00000000
  2. SHUTDOWN:00000000 00000000 00000000 00000000
  3. STOP:00100000 00000000 00000000 00000000
  4. TIDYING:01000000 00000000 00000000 00000000
  5. TERMINATED:01100000 00000000 00000000 00000000

       所以,只需要使用一个Integer数字的最高三个bit,就可以表示5种线程池的状态,而剩下的29个bit就可以用来表示工作线程数,比如,假如 ctl 为:11100000 00000000 00000000 00001010,就表示线程池的状态为RUNNING,线程池中目前在工作的线程有10个,这里说的“在工作”意思是线程活着,要么在执行任务,要么在阻塞等待任务。

       把线程状态放到高3位,把线程数量放到剩下的29位的好处就是方便之后频繁修改线程个数。

下面是线程状态的转换关系:

2.2.1、Thread.interrupt 线程中断
public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    System.out.println("线程 t1 在执行任务");
                }
            }
        });
 
        t1.start();
 
        Thread.sleep(3000);
 
        t1.interrupt();
 
        System.out.println("线程 t1 被阻塞了");
    }

       事实上,上面的代码并不会真正中断线程 t1 ,因为 interrupt 只是一个信号,线程要不要停掉还是取决于它自己。所以要想真正打断线程,需要线程自己判断:

Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!Thread.interrupted()){ // 如果线程未被中断
                    System.out.println("线程 t1 在执行任务");
                }
            }
        });

这样才能真正的关闭线程,否则 interrupt 方法没有意义。

2.2.2、线程池的状态转换

       线程池的 shutdown 方法会把线程池状态设置为 SHUTDOWN ,但是 SHUTDOWN 状态并不会立即关闭线程池中的所有线程,而是会等任务执行完毕再关闭线程,毕竟直接关闭线程可能会出现一些安全问题。

       如果希望立即关闭线程,需要调用 shutdownNow方法,这个方法会把线程池的状态设置为 STOP

这里的 onShutdown 方法是个空的(它是提供给子类用的),这里不需要关心。,而下面的 tryTerminate 才会对线程池的状态进行修改,这才是我们需要关心的:

同样,这里的 terminate 方法其实也只是一个空方法,它也是留给子类在需要时使用的,就相当于一个扩展机制,以后要是再 TIDYING 状态之后希望对线程池进行什么操作,就重写它就可以了。

上面代码中的 isRunning 方法也特别简单:

只要当前的线程池状态小于 SHUTDOWN 就是 RUNNING 状态。

2.2.3、addWorker 再解析

首先了解乐观锁:

  • Java提供了如AtomicInteger、AtomicLong、AtomicReference等原子类来支持CAS操作,这些类位于java.util.concurrent.atomic包下。
  • 下面的源码中就是在循环体内部使用CAS操作或版本号机制来实现了乐观锁的行为。
 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN) // 如果当前线程池的状态 >= SHUTDOWN
                && (runStateAtLeast(c, STOP) // 并且当前线程池的状态 >= STOP
                    || firstTask != null // execute(command) 的command为null代表不携带任务
                    || workQueue.isEmpty())) // 任务队列为空
                return false; // 返回false代表创建线程失败
 
            for (;;) {
                if (workerCountOf(c) // 线程池的总线程数
                    // 在execute方法源码中,当当前线程数<核心线程数是addWorker的core=true
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) // 是否大于核心线程数或者总线程数
                    return false;
                // 当有多个线程同时执行compareAndIncrementWorkerCount方法时,只会有一个返回true
                if (compareAndIncrementWorkerCount(c)) // 先把总线程数+1再去创建线程,ctl是原子类型的,所以这样不会出现线程安全的问题
                    break retry; // 如果新建线程成功,就会跳出去取真正新建线程
                c = ctl.get();  // 新建线程失败的话刷新总线程数
                if (runStateAtLeast(c, SHUTDOWN)) // 判断线程池状态是否>=SHUTDOWN
                    continue retry; 
                // else CAS failed due to workerCount change; retry inner loop
                // 否则继续执行该循环
            }
        }
 
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
 
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        workerAdded = true;
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

       比如我们设置核心线程数为10,但是现在只创建了9个核心线程,如果此时有 2 个以上线程同时执行了 executor.execute(task) 方法,那么在 execute 中会先判断当前线程总数是否大于核心线程数,因为这些线程是同时执行的,所以都返回 9 ,那么 addWorker 的 core 参数就是 true,进入 addWorker 方法,这些线程会都先给总线程数 +1 再去创建线程,而不是先去创建线程再给总线程数 + 1,因为 addWorker 的参数 core = true(代表创建一个核心线程),那么如果都去创建核心线程就会使得核心线程数超过设置的值。

       所以第一个抢先给 ctl + 1 的线程会真正创建一个核心线程(ctl 是原子性的),而其它线程只能被跳出循环重新执行,这样当第二次执行循环时,就会发现此时的核心线程数已经达到了,就会返回 false,代表创建核心线程失败。

总结

       所以我们需要知道,在 Java 的线程池中,它并不是上来就把所有核心线程都开启,而是需要的时候才会开启;此外,当核心线程都忙碌的时候,新来的任务并不会去创建非核心线程,而是优先放到一个任务队列当中去,只有当任务队列满了(这里用的 LinkedBlockingQueue 是链表结构,所以不会满)才会去创建一个新的非核心队列。

相关文章
|
4天前
|
Java API 调度
深入解析Java线程状态与生命周期
深入解析Java线程状态与生命周期
10 1
|
4天前
|
Java API
深入探讨 Java 8 集合操作:全面解析 Stream API 的强大功能
深入探讨 Java 8 集合操作:全面解析 Stream API 的强大功能
14 2
|
1天前
|
存储 算法 Java
Java Set深度解析:为何它能成为“无重复”的代名词?
【6月更文挑战第17天】Java Set实现无重复元素原理:HashSet利用哈希表(HashMap基础),通过hashCode()和equals()检查元素唯一性;TreeSet基于红黑树保持元素排序和唯一。选择合适的Set类(HashSet、TreeSet、LinkedHashSet)并正确实现对象的hashCode()和equals()是关键。示例代码展示了HashSet的去重功能。
|
1天前
|
Java
【Java】Object类简单解析
【Java】Object类简单解析
4 1
|
1天前
|
Java 开发者 C++
Java面向对象的终极挑战:抽象类与接口的深度解析!
【6月更文挑战第17天】在Java OOP中,抽象类和接口助力代码复用与扩展。抽象类不可实例化,提供通用框架,适合继承;接口包含纯抽象方法,支持多态与松耦合。选择抽象类用于继承已有方法和状态,接口则适用于不相关类共享行为。Java 8后接口能含默认方法,增加设计灵活性。抽象类与接口常结合使用,以实现最佳设计,如`Shape`抽象类实现`Drawable`和`Selectable`接口,展现两者协同优势。理解和熟练运用这对概念是提升代码质量的关键。
|
2天前
|
存储 缓存 算法
滚雪球学Java(62):HashSet的底层实现原理解析
【6月更文挑战第16天】🏆本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
11 3
滚雪球学Java(62):HashSet的底层实现原理解析
|
3天前
|
Java 开发者 UED
Java中的异常处理机制深度解析
在Java编程中,异常处理是确保软件健壮性的关键因素。本文将深入探讨Java的异常处理机制,包括异常的类型、如何捕获和处理异常,以及最佳实践。我们将通过实例学习如何优雅地处理异常,避免常见的陷阱,并提升代码的可维护性和用户体验。
|
3天前
|
存储 Java 测试技术
滚雪球学Java(61):从源码角度解读Java Set接口底层实现原理
【6月更文挑战第15天】🏆本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
13 1
滚雪球学Java(61):从源码角度解读Java Set接口底层实现原理
|
4天前
|
搜索推荐 Java 程序员
【案例解析】从菜鸟到高手,如何优雅地使用Java条件语句?
【6月更文挑战第14天】本文介绍了Java编程中条件语句的重要性,特别是if-else和switch的使用。通过四个案例,展示了如何优雅地运用这些语句:案例一演示了基础if-else用于用户登录响应;案例二利用switch处理枚举类型,如订单状态;案例三展示了条件语句的嵌套与组合,用于游戏评分系统;案例四探讨了代码优化与重构,减少冗长的if-else结构。文章强调,掌握条件语句能提升编码效率和代码质量,鼓励开发者不断实践以写出高效优雅的代码。
|
4天前
|
Java 开发者
别再傻傻分不清!Java if-else与switch的性能对比全解析!
【6月更文挑战第14天】本文探讨了Java中if-else与switch语句的性能异同。虽然现代JVM的优化使得两者性能差异不大,但特定情况下仍有区别。switch通过跳转表提供高效执行,尤其适用于枚举和固定值,而if-else依赖条件顺序,JVM可能优化常量条件。实验显示,处理大量重复case时,switch性能更优。选择时还需考虑可读性和维护性,灵活运用以实现高效优雅的代码。

热门文章

最新文章

推荐镜像

更多