【多线程】线程池如何复用,怎么才能让面试官听懂我说的?

简介: 今天来说一下面试中常问到问题,我们知道线程池是帮助我们对线程资源的管理,只有我们合理的使用使用线程池,他才能做到事倍功半,但是你知道线程池是如何复用的吗?

前言

大家好,我是小郭,今天来说一下面试中常问到问题,我们知道线程池是帮助我们对线程资源的管理,只有我们合理的使用使用线程池,他才能做到事倍功半,但是你知道线程池是如何复用的吗?

概要

  1. Worker执行任务模型
  2. Worker线程如何增加?
  3. Worker线程执行任务
  4. 原理总结

问题

  1. 线程如何复用?
  2. 线程如何回收? 等看完源码,我们再来回答

源码环节

1. Worker执行任务模型

图片来源网上

网络异常,图片无法展示
|

主要执行任务的三个角色 Worker-》WorkQueue-》Thread

2. addWorker源码

涉及参数:

firstTask:新线程首先要执行的任务,若没有则传null。

core:如果为true,则与 corePoolSize 绑定。为false,则与 maximumPoolSize 绑定。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
        //检测当前线程池状态
    for (;;) {
        //包含线程池状态和线程中有效线程数量
        int c = ctl.get();
        //通过高三位获取线程池状态
        int rs = runStateOf(c);
        // 线程池不处于RUNNABLE状态且现在状态为关系状态,未传入新任务,阻塞队列不为空 返回false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            //通过低29位计算线程池内有效线程的数量
            int wc = workerCountOf(c);
            //超过size,则返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //增加工作线程数    
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //再次获取线程池状态和线程中有效线程数量
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创建一个Worker对象,将新任务放入Worker
        w = new Worker(firstTask);
        //实例化一个线程
        final Thread t = w.thread;
        if (t != null) {
            //加锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                //再次检查线程池运行状态
                int rs = runStateOf(ctl.get());
                //线程池运行状态为RUNNABLE 或者 为已关闭且无新任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) //检测是否处于活动
                        throw new IllegalThreadStateException();
                    //将新任务放入set数组中    
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //添加任务工程    
                    workerAdded = true;
                }
            } finally {
                //释放锁
                mainLock.unlock();
            }
            //启动任务
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //添加失败则加入失败队列
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

上面做了四件事:

  1. 检查线程池状态
  2. 新建线程,使用Worker进行包装,放入Hashet数组中,最终真正执行任务的线程就放在Worker,所以新增一个addWorker就是新增一个线程。主要实现复用就是Worker类中的runWorker(this)
  3. 启动线程Start()
  4. 添加失败操作,移除Worker,减少WorkerCount

2.1 worker源码

worker实际的实现也是Runnbale,通过继承AbstractQueuedSynchronizer,来获取独占锁,所以上面调用start(),实际上也是执行Worker中的run方法

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable{
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
    ...
}    

3. Worker线程执行任务

Worker线程执行任务流程图

网络异常,图片无法展示
|

3.1 runWorker源码

final void runWorker(Worker w) {
    //获取当前的线程
    Thread wt = Thread.currentThread();
    //从Worker中取出新任务
    Runnable task = w.firstTask;
    //释放Worker
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //任务不为空
        while (task != null || (task = getTask()) != null) {
            //上锁,避免被其他线程中断
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            //检查状态如果线程池状态处于中断,则中断线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                //中断当前线程
                wt.interrupt();
            try {
                //执行beforeExecute
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //worker回收
        processWorkerExit(w, completedAbruptly);
    }
}

使线程池复用的核心方法,getTask() 只要不返回null,就会一直执行

3.2 getTask源码

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        //获取线程池运行状态和线程池内中有效线程池
        int c = ctl.get();
        //获取运行状态
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        // 线程不处于RUNNABLE 且(已关闭 或 worker队列为空)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //减少工作线程数
            decrementWorkerCount();
            return null;
        }
        //获取线程池内有效线程池数量
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        // 1.allowCoreThreadTimeOut变量默认是false,核心线程即使空闲也不会被销毁
        // 如果为true,核心线程在keepAliveTime内仍空闲则会被销毁。
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //如果运行线程数超过了最大线程数,但是缓存队列已经空了,这时递减worker数量。 
     // 如果有设置允许线程超时或者线程数量超过了核心线程数量,
        // 并且线程在规定时间内均未poll到任务且队列为空则递减worker数量
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            //如果为true,则调用poll方法获取任务,超过keepAliveTime,则会返回null
            //如果为false,则直接调用take方法获取任务,workQueue.offer(command) ,当任务加入时,再被唤醒,返回任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

