Java Review - 并发编程_ThreadPoolExecutor原理&源码剖析(下)

简介: Java Review - 并发编程_ThreadPoolExecutor原理&源码剖析(下)

首先来分析第一部分的代码6

    // 6 检查队列是否只在必要的时候为空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

展开!运算后等价于


rs >= SHUTDOWN &&
               (rs != SHUTDOWN ||
             firstTask != null ||
             workQueue.isEmpty())


也就是说下面几种情况下会返回false:


当前线程池状态为STOP,TIDYING,TERMINATED

当前线程池状态为SHUTDOWN并且已经有了第一个任务

当前线程池状态为SHUTDOWN并且任务队列为空

内层循环的作用是使用CAS操作增加线程数,代码(7.1)判断如果线程个数超限则返回false,否则执行代码(7.2)CAS操作设置线程个数,CAS成功则退出双循环,CAS失败则执行代码(7.3)看当前线程池的状态是否变化了,如果变了,则再次进入外层循环重新获取线程池状态,否则进入内层循环继续进行CAS尝试。


执行到第二部分的代码(8)时说明使用CAS成功地增加了线程个数,但是现在任务还没开始执行。这里使用全局的独占锁来控制把新增的Worker添加到工作集workers中。代码(8.1)创建了一个工作线程Worker。


代码(8.2)获取了独占锁,代码(8.3)重新检查线程池状态,这是为了避免在获取锁前其他线程调用了shutdown关闭了线程池。如果线程池已经被关闭,则释放锁,新增线程失败,否则执行代码(8.4)添加工作线程到线程工作集,然后释放锁。代码(8.5)判断如果新增工作线程成功,则启动工作线程。


工作线程Worker的执行


用户线程提交任务到线程池后,由Worker来执行。先看下Worker的构造函数。

    /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //创建一个线程
            this.thread = getThreadFactory().newThread(this);
        }


在构造函数内首先设置Worker的状态为-1,这是为了避免当前Worker在调用runWorker方法前被中断(当其他线程调用了线程池的shutdownNow时,如果Worker状态>=0则会中断该线程)。这里设置了线程的状态为-1,所以该线程就不会被中断了。在如下runWorker代码中,运行代码(9)时会调用unlock方法,该方法把status设置为了0,所以这时候调用shutdownNow会中断Worker线程。

  /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
  final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 9 将state 置为0 ,允许终端
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
          // 10 
              while (task != null || (task = getTask()) != null) {
              // 10.1 
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                  // 10.2 执行任务前干一些事情
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                      // 10.3 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                      // 10.4 执行任务完成后干一些事情
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 10.5 统计当前Worker完成了多少任务
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
          // 11 执行清理工作 
            processWorkerExit(w, completedAbruptly);
        }
    }

在如上代码(10)中,如果当前task==null或者调用getTask从任务队列获取的任务返回null,则跳转到代码(11)执行。


如果task不为null则执行代码(10.1)获取工作线程内部持有的独占锁,然后执行扩展接口代码(10.2)在具体任务执行前做一些事情。代码(10.3)具体执行任务,代码(10.4)在任务执行完毕后做一些事情,代码(10.5)统计当前Worker完成了多少个任务,并释放锁。


这里在执行具体任务期间加锁,是为了避免在任务运行期间,其他线程调用了shutdown后正在执行的任务被中断(shutdown只会中断当前被阻塞挂起的线程)


getTask()


如果当前task为空,则直接执行,否者调用getTask从任务队列获取一个任务执行,如果任务队列为空,则worker退出。


private Runnable getTask() {
   boolean timedOut = false; // Did the last poll() time out?
   retry:
   for (;;) {
       int c = ctl.get();
       int rs = runStateOf(c);
       // 如果当前线程池状态>=STOP 或者线程池状态为shutdown并且工作队列为空则,减少工作线程个数
       if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
           decrementWorkerCount();
           return null;
       }
       boolean timed;      // Are workers subject to culling?
       for (;;) {
           int wc = workerCountOf(c);
           timed = allowCoreThreadTimeOut || wc > corePoolSize;
           if (wc <= maximumPoolSize && ! (timedOut && timed))
               break;
           if (compareAndDecrementWorkerCount(c))
               return null;
           c = ctl.get();  // Re-read ctl
           if (runStateOf(c) != rs)
               continue retry;
           // else CAS failed due to workerCount change; retry inner loop
       }
       try {
           //根据timed选择调用poll还是阻塞的take
           Runnable r = timed ?
               workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
               workQueue.take();
           if (r != null)
               return r;
           timedOut = true;
       } catch (InterruptedException retry) {
           timedOut = false;
       }
   }
}


processWorkerExit

代码(11)执行清理任务,其代码如下。

  /**
     * Performs cleanup and bookkeeping for a dying worker. Called
     * only from worker threads. Unless completedAbruptly is set,
     * assumes that workerCount has already been adjusted to account
     * for exit.  This method removes thread from worker set, and
     * possibly terminates the pool or replaces the worker if either
     * it exited due to user task exception or if fewer than
     * corePoolSize workers are running or queue is non-empty but
     * there are no workers.
     *
     * @param w the worker
     * @param completedAbruptly if the worker died due to user exception
     */
     private void processWorkerExit(Worker w, boolean completedAbruptly) {
   if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
       decrementWorkerCount();
   //统计整个线程池完成的任务个数
   final ReentrantLock mainLock = this.mainLock;
   mainLock.lock();
   try {
       completedTaskCount += w.completedTasks;
       workers.remove(w);
   } finally {
       mainLock.unlock();
   }
   //尝试设置线程池状态为TERMINATED,如果当前是shutdonw状态并且工作队列为空
   //或者当前是stop状态当前线程池里面没有活动线程
   tryTerminate();
   //如果当前线程个数小于核心个数,则增加
   int c = ctl.get();
   if (runStateLessThan(c, STOP)) {
       if (!completedAbruptly) {
           int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
           if (min == 0 && ! workQueue.isEmpty())
               min = 1;
           if (workerCountOf(c) >= min)
               return; // replacement not needed
       }
       addWorker(null, false);
   }
}


