彻底弄懂 Java 线程池原理

简介: 概述这篇文章是我在阅读源码时整理的一些笔记,对源码的关键点进行了比较详细的注释,然后加上一些自己对线程池机制的理解。最终目的是要弄清楚下面这些问题:线程池有 execute() 和 submit() 方法,执行机制分别是什么? 如何新建线程? 任务如何执行? 线程如何销毁?超时机制如何实现?首先需要介绍一下线程池的两个重要成员:ctlAtomInteger 类型。

概述

这篇文章是我在阅读源码时整理的一些笔记,对源码的关键点进行了比较详细的注释,然后加上一些自己对线程池机制的理解。最终目的是要弄清楚下面这些问题:

  • 线程池有 execute() 和 submit() 方法,执行机制分别是什么?
  • 如何新建线程?
  • 任务如何执行?
  • 线程如何销毁?超时机制如何实现?

首先需要介绍一下线程池的两个重要成员:

ctl

AtomInteger 类型。高3位存储线程池状态,低29位存储当前线程数量。workerCountOf(c) 返回当前线程数量。runStateOf(c) 返回当前线程池状态。 线程池有如下状态:

  • RUNNING:接收新任务,处理队列任务。
  • SHUTDOWN:不接收新任务,但处理队列任务。
  • STOP:不接收新任务,也不处理队列任务,并且中断所有处理中的任务。
  • TIDYING:所有任务都被终结,有效线程为0。会触发terminated()方法。
  • TERMINATED:当terminated()方法执行结束。

Worker

这个线程在线程池中的包装类。一个 Worker 代表一个线程。线程池用一个 HashSet 管理这些线程。

需要注意的是,Worker 本身并不区分核心线程和非核心线程,核心线程只是概念模型上的叫法,特性是依靠对线程数量的判断来实现的 Worker 特性如下:

  • 继承自 AQS,本身实现了一个最简单的不公平的不可重入锁。
  • 构造方法传入 Runnable,代表第一个执行的任务,可以为空。构造方法中新建一个线程。
  • 实现了 Runnable 接口,在新建线程时传入 this。因此线程启动时,会执行 Worker 本身的 run 方法。
  • run 方法调用了 ThreadPoolExecutor 的 runWorker 方法,负责实际执行任务。

submit() 方法的执行机制

submit 返回一个 Future 对象,我们可以调用其 get 方法获取任务执行的结果。代码很简单,就是将 Runnable 包装成 FutureTask 而已。可以看到,最终还是调用 Execute 方法:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
复制代码

FutureTask 的代码就不贴了,简述一下原理:

  • FutureTask 实现了 RunnableFuture 接口,RunnableFuture 继承自Runnable。执行任务时会调用 FutureTask 的 run 方法,run 方法中执行真正的任务代码,执行完后调用 set 方法设置结果。
  • 如果任务执行完毕,get 方法会直接返回结果,如果没有,get 方法会阻塞并等待结果。
  • set 方法中设置结果后会取消阻塞,使 get 方法返回结果。

execute() 方法的执行机制

这个机制大家应该都很熟了,再简述一遍:

  1. 工作线程数小于核心线程数时,直接新建核心线程执行任务;
  2. 大于核心线程数时,将任务添加进等待队列;
  3. 队列满时,创建非核心线程执行任务;
  4. 工作线程数大于最大线程数时,拒绝任务

具体的代码分析如下:

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //小于核心线程数
    if (addWorker(command, true)) //启动核心线程并执行任务
        return;
    c = ctl.get(); //执行失败时重新获取值
}
if (isRunning(c) && workQueue.offer(command)) { //检查运行状态并将任务添加到队列
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command)) //重新检查,防止状态有变化。如果有,移出队列并拒绝任务
        reject(command);
    else if (workerCountOf(recheck) == 0) //如果线程数为0,创建非核心线程,第一个参数为空时会从队列中取任务执行
        addWorker(null, false);
}
else if (!addWorker(command, false)) //添加到队列失败,说明队列已满,创建非核心线程执行任务
    reject(command); //执行失败说明达到最大线程数,拒绝任务
复制代码

新任务如何添加进队列?

线程池使用 addWorker 方法新建线程,第一个参数代表要执行的任务,线程会将这个任务执行完毕后再从队列取任务执行。第二参数是核心线程的标志,它并不是 Worker 本身的属性,在这里只用来判断工作线程数量是否超标。

