面试官: 有了解过线程池的工作原理吗?说说看

简介: 面试官: 有了解过线程池的工作原理吗?说说看

前言

目前正在出一个Java多线程专题长期系列教程,从入门到进阶含源码解读, 篇幅会较多, 喜欢的话,给个关注❤️ ~


本节主要带大家从ThreadPoolExecutor源码角度来了解一下线程池的工作原理,一起来看下吧~


Executor 接口

首先Executor这个接口是线程池实现的顶层接口类,我们上节遇到的

ExecutorService也是继承了Executor

public interface ExecutorService extends Executor {...}
复制代码


ExecutorService的上层AbstractExecutorService这个抽象类实现了接口ExecutorService

public abstract class AbstractExecutorService implements ExecutorService {...}
复制代码


ThreadPoolExecutor继承了AbstractExecutorService

public class ThreadPoolExecutor extends AbstractExecutorService {...}
复制代码


ThreadPoolExecutor这个类我们需要重点看一下,它是接口的实现类,我们以newCachedThreadPool为例

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
复制代码


可以看到内部其实还是调用了ThreadPoolExecutor,我们再看newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
复制代码


内部也是调了它, 下面我们就看下这个类


ThreadPoolExecutor

首先我们从构造函数看起,它主要有四个构造函数


构造函数一

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
复制代码


一共有五个参数:

  • corePoolSize - 保留在池中的线程数,即使是空闲的,除非设置allowCoreThreadTimeOut
  • maximumPoolSize – 池中允许的最大线程数
  • keepAliveTime – 当线程数大于核心时,这是多余的空闲线程在终止前等待新任务的最长时间。
  • unit – keepAliveTime参数的时间单位
  • workQueue – 用于在执行任务之前保存任务的队列。此队列将仅保存由execute方法提交的Runnable任务。


构造函数二

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
复制代码

其它参数同上

  • threadFactory 执行器创建新线程时使用的工厂


构造函数三

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
复制代码
  • handler 由于达到线程边界和队列容量而阻塞执行时使用的处理程序


构造函数四

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
复制代码

这个把之前都综合了一下,其实可以看到前几个内部都调用了this,调用自身,也就是调用这个构造函数,进行一些初始化


BlockingQueue 阻塞队列

有几个参数比较好理解,我们来看下这个参数workQueue, 它是一个阻塞队列,这里简要给大家提一下,这块内容也比较重要,后边会专门去讲


BlockingQueue本身是一个还接口,它有几个比较常用的阻塞队列

  • LinkedBlockingQueue  链式阻塞队列,底层数据结构是链表
  • ArrayBlockingQueue 数组阻塞队列,底层数据结构是数组,需要指定队列的大小。
  • SynchronousQueue 同步队列,内部容量为0,每个put操作必须等待一个take操作
  • DelayQueue 延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素


ThreadFactory 线程工厂

这个是线程工厂类,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等, 同样它也是一个接口,我们在ThreadPoolExecutor内部看到了 Executors.defaultThreadFactory(),这个是一个默认工厂

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
复制代码

没有指定参数,就会默认创建DefaultThreadFactory,还有其它的factory,大家可以自行看下,区别就在创建线程时指定的参数


RejectedExecutionHandler 拒绝策略

RejectedExecutionHandler同样是一个接口,这个处理器是用来专门处理拒绝的任务,也就是ThreadPoolExecutor无法处理的程序。同理,我们可以看到ThreadPoolExecutor内部有调了defaultHandler

private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
复制代码


这个是默认的拒绝策略, 可以看到它的默认处理是抛出拒绝的异常

public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
复制代码


再带大家看下另外的策略, DiscardPolicy,这个策略不会抛出异常,它会丢弃这个任务

public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
复制代码


DiscardOldestPolicy 该策略丢弃最旧的未处理请求,然后重试execute ,除非执行程序被关闭,在这种情况下任务被丢弃。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 判断是否关闭
            if (!e.isShutdown()) {
                e.getQueue().poll();
                // 任务重试
                e.execute(r);
            }
        }
    }
