【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码

简介: 当我们创建一个`ThreadPoolExecutor`的时候,你是否会好奇🤔,它到底发生了什么?比如:我传的拒绝策略、线程工厂是啥时候被使用的? 核心线程数是个啥?最大线程数和它又有什么关系?线程池,它是怎么调度,我们传入的线程?...不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界...

👋hi,我不是一名外包公司的员工,也不会偷吃茶水间的零食,我的梦想是能写高端CRUD

🔥 2025本人正在沉淀中... 博客更新速度++

👍 欢迎点赞、收藏、关注,跟上我的更新节奏

📚欢迎订阅专栏,专栏别名《在2B工作中寻求并发是否搞错了什么》

前言

当我们创建一个ThreadPoolExecutor的时候,你是否会好奇🤔,它到底发生了什么?比如:

  • 我传的拒绝策略、线程工厂是啥时候被使用的?
  • 核心线程数是个啥?最大线程数和它又有什么关系?
  • 线程池,它是怎么调度,我们传入的线程?
  • ...

不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界。

public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
                          int maximumPoolSize,//线程池的最大线程数
                          long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                          TimeUnit unit,//时间单位
                          BlockingQueue<Runnable> workQueue,//阻塞队列,用来储存等待执行任务的队列
                          ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
                          RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
                           ) {
   

ThreadPoolExecutor构造方法

所有伟大的开始,源于构造方法

我们可以看到,构造方法里,只是对它做了数量的校验和赋值:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
   
    if (corePoolSize < 0 ||                    // 核心线程数是可以为0,但不能小于0
        maximumPoolSize <= 0 ||                // 最大线程数不能小于0
        maximumPoolSize < corePoolSize ||    // 核心线程数小于最大线程数
        keepAliveTime < 0)                    // 存活时间大于0
        throw new IllegalArgumentException();
    // 工作队列、线程工厂、拒绝策略不能为null
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

有没有好奇的uu?构造线程池,有哪些参数可以不传?
下面的构造方法,早已告诉了我们答案 --- 线程工厂拒绝策略

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
   
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

如果我们不传线程工厂和拒绝策略,那么就会有默认的线程工厂拒绝策略

  • 默认的线程工厂:DefaultThreadFactory
// 默认的线程工厂
static class DefaultThreadFactory implements ThreadFactory {
   
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
   
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
   
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}
  • 默认的拒绝策略:AbortPolicy
// ThreadPoolExecutor默认的拒绝策略
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

// AbortPolicy拒绝策略
public static class AbortPolicy implements RejectedExecutionHandler {
   

    public AbortPolicy() {
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
   
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

提交任务(execute方法)

🤔提交任务的流程是什么勒?源码里,Doug Lea注释里写的很清楚了。
image.png

第一步

* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task.  The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
==========================我是分割线===========================
1、如果运行中的线程数少于corePoolSize,尝试用当前任务作为首个任务启动新线程。
addWorker方法会原子性地检查线程池状态和worker数量,避免非法创建线程。

第二步

* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
==========================我是分割线===========================
2、 如果任务成功入队,仍需二次检查:
   - 是否应补充新线程(例如之前检查后有线程死亡)
   - 线程池是否已关闭
   根据检查结果决定回滚入队操作或创建新线程

第三步

* 3. If we cannot queue task, then we try to add a new
* thread.  If it fails, we know we are shut down or saturated
* and so reject the task.
==========================我是分割线===========================
3、如果无法入队(队列已满),尝试创建新线程。
  若失败(线程池关闭或已达最大线程数),执行拒绝策略。

是不是长长的文字不想看,没有关系,一图顶千言👇:
image.png

从源码的角度的来看:

    public void execute(Runnable command) {
   
        if (command == null)
            throw new NullPointerException();
        // 获取ctl,ctl是什么?等下会详细的说说,它表示线程池的状态和工作线程数
        int c = ctl.get();
        // 工作线程数小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
   
            if (addWorker(command, true))        // 尝试添加核心线程
                return;
            c = ctl.get();
        }
        // 线程池状态为正在运行 且 成功放入阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
   
            int recheck = ctl.get();
            // 二次检查的线程状态不为正在运行 且 移除线程成功
            if (! isRunning(recheck) && remove(command))
                reject(command);                    // 触发拒绝策略
            else if (workerCountOf(recheck) == 0)   // 工作线程数为0
                addWorker(null, false);                // 尝试添加非核心线程
        }
        else if (!addWorker(command, false))         // 尝试添加非核心线程
            reject(command);                          // 触发拒绝策略
    }

二次检查状态是为了处理并发场景下的竞态条件,具体检查两个维度:

  • 状态检查:用isRunning(recheck)验证线程池是否被关闭(SHUTDOWN/STOP),若关闭则回滚已入队任务并触发拒绝策略。
  • 线程检查:当workerCount==0时(核心线程被回收/异常终止),需创建非核心线程保证队列任务能被消费。

竞态条件(race condition)指的是两个或者以上进程或者线程并发执行时,其最终的结果依赖于进程或者线程执行的精确时序。竞争条件会产生超出预期的情况。

ctl

刚刚看提交任务源码中埋下的伏笔 --- 什么是ctl?我们下面具体来说说。

Doug Lea用一个变量,来表示2种状态:

  1. 线程池当前状态(高3位)
  2. 当前线程池工作线程个数(低29位)
// 初始化ctl,线程池状态为正在运行,工作线程数为0
// RUNNING  = 111_00000_00000000_00000000_00000000 (-536870912)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// rs: 运行状态   wc:工作线程数量
private static int ctlOf(int rs, int wc) {
    return rs | wc; }


// 用来算线程池状态,左移27位 
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY = 000_11111_11111111_11111111_11111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

线程池状态

private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

相当于

// 状态值定义(高3位)
RUNNING    = 111_00000_00000000_00000000_00000000 (-536870912)
SHUTDOWN   = 000_00000_00000000_00000000_00000000 (0)
STOP       = 001_00000_00000000_00000000_00000000 (536870912)
TIDYING    = 010_00000_00000000_00000000_00000000 (1073741824)
TERMINATED = 011_00000_00000000_00000000_00000000 (1610612736)

计算出当前线程池的状态

CAPACITY = 000_11111_11111111_11111111_11111111

// 取高3位(屏蔽低29位)
private static int runStateOf(int c) {
    return c & ~CAPACITY; }

// 状态判断
private static boolean isRunning(int c) {
    return c < SHUTDOWN; }

线程池状态的转变
image.png

工作线程数量

CAPACITY = 000_11111_11111111_11111111_11111111

// 取低29位(屏蔽高3位)
private static int workerCountOf(int c) {
    return c & CAPACITY; }

// CAS更新线程数
private boolean compareAndIncrementWorkerCount(int expect) {
   
   return ctl.compareAndSet(expect, expect + 1);
}

尝试向线程池添加线程(addWorker方法)

  1. 状态检查与CAS计数更新:通过循环检查线程池状态(是否可接受新任务)和当前线程数(是否超过核心或最大线程数限制),使用CAS原子操作增加workerCount
// 入参:
// 1、firstTask: 新线程要执行的第一个任务(可能为null)
// 2、core:决定用corePoolSize(true)还是maximumPoolSize(false)作为线程数上限
private boolean addWorker(Runnable firstTask, boolean core:) {
   
    retry:
    for (;;) {
   
        int c = ctl.get();
        // 获取线程池状态
        int rs = runStateOf(c);

        // 通过ctl原子变量(高位存状态,低位存线程数)检查线程池状态,防止在非运行状态下创建新线程
        // 1、rs >= SHUTDOWN 
        //    判断线程池状态是否处于SHUTDOWN或更高状态(STOP, TIDYING, TERMINATED)
        // 2、!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())
        // 只有当同时满足以下3个条件时,才会取反为false:
        //   - 状态正好是SHUTDOWN(不是更高状态)
        //   - firstTask为空(表示不是新提交的任务)
        //   - 工作队列非空(还有未处理的任务)
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
   
            // 获取工作线程数
            int wc = workerCountOf(c);
            // 校验是否超过容量限制
            // 1、wc >= CAPACITY:判断工作线程数量是否大等于最大工作线程数量
            // 2、wc >= (core ? corePoolSize : maximumPoolSize)):
            // 判断工作线程数量是否大等于 核心线程数/最大线程数
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c)) // 通过CAS保证线程数增减的原子性,避免并发问题
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    ...
  1. 创建工作线程:创建Worker对象并加锁将其加入线程集合。若线程启动失败,则回滚计数和集合状态。
private boolean addWorker(Runnable firstTask, boolean core) {
   
    ...

    boolean workerStarted = false;    // worker线程是否被启动
    boolean workerAdded = false;    // worker是否被添加到workers集合中  private final HashSet<Worker> workers = new HashSet<Worker>();
    Worker w = null;
    try {
   
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
   
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
   

                int rs = runStateOf(ctl.get());

                // 1、rs < SHUTDOWN:线程池处于RUNNING状态
                // 目的:确保只有在线程池可用时才能创建新线程
                // 2、(rs == SHUTDOWN && firstTask == null):线程池处于SHUTDOWN状态且没有初始任务(firstTask为null)
                // SHUTDOWN 状态下不允许添加新任务(firstTask != null 会直接拒绝),但允许创建 无初始任务 的线程来消费队列中的残留任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
   
                    if (t.isAlive())  // 检查新创建的线程是否已处于活跃状态
                        throw new IllegalThreadStateException();
                    workers.add(w);        // 添加到Worker集合
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
   
                mainLock.unlock();
            }
            if (workerAdded) {
   
                t.start();        // 启动线程
                workerStarted = true;
            }
        }
    } finally {
   
        if (! workerStarted)
            addWorkerFailed(w);  // 失败时的回滚逻辑
    }
    return workerStarted;
}

