101. 熟悉 Java 并发吗,谈谈对 JUC 线程池 ThreadPoolExecutor 的认识吧(二)

简介: 101. 熟悉 Java 并发吗,谈谈对 JUC 线程池 ThreadPoolExecutor 的认识吧(二)

101. 熟悉 Java 并发吗,谈谈对 JUC 线程池 ThreadPoolExecutor 的认识吧(二)


execute方法源码分析

线程池异步执行任务的方法实现是ThreadPoolExecutor#execute(),源码如下:

// 执行命令,其中命令(下面称任务)对象是Runnable的实例
public void execute(Runnable command) {
    // 判断命令(任务)对象非空
    if (command == null)
        throw new NullPointerException();
    // 获取ctl的值
    int c = ctl.get();
    // 判断如果当前工作线程数小于核心线程数,则创建新的核心线程并且执行传入的任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            // 如果创建新的核心线程成功则直接返回
            return;
        // 这里说明创建核心线程失败,需要更新ctl的临时变量c
        c = ctl.get();
    }
    // 走到这里说明创建新的核心线程失败,也就是当前工作线程数大于等于corePoolSize
    // 判断线程池是否处于运行中状态,同时尝试用非阻塞方法向任务队列放入任务(放入任务失败返回false)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 这里是向任务队列投放任务成功,对线程池的运行中状态做二次检查
        // 如果线程池二次检查状态是非运行中状态,则从任务队列移除当前的任务调用拒绝策略处理之(也就是移除前面成功入队的任务实例)
        if (! isRunning(recheck) && remove(command))
            // 调用拒绝策略处理任务 - 返回
            reject(command);
        // 走到下面的else if分支,说明有以下的前提:
        // 0、待执行的任务已经成功加入任务队列
        // 1、线程池可能是RUNNING状态
        // 2、传入的任务可能从任务队列中移除失败(移除失败的唯一可能就是任务已经被执行了)
        // 如果当前工作线程数量为0,则创建一个非核心线程并且传入的任务对象为null - 返回
        // 也就是创建的非核心线程不会马上运行,而是等待获取任务队列的任务去执行
        // 如果前工作线程数量不为0,原来应该是最后的else分支,但是可以什么也不做,因为任务已经成功入队列,总会有合适的时机分配其他空闲线程去执行它
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 走到这里说明有以下的前提:
    // 0、线程池中的工作线程总数已经大于等于corePoolSize(简单来说就是核心线程已经全部懒创建完毕)
    // 1、线程池可能不是RUNNING状态
    // 2、线程池可能是RUNNING状态同时任务队列已经满了
    // 如果向任务队列投放任务失败,则会尝试创建非核心线程传入任务执行
    // 创建非核心线程失败,此时需要拒绝执行任务
    else if (!addWorker(command, false))
        // 调用拒绝策略处理任务 - 返回
        reject(command);
}

这里简单分析一下整个流程:

如果当前工作线程总数小于corePoolSize,则直接创建核心线程执行任务(任务实例会传入直接用于构造工作线程实例)。

如果当前工作线程总数大于等于corePoolSize,判断线程池是否处于运行中状态,同时尝试用非阻塞方法向任务队列放入任务,这里会二次检查线程池运行状态,如果当前工作线程数量为0,则创建一个非核心线程并且传入的任务对象为null。

如果向任务队列投放任务失败(任务队列已经满了),则会尝试创建非核心线程传入任务实例执行。

如果创建非核心线程失败,此时需要拒绝执行任务,调用拒绝策略处理任务。

这里是一个疑惑点:为什么需要二次检查线程池的运行状态,当前工作线程数量为0,尝试创建一个非核心线程并且传入的任务对象为null?这个可以看API注释:

如果一个任务成功加入任务队列,我们依然需要二次检查是否需要添加一个工作线程(因为所有存活的工作线程有可能在最后一次检查之后已经终结)或者执行当前方法的时候线程池是否已经shutdown了。所以我们需要二次检查线程池的状态,必须时把任务从任务队列中移除或者在没有可用的工作线程的前提下新建一个工作线程。

