ThreadPoolExecutor解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 本文深入解析了Java中`ThreadPoolExecutor`的实现原理,帮助理解线程池的工作机制。核心内容包括任务队列`workQueue`、线程工厂`ThreadFactory`、拒绝策略`RejectedExecutionHandler`等关键成员的作用。通过`submit`和`execute`方法的执行流程,展示了线程池如何根据`corePoolSize`和`maximumPoolSize`动态调整线程数量,并结合`keepAliveTime`管理空闲线程。最后分析了`Worker`类的`run`方法,揭示线程池通过循环从队列中获取任务并执行的核心逻辑。

       相信大多数人都了解过Java中线程池相关的架构,了解了这些内容其实我们就可以使用java的线程池为我们工作了,使用其提供的线程池我们可以很方便的写出高质量的多线程代码,本文将分析ThreadPoolExecutor的实现,来探索线程池的运行原理。下面是几个比较关键的类成员:

// 任务队列,我们的任务会添加到该队列里面,线程将从该队列获取任务来执行
private final BlockingQueue<Runnable> workQueue;  

//任务的执行值集合,来消费workQueue里面的任务
private final HashSet<Worker> workers = new HashSet<Worker>();

//线程工厂
private volatile ThreadFactory threadFactory;

//拒绝策略,默认会抛出异异常,还要其他几种拒绝策略如下:
private volatile RejectedExecutionHandler handler;

1CallerRunsPolicy:在调用者线程里面运行该任务
2DiscardPolicy:丢弃任务
3DiscardOldestPolicy:丢弃workQueue的头部任务

//最下保活work数量
private volatile int corePoolSize;

//work上限
private volatile int maximumPoolSize;
AI 代码解读

       我们尝试执行submit方法,下面是执行的关键路径,总结起来就是:如果Worker数量还没达到上限则继续创建,否则提交任务到workQueue,然后让worker来调度运行任务

step 1: <ExecutorService>
    Future<?> submit(Runnable task);  

    step 2:<AbstractExecutorService>
        public Future<?> submit(Runnable task) {
   
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    step 3:<Executor>
    void execute(Runnable command);

    step 4:<ThreadPoolExecutor>
     public void execute(Runnable command) {
   
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
   
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
    //提交我们的额任务到workQueue
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) //使用maximumPoolSize作为边界
            reject(command); //还不行?拒绝提交的任务
    }

    step 5:<ThreadPoolExecutor>
    private boolean addWorker(Runnable firstTask, boolean core) 


    step 6:<ThreadPoolExecutor>
    w = new Worker(firstTask); //包装任务
    final Thread t = w.thread; //获取线程(包含任务)
    workers.add(w);   // 任务被放到works中
    t.start(); //执行任务
AI 代码解读

       上面的流程是高度概括的,实际情况远比这复杂得多,但是我们关心的是怎么打通整个流程,所以这样分析问题是没有太大的问题的。观察上面的流程,我们发现其实关键的地方在于Worker,如果弄明白它是如何工作的,那么我们也就大概明白了线程池是怎么工作的了。下面分析一下Worker类。

image.png


       上面的图片展示了Worker的类关系图,关键在于他实现了Runnable接口,所以问题的关键就在于run方法上。在这之前,我们来看一下Worker类里面的关键成员:
final Thread thread; 

Runnable firstTask; //我们提交的任务,可能被立刻执行,也可能被放到队列里面
AI 代码解读

       thread是Worker的工作线程,上面的分析我们也发现了在addWorker中会获取worker里面的thread然后start,也就是这个线程的执行,而Worker实现了Runnable接口,所以在构造thread的时候Worker将自己传递给了构造函数,thread.start执行的其实就是Worker的run方法。下面是run方法的内容:

public void run() {
   
    runWorker(this);
}

final void runWorker(Worker w) {
   
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
   
        while (task != null || (task = getTask()) != null) {
   
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
   
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
   
                    task.run();
                } catch (RuntimeException x) {
   
                    thrown = x; throw x;
                } catch (Error x) {
   
                    thrown = x; throw x;
                } catch (Throwable x) {
   
                    thrown = x; throw new Error(x);
                } finally {
   
                    afterExecute(task, thrown);
                }
            } finally {
   
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
   
        processWorkerExit(w, completedAbruptly);
    }
}
AI 代码解读

       我们来分析一下runWorker这个方法,这就是整个线程池的核心。首先获取到了我们刚提交的任务firstTask,然后会循环从workQueue里面获取任务来执行,获取任务的方法如下:

