深入理解Java线程池ThreadPoolExcutor实现原理、数据结构和算法(源码解析)

本文涉及的产品
应用实时监控服务-应用监控,每月50GB免费额度
可观测监控 Prometheus 版,每月50GB免费额度
函数计算FC,每月15万CU 3个月
简介: Java线程池的核心组件包括核心线程数、最大线程数、队列容量、拒绝策略等。核心线程数是线程池长期维持的线程数量,即使这些线程处于空闲状态也不会被销毁;最大线程数则是线程池允许的最大线程数量,当任务队列已满且当前线程数未达到最大线程数时,线程池会创建新线程执行任务;队列容量决定了任务队列的最大长度,当新任务到来时,如果当前线程数已达到核心线程数且队列未满,任务将被放入队列等待执行;拒绝策略则定义了当线程池无法处理新任务时的行为,如抛出异常、丢弃任务等。

💪🏻 制定明确可量化的目标,坚持默默的做事。



 什么是线程池?

       线程池主要是为了解决执行新任务执行时,应用程序为减少为任务创建一个新线程和任务执行完毕时销毁线程所带来的开销。通过线程池,可以在项目初始化时就创建一个线程集合,然后在需要执行新任务时重用这些线程而不是每次都新建一个线程,一旦任务已经完成了,线程回到线程池中并等待下一次分配任务,达到资源复用的效果。

线程池主要优势?

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

如何创建线程?

通过Executors

创建线程池

newSingleThreadExecutor:创建一个只有一个线程的线程池,串行执行所有任务,即使空闲时也不会被关闭。可以保证所有任务的执行顺序按照任务的提交顺序执行。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。

适用场景:需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程活动的应用场景。

newFixedThreadPool:创建一个固定线程数量的线程池(corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作为阻塞队列)。初始化时线程数量为零,之后每次提交一个任务就创建一个线程,直到线程达到线程池的最大容量。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

适用场景:为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。

newCachedThreadPool:创建一个可缓存的线程池,线程的最大数量为Integer.MAX_VALUE。空闲线程会临时缓存下来,线程会等待60s还是没有任务加入的话就会被关闭。

适用场景:适用于执行很多的短时间异步任务的小程序,或者是负载较轻的服务器。

newScheduledThreadPool:创建一个支持执行延迟任务或者周期性执行任务的线程池。

ThreadPoolExecutor

阿里巴巴开发手册并发编程有一条规定:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这是为什么呢?主要是因为这样的可以避免资源耗尽的风险,因为使用Executors返回线程池对象的弊端有:

FixedThreadPool 和 SingleThreadPool 允许的阻塞队列长度为 Integer.MAX_VALUE,这样会导致堆积大量的请求,从而导致OOM;

CachedThreadPool 允许创建的线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

所以创建线程池,最好是根据线程池的用途,然后自己创建线程池。




一、线程池系列相关文章

       1.1 Java线程池ThreadPoolExcutor01-参数说明

       1.2 Java线程池ThreadPoolExcutor02-阻塞队列之ArrayBlockingQueue

       1.3 Java线程池ThreadPoolExcutor03-阻塞队列之LinkedBlockingQueue

       1.4 Java线程池ThreadPoolExcutor04-阻塞队列之PriorityBlockingQueue原理及扩容机制详解

       1.5 Java线程池ThreadPoolExcutor05-阻塞队列之DelayQueue原理及扩容机制详解

       1.6 Java线程池ThreadPoolExcutor06-阻塞队列之SynchronousQueue

       1.7 Java线程池ThreadPoolExcutor07-阻塞队列之LinkedTransferQueue

       1.8 Java线程池ThreadPoolExcutor07-阻塞队列之LinkedBlockingDeque

       1.9 Java线程池ThreadPoolExcutor08-4种拒绝策略

二、继承实现关系图

image.gif image.png

三、低层数据存储结构

3.1 核心属性

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    private final BlockingQueue<Runnable> workQueue;
    private final ReentrantLock mainLock = new ReentrantLock();
    private final HashSet<Worker> workers = new HashSet<>();
    private final Condition termination = mainLock.newCondition();
    private int largestPoolSize;
    private long completedTaskCount;
    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler handler;
    private volatile long keepAliveTime;
    private volatile boolean allowCoreThreadTimeOut;
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
...
}

