[并发编程] - Executor框架#ThreadPoolExecutor源码解读03

简介: [并发编程] - Executor框架#ThreadPoolExecutor源码解读03

20200902001421975.png


Pre

[并发编程] - Executor框架#ThreadPoolExecutor源码解读02

说了一堆结论性的东西,作为开发人员着实是不过瘾,那这里我们就来剖根问底来看下线程池是如何工作的。


execute源码分析


        ThreadPoolExecutor te = new ThreadPoolExecutor(5,10,500,TimeUnit.SECONDS,new ArrayBlockingQueue(5));
        for (int i = 0; i < 6; i++) {
            te.submit(()->{
                System.out.println("i m task :"+Thread.currentThread().getName());
            });
        }

使用ThreadPoolExecutor 自定义了一个线程池

参数对应如下

int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue


调用了 AbstractExecutorService#submit

   public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }


最核心的方法 execute ,由子类ThredPoolExecutor实现



202009080027558.png

 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
  /*
   * clt记录着runState和workerCount
   */
    int c = ctl.get();
  /*
   * workerCountOf方法取出低29位的值,表示当前活动的线程数;
   * 如果当前活动线程数小于corePoolSize,则新建一个线程放从入线程池中;
   * 并把任务添加到该线程中。
   */
    if (workerCountOf(c) < corePoolSize) {
    /*
     * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
     * 如果为true,根据corePoolSize来判断;
     * 如果为false,则根据maximumPoolSize来判断
     */
        if (addWorker(command, true))
            return;
  /*
   * 如果添加失败,则重新获取ctl值
   */
        c = ctl.get();
    }
  /*
   * 如果当前线程池是运行状态并且任务添加到队列成功
   */
    if (isRunning(c) && workQueue.offer(command)) {
    // 重新获取ctl值
        int recheck = ctl.get();
     // 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,
    // 这时需要移除该command
    // 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
        if (! isRunning(recheck) && remove(command))
            reject(command);
    /*
     * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
     * 这里传入的参数表示:
     * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
     * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
     * 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。
     */
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
  /*
   * 如果执行到这里,有两种情况:
   * 1. 线程池已经不是RUNNING状态;
   * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
   * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
   * 如果失败则拒绝该任务
   */
    else if (!addWorker(command, false))
        reject(command);
}

主要的流程,注释中也写的很清楚了

     /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */


简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:


如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;

如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;

如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;

如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。


Note : 这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。


20200908005604488.png


addWorker()解读

private boolean addWorker(Runnable firstTask, boolean core) {
 }


addWorker方法的主要工作是在线程池中创建一个新的线程并执行,


firstTask参数 用于指定新增的线程执行的第一个任务,

core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
    // 获取运行状态
        int rs = runStateOf(c);
    /*
     * 这个if判断
     * 如果rs >= SHUTDOWN,则表示此时不再接收新任务;
     * 接着判断以下3个条件,只要有1个不满足,则返回false:
     * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
     * 2. firsTask为空
     * 3. 阻塞队列不为空
     * 
     * 首先考虑rs == SHUTDOWN的情况
     * 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
     * 然后,如果firstTask为空,并且workQueue也为空,则返回false,
     * 因为队列中已经没有任务了,不需要再添加线程了
     */
     // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                        firstTask == null &&
                        ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 获取线程数
            int wc = workerCountOf(c);
            // 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
            // 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较,
            // 如果为false则根据maximumPoolSize来比较。
            // 
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 尝试增加workerCount,如果成功,则跳出第一个for循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 如果增加workerCount失败,则重新获取ctl的值
            c = ctl.get();  // Re-read ctl
            // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
     // 根据firstTask来创建Worker对象
        w = new Worker(firstTask);
     // 每一个Worker对象都会创建一个线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN表示是RUNNING状态;
                // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
                // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一个HashSet
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize记录着线程池中出现过的最大线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}


Worker解读

addWorker中多次提到了这个Work这个类, 其实就是 线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象

ThreadPoolExector中内部类 Worker

20200909015219592.png

private final class Worker extends AbstractQueuedSynchronizer  implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }


继承了AQS,并实现了Runnable接口 ,

两个比较重要的属性

  1. firstTask用它来保存传入的任务;
  2. thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。


在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。


Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的

相关文章
|
6月前
|
缓存 Java 调度
Java并发编程学习10-任务执行与Executor框架
【4月更文挑战第12天】本篇 重点讲解任务执行和 Executor框架的基础知识
64 4
Java并发编程学习10-任务执行与Executor框架
|
5月前
|
缓存 并行计算 Java
重温JAVA线程池精髓:Executor、ExecutorService及Executors的源码剖析与应用指南
重温JAVA线程池精髓:Executor、ExecutorService及Executors的源码剖析与应用指南
|
4月前
|
Java 开发者
Java并发编程之Executor框架详解
【7月更文挑战第18天】本文旨在深入探讨Java中的Executor框架,揭示其对并发编程的优化作用。通过解析Executor接口、ThreadPoolExecutor和ScheduledExecutorService等关键组件,文章展示了如何有效管理和控制线程资源。同时,结合实例分析,本文阐释了Executor框架在提高程序性能、简化代码结构方面的实际应用价值。旨在为Java开发者提供并发编程的高级工具,帮助他们构建更加高效、稳定的多线程应用。
|
6月前
|
并行计算 算法 Java
Java线程池——Executor框架
Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)。 Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。
|
安全 Java
异步编程 - 03 线程池ThreadPoolExecutor原理剖析&源码详解2
异步编程 - 03 线程池ThreadPoolExecutor原理剖析&源码详解2
55 0
|
Java
异步编程 - 03 线程池ThreadPoolExecutor原理剖析&源码详解1
异步编程 - 03 线程池ThreadPoolExecutor原理剖析&源码详解
48 0
|
存储 Java C++
ThreadPoolExecutor 线程池执行流程及核心源码分析
ThreadPoolExecutor 线程池执行流程及核心源码分析
97 0
|
存储 监控 Java
[并发编程] - Executor框架#ThreadPoolExecutor源码解读02
[并发编程] - Executor框架#ThreadPoolExecutor源码解读02
111 0
|
监控 安全 Java
[并发编程] - Executor框架#ThreadPoolExecutor源码解读01
[并发编程] - Executor框架#ThreadPoolExecutor源码解读01
95 0
|
存储 缓存 Java
【并发编程】线程池及Executor框架
【并发编程】线程池及Executor框架