超硬核!ThreadPoolExecutor线程池源码解析(下)

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: ThreadPoolExecutor6 线程池的工作流程7 ThreadPoolExecutor 的执行方法8 线程的拒绝策略8.1 自定义拒绝策略

addWorker()中会取出当前队列中的第一个线程并调用start()方法开启


20200925160116759.png


其中线程 t 由以下代码获取


20200925160245239.png

观察Worker的构造方法,使用 getThreadFactory 工厂创建一个线程:


Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }


线程中的run方法调用runWorker方法,对应上图绿色部分


/** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }


5.5.2 tryTerminate*


20200925163438193.png


5.5.3 runWorker*


执行流程如下图所示:


20200925163438193.png


final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    /**
                     * 同时满足如下两个条件,则执行wt.interrupt()
                     * 1> 线程状态为STOP、TIDYING、TERMINATED或者(当前线程被中断(清除中断标记)并且线程状态为STOP、TIDYING、TERMINATED)
                     * 2> 当前线程wt是否被标记中断
                     */
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                            (Thread.interrupted() &&
                                    runStateAtLeast(ctl.get(), STOP))) &&
                            !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
} catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }


runWorker方法的核心是调用了getTask方法,即上图中绿色框部分。

5.5.4 getTask*


20200925164924934.png


private Runnable getTask() {
            // 表示上次从阻塞队列中获取任务是否超时
            boolean timedOut = false;
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
                /**
                 * 同时满足如下两点,则线程池中工作线程数减1,并返回null
                 * 1> rs >= SHUTDOWN,表示线程池不是RUNNING状态
                 * 2> rs >= STOP 表示STOP、TIDYING和TERMINATED这三个状态,它们共同点就是【不接收新任务】也【不处理workQueue里的线程任务】 or 阻塞队列workQueue为空
                 */
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount(); // 线程池中工作线程数 - 1
                    return null;
                }
                int wc = workerCountOf(c);
                // timed用于判断是否需要进行超时控制,当allowCoreThreadTimeOut被设置为ture或者活跃线程数大于核心线程数,则需要进行超时控制
                // allowCoreThreadTimeOut默认为false,则表明核心线程不允许超时
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                /**
                 * 同时满足以下两种情况,则线程池中工作线程数减1并返回nul1:
                 * case1:当前活动线程数workCount大于最大线程数,或者需要超时控制(timed = true)并且上次从阻塞队列中获取任务
 * case2:如果有效线程数大于1,或者阻塞队列为空。
                 */
                if ((wc > maximumPoolSize   // 因为在执行该方法的同时被执行了setMaximumPoolSize,导致最大线程数被缩小
                        || (timed && timedOut))
                        && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))  // 线程池中工作线程数 - 1
                        return null;
                    // 如果 - 1失败,则循环重试
                    continue;
                }
                try {
                    // 如果需要超时控制,则通过阻塞队列的pol1方法进行超时控制,
                    // 否则,直接获取,如果队列为空,task方法会阻塞直到队列不为空
                    Runnable r = timed ?
                            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :   // pol1-->若队列为空,返回null
                            workQueue.take();   // take-->若队列为空,发生阻塞,等待元素
                    if (r != null)
                        return r;
                    // 如果r=nul1,表示超时了,则timeOut设置为true,标记为上一次超时状态
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }



7 ThreadPoolExecutor 的执行方法


执行方法有两个,分别是 execute() 和 submit(),最主要的区别就是 submit() 方法可以接受线程池执行的返回值,而 execute() 不能接收返回值。


示例代码:


public static void main(String[] args) throws Exception{
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20));
        // execute 使用例子
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("我是一个 execute");
            }
        });
        // submit 使用例子
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "我是一个 submit";
            }
        });
        System.out.println(future.get());
    }


另一个区别是  execute() 方法 属于 顶级接口 Executor 的方法 ,而  submit() 属于 子类接口 ExecutorService 的方法。



8 线程的拒绝策略


当线程池中的任务队列已满,再有任务来添加时会先判断当前线程池中的线程数是否大于等于线程池的最大值,如果是,则会触发线程池的拒绝策略。



自带的拒绝策略有4种:


AbortPolicy :终止策略,线程池抛出一个异常并终止执行,是默认策略


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }


CallerRunsPolicy :把任务交给当前线程执行


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }


DiscardPolicy : 丢弃新进来的任务


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }


DiscardOldestPolicy :丢弃最早的任务(最先加入队列中的任务)


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }


8.1 自定义拒绝策略


自定义拒绝策略只需要新建一个 RejectedExecutionHandler 对象,然后重写其 rejectedExecution 方法即可。


ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1,
                3,
                10L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(2)
                , new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("我是自定义的拒绝策略");
            }
        });
        for (int i = 0; i < 6; i++) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }


9 面试题


1、如果线程池的当前大小还没有达到基本大小(poolSize < corePoolSize),那么就新增加一个线程处理新提交的任务;


2、如果当前大小已经达到了基本大小,就将新提交的任务提交到阻塞队列排队,等候处理workQueue.offer(command);


3、如果队列容量已达上限,并且当前大小poolSize没有达到maximumPoolSize,那么就新增线程来处理任务;


4、如果队列已满,并且当前线程数目也已经达到上限,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝新增加的任务。至于如何拒绝处理新增的任务,取决于线程池的饱和策略RejectedExecutionHandler。




corePoolSize=10, maximumPoolSize=10,queueSize = 10



20个并发任务过来,有多少个活跃线程?


10个。corePoolSize  =  maximumPoolSize 定长线程池,corePoolSize先打满,queueSize也满



队列里面有几个线程?


10个。corePoolSize先打满,queueSize也满。


如果有21个并发队列过来呢?


corePoolSize先打满,queueSize也满还多了一个,这个时候如果是丢弃策略就丢弃。




corePoolSize=10, maximumPoolSize=20,queueSize = 10?


20个并发任务过来,有多少个活跃线程?


10个。corePoolSize打满,queueSize 也满


21个并发任务过来,有多少个活跃线程?


11个。corePoolSize打满,queueSize 也满还多一个,maximumPoolSize = 20,所以corePoolSize + 1此时活跃的为11个。


30个并发任务过来,有多少个活跃线程?


20个。corePoolSize打满,queueSize 也满,corePoolSize扩充至20,此时有20个活跃任务。


31个并发任务过来,有多少个活跃线程?


20个。corePoolSize打满,queueSize 也满,corePoolSize扩充至20还多一个,如果是丢弃策略,此时有20个活跃任务。

相关文章
|
16天前
|
缓存 Java 调度
多线程编程核心:上下文切换深度解析
在现代计算机系统中,多线程编程已成为提高程序性能和响应速度的关键技术。然而,多线程编程中一个不可避免的概念就是上下文切换(Context Switching)。本文将深入探讨上下文切换的概念、原因、影响以及优化策略,帮助你在工作和学习中深入理解这一技术干货。
36 10
|
13天前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
17天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
47 12
|
16天前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
34 4
|
16天前
|
算法 调度 开发者
多线程编程核心:上下文切换深度解析
在多线程编程中,上下文切换是一个至关重要的概念,它直接影响到程序的性能和响应速度。本文将深入探讨上下文切换的含义、原因、影响以及如何优化,帮助你在工作和学习中更好地理解和应用多线程技术。
25 4
|
16天前
|
Java 调度 Android开发
安卓与iOS开发中的线程管理差异解析
在移动应用开发的广阔天地中,安卓和iOS两大平台各自拥有独特的魅力。如同东西方文化的差异,它们在处理多线程任务时也展现出不同的哲学。本文将带你穿梭于这两个平台之间,比较它们在线程管理上的核心理念、实现方式及性能考量,助你成为跨平台的编程高手。
|
19天前
|
存储 缓存 监控
Java中的线程池深度解析####
本文深入探讨了Java并发编程中的核心组件——线程池,从其基本概念、工作原理、核心参数解析到应用场景与最佳实践,全方位剖析了线程池在提升应用性能、资源管理和任务调度方面的重要作用。通过实例演示和性能对比,揭示合理配置线程池对于构建高效Java应用的关键意义。 ####
|
29天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
67 2
|
2月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
75 0
|
2月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
57 0

推荐镜像

更多