Java Review - 并发编程_ThreadPoolExecutor原理&源码剖析(下)

简介: Java Review - 并发编程_ThreadPoolExecutor原理&源码剖析(下)

首先来分析第一部分的代码6

    // 6 检查队列是否只在必要的时候为空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

展开!运算后等价于


rs >= SHUTDOWN &&
               (rs != SHUTDOWN ||
             firstTask != null ||
             workQueue.isEmpty())


也就是说下面几种情况下会返回false:


当前线程池状态为STOP,TIDYING,TERMINATED

当前线程池状态为SHUTDOWN并且已经有了第一个任务

当前线程池状态为SHUTDOWN并且任务队列为空

内层循环的作用是使用CAS操作增加线程数,代码(7.1)判断如果线程个数超限则返回false,否则执行代码(7.2)CAS操作设置线程个数,CAS成功则退出双循环,CAS失败则执行代码(7.3)看当前线程池的状态是否变化了,如果变了,则再次进入外层循环重新获取线程池状态,否则进入内层循环继续进行CAS尝试。


执行到第二部分的代码(8)时说明使用CAS成功地增加了线程个数,但是现在任务还没开始执行。这里使用全局的独占锁来控制把新增的Worker添加到工作集workers中。代码(8.1)创建了一个工作线程Worker。


代码(8.2)获取了独占锁,代码(8.3)重新检查线程池状态,这是为了避免在获取锁前其他线程调用了shutdown关闭了线程池。如果线程池已经被关闭,则释放锁,新增线程失败,否则执行代码(8.4)添加工作线程到线程工作集,然后释放锁。代码(8.5)判断如果新增工作线程成功,则启动工作线程。


工作线程Worker的执行


用户线程提交任务到线程池后,由Worker来执行。先看下Worker的构造函数。

    /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //创建一个线程
            this.thread = getThreadFactory().newThread(this);
        }


在构造函数内首先设置Worker的状态为-1,这是为了避免当前Worker在调用runWorker方法前被中断(当其他线程调用了线程池的shutdownNow时,如果Worker状态>=0则会中断该线程)。这里设置了线程的状态为-1,所以该线程就不会被中断了。在如下runWorker代码中,运行代码(9)时会调用unlock方法,该方法把status设置为了0,所以这时候调用shutdownNow会中断Worker线程。

  /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
  final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 9 将state 置为0 ,允许终端
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
          // 10 
              while (task != null || (task = getTask()) != null) {
              // 10.1 
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                  // 10.2 执行任务前干一些事情
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                      // 10.3 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                      // 10.4 执行任务完成后干一些事情
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 10.5 统计当前Worker完成了多少任务
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
          // 11 执行清理工作 
            processWorkerExit(w, completedAbruptly);
        }
    }

在如上代码(10)中,如果当前task==null或者调用getTask从任务队列获取的任务返回null,则跳转到代码(11)执行。


如果task不为null则执行代码(10.1)获取工作线程内部持有的独占锁,然后执行扩展接口代码(10.2)在具体任务执行前做一些事情。代码(10.3)具体执行任务,代码(10.4)在任务执行完毕后做一些事情,代码(10.5)统计当前Worker完成了多少个任务,并释放锁。


这里在执行具体任务期间加锁,是为了避免在任务运行期间,其他线程调用了shutdown后正在执行的任务被中断(shutdown只会中断当前被阻塞挂起的线程)


getTask()


如果当前task为空,则直接执行,否者调用getTask从任务队列获取一个任务执行,如果任务队列为空,则worker退出。


private Runnable getTask() {
   boolean timedOut = false; // Did the last poll() time out?
   retry:
   for (;;) {
       int c = ctl.get();
       int rs = runStateOf(c);
       // 如果当前线程池状态>=STOP 或者线程池状态为shutdown并且工作队列为空则,减少工作线程个数
       if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
           decrementWorkerCount();
           return null;
       }
       boolean timed;      // Are workers subject to culling?
       for (;;) {
           int wc = workerCountOf(c);
           timed = allowCoreThreadTimeOut || wc > corePoolSize;
           if (wc <= maximumPoolSize && ! (timedOut && timed))
               break;
           if (compareAndDecrementWorkerCount(c))
               return null;
           c = ctl.get();  // Re-read ctl
           if (runStateOf(c) != rs)
               continue retry;
           // else CAS failed due to workerCount change; retry inner loop
       }
       try {
           //根据timed选择调用poll还是阻塞的take
           Runnable r = timed ?
               workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
               workQueue.take();
           if (r != null)
               return r;
           timedOut = true;
       } catch (InterruptedException retry) {
           timedOut = false;
       }
   }
}