这里简单说下,失败时候的回滚逻辑

  • 从集合中移除Worker
  • CAS递减线程数
  • 尝试终止线程池(后面会详细说说🤗)
private void addWorkerFailed(Worker w) {
   
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
   
        if (w != null)                
            workers.remove(w);        // 从集合中移除Worker
        decrementWorkerCount();        // CAS递减线程数
        tryTerminate();                // 尝试终止线程池,后面会详细说说
    } finally {
   
        mainLock.unlock();
    }
}

worker类

刚刚在addWorker方法里,我们创建了Worker类,🤔聪明的你,一定很想知道这个类到底是干什么的吧!
Worker类是ThreadPoolExecutor的核心内部类,它直接负责 封装工作线程 和 管理任务执行。其设计巧妙地将线程、任务和锁机制结合在一起。

private final class Worker
    extends AbstractQueuedSynchronizer // 继承AQS实现锁机制
    implements Runnable {
                // 自身作为线程的Runnable目标
    final Thread thread;              // 实际的工作线程
    Runnable firstTask;               // 初始任务(可能为null)
    volatile long completedTasks;     // 完成的任务计数器
}

构造方法,在addWorker的时候,就会调用到这个构造方法。
怎么样?你是否看到了,自己构造线程池时传入的线程工厂被调用😄?