任务提交流程从调用者的角度来看如下:

addWorker方法源码分析

boolean addWorker(Runnable firstTask, boolean core)方法的第一的参数可以用于直接传入任务实例,第二个参数用于标识将要创建的工作线程是否核心线程。方法源码如下:

// 添加工作线程,如果返回false说明没有新创建工作线程,如果返回true说明创建和启动工作线程成功
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 注意这是一个死循环 - 最外层循环
    for (int c = ctl.get();;) {
        // 这个是十分复杂的条件,这里先拆分多个与(&&)条件:
        // 1. 线程池状态至少为SHUTDOWN状态,也就是rs >= SHUTDOWN(0)
        // 2. 线程池状态至少为STOP状态,也就是rs >= STOP(1),或者传入的任务实例firstTask不为null,或者任务队列为空
        // 其实这个判断的边界是线程池状态为shutdown状态下,不会再接受新的任务,在此前提下如果状态已经到了STOP、或者传入任务不为空、或者任务队列为空(已经没有积压任务)都不需要添加新的线程
        if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) {
            return false;
        }
        // 注意这也是一个死循环 - 二层循环
        for (;;) {
            // 这里每一轮循环都会重新获取工作线程数wc
            // 1. 如果传入的core为true,表示将要创建核心线程,通过wc和corePoolSize判断,如果wc >= corePoolSize,则返回false表示创建核心线程失败
            // 1. 如果传入的core为false,表示将要创非建核心线程,通过wc和maximumPoolSize判断,如果wc >= maximumPoolSize,则返回false表示创建非核心线程失败
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            // 成功通过CAS更新工作线程数wc,则break到最外层的循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 走到这里说明了通过CAS更新工作线程数wc失败,这个时候需要重新判断线程池的状态是否由RUNNING已经变为SHUTDOWN
            c = ctl.get();  // Re-read ctl
            // 如果线程池状态已经由RUNNING已经变为SHUTDOWN,则重新跳出到外层循环继续执行
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // 如果线程池状态依然是RUNNING,CAS更新工作线程数wc失败说明有可能是并发更新导致的失败,则在内层循环重试即可
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 标记工作线程是否启动成功
    boolean workerStarted = false;
    // 标记工作线程是否创建成功
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 传入任务实例firstTask创建Worker实例,Worker构造里面会通过线程工厂创建新的Thread对象,所以下面可以直接操作Thread t = w.thread
        // 这一步Worker实例已经创建,但是没有加入工作线程集合或者启动它持有的线程Thread实例
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 这里需要全局加锁,因为会改变一些指标值和非线程安全的集合
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                // 这里主要在加锁的前提下判断ThreadFactory创建的线程是否存活或者判断获取锁成功之后线程池状态是否已经更变为SHUTDOWN
                // 1. 如果线程池状态依然为RUNNING,则只需要判断线程实例是否存活,需要添加到工作线程集合和启动新的Worker
                // 2. 如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
                // 对于2,换言之,如果线程池处于SHUTDOWN状态下,同时传入的任务实例firstTask不为null,则不会添加到工作线程集合和启动新的Worker
                // 这一步其实有可能创建了新的Worker实例但是并不启动(临时对象,没有任何强引用),这种Worker有可能成功下一轮GC被收集的垃圾对象
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 把创建的工作线程实例添加到工作线程集合
                    workers.add(w);
                    int s = workers.size();
                    // 尝试更新历史峰值工作线程数,也就是线程池峰值容量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 这里更新工作线程是否启动成功标识为true,后面才会调用Thread#start()方法启动真实的线程实例
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
            if (workerAdded) {
                t.start();
                // 标记线程启动成功
                workerStarted = true;
            }
        }
    } finally {
        // 线程启动失败,需要从工作线程集合移除对应的Worker
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
// 添加Worker失败
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 从工作线程集合移除之
        if (w != null)
            workers.remove(w);
        // wc数量减1
        decrementWorkerCount();
        // 基于状态判断尝试终结线程池
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

笔者发现了Doug Lea大神十分喜欢复杂的条件判断,而且单行复杂判断不喜欢加花括号,像下面这种代码在他编写的很多类库中都比较常见:

if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) {
    return false;
}
// ....
//  代码拆分一下如下
boolean atLeastShutdown = runStateAtLeast(c, SHUTDOWN);     # rs >= SHUTDOWN(0)
boolean atLeastStop = runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty();
if (atLeastShutdown && atLeastStop){
   return false;
}