private Runnable getTask() {
   
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
   
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
   
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
   
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
   
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
   
            timedOut = false;
        }
    }
}
AI 代码解读

       其实核心也就一句:

Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
AI 代码解读

       我们再回头看一下execute,其实我们上面只走了一条逻辑,在execute的时候,我们的worker的数量还没有到达我们设定的corePoolSize的时候,会走上面我们分析的逻辑,而如果达到了我们设定的阈值之后,execute中会尝试去提交任务,如果提交成功了就结束,否则会拒绝任务的提交。我们上面还提到一个成员:maximumPoolSize,其实线程池的最大的Worker数量应该是maximumPoolSize,但是我们上面的分析是corePoolSize,这是因为我们的private boolean addWorker(Runnable firstTask, boolean core)的参数core的值来控制的,core为true则使用corePoolSize来设定边界,否则使用maximumPoolSize来设定边界。
       直观的解释一下,当线程池里面的Worker数量还没有到corePoolSize,那么新添加的任务会伴随着产生一个新的worker,如果Worker的数量达到了corePoolSize,那么就将任务存放在阻塞队列中等待Worker来获取执行,如果没有办法再向阻塞队列放任务了,那么这个时候maximumPoolSize就变得有用了,新的任务将会伴随着产生一个新的Worker,如果线程池里面的Worker已经达到了maximumPoolSize,那么接下来提交的任务只能被拒绝策略拒绝了。可以参考下面的描述来理解:
       |---corePoolSize[创建]---||---workQueue[等待keepAliveTime]---||---maximumPoolSize[创建]---||---拒绝策略---|

 * When a new task is submitted in method {
   @link #execute(Runnable)},
 * and fewer than corePoolSize threads are running, a new thread is
 * created to handle the request, even if other worker threads are
 * idle.  If there are more than corePoolSize but less than
 * maximumPoolSize threads running, a new thread will be created only
 * if the queue is full.  By setting corePoolSize and maximumPoolSize
 * the same, you create a fixed-size thread pool. By setting
 * maximumPoolSize to an essentially unbounded value such as {
   @code
 * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
 * number of concurrent tasks. Most typically, core and maximum pool
 * sizes are set only upon construction, but they may also be changed
 * dynamically using {
   @link #setCorePoolSize} and {
   @link
 * #setMaximumPoolSize}.

在方法{
   @link #execute(Runnable)}中提交新任务时,
如果运行的线程小于corePoolSize,则创建新线程处理请求,即使其他工作线程闲置。
如果运行的线程大于corePoolSize,但是小于maximumPoolSize,当线程运行时,如果队列已满则会创建一个新线程
同样通过设置corePoolSize和maximumPoolSize,创建一个固定大小的线程池。通过设置maximumPoolSize到一个
本质上无界的值,比如{
   @code Integer.MAX_VALUE},您允许池容纳任意的并发任务的数量。
最典型的是核心池和最大池尺寸只在构造时设置,但也可以更改动态使用{
   @link #setCorePoolSize}和{
   @link #setMaximumPoolSize}。
AI 代码解读

       在此需要说明一点,有一个重要的成员:keepAliveTime,当线程池里面的线程数量超过corePoolSize了,那么超出的线程将会在空闲keepAliveTime之后被terminated。

 * If the pool currently has more than corePoolSize threads,
 * excess threads will be terminated if they have been idle for more
 * than the keepAliveTime (see {
   @link #getKeepAliveTime(TimeUnit)}).
AI 代码解读
目录
打赏
0
0
0
0
72
分享
相关文章
多线程与高并发学习:ThreadPoolExecutor源码解析
多线程与高并发学习:ThreadPoolExecutor源码解析
96 0
超硬核!ThreadPoolExecutor线程池源码解析(下)
ThreadPoolExecutor 6 线程池的工作流程 7 ThreadPoolExecutor 的执行方法 8 线程的拒绝策略 8.1 自定义拒绝策略
超硬核!ThreadPoolExecutor线程池源码解析(下)
超硬核!ThreadPoolExecutor线程池源码解析(上)
Executor & 概述 2 Executors中对线程池的实现 2.1 CachedThreadPool 2.2 FixedThreadPool 2.3 SingleThreadExecutor 2.3.1 newSingleThreadExecutor() 与newFixedThreadPool(1) 2.4 SingleThreadScheduledExrcutor 2.5 ScheduledThreadPool 5 ThreadPoolExecutor 6 线程池的工作流程 7 ThreadPoolExecutor 的执行方法 8 线程的拒绝策略
超硬核!ThreadPoolExecutor线程池源码解析(上)
万字长文阿粉带你解析 ThreadPoolExecutor(一)
你有没有这样的疑惑,为什么要用线程池呢?可能你会说,我可以复用已经创建的线程呀;线程是个重量级对象,为了避免频繁创建和销毁,使用线程池来管理最好了
万字长文阿粉带你解析 ThreadPoolExecutor(一)
高并发之——通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程
打开你的IDE,踏下心来,跟着文章看代码,相信你定能收货满满!!!
179 0
高并发之——通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程
【高并发】通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程
ThreadPoolExecutor类中存在一个workers工作线程集合,用户可以向线程池中添加需要执行的任务,workers集合中的工作线程可以直接执行任务,或者从任务队列中获取任务后执行。ThreadPoolExecutor类中提供了整个线程池从创建到执行任务,再到消亡的整个流程方法。本文,就结合ThreadPoolExecutor类的源码深度分析线程池执行任务的整体流程。
233 0
【高并发】通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程
【高并发】通过源码深度解析ThreadPoolExecutor类是如何保证线程池正确运行的
对于线程池的核心类ThreadPoolExecutor来说,有哪些重要的属性和内部类为线程池的正确运行提供重要的保障呢?今天我们就一起来深入探讨下这些问题!!
266 0
【高并发】通过源码深度解析ThreadPoolExecutor类是如何保证线程池正确运行的
万字长文阿粉带你解析 ThreadPoolExecutor(二)
你有没有这样的疑惑,为什么要用线程池呢?可能你会说,我可以复用已经创建的线程呀;线程是个重量级对象,为了避免频繁创建和销毁,使用线程池来管理最好了
ThreadPoolExecutor源码解析(二)
1.ThreadPoolExcuter运行实例   首先我们先看如何新建一个ThreadPoolExecutor去运行线程。然后深入到源码中去看ThreadPoolExecutor里面使如何运作的。 public class Test { public static void main(S.
1012 0

推荐镜像

更多