image.gif

3.1.1 说明

属性名 说明
COUNT_BITS 用于计算线程池的状态值、容量
workQueue

阻塞队列。七个:

ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务。详见1.2链接

LinkedBlockingQueue: 基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene。详见1.3链接

PriorityBlockingQueue:具有优先级的无界阻塞队列。详见1.4链接

DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。详见1.5链接

SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。详见1.6链接

LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。详见1.7链接

LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。详见1.8链接。

threadFactory
线程工厂
handler

拒绝策略处理类。四种:AbortPolicy策略(默认)、DiscardPolicy策略、DiscardOldestPolicy策略 和 CallerRunsPolicy策略。详见1.9链接

keepAliveTime
为多余的空闲线程等待新任务的最长时间, 超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余 的空闲线程会被立即终止。
allowCoreThreadTimeOut

如果为true,核心线程使用keepAliveTime超时等待工作。

如果为false(默认),核心线程即使在空闲时也保持活动。

corePoolSize 核心线程数
maximumPoolSize 最大线程数

3.1.2 线程池五种状态

属性ctl 是ThreadPoolExecutor内部一个用来进行技术和状态控制的控制变量,它使用了一个原子整形字段来实现两个方面的管理:

  • 低29位记录线程池的线程数,
  • 高3位记录线程池的工作状态

五种状态:

  • RUNNING线程池一旦被创建处于RUNNING状态。线程池处于RUNNING状态时,能够接收新任务以及对已添加的任务进行处理。
  • SHUTDOWN线程池处于SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。调用线程池的shutdown()接口时,线程池由RUNNING状态转变为SHUTDOWN状态。
  • STOP: 线程池处于STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()接口时,线程池由RUNNING状态或者SHUTDOWN状态变为STOP状态。
  • TIDYING: 当所有的任务已终止,ctl记录的任务数为0,线程池的状态会变为TIDYING状态。当线程池状态为SHUTDOWN时,阻塞队列为空并且线程池中执行的任务也为空时,就会由SHUTDOWN状态变为 TIDYING状态;当线程池为STOP时,线程池中执行的任务为空时,就会又STOP状态变为 TIDYING状态。
  • TERMINATED: 线程池彻底终止,就会变成TERMINATED状态。线程池处于TIDYING状态时,调用terminated()就会由TIDYING状态变为TERMINATED状态。

状态转换如下图:

image.gif image.png

                                                               图3.1.2-1

3.2 构造器

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    ...
}

image.gif

参数名 说明
corePoolSize 核心线程数
maximumPoolSize 最大线程数
keepAliveTime 为多余的空闲线程等待新任务的最长时间, 超过这个时间后多余的线程将被终止。
unit keepAliveTime的单位
workQueue 阻塞队列
threadFactory 线程工厂
handler 拒绝策略处理类

详见1.1链接

四、工作原理

image.gif image.png

  • 如果任务null直接退出,否则执行步骤2;
  • 若工作线程数小于核心线程数,执行步骤3创建工作线程并执行任务,否则执行步骤4。
  • 若阻塞队列已满
  • 工作线程数小于最大线程数,执行步骤6创建工作线程并执行任务,否则执行步骤10 拒绝任务(执行拒绝策略)
  • 若阻塞队列未满,添加任务到阻塞队列,若线程状态不为运行中,则任务从队列中取出,并执行步骤10 拒绝任务(执行拒绝策略)

五、源码解析

5.1 核心方法execute()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
    
   
    int c = ctl.get();
    //通过位运算,获取当前线程数,如果线程数小于corePoolSize,则执行addWorker(),创建新线程执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        // 若添加工作任务执行任务不成功,尝试添加到阻塞队列中
        c = ctl.get();
    }
    // 线程数超过核心线程数,任务添加到阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 添加到阻塞队列,但若线程池状态不是运行中,则从队列中取出
        if (! isRunning(recheck) && remove(command))
            // 并且执行拒绝策略
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 阻塞队列已满,则若工作线程数小于最大线程数 创建线程执行任务
    else if (!addWorker(command, false))
        // 若线程数已达最大线程数,否则执行拒绝策略
        reject(command);
}

image.gif

