超硬核!ThreadPoolExecutor线程池源码解析(上)

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: Executor & 概述2 Executors中对线程池的实现2.1 CachedThreadPool2.2 FixedThreadPool2.3 SingleThreadExecutor2.3.1 newSingleThreadExecutor() 与newFixedThreadPool(1)2.4 SingleThreadScheduledExrcutor2.5 ScheduledThreadPool5 ThreadPoolExecutor6 线程池的工作流程7 ThreadPoolExecutor 的执行方法8 线程的拒绝策略

1 Executor & 概述


Executor是顶级接口。关于线程池的总览示意图如下图所示:


20200922203424107.png


申请线程实例时会先从核心线程corePool中获取,如果核心线程满了之后线程会先加入到工作队列中,工作队列也满了的话也允许继续申请,直至maxnumPoolSize。之后会执行拒绝策略RejectedExecutionHandler。


ThreadFactory是worker中构建线程实例的工厂。



使用线程池的好处如下:


可以复用线程、控制最大并发数。


实现任务线程队列缓存策略和拒绝机制。


实现如定时执行、周期执行等与时间相关的功能。


隔离线程环境。


比如,为交易服务和搜索服务分别开两个线程池,交易线程的资源消耗明显要大;因此,通过配置独立的线程池,将较慢的交易服务与搜索服务隔开,避免线程间互相影响。


Executor executor = new ExecutorSubClass(); //线程池实现类

executor.execute(new RunnableTask1());

executor.execute(new RunnableTask2());


Executors 类为 Executor 提供了工厂方法。ExecutorService 是 Executor 接口的默认实现,下面是使用 ExecutorService 创建线程的几种方式。

2 Executors中对线程池的实现

2.1 CachedThreadPool


public static void main(String[] args) {
    ExecutorService service = Executors.newCachedThreadPool();
    for(int i = 0;i < 5;i++){
        service.execute(new TestThread());
    }
    service.shutdown();
}


CachedThreadPool 会为每个任务都创建一个线程。


ExecutorService 对象是使用静态的 Executors 创建的,这个方法可以确定Executor类型。调用 shutDown 可以防止新任务提交给 ExecutorService,这个线程在 Executor 中所有任务完成后退出。



2.2 FixedThreadPool


FixedThreadPool 可以使用有限的线程集来启动多线程。


public static void main(String[] args) {
    ExecutorService service = Executors.newFixedThreadPool(5);
    for(int i = 0;i < 5;i++){
        service.execute(new TestThread());
    }
    service.shutdown();
}


FixedThreadPool 可以一次性的预先执行高昂的线程分配,因此也就可以限制线程的数量。因为不必为每个任务都固定的付出创建线程的时间开销,所以可以节省时间。



2.3 SingleThreadExecutor


SingleThreadExecutor 就是线程数量为1的 FixedThreadPool,如果向SingleThreadPool一次性提交了多个任务,那么这些任务将会排队,所有的任务都将使用相同的线程。SingleThreadPool 会序列化所有提交给他的任务,并会维护一个隐藏的挂起队列。


public static void main(String[] args) {
    ExecutorService service = Executors.newSingleThreadExecutor();
    for(int i = 0;i < 5;i++){
        service.execute(new TestThread());
    }
    service.shutdown();
}


可以用 SingleThreadExecutor 来确保任意时刻都只有唯一一个任务在运行。



2.3.1 newSingleThreadExecutor() 与newFixedThreadPool(1)


结合上面的介绍,自然会想到一个问题:既然已经有了newFixedThreadPool,为什么还要存在newSingleThreadExecutor这个方法?


结合jdk中的说明“Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.”。newSingleThreadExecutor和newFixedThreadPool(1)确实是有区别的,区别在于newSingleThreadExecutor返回的线程池保证不能被重新配置(重新调整线程池大小等)


对比源码:


public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue < Runnable > ());
}


public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue < Runnable > ());
}


二者及其相似,连ThreadPoolExecutor对象的参数值都一样的,只不过newFixedThreadPool返回了一个ThreadPoolExecutor对象,newSingleThreadExecutor返回了一个被FinalizableDelegatedExecutorService包装过的ThreadPoolExecutor对象,问题其实就出在FinalizableDelegatedExecutorService上。


容量为1的FixedThreadPool的属性(容量等)可以通过将其强转为ThreadPoolExecutor而被重新进行配置;