processWorkerExit

代码(11)执行清理任务,其代码如下。

  /**
     * Performs cleanup and bookkeeping for a dying worker. Called
     * only from worker threads. Unless completedAbruptly is set,
     * assumes that workerCount has already been adjusted to account
     * for exit.  This method removes thread from worker set, and
     * possibly terminates the pool or replaces the worker if either
     * it exited due to user task exception or if fewer than
     * corePoolSize workers are running or queue is non-empty but
     * there are no workers.
     *
     * @param w the worker
     * @param completedAbruptly if the worker died due to user exception
     */
     private void processWorkerExit(Worker w, boolean completedAbruptly) {
   if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
       decrementWorkerCount();
   //统计整个线程池完成的任务个数
   final ReentrantLock mainLock = this.mainLock;
   mainLock.lock();
   try {
       completedTaskCount += w.completedTasks;
       workers.remove(w);
   } finally {
       mainLock.unlock();
   }
   //尝试设置线程池状态为TERMINATED,如果当前是shutdonw状态并且工作队列为空
   //或者当前是stop状态当前线程池里面没有活动线程
   tryTerminate();
   //如果当前线程个数小于核心个数,则增加
   int c = ctl.get();
   if (runStateLessThan(c, STOP)) {
       if (!completedAbruptly) {
           int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
           if (min == 0 && ! workQueue.isEmpty())
               min = 1;
           if (workerCountOf(c) >= min)
               return; // replacement not needed
       }
       addWorker(null, false);
   }
}


shutdown


调用shutdown后,线程池就不会在接受新的任务了,但是工作队列里面的任务还是要执行的,但是该方法立刻返回的,并不等待队列任务完成在返回。

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess(); // 12 
            advanceRunState(SHUTDOWN);// 13 
            interruptIdleWorkers();// 14 
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();// 15 
    }


代码(12)检查看是否设置了安全管理器,是则看当前调用shutdown命令的线程是否有关闭线程的权限,如果有权限则还要看调用线程是否有中断工作线程的权限,如果没有权限则抛出SecurityException或者NullPointerException异常。


其中代码(13)的内容如下,如果当前线程池状态>=SHUTDOWN则直接返回,否则设置为SHUTDOWN状态。

  private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }


代码(14)的内容如下,其设置所有空闲线程的中断标志。这里首先加了全局锁,同时只有一个线程可以调用shutdown方法设置中断标志。然后尝试获取Worker自己的锁,获取成功则设置中断标志。由于正在执行的任务已经获取了锁,所以正在执行的任务没有被中断。这里中断的是阻塞到getTask()方法并企图从队列里面获取任务的线程,也就是空闲线程。

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

在如上代码中,首先使用CAS设置当前线程池状态为TIDYING,如果设置成功则执行扩展接口terminated在线程池状态变为TERMINATED前做一些事情,然后设置当前线程池状态为TERMINATED。最后调用 termination.signalAll()激活因调用条件变量termination的await系列方法而被阻塞的所有线程


shutdownNow


调用shutdownNow方法后,线程池就不会再接受新的任务了,并且会丢弃工作队列里面的任务,正在执行的任务会被中断,该方法会立刻返回,并不等待激活的任务执行完成。返回值为这时候队列里面被丢弃的任务列表。

 public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess(); // 16
            advanceRunState(STOP);// 17 
            interruptWorkers();//18 
            tasks = drainQueue();//19 
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }


在如上代码中,首先调用代码(16)检查权限,然后调用代码(17)设置当前线程池状态为STOP,随后执行代码(18)中断所有的工作线程。这里需要注意的是,中断的所有线程包含空闲线程和正在执行任务的线程。

  private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }


然后代码(19)将当前任务队列里面的任务移动到tasks列表。


awaitTermination


等待线程池状态变为TERMINATED则返回,或者时间超时。由于整个过程独占锁,所以一般调用shutdown或者shutdownNow后使用。


    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }


如上代码首先获取独占锁,然后在无限循环内部判断当前线程池状态是否至少是TERMINATED状态,如果是则直接返回,否则说明当前线程池里面还有线程在执行 ,则看设置的超时时间nanos是否小于0,小于0则说明不需要等待,那就直接返回,如果大于0则调用条件变量termination的awaitNanos方法等待nanos时间,期望在这段时间内线程池状态变为TERMINATED。


在shutdown方法时提到过,当线程池状态变为TERMINATED时,会调用termination.signalAll()用来激活调用条件变量termination的await系列方法被阻塞的所有线程,所以如果在调用awaitTermination之后又调用了shutdown方法,并且在shutdown内部将线程池状态设置为TERMINATED,则termination.awaitNanos方法会返回。


另外在工作线程Worker的runWorker方法内,当工作线程运行结束后,会调用processWorkerExit方法,在processWorkerExit方法内部也会调用tryTerminate方法测试当前是否应该把线程池状态设置为TERMINATED,如果是,则也会调用termination.signalAll()用来激活调用线程池的awaitTermination方法而被阻塞的线程。


而且当等待时间超时后,termination.awaitNanos也会返回,这时候会重新检查当前线程池状态是否为TERMINATED,如果是则直接返回,否则继续阻塞挂起自己。


小结


线程池巧妙地使用一个Integer类型的原子变量来记录线程池状态和线程池中的线程个数。通过线程池状态来控制任务的执行,每个Worker线程可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销。

相关文章
|
14天前
|
XML Java 编译器
Java注解的底层源码剖析与技术认识
Java注解(Annotation)是Java 5引入的一种新特性,它提供了一种在代码中添加元数据(Metadata)的方式。注解本身并不是代码的一部分,它们不会直接影响代码的执行,但可以在编译、类加载和运行时被读取和处理。注解为开发者提供了一种以非侵入性的方式为代码提供额外信息的手段,这些信息可以用于生成文档、编译时检查、运行时处理等。
50 7
|
15天前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
21 0
|
7天前
|
存储 JavaScript 前端开发
基于 SpringBoot 和 Vue 开发校园点餐订餐外卖跑腿Java源码
一个非常实用的校园外卖系统,基于 SpringBoot 和 Vue 的开发。这一系统源于黑马的外卖案例项目 经过站长的进一步改进和优化,提供了更丰富的功能和更高的可用性。 这个项目的架构设计非常有趣。虽然它采用了SpringBoot和Vue的组合,但并不是一个完全分离的项目。 前端视图通过JS的方式引入了Vue和Element UI,既能利用Vue的快速开发优势,
52 13
|
20天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
50 12
|
15天前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。
|
16天前
|
人工智能 移动开发 安全
家政上门系统用户端、阿姨端源码,java家政管理平台源码
家政上门系统基于互联网技术,整合大数据分析、AI算法和现代通信技术,提供便捷高效的家政服务。涵盖保洁、月嫂、烹饪等多元化服务,支持多终端访问,具备智能匹配、在线支付、订单管理等功能,确保服务透明、安全,适用于家庭生活的各种需求场景,推动家政市场规范化发展。
|
6月前
|
Java C++
关于《Java并发编程之线程池十八问》的补充内容
【6月更文挑战第6天】关于《Java并发编程之线程池十八问》的补充内容
53 5
|
3月前
|
缓存 监控 Java
Java中的并发编程:理解并应用线程池
在Java的并发编程中,线程池是提高应用程序性能的关键工具。本文将深入探讨如何有效利用线程池来管理资源、提升效率和简化代码结构。我们将从基础概念出发,逐步介绍线程池的配置、使用场景以及最佳实践,帮助开发者更好地掌握并发编程的核心技巧。
|
5月前
|
安全 Java 开发者
Java中的并发编程:深入理解线程池
在Java的并发编程中,线程池是管理资源和任务执行的核心。本文将揭示线程池的内部机制,探讨如何高效利用这一工具来优化程序的性能与响应速度。通过具体案例分析,我们将学习如何根据不同的应用场景选择合适的线程池类型及其参数配置,以及如何避免常见的并发陷阱。
58 1
|
5月前
|
监控 Java
Java并发编程:深入理解线程池
在Java并发编程领域,线程池是提升应用性能和资源管理效率的关键工具。本文将深入探讨线程池的工作原理、核心参数配置以及使用场景,通过具体案例展示如何有效利用线程池优化多线程应用的性能。
下一篇
DataWorks