超硬核!ThreadPoolExecutor线程池源码解析(下)

简介: ThreadPoolExecutor6 线程池的工作流程7 ThreadPoolExecutor 的执行方法8 线程的拒绝策略8.1 自定义拒绝策略

addWorker()中会取出当前队列中的第一个线程并调用start()方法开启


20200925160116759.png


其中线程 t 由以下代码获取


20200925160245239.png

观察Worker的构造方法,使用 getThreadFactory 工厂创建一个线程:


Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }


线程中的run方法调用runWorker方法,对应上图绿色部分


/** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }


5.5.2 tryTerminate*


20200925163438193.png


5.5.3 runWorker*


执行流程如下图所示:


20200925163438193.png


final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    /**
                     * 同时满足如下两个条件,则执行wt.interrupt()
                     * 1> 线程状态为STOP、TIDYING、TERMINATED或者(当前线程被中断(清除中断标记)并且线程状态为STOP、TIDYING、TERMINATED)
                     * 2> 当前线程wt是否被标记中断
                     */
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                            (Thread.interrupted() &&
                                    runStateAtLeast(ctl.get(), STOP))) &&
                            !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
} catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }


runWorker方法的核心是调用了getTask方法,即上图中绿色框部分。

5.5.4 getTask*


20200925164924934.png


private Runnable getTask() {
            // 表示上次从阻塞队列中获取任务是否超时
            boolean timedOut = false;
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
                /**
                 * 同时满足如下两点,则线程池中工作线程数减1,并返回null
                 * 1> rs >= SHUTDOWN,表示线程池不是RUNNING状态
                 * 2> rs >= STOP 表示STOP、TIDYING和TERMINATED这三个状态,它们共同点就是【不接收新任务】也【不处理workQueue里的线程任务】 or 阻塞队列workQueue为空
                 */
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount(); // 线程池中工作线程数 - 1
                    return null;
                }
                int wc = workerCountOf(c);
                // timed用于判断是否需要进行超时控制,当allowCoreThreadTimeOut被设置为ture或者活跃线程数大于核心线程数,则需要进行超时控制
                // allowCoreThreadTimeOut默认为false,则表明核心线程不允许超时
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                /**
                 * 同时满足以下两种情况,则线程池中工作线程数减1并返回nul1:
                 * case1:当前活动线程数workCount大于最大线程数,或者需要超时控制(timed = true)并且上次从阻塞队列中获取任务
 * case2:如果有效线程数大于1,或者阻塞队列为空。
                 */
                if ((wc > maximumPoolSize   // 因为在执行该方法的同时被执行了setMaximumPoolSize,导致最大线程数被缩小
                        || (timed && timedOut))
                        && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))  // 线程池中工作线程数 - 1
                        return null;
                    // 如果 - 1失败,则循环重试
                    continue;
                }
                try {
                    // 如果需要超时控制,则通过阻塞队列的pol1方法进行超时控制,
                    // 否则,直接获取,如果队列为空,task方法会阻塞直到队列不为空
                    Runnable r = timed ?
                            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :   // pol1-->若队列为空,返回null
                            workQueue.take();   // take-->若队列为空,发生阻塞,等待元素
                    if (r != null)
                        return r;
                    // 如果r=nul1,表示超时了,则timeOut设置为true,标记为上一次超时状态
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }



7 ThreadPoolExecutor 的执行方法


执行方法有两个,分别是 execute() 和 submit(),最主要的区别就是 submit() 方法可以接受线程池执行的返回值,而 execute() 不能接收返回值。


示例代码:


public static void main(String[] args) throws Exception{
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20));
        // execute 使用例子
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("我是一个 execute");
            }
        });
        // submit 使用例子
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "我是一个 submit";
            }
        });
        System.out.println(future.get());
    }


另一个区别是  execute() 方法 属于 顶级接口 Executor 的方法 ,而  submit() 属于 子类接口 ExecutorService 的方法。



8 线程的拒绝策略


当线程池中的任务队列已满,再有任务来添加时会先判断当前线程池中的线程数是否大于等于线程池的最大值,如果是,则会触发线程池的拒绝策略。