上面的分析逻辑中需要注意一点,Worker实例创建的同时,在其构造函数中会通过ThreadFactory创建一个Java线程Thread实例,后面会加锁后二次检查是否需要把Worker实例添加到工作线程集合workers中和是否需要启动Worker中持有的Thread实例,只有启动了Thread实例实例,Worker才真正开始运作,否则只是一个无用的临时对象。Worker本身也实现了Runnable接口,它可以看成是一个Runnable的适配器。

工作线程内部类Worker源码分析

线程池中的每一个具体的工作线程被包装为内部类Worker实例,Worker继承于AbstractQueuedSynchronizer(AQS),实现了Runnable接口:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;
    // 保存ThreadFactory创建的线程实例,如果ThreadFactory创建线程失败则为null
    final Thread thread;
    // 保存传入的Runnable任务实例
    Runnable firstTask;
    // 记录每个线程完成的任务总数
    volatile long completedTasks;
    // 唯一的构造函数,传入任务实例firstTask,注意可以为null
    Worker(Runnable firstTask) {
        // 禁止线程中断,直到runWorker()方法执行
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 通过ThreadFactory创建线程实例,注意一下Worker实例自身作为Runnable用于创建新的线程实例
        this.thread = getThreadFactory().newThread(this);
    }
    // 委托到外部的runWorker()方法,注意runWorker()方法是线程池的方法,而不是Worker的方法
    public void run() {
        runWorker(this);
    }
    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    //  是否持有独占锁,state值为1的时候表示持有锁,state值为0的时候表示已经释放锁
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    // 独占模式下尝试获取资源,这里没有判断传入的变量,直接CAS判断0更新为1是否成功,成功则设置独占线程为当前线程
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    // 独占模式下尝试是否资源,这里没有判断传入的变量,直接把state设置为0
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    // 加锁
    public void lock() {
        acquire(1);
    }
    // 尝试加锁
    public boolean tryLock() { 
        return tryAcquire(1);
    }
    // 解锁
    public void unlock() {
        release(1); 
    }
    // 是否锁定
    public boolean isLocked() {
        return isHeldExclusively(); 
    }
    // 启动后进行线程中断,注意这里会判断线程实例的中断标志位是否为false,只有中断标志位为false才会中断
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker的构造函数里面的逻辑十分重要,通过ThreadFactory创建的Thread实例同时传入Worker实例,因为Worker本身实现了Runnable,所以可以作为任务提交到线程中执行。只要Worker持有的线程实例w调用Thread#start()方法就能在合适时机执行Worker#run()。简化一下逻辑如下:

// addWorker()方法中构造
Worker worker = createWorker();
// 通过线程池构造时候传入
ThreadFactory threadFactory = getThreadFactory();
// Worker构造函数中
Thread thread = threadFactory.newThread(worker);
// addWorker()方法中启动
thread.start();

Worker继承自AQS,这里使用了AQS的独占模式,有个技巧是构造Worker的时候,把AQS的资源(状态)通过setState(-1)设置为-1,这是因为Worker实例刚创建时AQS中state的默认值为0,此时线程尚未启动,不能在这个时候进行线程中断,见Worker#interruptIfStarted()方法。Worker中两个覆盖AQS的方法tryAcquire()和tryRelease()都没有判断外部传入的变量,前者直接CAS(0,1),后者直接setState(0)。接着看核心方法ThreadPoolExecutor#runWorker():