SingleThreadPool实际是一个FinalizableDelegatedExecutorService类的对象,把诸如setCorePoolSize的方法给去掉了,并且该类没有继承任何可以配置线程池的类,因此可以保证它不能被再次配置。


2.4 SingleThreadScheduledExrcutor


创建一个可以周期性执行任务的单线程线程池。


public class TestMain {
    //格式化
    static SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    //AtomicInteger用来计数
    static AtomicInteger number = new AtomicInteger();
    public static void main(String[] args) throws Exception {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        for (int i = 0; i < 3; i++) {
            executorService.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println("第" + number.incrementAndGet() + "周期线程运行当前时间【" + sim.format(new Date()) + "】");
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, 3L, TimeUnit.SECONDS);
        }
        System.out.println("主线程运行当前时间【" + sim.format(new Date()) + "】");
    }
}


2.5 ScheduledThreadPool


ScheduledThreadPoolExecutor,它可另行安排在给定的延迟后运行命令,或者定期执行命令。


ScheduledThreadPoolExecutor  scheduled = new ScheduledThreadPoolExecutor(2);
scheduled.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
            loge("time:");
      }
}, 0, 40, TimeUnit.MILLISECONDS);
//0表示首次执行任务的延迟时间,40表示每次执行任务的间隔时间,TimeUnit.MILLISECONDS执行的时间间隔数值单位


5 ThreadPoolExecutor


5.1 概述


上面说了那么多,其实都不是推荐的方法,阿里巴巴的编程手册中有这样的描述:


线程池不允许使用Executors去创建。


而是通过ThreadPoolExecutor的方式,


这样的处理方式让写的读者更加明确线程池的运行规则,规则资源耗尽的风险。


具体原因是:


fixedThreadPool 和 singleThreadExecutor 对于排队的队列没有数量限制,最大支持Integer.MAX_VALUE个;

cachedThreadPool 和 scheduledThreadPool 中最大线程数可以达到 Integer.MAX_VALUE 个;

当线程过多的时候,这些方法就容易造成OOM了


d6442d657f5e7b88bbbc9a65815dd5e0.png

20200922202849998.png


当我们去看 Executors 的源码就会发现,Executors.newFixedThreadPool 、Executors.newSingleThreadPool 、Executors.newCachedThreadPool 、Executors.newScheduledThreadPool等方法的底层都是  ThreadPoolExecutor 实现的,其中执行周期任务得益于DelayedWorkedQueue的使用,而这些线程池又不是被推荐使用的,所以有必要好好研究一下 ThreadPoolExecutor 以便自定义线程池,这样才可以更加明确的线程池的运行规则,规避资源耗尽的风险。



ThreadPoolExecutor 的核心参数值得是他在构造时需要传递的参数,其构造参数如下:


public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }


public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }


int corePoolSize: 线程池的常驻核心线程数


   如果设置为0,则表示在没有任何任务时,销毁线程池;

   如果大于0,即使没有任务时也会保证线程池的线程数量等于此值。

   需要注意的是,如果此值设置的比较小,则会频繁的创建和销毁线程。如果设置的比较大,则会浪费系统资源。


int maximumPoolSize:线程池最大可以创建的线程数


  官方规定此参数必须大于0,也必须大于等于 corePoolSize ,此值只有在任务比较多,且任务队列中已被存满时才会用到。

 


long keepAliveTime : 线程的存活时间


 当线程池空闲时并且超过了此时间,多余的线程就会销毁,直到线程池中的线程数量等于corePoolSize 。


 如果 maximumPoolSize 等于 corePoolSize,那么线程池在空闲的时候不会销毁任何线程。

 

TimeUnit unit:存活时间的单位,是配合 keepAliveTime  参数共同使用的。


BlockingQueue<Runnable> workQueue : 线程池执行的任务队列


 当线程池中的所有线程都在处理任务时,如果来了新任务就会缓存到次任务队列中排队等待执行。


ThreadFactory threadFactory:线程的创建工厂

   此参数一般用的较少,如果创建线程时不指定此参数,则会使用默认的现场创建工厂的方法来创建线程。



RejectedExecutionHandler handler :指定线程池的拒绝策略。


 当线程池的任务已经在缓存队列 workQueue 中存储满了之后,并且不能创建新的线程来执此任务时,就会用到此拒绝策略。


5.2 先从CAPACITY的初始化开始说起:


@Native public static final int SIZE = 32;
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
private static final int RUNNING    = -1 << COUNT_BITS;    // 1110 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN   =  0 << COUNT_BITS;    // 0000 0000 0000 0000 0000 0000 0000
private static final int STOP       =  1 << COUNT_BITS;    // 0010 0000 0000 0000 0000 0000 0000
private static final int TIDYING    =  2 << COUNT_BITS;    // 0100 0000 0000 0000 0000 0000 0000
private static final int TERMINATED =  3 << COUNT_BITS;    // 0110 0000 0000 0000 0000 0000 0000


COUNT_BITS 值为29,CAPACITY的计算如下图:


20200923210934181.png


为什么最后要 - 1,原因是和一下两个方法有关,获得运行状态和获得当前活动线程数:


// 获取运行状态 RUNNING/SHUTDOWN/STOP/TIDYING/TERMINATED
private static int runStateOf(int c)     { 
    // - CAPACITY = 1110 0000 0000 0000 0000 0000 0000 0000
    return c & ~CAPACITY; 
}
// 取出低29位的值,表示获得当前活动的线程数
private static int workerCountOf(int c)  { 
    // CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111
    return c & CAPACITY; 
}


前三位是0,后29位是1的序列可以非常方便的取出c的前三位和后29位,即,runState和workerCount,是存储在一个叫ctl的变量中的。


5.3 线程池运行状态和活动线程数


2020092321234223.png


RUNNING状态可以转换为SHUTDOWN或STOP状态,具体如下:


20200923212512504.png


SHUTDOWN和STOP相当于一个中间状态,最终所有任务都停止了时会进入TIDYING状态。


5.4 上述线程池的构造函数


image.png


5.5 execute


执行流程如下图:


20200923220346112.png


简化版的流程图:


20200923220346112.png


对应的带注释的源代码如下:


public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /** ctl记录着workCount和runState */
            int c = ctl.get();
            /** case1:如果线程池中的线程数量小于核心线程数,那么创建线程并执行 */
            if (workerCountOf(c) < corePoolSize) {  // workerCountof(c):获取当前活动线程数
                /**
                 * 在线程池中新建一个新的线程
                 * command:需要执行的Runnable线程
                 * true:新增线程时,【当前活动的线程数】是否 < corePoolSize
                 * false:新增线程时,【当前活动的线程数】是否 < maximumPoolSize
                 */
                if (addWorker(command, true))
                    return;
                // 添加新线程失败,则重新获取【当前活动的线程数】
                c = ctl.get();
            }
            /** 第二步:如果当前线程池是运行状态 且 任务添加到队列成功(即,case2:如果workCount >= corePoolSize) */
            if (isRunning(c) && workQueue.offer(command)) { // 添加command到workQueue队列中
                // 重新获取ctl
                int recheck = ctl.get();
                // 再次check一下,当前线程池是否是运行状态,如果不是运行时状态,则把刚刚放入到workQueue队列中的command移除掉
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0){  // 如果【当前活动的线程数】为0,则执行addWorker方法
 /**
                     * null:只创建线程,但不去启动
                     * false:添加线程时,根据maximumPoolSize来判断
                     *
                     * 如果workerCountOf(recheck) > 0,则直接返回,在队列中的command稍后会出队列并且执行
                     */
                    addWorker(null, false);
                }
                /**
                 * 第三步:满足以下两种条件之一,进入第三步判断语句
                 *  case1:线程池不是正在运行状态,即:isRunning(c)==false
                 *  case2:workCount>=corePoolSize 并且添加workQueue队列失败。即:workQueue.offer(command)== false
                 *  由于第二个参数传的是false,所以如果workCount < maximumPoolSize,则创建执行线程;否则进入方法体执行reject(command)
                 *  如果是true的话则和核心线程数进行比较
                 */
            }
            else if (!addWorker(command, false))
                reject(command);    // 执行线程创建失败的拒绝策略
        }

5.5.1 addWorker


execute中经常使用到的一个重要方法就是addWorker(),效果是在线程池中添加一个新的线程。


20200923221421824.png