Worker(Runnable firstTask) {
   
  this.firstTask = firstTask;
  this.thread = getThreadFactory().newThread(this); // this即Worker自身
}

worker类的锁状态
通过继承 AQS(AbstractQueuedSynchronizer) 实现了一个不可重入的独占锁,用于精确控制线程的中断行为和状态标识。

注释说了,0是未被锁定,1是被锁定了
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.

work类的加锁/解锁

// 最终调用AQS的acquire方法。将state从0改为1
public void lock()        {
    acquire(1); }  
public boolean tryLock()  {
    return tryAcquire(1); }

// AQS的tryAcquire实现(Worker内部)
protected boolean tryAcquire(int unused) {
   
    if (compareAndSetState(0, 1)) {
    // CAS操作保证原子性
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

解锁

// 最终调用AQS的tryRelease实现。将state从1改为0
public void unlock()      {
    release(1); }   

// AQS的tryRelease实现
protected boolean tryRelease(int unused) {
   
    setExclusiveOwnerThread(null);
    setState(0); // 无需CAS,因为只有持有锁的线程能调用此方法
    return true;
}

执行Worker(runWorker方法)

worker类,调用外部类ThreadPoolExecutorrunWorker方法

public void run() {
   
    runWorker(this);   // 调用外部类ThreadPoolExecutor的runWorker方法
}

让我们康康,线程池的runWorker方法吧!

final void runWorker(Worker w) {
   
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;        // 获取初始任务(可能为null)
    w.firstTask = null;                    // 清空初始任务引用
    w.unlock();                         // 允许中断(设置state=0,标记为初始可中断状态)
    boolean completedAbruptly = true;    // 是否因异常退出
    try {
   
        // ---- 核心循环:不断获取任务 ----
        // 执行初始任务,或者获取到的任务,getTask是怎么获取的,下文会介绍
        while (task != null || (task = getTask()) != null) {
   
            w.lock();   // 加锁标记线程为"工作中"
            // 检查线程池状态(若处于STOP需确保线程被中断)
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
   
                // 如果我们写了新类继承了ThreadPoolExector重写了beforeExecute方法,就执行
                // ThreadPoolExector这里没有任何实现: protected void beforeExecute(Thread t, Runnable r) { }
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
   
                    task.run();     // 实际执行任务(注意不是启动新线程!)
                } catch (RuntimeException x) {
   
                    thrown = x; throw x;
                } catch (Error x) {
   
                    thrown = x; throw x;
                } catch (Throwable x) {
   
                    thrown = x; throw new Error(x);
                } finally {
   
                    // 如果我们写了新类继承了ThreadPoolExector重写了afterExecute方法,就执行
                    // ThreadPoolExector这里没有任何实现:protected void afterExecute(Runnable r, Throwable t) { }
                    afterExecute(task, thrown);
                }
            } finally {
   
                task = null;                // 清空任务引用
                w.completedTasks++;            // 统计完成的任务数
                w.unlock();                    // 解锁标记线程为“空闲”
            }
        }
        completedAbruptly = false;            // 没有因为异常导致退出
    } finally {
   
        processWorkerExit(w, completedAbruptly);  // 线程退出处理
    }
}