final void runWorker(Worker w) {
    // 获取当前线程,实际上和Worker持有的线程实例是相同的
    Thread wt = Thread.currentThread();
    // 获取Worker中持有的初始化时传入的任务对象,这里注意存放在临时变量task中
    Runnable task = w.firstTask;
    // 设置Worker中持有的初始化时传入的任务对象为null
    w.firstTask = null;
    // 由于Worker初始化时AQS中state设置为-1,这里要先做一次解锁把state更新为0,允许线程中断
    w.unlock(); // allow interrupts
    // 记录线程是否因为用户异常终结,默认是true
    boolean completedAbruptly = true;
    try {
        // 初始化任务对象不为null,或者从任务队列获取任务不为空(从任务队列获取到的任务会更新到临时变量task中)
        // getTask()由于使用了阻塞队列,这个while循环如果命中后半段会处于阻塞或者超时阻塞状态,getTask()返回为null会导致线程跳出死循环使线程终结
        while (task != null || (task = getTask()) != null) {
            // Worker加锁,本质是AQS获取资源并且尝试CAS更新state由0更变为1
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 如果线程池正在停止(也就是由RUNNING或者SHUTDOWN状态向STOP状态变更),那么要确保当前工作线程是中断状态
            // 否则,要保证当前线程不是中断状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 钩子方法,任务执行前
                beforeExecute(wt, task);
                try {
                    task.run();
                    // 钩子方法,任务执行后 - 正常情况
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    // 钩子方法,任务执行后 - 异常情况
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                // 清空task临时变量,这个很重要,否则while会死循环执行同一个task
                task = null;
                // 累加Worker完成的任务数
                w.completedTasks++;
                // Worker解锁,本质是AQS释放资源,设置state为0
                w.unlock();
            }
        }
        // 走到这里说明某一次getTask()返回为null,线程正常退出
        completedAbruptly = false;
    } finally {
        // 处理线程退出,completedAbruptly为true说明由于用户异常导致线程非正常退出
        processWorkerExit(w, completedAbruptly);
    }
}

这里重点拆解分析一下判断当前工作线程中断状态的代码:

if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
    wt.interrupt();
// 先简化一下判断逻辑,如下
// 判断线程池状态是否至少为STOP,rs >= STOP(1)
boolean atLeastStop = runStateAtLeast(ctl.get(), STOP);
// 判断线程池状态是否至少为STOP,同时判断当前线程的中断状态并且清空当前线程的中断状态
boolean interruptedAndAtLeastStop = Thread.interrupted() && runStateAtLeast(ctl.get(), STOP);
if (atLeastStop || interruptedAndAtLeastStop && !wt.isInterrupted()){
    wt.interrupt();
}

Thread.interrupted()方法获取线程的中断状态同时会清空该中断状态,这里之所以会调用这个方法是因为在执行上面这个if逻辑同时外部有可能调用shutdownNow()方法,shutdownNow()方法中也存在中断所有Worker线程的逻辑,但是由于shutdownNow()方法中会遍历所有Worker做线程中断,有可能无法及时在任务提交到Worker执行之前进行中断,所以这个中断逻辑会在Worker内部执行,就是if代码块的逻辑。这里还要注意的是:STOP状态下会拒绝所有新提交的任务,不会再执行任务队列中的任务,同时会中断所有Worker线程。也就是,即使任务Runnable已经runWorker()中前半段逻辑取出,只要还没走到调用其Runnable#run(),都有可能被中断。假设刚好发生了进入if代码块的逻辑同时外部调用了shutdownNow()方法,那么if逻辑内会判断线程中断状态并且重置,那么shutdownNow()方法中调用的interruptWorkers()就不会因为中断状态判断出现问题导致二次中断线程(会导致异常)。

小结一下上面runWorker()方法的核心流程:

Worker先执行一次解锁操作,用于解除不可中断状态。

通过while循环调用getTask()方法从任务队列中获取任务(当然,首轮循环也有可能是外部传入的firstTask任务实例)。