自带的拒绝策略有4种:


AbortPolicy :终止策略,线程池抛出一个异常并终止执行,是默认策略


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }


CallerRunsPolicy :把任务交给当前线程执行


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }


DiscardPolicy : 丢弃新进来的任务


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }


DiscardOldestPolicy :丢弃最早的任务(最先加入队列中的任务)


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }


8.1 自定义拒绝策略


自定义拒绝策略只需要新建一个 RejectedExecutionHandler 对象,然后重写其 rejectedExecution 方法即可。


ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1,
                3,
                10L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(2)
                , new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("我是自定义的拒绝策略");
            }
        });
        for (int i = 0; i < 6; i++) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }


9 面试题


1、如果线程池的当前大小还没有达到基本大小(poolSize < corePoolSize),那么就新增加一个线程处理新提交的任务;


2、如果当前大小已经达到了基本大小,就将新提交的任务提交到阻塞队列排队,等候处理workQueue.offer(command);


3、如果队列容量已达上限,并且当前大小poolSize没有达到maximumPoolSize,那么就新增线程来处理任务;


4、如果队列已满,并且当前线程数目也已经达到上限,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝新增加的任务。至于如何拒绝处理新增的任务,取决于线程池的饱和策略RejectedExecutionHandler。




corePoolSize=10, maximumPoolSize=10,queueSize = 10



20个并发任务过来,有多少个活跃线程?


10个。corePoolSize  =  maximumPoolSize 定长线程池,corePoolSize先打满,queueSize也满



队列里面有几个线程?


10个。corePoolSize先打满,queueSize也满。


如果有21个并发队列过来呢?


corePoolSize先打满,queueSize也满还多了一个,这个时候如果是丢弃策略就丢弃。




corePoolSize=10, maximumPoolSize=20,queueSize = 10?


20个并发任务过来,有多少个活跃线程?


10个。corePoolSize打满,queueSize 也满


21个并发任务过来,有多少个活跃线程?


11个。corePoolSize打满,queueSize 也满还多一个,maximumPoolSize = 20,所以corePoolSize + 1此时活跃的为11个。


30个并发任务过来,有多少个活跃线程?


20个。corePoolSize打满,queueSize 也满,corePoolSize扩充至20,此时有20个活跃任务。


31个并发任务过来,有多少个活跃线程?


20个。corePoolSize打满,queueSize 也满,corePoolSize扩充至20还多一个,如果是丢弃策略,此时有20个活跃任务。

相关文章
|
8月前
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
252 4
|
8月前
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
8月前
|
Java 调度
ThreadPoolExecutor解析
本文深入解析了Java中`ThreadPoolExecutor`的实现原理,帮助理解线程池的工作机制。核心内容包括任务队列`workQueue`、线程工厂`ThreadFactory`、拒绝策略`RejectedExecutionHandler`等关键成员的作用。通过`submit`和`execute`方法的执行流程,展示了线程池如何根据`corePoolSize`和`maximumPoolSize`动态调整线程数量,并结合`keepAliveTime`管理空闲线程。最后分析了`Worker`类的`run`方法,揭示线程池通过循环从队列中获取任务并执行的核心逻辑。
232 0
|
8月前
|
负载均衡 JavaScript 前端开发
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
1月前
|
Java
如何在Java中进行多线程编程
Java多线程编程常用方式包括:继承Thread类、实现Runnable接口、Callable接口(可返回结果)及使用线程池。推荐线程池以提升性能,避免频繁创建线程。结合同步与通信机制,可有效管理并发任务。
127 6
|
4月前
|
Java API 微服务
为什么虚拟线程将改变Java并发编程?
为什么虚拟线程将改变Java并发编程?
288 83
|
21天前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
179 0
|
2月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
189 16
|
6月前
|
机器学习/深度学习 消息中间件 存储
【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
228 0
|
9月前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
通过本文,您可以了解如何在业务线程中注册和处理Linux信号。正确处理信号可以提高程序的健壮性和稳定性。希望这些内容能帮助您更好地理解和应用Linux信号处理机制。
156 26

热门文章

最新文章

推荐镜像

更多
  • DNS