shutdown


调用shutdown后,线程池就不会在接受新的任务了,但是工作队列里面的任务还是要执行的,但是该方法立刻返回的,并不等待队列任务完成在返回。

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess(); // 12 
            advanceRunState(SHUTDOWN);// 13 
            interruptIdleWorkers();// 14 
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();// 15 
    }


代码(12)检查看是否设置了安全管理器,是则看当前调用shutdown命令的线程是否有关闭线程的权限,如果有权限则还要看调用线程是否有中断工作线程的权限,如果没有权限则抛出SecurityException或者NullPointerException异常。


其中代码(13)的内容如下,如果当前线程池状态>=SHUTDOWN则直接返回,否则设置为SHUTDOWN状态。

  private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }


代码(14)的内容如下,其设置所有空闲线程的中断标志。这里首先加了全局锁,同时只有一个线程可以调用shutdown方法设置中断标志。然后尝试获取Worker自己的锁,获取成功则设置中断标志。由于正在执行的任务已经获取了锁,所以正在执行的任务没有被中断。这里中断的是阻塞到getTask()方法并企图从队列里面获取任务的线程,也就是空闲线程。

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

在如上代码中,首先使用CAS设置当前线程池状态为TIDYING,如果设置成功则执行扩展接口terminated在线程池状态变为TERMINATED前做一些事情,然后设置当前线程池状态为TERMINATED。最后调用 termination.signalAll()激活因调用条件变量termination的await系列方法而被阻塞的所有线程


shutdownNow


调用shutdownNow方法后,线程池就不会再接受新的任务了,并且会丢弃工作队列里面的任务,正在执行的任务会被中断,该方法会立刻返回,并不等待激活的任务执行完成。返回值为这时候队列里面被丢弃的任务列表。

 public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess(); // 16
            advanceRunState(STOP);// 17 
            interruptWorkers();//18 
            tasks = drainQueue();//19 
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }


在如上代码中,首先调用代码(16)检查权限,然后调用代码(17)设置当前线程池状态为STOP,随后执行代码(18)中断所有的工作线程。这里需要注意的是,中断的所有线程包含空闲线程和正在执行任务的线程。

  private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }


然后代码(19)将当前任务队列里面的任务移动到tasks列表。


awaitTermination


等待线程池状态变为TERMINATED则返回,或者时间超时。由于整个过程独占锁,所以一般调用shutdown或者shutdownNow后使用。


    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }


如上代码首先获取独占锁,然后在无限循环内部判断当前线程池状态是否至少是TERMINATED状态,如果是则直接返回,否则说明当前线程池里面还有线程在执行 ,则看设置的超时时间nanos是否小于0,小于0则说明不需要等待,那就直接返回,如果大于0则调用条件变量termination的awaitNanos方法等待nanos时间,期望在这段时间内线程池状态变为TERMINATED。


在shutdown方法时提到过,当线程池状态变为TERMINATED时,会调用termination.signalAll()用来激活调用条件变量termination的await系列方法被阻塞的所有线程,所以如果在调用awaitTermination之后又调用了shutdown方法,并且在shutdown内部将线程池状态设置为TERMINATED,则termination.awaitNanos方法会返回。


另外在工作线程Worker的runWorker方法内,当工作线程运行结束后,会调用processWorkerExit方法,在processWorkerExit方法内部也会调用tryTerminate方法测试当前是否应该把线程池状态设置为TERMINATED,如果是,则也会调用termination.signalAll()用来激活调用线程池的awaitTermination方法而被阻塞的线程。


而且当等待时间超时后,termination.awaitNanos也会返回,这时候会重新检查当前线程池状态是否为TERMINATED,如果是则直接返回,否则继续阻塞挂起自己。


小结


线程池巧妙地使用一个Integer类型的原子变量来记录线程池状态和线程池中的线程个数。通过线程池状态来控制任务的执行,每个Worker线程可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销。

相关文章
|
5月前
|
监控 Java API
现代 Java IO 高性能实践从原理到落地的高效实现路径与实战指南
本文深入解析现代Java高性能IO实践,涵盖异步非阻塞IO、操作系统优化、大文件处理、响应式网络编程与数据库访问,结合Netty、Reactor等技术落地高并发应用,助力构建高效可扩展的IO系统。
153 0
|
5月前
|
存储 缓存 安全
深入讲解 Java 并发编程核心原理与应用案例
本教程全面讲解Java并发编程,涵盖并发基础、线程安全、同步机制、并发工具类、线程池及实际应用案例,助你掌握多线程开发核心技术,提升程序性能与响应能力。
231 0
|
5月前
|
人工智能 安全 Java
Go与Java泛型原理简介
本文介绍了Go与Java泛型的实现原理。Go通过单态化为不同类型生成函数副本,提升运行效率;而Java则采用类型擦除,将泛型转为Object类型处理,保持兼容性但牺牲部分类型安全。两种机制各有优劣,适用于不同场景。
180 24
|
6月前
|
存储 缓存 Java
我们来详细讲一讲 Java NIO 底层原理
我是小假 期待与你的下一次相遇 ~
214 2
|
2月前
|
JSON 网络协议 安全
【Java】(10)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
165 1
|
2月前
|
JSON 网络协议 安全
【Java基础】(1)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
191 1
|
3月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案
Java 数据库 Spring
152 0
|
3月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
239 16