这个方法可以分成两部分,第一部分进行一些前置判断,并使用循环 CAS 结构将线程数量加1。代码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry: //这个语法不常用,用于给外层 for 循环命名。方便嵌套 for 循环中,break 和 continue 指定是外层还是内层循环
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // firstTask 不为空代表这个方法用于添加任务,为空代表新建线程。SHUTDOWN 状态下不接受新任务,但处理队列中的任务。这就是第二个判断的逻辑。
        if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;
        
        // 使用循环 CAS 自旋,增加线程数量直到成功为止
        for (;;) {
        int wc = workerCountOf(c);
        //判断是否超过线程容量
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        //使用 CAS 将线程数量加1
        if (compareAndIncrementWorkerCount(c))
            break retry;
        //修改不成功说明线程数量有变化
        //重新判断线程池状态,有变化时跳到外层循环重新获取线程池状态
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
        //到这里说明状态没有变化,重新尝试增加线程数量
        }
    }
    ... ...
}
复制代码

第二部分负责新建并启动线程,并将 Worker 添加至 Hashset 中。代码很简单,没什么好注释的,用了 ReentrantLock 确保线程安全。

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s; //这个参数是测试用的,不用管它
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w); //添加失败时移除 Worker 并将线程数量减 1
}
return workerStarted;
}
复制代码

任务如何执行?

在 addWorker 方法中,线程会被启动。新建线程时,Worker 将自身传入,所以线程启动后会执行 Worker 的 run 方法,这个方法调用了 ThreadPoolExecutor 的 runWorker 方法执行任务,runWorker 中会循环取任务执行,执行逻辑如下:

  • 如果 firstTask 不为空,先执行 firstTask,执行完毕后置空;
  • firstTask 为空后调用 getTask() 从队列中取任务执行;
  • 一直执行到没有任务后,退出 while 循环
  • 调用 processWorkerExit() 方法,将 Worker 移除出 HashSet,此时线程执行完毕,也不再被引用,会自动销毁。

具体代码分析如下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    //task 为我们传给 execute 的任务。task 为空时从队列中取任务执行
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //这段逻辑非常绕。实际上它实现了以下逻辑:
            //1.如果线程池已停止且线程未中断,条件成立,中断线程
            //2.如果线程池未停止,线程为中断状态,将线程状态重置,并重新进行1的判断
            //3.如果线程池未停止,线程不为中断状态,条件不成立
            //Thread.interrupted() 会重置中断状态,保证
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            //beforeExecute 和 afterExecute 为空方法,交给子类实现
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run(); //执行任务
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //执行到这里时说明线程执行完毕,此方法将线程从 HashSet 中移出。线程终止且没有引用,会被自动回收。
        processWorkerExit(w, completedAbruptly);
    }
}
复制代码

线程如何销毁?超时机制如何实现?

在 runWorker 方法中 getTask 方法返回 null 之后会导致线程执行完毕,被移除出 HashSet,从而被系统销毁。 线程的超时机制也是在这个方法实现的,借助于 BlockingQueue 的 poll 和 take 方法。

  • poll 方法可以设置一个超时时间,当队列为空时,在此时间内阻塞等待,超时后返回 null
  • take 方法在队列为空时直接抛出异常

超时机制实现原理如下:

  • 当 allowCoreThreadTimeOut 为 true,所有线程都会超时,全部调用 poll 方法,传入 keepAliveTime 参数。
  • 当 allowCoreThreadTimeOut 为 false 时,如果工作线程数量大于核心线程数,将此线程当作非核心线程处理,调用 poll 方法
  • 当 allowCoreThreadTimeOut 为 false 且工作线程数量小于等于核心线程数时,将此线程当作核心线程处理,调用 take 方法,队列为空时抛出异常,进入下一次循环。如果队列一直为空,核心线程会一直在此循环等待任务进行处理。

