面试官: 线程池是如何做到线程复用的?有了解过吗,说说看

简介: 面试官: 线程池是如何做到线程复用的?有了解过吗,说说看

前言

目前正在出一个Java多线程专题长期系列教程,从入门到进阶含源码解读, 篇幅会较多, 喜欢的话,给个关注❤️ ~


承接上节的问题,我们继续探讨ThreadPoolExecutor,一起来看下吧~


ThreadPoolExecutor中是如何做到线程复用的❓

我们知道,一个线程在创建的时候会指定一个线程任务,当执行完这个线程任务之后,线程自动销毁。但是线程池却可以复用线程,一个线程执行完线程任务后不销毁,继续执行另外的线程任务。那么它是如何做到的❓这得从addWorker()说起


addWorker()

  • 先看上半部分addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 对边界设定的检查
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
}
复制代码


retry:可能有些同学没用过,它只是一个标记,它的下一个标记就是for循环,在for循环里面调用continue/break再紧接着retry标记时,就表示从这个地方开始执行continue/break操作,但这不是我们关注的重点。


从上面的代码,我们可以看出,ThreadPoolExecutor在创建线程时,会将线程封装成工作线程worker,并放入工作线程组中,然后这个worker反复从阻塞队列中拿任务去执行。这个addWorkerexcute方法中调用的


  • 我们接着看下半部分
