超硬核!ThreadPoolExecutor线程池源码解析(下)

本文涉及的产品
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介: ThreadPoolExecutor6 线程池的工作流程7 ThreadPoolExecutor 的执行方法8 线程的拒绝策略8.1 自定义拒绝策略

addWorker()中会取出当前队列中的第一个线程并调用start()方法开启


20200925160116759.png


其中线程 t 由以下代码获取


20200925160245239.png

观察Worker的构造方法,使用 getThreadFactory 工厂创建一个线程:


Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }


线程中的run方法调用runWorker方法,对应上图绿色部分


/** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }


5.5.2 tryTerminate*


20200925163438193.png


5.5.3 runWorker*


执行流程如下图所示:


20200925163438193.png


final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    /**
                     * 同时满足如下两个条件,则执行wt.interrupt()
                     * 1> 线程状态为STOP、TIDYING、TERMINATED或者(当前线程被中断(清除中断标记)并且线程状态为STOP、TIDYING、TERMINATED)
                     * 2> 当前线程wt是否被标记中断
                     */
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                            (Thread.interrupted() &&
                                    runStateAtLeast(ctl.get(), STOP))) &&
                            !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
} catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }


runWorker方法的核心是调用了getTask方法,即上图中绿色框部分。

5.5.4 getTask*


20200925164924934.png


private Runnable getTask() {
            // 表示上次从阻塞队列中获取任务是否超时
            boolean timedOut = false;
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
                /**
                 * 同时满足如下两点,则线程池中工作线程数减1,并返回null
                 * 1> rs >= SHUTDOWN,表示线程池不是RUNNING状态
                 * 2> rs >= STOP 表示STOP、TIDYING和TERMINATED这三个状态,它们共同点就是【不接收新任务】也【不处理workQueue里的线程任务】 or 阻塞队列workQueue为空
                 */
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount(); // 线程池中工作线程数 - 1
                    return null;
                }
                int wc = workerCountOf(c);
                // timed用于判断是否需要进行超时控制,当allowCoreThreadTimeOut被设置为ture或者活跃线程数大于核心线程数,则需要进行超时控制
                // allowCoreThreadTimeOut默认为false,则表明核心线程不允许超时
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                /**
                 * 同时满足以下两种情况,则线程池中工作线程数减1并返回nul1:
                 * case1:当前活动线程数workCount大于最大线程数,或者需要超时控制(timed = true)并且上次从阻塞队列中获取任务
 * case2:如果有效线程数大于1,或者阻塞队列为空。
                 */
                if ((wc > maximumPoolSize   // 因为在执行该方法的同时被执行了setMaximumPoolSize,导致最大线程数被缩小
                        || (timed && timedOut))
                        && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))  // 线程池中工作线程数 - 1
                        return null;
                    // 如果 - 1失败,则循环重试
                    continue;
                }
                try {
                    // 如果需要超时控制,则通过阻塞队列的pol1方法进行超时控制,
                    // 否则,直接获取,如果队列为空,task方法会阻塞直到队列不为空
                    Runnable r = timed ?
                            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :   // pol1-->若队列为空,返回null
                            workQueue.take();   // take-->若队列为空,发生阻塞,等待元素
                    if (r != null)
                        return r;
                    // 如果r=nul1,表示超时了,则timeOut设置为true,标记为上一次超时状态
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }



7 ThreadPoolExecutor 的执行方法


执行方法有两个,分别是 execute() 和 submit(),最主要的区别就是 submit() 方法可以接受线程池执行的返回值,而 execute() 不能接收返回值。


示例代码:


public static void main(String[] args) throws Exception{
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20));
        // execute 使用例子
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("我是一个 execute");
            }
        });
        // submit 使用例子
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "我是一个 submit";
            }
        });
        System.out.println(future.get());
    }


另一个区别是  execute() 方法 属于 顶级接口 Executor 的方法 ,而  submit() 属于 子类接口 ExecutorService 的方法。



8 线程的拒绝策略


当线程池中的任务队列已满,再有任务来添加时会先判断当前线程池中的线程数是否大于等于线程池的最大值,如果是,则会触发线程池的拒绝策略。



自带的拒绝策略有4种:


AbortPolicy :终止策略,线程池抛出一个异常并终止执行,是默认策略


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }


CallerRunsPolicy :把任务交给当前线程执行


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }


DiscardPolicy : 丢弃新进来的任务


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }


DiscardOldestPolicy :丢弃最早的任务(最先加入队列中的任务)


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }


8.1 自定义拒绝策略


自定义拒绝策略只需要新建一个 RejectedExecutionHandler 对象,然后重写其 rejectedExecution 方法即可。


ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1,
                3,
                10L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(2)
                , new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("我是自定义的拒绝策略");
            }
        });
        for (int i = 0; i < 6; i++) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }


9 面试题


1、如果线程池的当前大小还没有达到基本大小(poolSize < corePoolSize),那么就新增加一个线程处理新提交的任务;


2、如果当前大小已经达到了基本大小,就将新提交的任务提交到阻塞队列排队,等候处理workQueue.offer(command);


3、如果队列容量已达上限,并且当前大小poolSize没有达到maximumPoolSize,那么就新增线程来处理任务;


4、如果队列已满,并且当前线程数目也已经达到上限,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝新增加的任务。至于如何拒绝处理新增的任务,取决于线程池的饱和策略RejectedExecutionHandler。




corePoolSize=10, maximumPoolSize=10,queueSize = 10



20个并发任务过来,有多少个活跃线程?


10个。corePoolSize  =  maximumPoolSize 定长线程池,corePoolSize先打满,queueSize也满



队列里面有几个线程?


10个。corePoolSize先打满,queueSize也满。


如果有21个并发队列过来呢?


corePoolSize先打满,queueSize也满还多了一个,这个时候如果是丢弃策略就丢弃。




corePoolSize=10, maximumPoolSize=20,queueSize = 10?


20个并发任务过来,有多少个活跃线程?


10个。corePoolSize打满,queueSize 也满


21个并发任务过来,有多少个活跃线程?


11个。corePoolSize打满,queueSize 也满还多一个,maximumPoolSize = 20,所以corePoolSize + 1此时活跃的为11个。


30个并发任务过来,有多少个活跃线程?


20个。corePoolSize打满,queueSize 也满,corePoolSize扩充至20,此时有20个活跃任务。


31个并发任务过来,有多少个活跃线程?


20个。corePoolSize打满,queueSize 也满,corePoolSize扩充至20还多一个,如果是丢弃策略,此时有20个活跃任务。

相关文章
|
5天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
5天前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
|
9天前
|
开发工具
Flutter-AnimatedWidget组件源码解析
Flutter-AnimatedWidget组件源码解析
|
20天前
|
监控 网络协议 Java
Tomcat源码解析】整体架构组成及核心组件
Tomcat,原名Catalina,是一款优雅轻盈的Web服务器,自4.x版本起扩展了JSP、EL等功能,超越了单纯的Servlet容器范畴。Servlet是Sun公司为Java编程Web应用制定的规范,Tomcat作为Servlet容器,负责构建Request与Response对象,并执行业务逻辑。
Tomcat源码解析】整体架构组成及核心组件
|
1月前
|
存储 NoSQL Redis
redis 6源码解析之 object
redis 6源码解析之 object
53 6
|
3月前
|
XML Java 数据格式
深度解析 Spring 源码:从 BeanDefinition 源码探索 Bean 的本质
深度解析 Spring 源码:从 BeanDefinition 源码探索 Bean 的本质
76 3
|
27天前
|
测试技术 Python
python自动化测试中装饰器@ddt与@data源码深入解析
综上所述,使用 `@ddt`和 `@data`可以大大简化写作测试用例的过程,让我们能专注于测试逻辑的本身,而无需编写重复的测试方法。通过讲解了 `@ddt`和 `@data`源码的关键部分,我们可以更深入地理解其背后的工作原理。
23 1
|
1月前
|
开发者 Python
深入解析Python `httpx`源码,探索现代HTTP客户端的秘密!
深入解析Python `httpx`源码,探索现代HTTP客户端的秘密!
67 1
|
1月前
|
开发者 Python
深入解析Python `requests`库源码,揭开HTTP请求的神秘面纱!
深入解析Python `requests`库源码,揭开HTTP请求的神秘面纱!
121 1
|
2月前
|
负载均衡 Java Spring
@EnableFeignClients注解源码解析
@EnableFeignClients注解源码解析
58 14

推荐镜像

更多