【多线程】线程池如何复用,怎么才能让面试官听懂我说的?

简介: 今天来说一下面试中常问到问题,我们知道线程池是帮助我们对线程资源的管理,只有我们合理的使用使用线程池,他才能做到事倍功半,但是你知道线程池是如何复用的吗?

前言

大家好,我是小郭,今天来说一下面试中常问到问题,我们知道线程池是帮助我们对线程资源的管理,只有我们合理的使用使用线程池,他才能做到事倍功半,但是你知道线程池是如何复用的吗?

概要

  1. Worker执行任务模型
  2. Worker线程如何增加?
  3. Worker线程执行任务
  4. 原理总结

问题

  1. 线程如何复用?
  2. 线程如何回收? 等看完源码,我们再来回答

源码环节

1. Worker执行任务模型

图片来源网上

网络异常,图片无法展示
|

主要执行任务的三个角色 Worker-》WorkQueue-》Thread

2. addWorker源码

涉及参数:

firstTask:新线程首先要执行的任务,若没有则传null。

core:如果为true,则与 corePoolSize 绑定。为false,则与 maximumPoolSize 绑定。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
        //检测当前线程池状态
    for (;;) {
        //包含线程池状态和线程中有效线程数量
        int c = ctl.get();
        //通过高三位获取线程池状态
        int rs = runStateOf(c);
        // 线程池不处于RUNNABLE状态且现在状态为关系状态,未传入新任务,阻塞队列不为空 返回false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            //通过低29位计算线程池内有效线程的数量
            int wc = workerCountOf(c);
            //超过size,则返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //增加工作线程数    
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //再次获取线程池状态和线程中有效线程数量
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创建一个Worker对象,将新任务放入Worker
        w = new Worker(firstTask);
        //实例化一个线程
        final Thread t = w.thread;
        if (t != null) {
            //加锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                //再次检查线程池运行状态
                int rs = runStateOf(ctl.get());
                //线程池运行状态为RUNNABLE 或者 为已关闭且无新任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) //检测是否处于活动
                        throw new IllegalThreadStateException();
                    //将新任务放入set数组中    
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //添加任务工程    
                    workerAdded = true;
                }
            } finally {
                //释放锁
                mainLock.unlock();
            }
            //启动任务
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //添加失败则加入失败队列
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

上面做了四件事:

  1. 检查线程池状态
  2. 新建线程,使用Worker进行包装,放入Hashet数组中,最终真正执行任务的线程就放在Worker,所以新增一个addWorker就是新增一个线程。主要实现复用就是Worker类中的runWorker(this)
  3. 启动线程Start()
  4. 添加失败操作,移除Worker,减少WorkerCount

2.1 worker源码

worker实际的实现也是Runnbale,通过继承AbstractQueuedSynchronizer,来获取独占锁,所以上面调用start(),实际上也是执行Worker中的run方法

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable{
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
    ...
}    

3. Worker线程执行任务

Worker线程执行任务流程图

网络异常,图片无法展示
|

3.1 runWorker源码

final void runWorker(Worker w) {
    //获取当前的线程
    Thread wt = Thread.currentThread();
    //从Worker中取出新任务
    Runnable task = w.firstTask;
    //释放Worker
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //任务不为空
        while (task != null || (task = getTask()) != null) {
            //上锁,避免被其他线程中断
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            //检查状态如果线程池状态处于中断,则中断线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                //中断当前线程
                wt.interrupt();
            try {
                //执行beforeExecute
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //worker回收
        processWorkerExit(w, completedAbruptly);
    }
}

使线程池复用的核心方法,getTask() 只要不返回null,就会一直执行

3.2 getTask源码

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        //获取线程池运行状态和线程池内中有效线程池
        int c = ctl.get();
        //获取运行状态
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        // 线程不处于RUNNABLE 且(已关闭 或 worker队列为空)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //减少工作线程数
            decrementWorkerCount();
            return null;
        }
        //获取线程池内有效线程池数量
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        // 1.allowCoreThreadTimeOut变量默认是false,核心线程即使空闲也不会被销毁
        // 如果为true,核心线程在keepAliveTime内仍空闲则会被销毁。
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //如果运行线程数超过了最大线程数,但是缓存队列已经空了,这时递减worker数量。 
     // 如果有设置允许线程超时或者线程数量超过了核心线程数量,
        // 并且线程在规定时间内均未poll到任务且队列为空则递减worker数量
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            //如果为true,则调用poll方法获取任务,超过keepAliveTime,则会返回null
            //如果为false,则直接调用take方法获取任务,workQueue.offer(command) ,当任务加入时,再被唤醒,返回任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

