[并发编程] - Executor框架#ThreadPoolExecutor源码解读03

简介: [并发编程] - Executor框架#ThreadPoolExecutor源码解读03

20200902001421975.png


Pre

[并发编程] - Executor框架#ThreadPoolExecutor源码解读02

说了一堆结论性的东西,作为开发人员着实是不过瘾,那这里我们就来剖根问底来看下线程池是如何工作的。


execute源码分析


        ThreadPoolExecutor te = new ThreadPoolExecutor(5,10,500,TimeUnit.SECONDS,new ArrayBlockingQueue(5));
        for (int i = 0; i < 6; i++) {
            te.submit(()->{
                System.out.println("i m task :"+Thread.currentThread().getName());
            });
        }

使用ThreadPoolExecutor 自定义了一个线程池

参数对应如下

int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue


调用了 AbstractExecutorService#submit

   public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }


最核心的方法 execute ,由子类ThredPoolExecutor实现



202009080027558.png

 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
  /*
   * clt记录着runState和workerCount
   */
    int c = ctl.get();
  /*
   * workerCountOf方法取出低29位的值,表示当前活动的线程数;
   * 如果当前活动线程数小于corePoolSize,则新建一个线程放从入线程池中;
   * 并把任务添加到该线程中。
   */
    if (workerCountOf(c) < corePoolSize) {
    /*
     * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
     * 如果为true,根据corePoolSize来判断;
     * 如果为false,则根据maximumPoolSize来判断
     */
        if (addWorker(command, true))
            return;
  /*
   * 如果添加失败,则重新获取ctl值
   */
        c = ctl.get();
    }
  /*
   * 如果当前线程池是运行状态并且任务添加到队列成功
   */
    if (isRunning(c) && workQueue.offer(command)) {
    // 重新获取ctl值
        int recheck = ctl.get();
     // 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,
    // 这时需要移除该command
    // 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
        if (! isRunning(recheck) && remove(command))
            reject(command);
    /*
     * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
     * 这里传入的参数表示:
     * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
     * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
     * 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。
     */
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
  /*
   * 如果执行到这里,有两种情况:
   * 1. 线程池已经不是RUNNING状态;
   * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
   * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
   * 如果失败则拒绝该任务
   */
    else if (!addWorker(command, false))
        reject(command);
}

主要的流程,注释中也写的很清楚了

     /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */


简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:


如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;

如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;

如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;

如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。


Note : 这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。


20200908005604488.png


addWorker()解读

private boolean addWorker(Runnable firstTask, boolean core) {
 }


addWorker方法的主要工作是在线程池中创建一个新的线程并执行,


firstTask参数 用于指定新增的线程执行的第一个任务,

core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
    // 获取运行状态
        int rs = runStateOf(c);
    /*
     * 这个if判断
     * 如果rs >= SHUTDOWN,则表示此时不再接收新任务;
     * 接着判断以下3个条件,只要有1个不满足,则返回false:
     * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
     * 2. firsTask为空
     * 3. 阻塞队列不为空
     * 
     * 首先考虑rs == SHUTDOWN的情况
     * 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
     * 然后,如果firstTask为空,并且workQueue也为空,则返回false,
     * 因为队列中已经没有任务了,不需要再添加线程了
     */
     // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                        firstTask == null &&
                        ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 获取线程数
            int wc = workerCountOf(c);
            // 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
            // 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较,
            // 如果为false则根据maximumPoolSize来比较。
            // 
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 尝试增加workerCount,如果成功,则跳出第一个for循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 如果增加workerCount失败,则重新获取ctl的值
            c = ctl.get();  // Re-read ctl
            // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
     // 根据firstTask来创建Worker对象
        w = new Worker(firstTask);
     // 每一个Worker对象都会创建一个线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN表示是RUNNING状态;
                // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
                // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一个HashSet
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize记录着线程池中出现过的最大线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}


Worker解读

addWorker中多次提到了这个Work这个类, 其实就是 线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象

ThreadPoolExector中内部类 Worker

20200909015219592.png

private final class Worker extends AbstractQueuedSynchronizer  implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }


继承了AQS,并实现了Runnable接口 ,

两个比较重要的属性

  1. firstTask用它来保存传入的任务;
  2. thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。


在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。


Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的

相关文章
|
4天前
|
人工智能 运维 安全
|
2天前
|
人工智能 异构计算
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
|
10天前
|
人工智能 JavaScript 测试技术
Qwen3-Coder入门教程|10分钟搞定安装配置
Qwen3-Coder 挑战赛简介:无论你是编程小白还是办公达人,都能通过本教程快速上手 Qwen-Code CLI,利用 AI 轻松实现代码编写、文档处理等任务。内容涵盖 API 配置、CLI 安装及多种实用案例,助你提升效率,体验智能编码的乐趣。
833 109
|
4天前
|
机器学习/深度学习 人工智能 自然语言处理
B站开源IndexTTS2,用极致表现力颠覆听觉体验
在语音合成技术不断演进的背景下,早期版本的IndexTTS虽然在多场景应用中展现出良好的表现,但在情感表达的细腻度与时长控制的精准性方面仍存在提升空间。为了解决这些问题,并进一步推动零样本语音合成在实际场景中的落地能力,B站语音团队对模型架构与训练策略进行了深度优化,推出了全新一代语音合成模型——IndexTTS2 。
435 11
|
3天前
|
人工智能 测试技术 API
智能体(AI Agent)搭建全攻略:从概念到实践的终极指南
在人工智能浪潮中,智能体(AI Agent)正成为变革性技术。它们具备自主决策、环境感知、任务执行等能力,广泛应用于日常任务与商业流程。本文详解智能体概念、架构及七步搭建指南,助你打造专属智能体,迎接智能自动化新时代。
|
4天前
|
机器学习/深度学习 传感器 算法
Edge Impulse:面向微型机器学习的MLOps平台——论文解读
Edge Impulse 是一个面向微型机器学习(TinyML)的云端MLOps平台,致力于解决嵌入式与边缘设备上机器学习开发的碎片化与异构性难题。它提供端到端工具链,涵盖数据采集、信号处理、模型训练、优化压缩及部署全流程,支持资源受限设备的高效AI实现。平台集成AutoML、量化压缩与跨硬件编译技术,显著提升开发效率与模型性能,广泛应用于物联网、可穿戴设备与边缘智能场景。
188 127