获取任务(getTask方法)

你是否会好奇🤔, 刚刚我们在runWork中的task是从哪里获取的?while (task != null || (task = getTask()) != null) 。

private Runnable getTask() {
   
    boolean timedOut = false;         // 标记是否发生poll超时

    for (;;) {
   
        int c = ctl.get();
        int rs = runStateOf(c);        // 解析线程池状态

        // ===== 1. 状态检查:是否需要停止工作 =====
        // 条件1: 线程池状态 >= STOP(立刻停止)
        // 条件2: 线程池状态 >= SHUTDOWN 且 队列为空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
   
            decrementWorkerCount();        // 减少工作线程数
            return null;                // 触发线程回收
        }

        int wc = workerCountOf(c);        // 当前工作线程数

        // ===== 2. 判断是否允许超时回收 =====
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // ===== 3. 线程数溢出检查 =====
        // 情况1: 线程数超过maximumPoolSize(可能因配置修改导致)
        // 情况2: 允许超时回收 且 已发生超时
        //                 且
        // 情况1:工作线程数量大于1 或 工作队列为空
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
   
            if (compareAndDecrementWorkerCount(c))    // CAS减少线程数
                return null;
            continue;    // 竞争失败则重试
        }
        // ===== 4. 从队列获取任务 =====
        try {
   
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :  // 限时阻塞等待,超时后返回 null
                workQueue.take();                                       // 永久阻塞(核心线程默认行为)
            if (r != null)
                return r;
            timedOut = true;    // 标记超时
        } catch (InterruptedException retry) {
   
            timedOut = false;    // 中断重试(SHUTDOWN状态可能触发)
        }
    }
}

线程退出(processWorkerExit方法)

我们在runWork方法的时候,调用了processWorkerExit方法,好奇的你一定想看看他是怎么实现的吧!

final void runWorker(Worker w) {
   
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;        // 获取初始任务(可能为null)
    w.firstTask = null;                    // 清空初始任务引用
    w.unlock();                         // 允许中断(设置state=0,标记为初始可中断状态)
    boolean completedAbruptly = true;    // 是否因异常退出
    try {
   
        // ---- 核心循环:不断获取任务 ----
        while (task != null || (task = getTask()) != null) {
   ...}
        completedAbruptly = false;
    } finally {
   
        processWorkerExit(w, completedAbruptly);  // 线程退出处理
    }
}

processWorkerExit方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
   
    // ===== 1. 参数校验与状态记录 =====
    if (completedAbruptly) // 若因异常退出,需手动减少线程数
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
   
        // ===== 2. 统计完成任务数 =====
        completedTaskCount += w.completedTasks;
        workers.remove(w); // 从Worker集合中移除
    } finally {
   
        mainLock.unlock();
    }

    // ===== 3. 尝试终止线程池 =====
    tryTerminate();

    // ===== 4. 判断是否需要补充线程 =====
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
    // 线程池仍处于RUNNING/SHUTDOWN状态
        if (!completedAbruptly) {
    // 正常退出
            // 计算最小应保持的线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1; // 队列非空时至少保留1个线程处理任务
            if (workerCountOf(c) >= min)
                return; // 当前线程数足够,无需补充
        }
        addWorker(null, false); // 补充新线程(无初始任务)
    }
}

线程池的关闭

之前说了这么多线程池是怎么运行,到这里也该说说线程池是怎么关闭的了🤭之前看源码的时候,经常出现的tryTerminate方法,也会在这里展开说说

线程池对我们开放了下面的3个方法,让我们来关闭线程池。