3.3 addWorkerFailed源码

//回滚创建
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //新任务不为空
        if (w != null)
        //移除新任务
            workers.remove(w);
       //减少WorkerCount     
        decrementWorkerCount();
        //其他状态为Terminate
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

3.4 tryTerminate源码

final void tryTerminate() {
    for (;;) {
        //获取线程池状态和线程池中有效线程数量
        int c = ctl.get();
        // 正在运行 或 任务都已经终止 或 处于关闭状态且池和队列不为空 提前退出
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
            //工作线程数不为0,则中断空闲的worker
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
    //获取对象锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

4. 原理总结

线程池的优点就是提高对线程的管理,提高资源的利用率,控制线程的数量。

在线程池中,线程可以从阻塞队列 中不断  getTask()  新任务来执行,其核心原理在于线程池用Worker对Thread进行了封装,每调用一个 addWorker 就是等于新开一个线程,并不是每次执行任务都会调用  Thread.start()  来创建新线程,而是让每个线程去轮询,在这个轮询中,不停地检查是否还有任务等待被执行,如果有则直接去执行这个任务,也就是调用任务的 run() 方法,把 run() 方法当作和普通方法一样的地位去调用,相当于把每个任务的 run() 方法串联了起来,所以线程数量并不增加。

通过上面的了解,我们可以来回答下面这两个问题了

4.1 线程如何复用?

ThreadPoolExecutor 在创建线程时,会将线程封装成工作线程 Worker ,并放入工作线程组中,然后这个 Worker 反复从阻塞队列中拿任务去执行。

  1. 通过取 Worker 的 firstTask 或者通过 getTask 方法从 workQueue 中获取待执行的任务。
  2. 直接调用 task 的 run 方法来执行具体的任务(而不是新建线程)

4.2 线程如何回收?

  1. 获取不到任务时,回收自己
  2. 将worker移出线程池
  3. 线程池状态置为TERMINATED
try {
  while (task != null || (task = getTask()) != null) {
    ...
  }
} finally {
  processWorkerExit(w, completedAbruptly);
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
    ...
          //将worker移出线程池
            completedTaskCount += w.completedTasks;
            workers.remove(w);
    //修改线程池状态
        tryTerminate();
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

以上不足之处,请大家多多指出~ 下一篇,一起来学习线程池的拒绝策略

相关文章
|
30天前
|
监控 Kubernetes Java
阿里面试:5000qps访问一个500ms的接口,如何设计线程池的核心线程数、最大线程数? 需要多少台机器?
本文由40岁老架构师尼恩撰写,针对一线互联网企业的高频面试题“如何确定系统的最佳线程数”进行系统化梳理。文章详细介绍了线程池设计的三个核心步骤:理论预估、压测验证和监控调整,并结合实际案例(5000qps、500ms响应时间、4核8G机器)给出具体参数设置建议。此外,还提供了《尼恩Java面试宝典PDF》等资源,帮助读者提升技术能力,顺利通过大厂面试。关注【技术自由圈】公众号,回复“领电子书”获取更多学习资料。
|
6天前
|
数据采集 Java Linux
面试大神教你:如何巧妙回答线程优先级这个经典考题?
大家好,我是小米。本文通过故事讲解Java面试中常见的线程优先级问题。小明和小华的故事帮助理解线程优先级:高优先级线程更可能被调度执行,但并非越高越好。实际开发需权衡业务需求,合理设置优先级。掌握线程优先级不仅能写出高效代码,还能在面试中脱颖而出。最后,小张因深入分析成功拿下Offer。希望这篇文章能助你在面试中游刃有余!
29 4
面试大神教你:如何巧妙回答线程优先级这个经典考题?
|
3天前
|
Python
python3多线程中使用线程睡眠
本文详细介绍了Python3多线程编程中使用线程睡眠的基本方法和应用场景。通过 `time.sleep()`函数,可以使线程暂停执行一段指定的时间,从而控制线程的执行节奏。通过实际示例演示了如何在多线程中使用线程睡眠来实现计数器和下载器功能。希望本文能帮助您更好地理解和应用Python多线程编程,提高程序的并发能力和执行效率。
32 20
|
2天前
|
Java 程序员 开发者
Java社招面试题:一个线程运行时发生异常会怎样?
大家好,我是小米。今天分享一个经典的 Java 面试题:线程运行时发生异常,程序会怎样处理?此问题考察 Java 线程和异常处理机制的理解。线程发生异常,默认会导致线程终止,但可以通过 try-catch 捕获并处理,避免影响其他线程。未捕获的异常可通过 Thread.UncaughtExceptionHandler 处理。线程池中的异常会被自动处理,不影响任务执行。希望这篇文章能帮助你深入理解 Java 线程异常处理机制,为面试做好准备。如果你觉得有帮助,欢迎收藏、转发!
35 14
|
9天前
|
缓存 安全 Java
面试中的难题:线程异步执行后如何共享数据?
本文通过一个面试故事,详细讲解了Java中线程内部开启异步操作后如何安全地共享数据。介绍了异步操作的基本概念及常见实现方式(如CompletableFuture、ExecutorService),并重点探讨了volatile关键字、CountDownLatch和CompletableFuture等工具在线程间数据共享中的应用,帮助读者理解线程安全和内存可见性问题。通过这些方法,可以有效解决多线程环境下的数据共享挑战,提升编程效率和代码健壮性。
37 6
|
9天前
|
安全 Java C#
Unity多线程使用(线程池)
在C#中使用线程池需引用`System.Threading`。创建单个线程时,务必在Unity程序停止前关闭线程(如使用`Thread.Abort()`),否则可能导致崩溃。示例代码展示了如何创建和管理线程,确保在线程中执行任务并在主线程中处理结果。完整代码包括线程池队列、主线程检查及线程安全的操作队列管理,确保多线程操作的稳定性和安全性。
|
25天前
|
算法 安全 Java
Java线程调度揭秘:从算法到策略,让你面试稳赢!
在社招面试中,关于线程调度和同步的相关问题常常让人感到棘手。今天,我们将深入解析Java中的线程调度算法、调度策略,探讨线程调度器、时间分片的工作原理,并带你了解常见的线程同步方法。让我们一起破解这些面试难题,提升你的Java并发编程技能!
65 16
|
1月前
|
安全 Java 程序员
面试直击:并发编程三要素+线程安全全攻略!
并发编程三要素为原子性、可见性和有序性,确保多线程操作的一致性和安全性。Java 中通过 `synchronized`、`Lock`、`volatile`、原子类和线程安全集合等机制保障线程安全。掌握这些概念和工具,能有效解决并发问题,编写高效稳定的多线程程序。
66 11
|
1月前
|
Java Linux 调度
硬核揭秘:线程与进程的底层原理,面试高分必备!
嘿,大家好!我是小米,29岁的技术爱好者。今天来聊聊线程和进程的区别。进程是操作系统中运行的程序实例,有独立内存空间;线程是进程内的最小执行单元,共享内存。创建进程开销大但更安全,线程轻量高效但易引发数据竞争。面试时可强调:进程是资源分配单位,线程是CPU调度单位。根据不同场景选择合适的并发模型,如高并发用线程池。希望这篇文章能帮你更好地理解并回答面试中的相关问题,祝你早日拿下心仪的offer!
38 6
|
1月前
|
缓存 安全 算法
Java 多线程 面试题
Java 多线程 相关基础面试题