从源码中可以清晰看出线程池执行任务的逻辑与“四、工作原理”所述一致。

5.2 添加任务addWork()

THreadPoolExecutor内部类Worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    private static final long serialVersionUID = 6138294804551838833L;
    //当前Worker所处于的线程
    final Thread thread;
    //待执行的任务
    Runnable firstTask;
    //任务计数器
    volatile long completedTasks;
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
}

image.gif

它是对Runnable进行了封装,主要功能是对待执行的任务进行中断处理和状态监控。Worker还继承了AQS,在每个任务执行时进行了加锁的处理。可以将Worker简单理解为可中断的、可进行锁处理的Runnable。

创建工作线程并执行是addWork()方法,源码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    // 自旋,判断线程池状态,并对线程数量执行原子+1操作
    retry:
    for (;;) {
        int c = ctl.get();
        // 获取线程池状态
        int rs = runStateOf(c);
        // 如果线程池已经关闭,则直接返回false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 判断线程数是否已达上限,根据传入参数core的不同,判断corePoolSize或者maximumPoolSize。
            // 如果线程数已达上限,直接返回false
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 执行原子操作,对线程数+1
            if (compareAndIncrementWorkerCount(c))
                // 执行原子操作成功,则退出自旋 并 创建工作线程执行任务
                break retry;
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    // ctl变量操作成功,执行Worker相关逻辑
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建一个新的Worker,传入待执行的任务
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 加锁后,再次判断线程池状态
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果创建了新的Worker,则调用其start方法立即执行
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

image.gif

addWork()方法首先在一个自旋中做三个判断:

  • 线程是否关闭,若关闭则直接返回false退出
  • 通过参数core来确定工作线程数与核心线程数比较 还是 与最大线程数比较,若工作线程数大,则返回false退出
  • CAS尝试将线程数加1,若成功则创建一个辨析的Worker并立即执行其start()方法执行该任务。

5.3 执行任务runWork()

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 指向线程池调execute添加的任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 首先释放锁,允许中断
    w.unlock(); 
    boolean completedAbruptly = true;
    
    try {
        // 从worker中取第1个任务,若任务为空则从阻塞队列中取任务,直到返回null,这里达到线程复用的效果,实现线程处理多个任务。
        while (task != null || (task = getTask()) != null) {
            // 执行任务前先加锁
            w.lock();
            // 如果线程池已经终止,则中断该线程。保存了线程池在STOP状态下线程中断的,非STOP状态下线程没有被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 记录正在运行的任务
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务(调任务的run方法)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 执行完任务,清除记录信息
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 当前Worker计数器+1,统计worker执行了多少任务,最后累加进completedTaskCount变量,可以调用相应方法返回一些统计信息。
                w.completedTasks++;
                // 释放锁
                w.unlock();
            }
        }
        // 表示worker是否异常终止,执行到这里代表执行正常,后续的方法需要这个变量
        completedAbruptly = false;
    } finally {
        // completedTasks累加到completedTaskCount变量中
        processWorkerExit(w, completedAbruptly);
    }
}

image.gif

runWorker()的主要逻辑就是进行线程池的关闭检查,然后执行任务,并将计数器+1。

注意这行代码 while (task != null || (task = getTask()) != null) ,当task = w.firstTask 的值为null时执行task = getTask(), getTask是从任务列队是取任务。也就是说,Worker在执行完提交给自己的任务后,会执行任务队列中的任务。

5.4 阻塞队列取任务getTask()

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

image.gif

说明:

  • if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) 这行代码体现出了 SHUTDOWN状态和 STOP状态的区别。若线程池状态为 SHUDOWN 状态,则条件为 false,取任务执行;而如果线程池的状态为 STOP 状态,则条件为 true,不管队列是否还有任务,不再处理了。
  • timed后在的判断逻辑有点复杂,以下几种情况为true,CAS尝试将线程数减1
  • 工作线程数大于最大线程数(后面wc>1||workQueue.isEmpty()应该自然满足)(可能是在运行中调用setMaximumPoolSize)
  • 设置了allowCoreThreadTimeOut为true且队列中取的任务为null,说明没任务了
  • 工作线程数大于核心线程数 且队列中取的任务为null(后面wc>1||workQueue.isEmpty()应该自然满足)
  • try后面逻辑
  • 延时取任务:allowCoreThreadTimeOut为true 或者 wc > corePoolSize
  • 直接取任务(若没任务则阻塞等待):allowCoreThreadTimeOut为false 或者 wc <= corePoolSize