方法 行为特性 适用场景
shutdown() 温和关闭:停止接受新任务,执行完已提交任务 需要优雅关闭,确保任务不丢失
shutdownNow() 强制关闭:停止接受新任务,尝试中断所有线程,并返回未执行任务列表 需要立即释放资源,容忍任务丢弃
awaitTermination(long timeout, TimeUnit unit) 阻塞等待线程池完全终止(结合shutdown使用) 需要同步等待关闭完成

shutdown方法

shutdown方法的作用

  1. 停止接受新任务:调用 shutdown() 后,线程池不再接受新提交的任务。
  2. 执行已提交的任务:线程池会继续执行已经提交但尚未完成的任务。
  3. 关闭线程池:在所有任务执行完毕后,线程池会逐步关闭。
public void shutdown() {
   
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
   
        checkShutdownAccess();
        advanceRunState(SHUTDOWN); // CAS更新状态为SHUTDOWN
        interruptIdleWorkers();    // 仅中断空闲线程
    } finally {
   
        mainLock.unlock();
    }
    tryTerminate(); // 尝试推进终止流程
}

interruptIdleWorkers方法
中断那些当前没有执行任务(即空闲)的工作线程。具体来说:

  • 中断空闲线程:它会遍历所有工作线程,并尝试中断那些正在等待任务的线程(即处于空闲状态的线程)。
  • 加速关闭过程:通过中断空闲线程,可以更快地释放资源,从而加速线程池的关闭过程。
private void interruptIdleWorkers(boolean onlyOne) {
   
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
   
        for (Worker w : workers) {
   
            Thread t = w.thread;
            // 关键点:尝试获取Worker锁(判断是否空闲)
            if (!t.isInterrupted() && w.tryLock()) {
   
                try {
   
                    t.interrupt(); // 仅中断空闲线程
                } catch (SecurityException ignore) {
   
                } finally {
   
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
   
        mainLock.unlock();
    }
}

shutdownNow方法

public List<Runnable> shutdownNow() {
   
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
   
        checkShutdownAccess();
        advanceRunState(STOP); // 更新为STOP状态
        interruptWorkers();    // 强制中断所有线程
        tasks = drainQueue();  // 排出未执行任务
    } finally {
   
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

interruptWorkers方法,我们可以看到里面调用了workerinterruptIfStarted

private void interruptWorkers() {
   
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
   
        for (Worker w : workers)
            w.interruptIfStarted(); // 无论是否空闲都尝试中断
    } finally {
   
        mainLock.unlock();
    }
}

// worker类interruptIfStarted方法
void interruptIfStarted() {
   
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
   
        try {
   
            t.interrupt();
        } catch (SecurityException ignore) {
   
        }
    }
}

tryTerminate方法

tryTerminate() 的核心逻辑是检查线程池的状态和条件,决定是否可以将线程池的状态转换为 TERMINATED,并在适当的时候中断空闲的工作线程。

final void tryTerminate() {
   
    // 检查是否满足终止条件
    if ((runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()) ||
        runStateAtLeast(c, TIDYING) || 
        isRunning(c)) 
        return;

    // 中断最后一个空闲线程(如有)
    if (workerCountOf(c) != 0) {
   
        interruptIdleWorkers(ONLY_ONE);
        return;
    }

    // 推进到TIDYING状态
    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
   
        try {
   
            terminated(); // 空实现的钩子方法
        } finally {
   
            ctl.set(ctlOf(TERMINATED, 0));
            termination.signalAll(); // 唤醒awaitTermination()
        }
    }
}

后话

👍 线程池源码学习,只是刚刚开始,友友们,点上关注,跟上主播的学习节奏。

话说最近温度回暖了,感觉看源码的速度都变快了!哈哈哈哈哈,开玩笑的。

往期文章推荐

【Java并发】【线程池】带你从0-1入门线程池