private boolean addWorker(Runnable firstTask, boolean core) {
        // 上半部分
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                // core是ture,需要创建的线程为核心线程,则先判断当前线程是否大于核心线程
                // 如果core是false,证明需要创建的是非核心线程,则先判断当前线程数是否大于总线程数
                // 如果不小于,则返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 下半部分
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 创建worker对象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 获取线程全局锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    // 判断线程池状态
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 将当前线程添加到线程组    
                        workers.add(w);
                        int s = workers.size();
                        // 如果线程组中的线程数大于最大线程池数 largestPoolSize赋值s
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 添加成功    
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 添加成功后执行线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 添加失败后执行 addWorkerFailed
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
复制代码


再看 addWorkerFailed(),与上边相反,相当于一个回滚操作,会移除失败的工作线程

private void addWorkerFailed(Worker w) {
        // 同样需要全局锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
复制代码


Worker

我们接着看Worker对象

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
        //.....
        // 省略下边代码
    }
复制代码


Worker类实现了Runnable接口,所以Worker也是一个线程任务。在构造方法中,创建了一个线程,回过头想想addWorker()里为啥可以t.start()应该很清楚了吧, 并且在构造方法中调用了线程工厂创建了一个线程实例,我们上节讲过线程工厂。其实这也不是关注的重点,重点是这个runWorker()

final void runWorker(Worker w) {
        // 获取当前的线程实例
        Thread wt = Thread.currentThread();
        // 直接从第一个任务开始执行 
        Runnable task = w.firstTask;
        // 获取完之后把worker的firstTask置为null 防止下次获取到
        w.firstTask = null;
        // 线程启动之后,通过unlock方法释放锁
        w.unlock(); // allow interrupts
        // 线程异常退出时 为 true
        boolean completedAbruptly = true;
        try {
            // Worker执行firstTask或从workQueue中获取任务,直到任务为空
            while (task != null || (task = getTask()) != null) {
                // 获取锁以防止在任务执行过程中发生中断
                w.lock();
                // 判断边界值 如果线程池中断 则中断线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 相当于钩子方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
复制代码


首先去执行创建这个worker时就有的任务,当执行完这个任务后,worker的生命周期并没有结束,在while循环中,worker会不断地调用getTask方法从阻塞队列中获取任务然后调用task.run()执行任务,从而达到复用线程的目的。只要getTask方法不返回null,此线程就不会退出。


我们接着看getTask()

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 如果运行线程数超过了最大线程数,但是缓存队列已经空了,这时递减worker数量。
            // 如果有设置允许线程超时或者线程数量超过了核心线程数量,并且线程在规定时间内均未poll到任务且队列为空则递减worker数量
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
            // 如果timed为true,则会调用workQueue的poll方法获取任务.
            // 超时时间是keepAliveTime。如果超过keepAliveTime时长,
            // 如果timed为false, 则会调用workQueue的take方法阻塞在当前。
            // 队列中有任务加入时,线程被唤醒,take方法返回任务,并执行。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
复制代码

大家有没有想过这里为啥要用takepoll,它们都是出队的操作,这么做有什么好处?


take & poll

我们说take()方法会将核心线程阻塞挂起,这样一来它就不会占用太多的cpu资源,直到拿到Runnable 然后返回。


如果allowCoreThreadTimeOut设置为true,那么核心线程就会去调用poll方法,因为poll可能会返回null,所以这时候核心线程满足超时条件也会被销毁

非核心线程会workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),如果超时还没有拿到,下一次循环判断compareAndDecrementWorkerCount就会返回null,Worker对象的run()方法循环体的判断为null,任务结束,然后线程被系统回收 。


再回头看一下runWorker()是不是设计的很巧妙~


结束语

本节内容不是很好理解,想继续探讨的同学可以继续阅读它的源码,这部分内容了解一下就好,其实我们从源码中可以看到大量的线程状态检查,代码写的很健壮,可以从中学习一下。下一节, 带大家学习一下阻塞队列BlockingQueue ~

相关文章
|
18天前
|
并行计算 算法 安全
面试必问的多线程优化技巧与实战
多线程编程是现代软件开发中不可或缺的一部分,特别是在处理高并发场景和优化程序性能时。作为Java开发者,掌握多线程优化技巧不仅能够提升程序的执行效率,还能在面试中脱颖而出。本文将从多线程基础、线程与进程的区别、多线程的优势出发,深入探讨如何避免死锁与竞态条件、线程间的通信机制、线程池的使用优势、线程优化算法与数据结构的选择,以及硬件加速技术。通过多个Java示例,我们将揭示这些技术的底层原理与实现方法。
71 3
|
2月前
|
存储 缓存 算法
面试官:单核 CPU 支持 Java 多线程吗?为什么?被问懵了!
本文介绍了多线程环境下的几个关键概念,包括时间片、超线程、上下文切换及其影响因素,以及线程调度的两种方式——抢占式调度和协同式调度。文章还讨论了减少上下文切换次数以提高多线程程序效率的方法,如无锁并发编程、使用CAS算法等,并提出了合理的线程数量配置策略,以平衡CPU利用率和线程切换开销。
面试官:单核 CPU 支持 Java 多线程吗?为什么?被问懵了!
|
2月前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
122 38
|
2月前
|
Java
.如何根据 CPU 核心数设计线程池线程数量
IO 密集型:核心数*2 计算密集型: 核心数+1 为什么加 1?即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。
82 4
|
2月前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
119 2
|
2月前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
408 2
|
5月前
|
存储 Java
【IO面试题 四】、介绍一下Java的序列化与反序列化
Java的序列化与反序列化允许对象通过实现Serializable接口转换成字节序列并存储或传输,之后可以通过ObjectInputStream和ObjectOutputStream的方法将这些字节序列恢复成对象。
|
2月前
|
存储 算法 Java
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
本文详解自旋锁的概念、优缺点、使用场景及Java实现。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
|
2月前
|
存储 缓存 Java
大厂面试必看!Java基本数据类型和包装类的那些坑
本文介绍了Java中的基本数据类型和包装类,包括整数类型、浮点数类型、字符类型和布尔类型。详细讲解了每种类型的特性和应用场景,并探讨了包装类的引入原因、装箱与拆箱机制以及缓存机制。最后总结了面试中常见的相关考点,帮助读者更好地理解和应对面试中的问题。
81 4
|
3月前
|
算法 Java 数据中心
探讨面试常见问题雪花算法、时钟回拨问题,java中优雅的实现方式
【10月更文挑战第2天】在大数据量系统中,分布式ID生成是一个关键问题。为了保证在分布式环境下生成的ID唯一、有序且高效,业界提出了多种解决方案,其中雪花算法(Snowflake Algorithm)是一种广泛应用的分布式ID生成算法。本文将详细介绍雪花算法的原理、实现及其处理时钟回拨问题的方法,并提供Java代码示例。
111 2

相关实验场景

更多