结论:

  • allowCoreThreadTimeOut设置为true时,工作线程数达最大之后,因无新任务而线程减少,工作线程总数最小值可以为0
  • allowCoreThreadTimeOut设置为false时,只有wc大于核心线程数,才去做CAS减线程数操作,所以工作线程数达到最大之后,因无新任务而线程减少,工作线程总数最小为核心线程数

5.5 Worker无任务最后处理processWorkerExit()

private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
   
    tryTerminate();
    int c = ctl.get();
    // 判断状态是否小于STOP
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

image.gif

说明:

  • decrementWorkerCount():如果为异常结束,则工作线程数减1
  • try 逻辑:加锁累加完成任务数
  • tryTerminate(): 尝试终止线程池
  • 判断状态是否小于STOP为true
  • allowCoreThreadTimeOut设置为true
  • 若队列不为空:至少保留一个worker
  • 若队列为空:直接退出,线程池的worker数减少,最终可能为0
  • allowCoreThreadTimeOut设置为false: 则保持worker数不少于corePoolSize(若线程数小于corePoolSize,则添加 null任务的worker

总结worker:线程池启动后,worker在池内创建,包装了提交的Runnable任务并执行,执行完就等待下一个任务,不再需要时就结束。

5.6 关闭线程池

从图3.1.2-1看出有两种方法关闭线程池:

  • shutdown: 不能再提交任务,已经提交的任务可继续执行;
  • shutdownNow: 不能再提交任务,已经提交的任务未执行的任务不再执行,正在执行的任务可继续执行,但会中断,返回已提交未执行的任务

5.6.1 关闭线程池shutdown()

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

image.gif

说明:

  • checkShutdownAccess(): 安装策略机构
  • advanceRunState(SHUTDOWN): 线程池状态切换到SHUTDOWN状态
  • interruptIdleWorkers(): 中断所有空闲的worker
  • tryTerminate(): 尝试结束线程池

5.6.2 关闭线程池shutdownNow()

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

image.gif

说明:

  • checkShutdownAccess(): 安装策略机构
  • advanceRunState(STOP): 线程池状态切换到STOP状态
  • interruptWorkers(): 中断所有空闲的worker
  • drainQueue(): 取出等待队列里未执行的任务
  • tryTerminate(): 尝试结束线程池

5.6.3 尝试结束线程池tryTerminate()

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

image.gif

说明:

  • ctl.get():获取ctl值判断线程池状态,以下状态不处理直接return
  • RUNNING状态,正在运行状态肯定不能停
  • TIDYING或TERMINATED状态,已经没有正在运行的worker了
  • SHUTDOWN状态且阻塞队列不为空,执行完才能停
  • 工作线程数不为0。又调了一次interruptIdleWorkers(ONLY_ONE),可能疑惑在调tryTerminate之前时已经调用过了,为什么又调用,而且每次只中断一个空闲worker?我们需要知道,shutdown时worker可能在执行中,执行完阻塞在队列的take,不知道要结束,所有要补充调用interruptIdleWorkers。每次只中断一个是因为processWorkerExit时,还会执行tryTerminate,自动中断下一个空闲的worker。
  • try逻辑:加锁CAS尝试将线程池状态切换成TIDYING,再切换成TERMINATED状态,terminated是空方法供子类来实现。

5.6.4 判断线程池是否关闭awaitTermination()

public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

image.gif

说明:接收人timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。

ps: 以上是研读源码加上翻阅许多文献理解的总结,如有错误或不足的地方,欢迎指出,欢迎留言交流。我会继续努力学习和分享更多有干货的内容。

相关文章
|
4天前
|
监控 算法 网络协议
Java 实现局域网电脑屏幕监控算法揭秘
在数字化办公环境中,局域网电脑屏幕监控至关重要。本文介绍用Java实现这一功能的算法,涵盖图像采集、数据传输和监控端显示三个关键环节。通过Java的AWT/Swing库和Robot类抓取屏幕图像,使用Socket进行TCP/IP通信传输图像数据,并利用ImageIO类在监控端展示图像。整个过程确保高效、实时和准确,为提升数字化管理提供了技术基础。
35 15
|
3月前
|
存储 Java
Java中的HashMap和TreeMap,通过具体示例展示了它们在处理复杂数据结构问题时的应用。
【10月更文挑战第19天】本文详细介绍了Java中的HashMap和TreeMap,通过具体示例展示了它们在处理复杂数据结构问题时的应用。HashMap以其高效的插入、查找和删除操作著称,而TreeMap则擅长于保持元素的自然排序或自定义排序,两者各具优势,适用于不同的开发场景。
55 1
|
3天前
|
存储 算法 安全
基于红黑树的局域网上网行为控制C++ 算法解析
在当今网络环境中,局域网上网行为控制对企业和学校至关重要。本文探讨了一种基于红黑树数据结构的高效算法,用于管理用户的上网行为,如IP地址、上网时长、访问网站类别和流量使用情况。通过红黑树的自平衡特性,确保了高效的查找、插入和删除操作。文中提供了C++代码示例,展示了如何实现该算法,并强调其在网络管理中的应用价值。
|
28天前
|
机器学习/深度学习 人工智能 算法
深入解析图神经网络:Graph Transformer的算法基础与工程实践
Graph Transformer是一种结合了Transformer自注意力机制与图神经网络(GNNs)特点的神经网络模型,专为处理图结构数据而设计。它通过改进的数据表示方法、自注意力机制、拉普拉斯位置编码、消息传递与聚合机制等核心技术,实现了对图中节点间关系信息的高效处理及长程依赖关系的捕捉,显著提升了图相关任务的性能。本文详细解析了Graph Transformer的技术原理、实现细节及应用场景,并通过图书推荐系统的实例,展示了其在实际问题解决中的强大能力。
149 30
|
10天前
|
缓存 算法 搜索推荐
Java中的算法优化与复杂度分析
在Java开发中,理解和优化算法的时间复杂度和空间复杂度是提升程序性能的关键。通过合理选择数据结构、避免重复计算、应用分治法等策略,可以显著提高算法效率。在实际开发中,应该根据具体需求和场景,选择合适的优化方法,从而编写出高效、可靠的代码。
25 6
|
7天前
|
存储 监控 算法
企业内网监控系统中基于哈希表的 C# 算法解析
在企业内网监控系统中,哈希表作为一种高效的数据结构,能够快速处理大量网络连接和用户操作记录,确保网络安全与效率。通过C#代码示例展示了如何使用哈希表存储和管理用户的登录时间、访问IP及操作行为等信息,实现快速的查找、插入和删除操作。哈希表的应用显著提升了系统的实时性和准确性,尽管存在哈希冲突等问题,但通过合理设计哈希函数和冲突解决策略,可以确保系统稳定运行,为企业提供有力的安全保障。
|
21天前
|
存储 缓存 安全
Java 集合江湖:底层数据结构的大揭秘!
小米是一位热爱技术分享的程序员,本文详细解析了Java面试中常见的List、Set、Map的区别。不仅介绍了它们的基本特性和实现类,还深入探讨了各自的使用场景和面试技巧,帮助读者更好地理解和应对相关问题。
37 5
|
1月前
|
存储 算法
深入解析PID控制算法:从理论到实践的完整指南
前言 大家好,今天我们介绍一下经典控制理论中的PID控制算法,并着重讲解该算法的编码实现,为实现后续的倒立摆样例内容做准备。 众所周知,掌握了 PID ,就相当于进入了控制工程的大门,也能为更高阶的控制理论学习打下基础。 在很多的自动化控制领域。都会遇到PID控制算法,这种算法具有很好的控制模式,可以让系统具有很好的鲁棒性。 基本介绍 PID 深入理解 (1)闭环控制系统:讲解 PID 之前,我们先解释什么是闭环控制系统。简单说就是一个有输入有输出的系统,输入能影响输出。一般情况下,人们也称输出为反馈,因此也叫闭环反馈控制系统。比如恒温水池,输入就是加热功率,输出就是水温度;比如冷库,
259 15
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
57 12
|
29天前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####

推荐镜像

更多