目录
相关文章
|
11天前
|
存储 Java
【源码】【Java并发】【ThreadLocal】适合中学者体质的ThreadLocal源码阅读
前言 下面,跟上主播的节奏,马上开始ThreadLocal源码的阅读( ̄▽ ̄)" 内部结构 如下图所示,我们可以知道,每个线程,都有自己的threadLocals字段,指向ThreadLocalMap
273 81
【源码】【Java并发】【ThreadLocal】适合中学者体质的ThreadLocal源码阅读
|
8天前
|
Java
【源码】【Java并发】【ReentrantLock】适合中学者体质的ReentrantLock源码阅读
因为本文说的是ReentrantLock源码,因此会默认,大家对AQS有基本的了解(比如同步队列、条件队列大概> 长啥样?)。 不懂AQS的小朋友们,你们好呀!也欢迎先看看这篇
56 13
【源码】【Java并发】【ReentrantLock】适合中学者体质的ReentrantLock源码阅读
|
12天前
|
Java
【源码】【Java并发】【AQS】从ReentrantLock、Semaphore、CutDownLunch、CyclicBarrier看AQS源码
前言 主播觉得,AQS的原理,就是通过这2个队列的协助,实现核心功能,同步队列(CLH队列)和条件队列(Condition队列)。 同步队列(CLH队列) 作用:管理需要获...
58 18
【源码】【Java并发】【AQS】从ReentrantLock、Semaphore、CutDownLunch、CyclicBarrier看AQS源码
|
3天前
|
Java
【源码】【Java并发】【LinkedBlockingQueue】适合中学体质的LinkedBlockingQueue入门
前言 有了前文对简单实用的学习 【Java并发】【LinkedBlockingQueue】适合初学体质的LinkedBlockingQueue入门 聪明的你,一定会想知道更多。哈哈哈哈哈,下面主播就...
30 6
【源码】【Java并发】【LinkedBlockingQueue】适合中学体质的LinkedBlockingQueue入门
|
4天前
|
安全 Java
【源码】【Java并发】【ArrayBlockingQueue】适合中学者体质的ArrayBlockingQueue
前言 通过之前的学习是不是学的不过瘾,没关系,马上和主播来挑战源码的阅读 【Java并发】【ArrayBlockingQueue】适合初学体质的ArrayBlockingQueue入门 还有一件事
39 5
【源码】【Java并发】【ArrayBlockingQueue】适合中学者体质的ArrayBlockingQueue
|
10天前
|
Java 中间件 调度
【源码】【Java并发】从InheritableThreadLocal和TTL源码的角度来看父子线程传递
本文涉及InheritableThreadLocal和TTL,从源码的角度,分别分析它们是怎么实现父子线程传递的。建议先了解ThreadLocal。
48 4
【源码】【Java并发】从InheritableThreadLocal和TTL源码的角度来看父子线程传递
|
4天前
|
前端开发 Java 物联网
智慧班牌源码,采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署
智慧班牌系统是一款基于信息化与物联网技术的校园管理工具,集成电子屏显示、人脸识别及数据交互功能,实现班级信息展示、智能考勤与家校互通。系统采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署与私有化定制。核心功能涵盖信息发布、考勤管理、教务处理及数据分析,助力校园文化建设与教学优化。其综合性和可扩展性有效打破数据孤岛,提升交互体验并降低管理成本,适用于日常教学、考试管理和应急场景,为智慧校园建设提供全面解决方案。
60 14
|
1天前
|
人工智能 安全 Java
智慧工地源码,Java语言开发,微服务架构,支持分布式和集群部署,多端覆盖
智慧工地是“互联网+建筑工地”的创新模式,基于物联网、移动互联网、BIM、大数据、人工智能等技术,实现对施工现场人员、设备、材料、安全等环节的智能化管理。其解决方案涵盖数据大屏、移动APP和PC管理端,采用高性能Java微服务架构,支持分布式与集群部署,结合Redis、消息队列等技术确保系统稳定高效。通过大数据驱动决策、物联网实时监测预警及AI智能视频监控,消除数据孤岛,提升项目可控性与安全性。智慧工地提供专家级远程管理服务,助力施工质量和安全管理升级,同时依托可扩展平台、多端应用和丰富设备接口,满足多样化需求,推动建筑行业数字化转型。
24 5
|
2月前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
195 60
【Java并发】【线程池】带你从0-1入门线程池
|
1月前
|
存储 网络协议 安全
Java网络编程,多线程,IO流综合小项目一一ChatBoxes
**项目介绍**:本项目实现了一个基于TCP协议的C/S架构控制台聊天室,支持局域网内多客户端同时聊天。用户需注册并登录,用户名唯一,密码格式为字母开头加纯数字。登录后可实时聊天,服务端负责验证用户信息并转发消息。 **项目亮点**: - **C/S架构**:客户端与服务端通过TCP连接通信。 - **多线程**:采用多线程处理多个客户端的并发请求,确保实时交互。 - **IO流**:使用BufferedReader和BufferedWriter进行数据传输,确保高效稳定的通信。 - **线程安全**:通过同步代码块和锁机制保证共享数据的安全性。
83 23