private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 步骤1:试图将workerCount + 1
        for (;;) {
            int c = ctl.get();
            // 获得运行状态runState
            int rs = runStateOf(c);
            /**
             * 只有如下两种情况可以新增worker,继续执行下去:
             * case one:rs==RUNNING
             * case two:rs==SHUTDOWN && firstTask ==null&&!workQueue.isEmpty()
             */
            if (rs >= SHUTDOWN &&   // 非RUNNING状态。线程池异常,表示不再去接收新的线程任务了,返回false
                    /**
                     * 当线程池是SHUTDOWN状态时,表示不再接收新的任务了,所以:
                     * case1:如果firstTask!=nul1,表示要添加新任务,则:新增worker失败,返回false。
                     * case2:如果firstTask==null并且workQueue为空,表示队列中的任务已经处理完毕,不需要添加新任务了。则:新增worker失败,返回false
                     */
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;
            /**
/**
             * 试图将workerCount + 1
             */
            for (;;) {
                // 获取当前线程池里的线程数
                int wc = workerCountOf(c);
                /**
                 * 满足如下任意情况,则新增worker失败,返回false
                 * case1:大于等于最大线程容量,即:536870911
                 * case2:当core是true时;>=  核心线程数
                 *        当core是false时:>= 最大线程数
                 */
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 当前工作线程数加1
                if (compareAndIncrementWorkerCount(c))
                    break retry;    // 成功加1,则跳出retry标识的这两层for循环
                c = ctl.get();  // Re-read ctl
                // 如果线程数加1操作失败,则获取当前最新的线程池运行状态,来判断与rs是否相同,如果不同,则说明方法处理期间线程池运行状态发生了变化,里新获取最新runState
                if (runStateOf(c) != rs)
                    continue retry; // 跳出内层for循环,继续从第一个for执行
            }
        }
            /**
             * 步骤二:创建Worker,加入集合workers中,并启动Worker线程
             */
        boolean workerStarted = false;  // 用于判断新的worker实例是否已经开始执行Thread.start
 boolean workerAdded = false;    // 用于判断新的worker实例是否已经被添加到线程池
        Worker w = null;    // AQS.Worker
        try {
            // 创建Worker实例,每个Worker对象都会创建一个线程
            w = new Worker(firstTask);
            // 获取包含work的线程
            final Thread t = w.thread;
            if (t != null) {
                // 重入锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();    // 如果能获得全局mainlock锁,则执行,否则阻塞
                try {
                    // 获得线程池当前的运行状态runStatus
                    int rs = runStateOf(ctl.get());
                    /**
                     * 满足如下任意条件,即可向线程池中添加线程:
                     * case1:线程池状态为RUNNING。
                     * case2:线程池状态为SHUTDOWN并且firstTask为空。
                     */
                    if (rs < SHUTDOWN ||    // 只有rs=RUNNING才满足
                            (rs == SHUTDOWN && firstTask == null)) {    //  线程池关闭,传入线程任务为null
                        if (t.isAlive()) // 因为t是新构建的线程,还没有启动,所以如果是alive状态,说明已经被启动
                            throw new IllegalThreadStateException();
                        workers.add(w); // workers中保存线程池中存在的所有work实例集合
                        int s = workers.size();
                         if (s > largestPoolSize)    // largestPoolSize用于记录线程池中曾经存在的最大的线程数量
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();  // 开启线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)    // 如果没有开启线程
                addWorkerFailed(w);
        }
        return workerStarted;
    }


相关文章
|
16天前
|
缓存 Java 调度
多线程编程核心:上下文切换深度解析
在现代计算机系统中,多线程编程已成为提高程序性能和响应速度的关键技术。然而,多线程编程中一个不可避免的概念就是上下文切换(Context Switching)。本文将深入探讨上下文切换的概念、原因、影响以及优化策略,帮助你在工作和学习中深入理解这一技术干货。
36 10
|
13天前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
17天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
47 12
|
16天前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
34 4
|
16天前
|
算法 调度 开发者
多线程编程核心:上下文切换深度解析
在多线程编程中,上下文切换是一个至关重要的概念,它直接影响到程序的性能和响应速度。本文将深入探讨上下文切换的含义、原因、影响以及如何优化,帮助你在工作和学习中更好地理解和应用多线程技术。
25 4
|
16天前
|
Java 调度 Android开发
安卓与iOS开发中的线程管理差异解析
在移动应用开发的广阔天地中,安卓和iOS两大平台各自拥有独特的魅力。如同东西方文化的差异,它们在处理多线程任务时也展现出不同的哲学。本文将带你穿梭于这两个平台之间,比较它们在线程管理上的核心理念、实现方式及性能考量,助你成为跨平台的编程高手。
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
55 1
C++ 多线程之初识多线程
|
2月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
26 3
|
2月前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
22 2
|
2月前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
35 2

推荐镜像

更多