3.3 addWorkerFailed源码

//回滚创建
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //新任务不为空
        if (w != null)
        //移除新任务
            workers.remove(w);
       //减少WorkerCount     
        decrementWorkerCount();
        //其他状态为Terminate
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

3.4 tryTerminate源码

final void tryTerminate() {
    for (;;) {
        //获取线程池状态和线程池中有效线程数量
        int c = ctl.get();
        // 正在运行 或 任务都已经终止 或 处于关闭状态且池和队列不为空 提前退出
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
            //工作线程数不为0,则中断空闲的worker
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
    //获取对象锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

4. 原理总结

线程池的优点就是提高对线程的管理,提高资源的利用率,控制线程的数量。

在线程池中,线程可以从阻塞队列 中不断  getTask()  新任务来执行,其核心原理在于线程池用Worker对Thread进行了封装,每调用一个 addWorker 就是等于新开一个线程,并不是每次执行任务都会调用  Thread.start()  来创建新线程,而是让每个线程去轮询,在这个轮询中,不停地检查是否还有任务等待被执行,如果有则直接去执行这个任务,也就是调用任务的 run() 方法,把 run() 方法当作和普通方法一样的地位去调用,相当于把每个任务的 run() 方法串联了起来,所以线程数量并不增加。

通过上面的了解,我们可以来回答下面这两个问题了

4.1 线程如何复用?

ThreadPoolExecutor 在创建线程时,会将线程封装成工作线程 Worker ,并放入工作线程组中,然后这个 Worker 反复从阻塞队列中拿任务去执行。

  1. 通过取 Worker 的 firstTask 或者通过 getTask 方法从 workQueue 中获取待执行的任务。
  2. 直接调用 task 的 run 方法来执行具体的任务(而不是新建线程)

4.2 线程如何回收?

  1. 获取不到任务时,回收自己
  2. 将worker移出线程池
  3. 线程池状态置为TERMINATED
try {
  while (task != null || (task = getTask()) != null) {
    ...
  }
} finally {
  processWorkerExit(w, completedAbruptly);
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
    ...
          //将worker移出线程池
            completedTaskCount += w.completedTasks;
            workers.remove(w);
    //修改线程池状态
        tryTerminate();
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

以上不足之处,请大家多多指出~ 下一篇,一起来学习线程池的拒绝策略

相关文章
|
1月前
|
存储 缓存 算法
面试官:单核 CPU 支持 Java 多线程吗?为什么?被问懵了!
本文介绍了多线程环境下的几个关键概念,包括时间片、超线程、上下文切换及其影响因素,以及线程调度的两种方式——抢占式调度和协同式调度。文章还讨论了减少上下文切换次数以提高多线程程序效率的方法,如无锁并发编程、使用CAS算法等,并提出了合理的线程数量配置策略,以平衡CPU利用率和线程切换开销。
面试官:单核 CPU 支持 Java 多线程吗?为什么?被问懵了!
|
1月前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
112 38
|
28天前
|
Java
.如何根据 CPU 核心数设计线程池线程数量
IO 密集型:核心数*2 计算密集型: 核心数+1 为什么加 1?即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。
36 4
|
1月前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
87 2
|
1月前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
97 4
|
1月前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
258 2
|
2月前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
24 2
|
1月前
|
数据采集 Java Python
爬取小说资源的Python实践:从单线程到多线程的效率飞跃
本文介绍了一种使用Python从笔趣阁网站爬取小说内容的方法,并通过引入多线程技术大幅提高了下载效率。文章首先概述了环境准备,包括所需安装的库,然后详细描述了爬虫程序的设计与实现过程,包括发送HTTP请求、解析HTML文档、提取章节链接及多线程下载等步骤。最后,强调了性能优化的重要性,并提醒读者遵守相关法律法规。
62 0
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
58 1
C++ 多线程之初识多线程
|
2月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
27 3