具体代码如下:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
    
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
    
        int wc = workerCountOf(c);
    
        // 允许核心线程超时或者线程数大于核心线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
        // timed && timedOut 这两个参数结合起来控制超时机制
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
    
        try {
            // 队列为空时,poll 方法会阻塞等待,超过 keepAliveTime 时返回空值。take 方法会直接返回异常。
            // 当 allowCoreThreadTimeOut 为 true 时,核心线程和非核心线程没有区别,一律调用poll方法
            // 当 allowCoreThreadTimeOut 为 false 时,线程数量超过核心线程数才会进入超时机制,如果不超过,则将当前线程当作核心线程处理,调用 take,抛出异常后进入下一次循环。如果队列为空,此处会一直循环。
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
复制代码

 

 

java方面的基础是Android面试必问的。关于这些所有的原理和内核技术 我们这边也有系统的资料。如果有需要的同学可以加Android进阶群免费获取;701740775 请备注csdn

相关文章
|
17天前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
142 60
【Java并发】【线程池】带你从0-1入门线程池
|
8天前
|
存储 缓存 安全
【原理】【Java并发】【volatile】适合初学者体质的volatile原理
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是写出高端的CRUD应用。2025年,我正在沉淀自己,博客更新速度也在加快。在这里,我会分享关于Java并发编程的深入理解,尤其是volatile关键字的底层原理。 本文将带你深入了解Java内存模型(JMM),解释volatile如何通过内存屏障和缓存一致性协议确保可见性和有序性,同时探讨其局限性及优化方案。欢迎订阅专栏《在2B工作中寻求并发是否搞错了什么》,一起探索并发编程的奥秘! 关注我,点赞、收藏、评论,跟上更新节奏,让我们共同进步!
80 8
【原理】【Java并发】【volatile】适合初学者体质的volatile原理
|
2天前
|
消息中间件 Java 应用服务中间件
JVM实战—1.Java代码的运行原理
本文介绍了Java代码的运行机制、JVM类加载机制、JVM内存区域及其作用、垃圾回收机制,并汇总了一些常见问题。
JVM实战—1.Java代码的运行原理
|
1天前
|
存储 缓存 人工智能
【原理】【Java并发】【synchronized】适合中学者体质的synchronized原理
本文深入解析了Java中`synchronized`关键字的底层原理,从代码块与方法修饰的区别到锁升级机制,内容详尽。通过`monitorenter`和`monitorexit`指令,阐述了`synchronized`实现原子性、有序性和可见性的原理。同时,详细分析了锁升级流程:无锁 → 偏向锁 → 轻量级锁 → 重量级锁,结合对象头`MarkWord`的变化,揭示JVM优化锁性能的策略。此外,还探讨了Monitor的内部结构及线程竞争锁的过程,并介绍了锁消除与锁粗化等优化手段。最后,结合实际案例,帮助读者全面理解`synchronized`在并发编程中的作用与细节。
19 8
|
4月前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
136 2
|
13天前
|
Java 调度
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
当我们创建一个`ThreadPoolExecutor`的时候,你是否会好奇🤔,它到底发生了什么?比如:我传的拒绝策略、线程工厂是啥时候被使用的? 核心线程数是个啥?最大线程数和它又有什么关系?线程池,它是怎么调度,我们传入的线程?...不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界...
81 0
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
|
1月前
|
安全 Java 开发者
【JAVA】封装多线程原理
Java 中的多线程封装旨在简化使用、提高安全性和增强可维护性。通过抽象和隐藏底层细节,提供简洁接口。常见封装方式包括基于 Runnable 和 Callable 接口的任务封装,以及线程池的封装。Runnable 适用于无返回值任务,Callable 支持有返回值任务。线程池(如 ExecutorService)则用于管理和复用线程,减少性能开销。示例代码展示了如何实现这些封装,使多线程编程更加高效和安全。
|
1月前
|
存储 算法 Java
【JAVA】生成accessToken原理
在Java中,生成accessToken用于身份验证和授权,确保合法用户访问受保护资源。流程包括:1. 身份验证(如用户名密码、OAuth 2.0);2. 生成唯一且安全的令牌;3. 设置令牌有效期并存储;4. 客户端传递令牌,服务器验证其有效性。常见场景为OAuth 2.0协议,涉及客户端注册、用户授权、获取授权码和换取accessToken。示例代码展示了使用Apache HttpClient库模拟OAuth 2.0获取accessToken的过程。
|
2月前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
128 17
|
2月前
|
Java Linux 调度
硬核揭秘:线程与进程的底层原理,面试高分必备!
嘿,大家好!我是小米,29岁的技术爱好者。今天来聊聊线程和进程的区别。进程是操作系统中运行的程序实例,有独立内存空间;线程是进程内的最小执行单元,共享内存。创建进程开销大但更安全,线程轻量高效但易引发数据竞争。面试时可强调:进程是资源分配单位,线程是CPU调度单位。根据不同场景选择合适的并发模型,如高并发用线程池。希望这篇文章能帮你更好地理解并回答面试中的相关问题,祝你早日拿下心仪的offer!
52 6

热门文章

最新文章