复制代码


CallerRunsPolicy 直接在execute方法的调用线程中运行被拒绝的任务,除非执行程序已关闭,在这种情况下,任务将被丢弃。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 直接判断是否关闭 未关闭就执行
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
复制代码


线程调度策略

看完构造函数,下面看下它的一些常量

private static final int COUNT_BITS = Integer.SIZE - 3;
// runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
复制代码


通过变量名,我们大致知道是用来表示线程池的状态。线程池本身有一个调度线程,这个线程就是用于管理整个线程池的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等,所以它本身也有上面的状态值。


当线程池被创建后就会处于RUNNING状态, 主池控制状态ctl是一个原子整数

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
复制代码


调用shutdown()方法后处于SHUTDOWN状态,线程池不能接受新的任务,清除一些空闲worker,不会等待阻塞队列的任务完成。

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
复制代码


另外,还有一个shutdownNow,调用后处于STOP状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            // 中断所有线程
            interruptWorkers();
            // 将任务队列排空到一个新列表中 这里要注意下
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
复制代码


当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。接着会执行terminated()函数。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
             // 不等于0时  中断任务线程
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 将状态设置为 TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 终止 
                        terminated();
                    } finally {
                        // 执行完  terminated 转为 TERMINATED状态
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
复制代码


execute

这个是执行任务的核心方法,我们一起看一下

public void execute(Runnable command) {
        // 如果任务不存在 抛空异常
        if (command == null)
            throw new NullPointerException();
        // 获取当前状态值
        int c = ctl.get();
        // 当前线程数小于corePoolSize,则调用addWorker创建核心线程执行任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果不小于corePoolSize,则将任务添加到workQueue队列。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 如果isRunning返回false(状态检查),则remove这个任务,然后执行拒绝策略。
            if (! isRunning(recheck) && remove(command))
                reject(command);
                // 线程池处于running状态,但是没有线程,则创建线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果放入workQueue失败,则创建非核心线程执行任务,
        else if (!addWorker(command, false))
            // 如果这时创建失败,就会执行拒绝策略。
            reject(command);
    }
复制代码

在源码中,我们可以看到,多次进行了isRunning判断。在多线程的环境下,线程池的状态是多变的。很有可能刚获取线程池状态后线程池状态就改变了


总结

下面给大家简要的总结一下线程池的处理流程

  1. 线程总数量小于线程池中保留的线程数量(corePoolSize),无论线程是否空闲,都会新建一个核心线程执行任务,这一步需要获取全局锁
  2. 线程总数量大于corePoolSize时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行,从而达到线程的复用
  3. 当缓存队列满了,会创建非核心线程去执行这个任务。
  4. 缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取拒绝策略进行处理。


结束语

它的源码还是比较长的,一篇文章说不清楚,有兴趣的同学可以通过本篇文章的理解继续阅读它的源码。

下一节, 继续带大家详探讨ThreadPoolExecutor中是如何进行线程复用 ~

相关文章
|
23天前
|
存储 监控 算法
美团面试:说说 G1垃圾回收 底层原理?说说你 JVM 调优的过程 ?
尼恩提示: G1垃圾回收 原理非常重要, 是面试的重点, 大家一定要好好掌握
美团面试:说说 G1垃圾回收 底层原理?说说你 JVM 调优的过程  ?
|
23天前
|
SQL 存储 关系型数据库
美团面试:binlog、redo log、undo log的底层原理是什么?它们分别实现ACID的哪个特性?
老架构师尼恩在其读者交流群中分享了关于 MySQL 中 redo log、undo log 和 binlog 的面试题及其答案。这些问题涵盖了事务的 ACID 特性、日志的一致性问题、SQL 语句的执行流程等。尼恩详细解释了这些日志的作用、所在架构层级、日志形式、缓存机制以及写文件方式等内容。他还提供了多个面试题的详细解答,帮助读者系统化地掌握这些知识点,提升面试表现。此外,尼恩还推荐了《尼恩Java面试宝典PDF》和其他技术圣经系列PDF,帮助读者进一步巩固知识,实现“offer自由”。
美团面试:binlog、redo log、undo log的底层原理是什么?它们分别实现ACID的哪个特性?
|
23天前
|
负载均衡 算法 Java
蚂蚁面试:Nacos、Sentinel了解吗?Springcloud 核心底层原理,你知道多少?
40岁老架构师尼恩分享了关于SpringCloud核心组件的底层原理,特别是针对蚂蚁集团面试中常见的面试题进行了详细解析。内容涵盖了Nacos注册中心的AP/CP模式、Distro和Raft分布式协议、Sentinel的高可用组件、负载均衡组件的实现原理等。尼恩强调了系统化学习的重要性,推荐了《尼恩Java面试宝典PDF》等资料,帮助读者更好地准备面试,提高技术实力,最终实现“offer自由”。更多技术资料和指导,可关注公众号【技术自由圈】获取。
蚂蚁面试:Nacos、Sentinel了解吗?Springcloud 核心底层原理,你知道多少?
|
23天前
|
SQL 关系型数据库 MySQL
阿里面试:MYSQL 事务ACID,底层原理是什么? 具体是如何实现的?
尼恩,一位40岁的资深架构师,通过其丰富的经验和深厚的技術功底,为众多读者提供了宝贵的面试指导和技术分享。在他的读者交流群中,许多小伙伴获得了来自一线互联网企业的面试机会,并成功应对了诸如事务ACID特性实现、MVCC等相关面试题。尼恩特别整理了这些常见面试题的系统化解答,形成了《MVCC 学习圣经:一次穿透MYSQL MVCC》PDF文档,旨在帮助大家在面试中展示出扎实的技术功底,提高面试成功率。此外,他还编写了《尼恩Java面试宝典》等资料,涵盖了大量面试题和答案,帮助读者全面提升技术面试的表现。这些资料不仅内容详实,而且持续更新,是求职者备战技术面试的宝贵资源。
阿里面试:MYSQL 事务ACID,底层原理是什么? 具体是如何实现的?
|
23天前
|
消息中间件 Java Linux
得物面试:什么是零复制?说说 零复制 底层原理?(吊打面试官)
尼恩,40岁老架构师,专注于技术分享与面试辅导。近期,尼恩的读者群中有小伙伴在面试一线互联网企业如得物、阿里、滴滴等时,遇到了关于零复制技术的重要问题。为此,尼恩系统化地整理了零复制的底层原理,包括RocketMQ和Kafka的零复制实现,以及DMA、mmap、sendfile等技术的应用。尼恩还计划推出一系列文章,深入探讨Netty、Kafka、RocketMQ等框架的零复制技术,帮助大家在面试中脱颖而出,顺利拿到高薪Offer。此外,尼恩还提供了《尼恩Java面试宝典》PDF等资源,助力大家提升技术水平。更多内容请关注尼恩的公众号【技术自由圈】。
得物面试:什么是零复制?说说 零复制 底层原理?(吊打面试官)
|
2月前
|
ARouter 测试技术 API
Android经典面试题之组件化原理、优缺点、实现方法?
本文介绍了组件化在Android开发中的应用,详细阐述了其原理、优缺点及实现方式,包括模块化、接口编程、依赖注入、路由机制等内容,并提供了具体代码示例。
44 2
|
1月前
|
Java 调度 Android开发
Android面试题之Kotlin中async 和 await实现并发的原理和面试总结
本文首发于公众号“AntDream”,详细解析了Kotlin协程中`async`与`await`的原理及其非阻塞特性,并提供了相关面试题及答案。协程作为轻量级线程,由Kotlin运行时库管理,`async`用于启动协程并返回`Deferred`对象,`await`则用于等待该对象完成并获取结果。文章还探讨了协程与传统线程的区别,并展示了如何取消协程任务及正确释放资源。
20 0
|
1月前
|
Java 编译器 程序员
【多线程】synchronized原理
【多线程】synchronized原理
51 0
|
1月前
|
Java 应用服务中间件 API
nginx线程池原理
nginx线程池原理
26 0
|
2月前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。