如果线程池更变为STOP状态,则需要确保工作线程是中断状态并且进行中断处理,否则要保证工作线程必须不是中断状态。

执行任务实例Runnale#run()方法,任务实例执行之前和之后(包括正常执行完毕和异常执行情况)分别会调用钩子方法beforeExecute()和afterExecute()。

while循环跳出意味着runWorker()方法结束和工作线程生命周期结束(Worker#run()生命周期完结),会调用processWorkerExit()处理工作线程退出的后续工作。

接下来分析一下从任务队列中获取任务的getTask()方法和处理线程退出的后续工作的processWorkerExit()方法。

目录
相关文章
|
3天前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第20天】 在多核处理器日益普及的今天,并发编程成为了软件开发中不可忽视的重要话题。Java语言提供了丰富的并发工具和机制来帮助开发者构建高效且线程安全的应用程序。本文将探讨Java并发的核心概念,包括线程同步、锁机制、以及如何通过这些工具实现性能优化。我们将透过实例分析,揭示并发编程中的常见问题,并展示如何利用现代Java API来解决这些问题。
|
1天前
|
Java 调度
【JAVA学习之路 | 提高篇】线程的通信
【JAVA学习之路 | 提高篇】线程的通信
|
1天前
|
存储 Java
【JAVA学习之路 | 提高篇】线程安全问题及解决
【JAVA学习之路 | 提高篇】线程安全问题及解决
|
1天前
|
Java
【JAVA学习之路 | 提高篇】创建与启动线程之二(继承Thread类)(实现Runnable接口)
【JAVA学习之路 | 提高篇】创建与启动线程之二(继承Thread类)(实现Runnable接口)
|
1天前
|
Java 调度
【JAVA学习之路 | 提高篇】进程与线程(Thread)
【JAVA学习之路 | 提高篇】进程与线程(Thread)
|
1天前
|
Java 开发者
Java并发编程:理解线程同步和锁
【5月更文挑战第22天】本文将深入探讨Java并发编程的核心概念——线程同步和锁。我们将从基本的同步问题开始,逐步深入到更复杂的并发控制技术,包括可重入锁、读写锁以及Java并发工具库中的其他锁机制。通过理论与实例相结合的方式,读者将能够理解在多线程环境下如何保证数据的一致性和程序的正确性。
|
1天前
|
Java 程序员
Java中的多线程编程
本文将深入探讨Java中的多线程编程,包括线程的创建、启动、控制和同步等关键技术。我们将通过实例代码演示如何在Java中实现多线程,并讨论多线程编程的优势和挑战。
|
2天前
|
Java 容器
Java并发编程:深入理解线程池
【5月更文挑战第21天】 在多核处理器的普及下,并发编程成为了提高程序性能的重要手段。Java提供了丰富的并发工具,其中线程池是管理线程资源、提高系统响应速度和吞吐量的关键技术。本文将深入探讨线程池的核心原理、关键参数及其调优策略,并通过实例展示如何高效地使用线程池以优化Java应用的性能。
|
2天前
|
监控 算法 Java
Java并发编程:深入理解线程池
【5月更文挑战第21天】 在现代软件开发中,尤其是Java应用中,并发编程是一个不可忽视的重要领域。合理利用多线程可以显著提高程序的性能和响应速度。本文将深入探讨Java中的线程池机制,包括其工作原理、优势以及如何正确使用线程池来优化应用程序性能。通过分析线程池的核心参数配置,我们将了解如何根据不同的应用场景调整线程池策略,以期达到最佳的并发处理效果。
|
2天前
|
Java 调度 开发者
Java并发编程:深入理解线程池
【5月更文挑战第21天】本文旨在通过深入探讨Java并发编程的核心组件——线程池,为开发者提供对线程池的全面理解。我们将从线程池的基本概念、优势入手,逐步深入到线程池的核心原理、常用配置参数,以及如何合理地使用线程池来提高系统性能和稳定性。文章将结合实际案例,帮助读者掌握线程池的使用技巧,以及在面对不同场景时如何进行调优。