【Java技术指南】「技术盲区」看看线程池是如何回收和维持运作线程的核心技术体系

简介: 【Java技术指南】「技术盲区」看看线程池是如何回收和维持运作线程的核心技术体系

线程池的背景介绍


线程池是很常用的并发框架,几乎所有需要异步和并发处理任务的程序都可用到线程池。使用线程池的好处如下:


  • 降低资源消耗:可重复利用已创建的线程池,降低创建和销毁带来的消耗;
  • 提高响应速度:任务到达时,可立即执行,无需等待线程创建;
  • 提高线程的可管理性:线程池可对线程统一分配、调优和监控。



线程池的难点和重点


让我们一起来看看线程池是如何回收和维持运作线程的核心技术体系。



线程池的前提和介绍


一般来讲JDK线程池就是ThreadPoolExecutor,大多数会对线程池执行任务的流程有了大体了解,实际上这个流程也十分通俗易懂,就不再赘述了,我之前的文章也介绍过了相关的技术点分析和介绍说明。


讲一讲线程池是如何回收线程的?

runWorker(Worker w)
复制代码



线程执行的基本流程


  1. 工作线程启动后,就进入runWorker(Worker w) 方法。
  2. 内部是一个while循环,循环判断任务是否为空,若不为空,执行任务;
  3. 若取不到任务,或发生异常,退出循环,执行processWorkerExit(w, completedAbruptly); 在这个方法里把工作线程移除掉。


读取任务的方式


主要有两种方式:一个是firstTask,这个是工作线程第一次跑的时候执行的任务,最多只能执行一次,后面得从getTask方法里取任务。


getTask是关键,在不考虑异常的场景下,返回,就表示退出循环,结束线程。

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
复制代码


重点关注的是getTask返回操作


一共有两种情况会返回


  • 第一种情况,线程池的状态已经是STOP,TIDYING, TERMINATED,或者是SHUTDOWN且工作队列为空;
if(runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
}
复制代码


  • 第二种情况,工作线程数已经大于最大线程数或当前工作线程已超时,且,还有其他工作线程或任务队列为空。
if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
}
复制代码



线程池回收工作线程


未调用shutdown ,RUNNING状态下全部任务执行完成的场景。


这种场景,会将工作线程的数量减少到核心线程数大小(如果本来就没有超过,则不需要回收)。


案例场景分析


  • 比如一个线程池,核心线程数为4,最大线程数为8。
  • 开始是4个工作线程,当任务把任务队列塞满,就得将工作线程增加到8。
  • 当后面任务执行到差不多了,线程取不到任务了,就会回收到4个工作线程的状态(取决于allowCoreThreadTimeOut的值,这里讨论默认值false的情况,即核心线程不会超时。如果为true,工作线程可以全部销毁)。
  • 可以先排除上面提到的条件1,线程池的状态已经是STOP,TIDYING, TERMINATED,或者是SHUTDOWN且工作队列为空。
  • 因为线程池一直是RUNNING,这条判断永远是false。在这个场景中,可以当条件1不存在。

下面分析取不出任务时线程是怎么运行的。

  1. 从任务队列取任务有两种方式,超时等待还是可以一直阻塞下去。决定因素是timed变量。该变量在前面赋值,如果当前线程数大于核心线程数,变量timed为true, 否则为false(当然是在:allowCoreThreadTimeOut为false的情况)。


现在讨论的是timed为true的情况。keepAliveTime一般不设置,默认值为0,所以基本上可以认为是不阻塞,马上返回取任务的结果,在线程超时等待唤醒之后,发现取不出任务,timeOut变为true,进入下一次循环。


  1. 来到1的判断,线程池一直RUNNING, 不进入代码块。
  2. 来到2的判断,这时任务队列为空,条件成立,CAS减少线程数,若成功,返回,否则,重复1。


注意,有可能多条线程同时通过2的判断,那会不会减少后线程的数量反而比预想的核心线程数少呢?


  • 比如当前线程数已经只有5条了,此时有两条线程同时唤醒,通过2的判断,同时减少数量,那剩下的线程数反而只有3条,和预期不一致。
  • 实际上是不会的,为了防止这种情况,compareAndDecrementWorkerCount(c) 用的是CAS方法,如果CAS失败就continue,进入下一轮循环,重新判断。
  • 像上述例子,其中一条线程会CAS失败,然后重新进入循环,发现工作线程数已经只有4了,timed为false,这条线程就不会被销毁,可以一直阻塞了(workQueue.take)。
  • 从这里也可以看出,虽然有核心线程数,但线程并没有区分是核心还是非核心,并不是先创建的就是核心,超过核心线程数后创建的就是非核心,最终保留哪些线程,完全随机。


shutdown


  • 调用shutdown ,全部任务执行完成的场景
  • 这种场景,无论是核心线程还是非核心线程,所有工作线程都会被销毁。
  • 在调用shutdown之后,会向所有的空闲工作线程发送中断信号。
public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
}
复制代码


最终传入false,调用下面这个方法。

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
}
复制代码
  • 可以看出,在发出中断信号前,会判断是否已经中断,以及要获得工作线程的独占锁。
  • 发出中断信号的时候,工作线程要么在getTask里准备获取任务,要么在执行任务,那就得等它执行完当前任务才会发出,因为工作线程在执行任务的时候,也会工作线程加锁。
  • 工作线程执行完任务,又跑到getTask里面去了。
  • 所以我们只要看getTask里面怎么应对中断异常的就可以了。


工作线程在getTask里,有两种可能。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
复制代码


  • 任务已全部完成,线程在阻塞等待。
  • 中断信号将其唤醒,从而进入下一轮循环。
  • 到达1处,符合条件,减少工作线程数量,并返回,由外层结束这条线程。
  • 这里的decrementWorkerCount是自旋式的,一定会减1。



任务还没有完全执行完


调用shutdown之后,未执行完的任务要执行完毕,线程池才能结束。所以此时有可能线程还在工作。


分两个阶段讨论


阶段1 任务较多,工作线程都能获得任务


分析一下收到中断信号后线程的表现。


  • 假设有线程A,正通过getTask里获取任务。此时A被中断,在获取任务时,无论是poll还是take,都会抛出中断异常。
  • 异常被捕获,重新进入下一轮循环,只要队列不为空,就可以继续取任务。


workQueue是BlockingQueue类型,以常见的LinkedBlockingQueue和ArrayBlockingQueue为例,加锁时都是调用lockInterruptibly,是响应中断的。


该方法又调用了AQS的acquireInterruptibly(int arg)。


acquireInterruptibly(int arg),无论是在入口处判断中断异常,还是在parkAndCheckInterrupt方法阻塞,被中断唤醒并判断中断异常时,均使用了Thread.interrupted。


这个方法会返回线程的中断状态,并把中断状态重置!也就是说,线程不再是中断状态了,这样在再次取任务时,就不会报错了。


因此,这对于正在准备取任务的线程,只是相当于浪费了一次循环,这可能是线程中断带来的副作用吧,当然,对整体的运行不影响。



任务刚好要执行完了


这时任务已经快取完了,比如有4条工作线程,只剩下2个任务,那就可能出现2条线程获得任务,2条线程阻塞。


因为在获取任务前的判断,没有加锁,那么会不会出现,所有线程都通过了前面的校验,来到workQueue获取任务的地方,刚好任务队列已经空了,线程全部阻塞了呢?因为shutdown 已经执行完毕,无法再向线程发出中断信号,从而线程一直在阻塞,无法被回收。


假设有A,B,C,D四条工作线程,同时通过了条件1和条件2的判断,来到取任务的地方。那么,工作队列至少还有一个任务,至少会有一条线程能取到任务。


假设A,B获得了任务,C,D阻塞。


A, B接下来的步骤是:


  • 任务执行完成后,再次getTask,此时符合条件1,返回,线程准备被回收。
  • processWorkerExit(Worker w, boolean completedAbruptly) 将线程回收。

回收就只是把线程干掉这么简单吗?来看看processWorkerExit(Worker w, boolean

completedAbruptly) 的方法。



可以看到,在里面除了workers.remove(w) 移除线,还调用了tryTerminate。


第一个判断条件没有一个子条件符合,跳过。第二个条件,工作线程还存在,那么随机中断一条空闲线程。


那么问题就来了,中断一条空闲线程,也没说是一定中断正在阻塞的线程啊。如果A, B同时退出,有没有可能出现A中断B, B中断A,AB互相中断,从而没有线程去中断唤醒阻塞的线程呢?


假设A能走到这里,说明A已经从工作线程的集合workers里面移除了(processWorkerExit(Worker w, boolean completedAbruptly) 在tryTerminate()之前,已经将其移除)。那么A中断B,B来到这里中断,就不会在workers里面找到A了。


也就是说,退出的线程不能互相中断,我从集合中退出后,中断了你,你不能中断我,因为我已经退出集合,你只能中断别人。那么,即使有N个线程同时退出,至少在最后,也会有一条线程,会中断剩余的阻塞线程。


== 阻塞的C,D中的任意一条被中断唤醒后,又会重复step1的动作,周而复始,直到所有阻塞线程都被中断,唤醒。


这也是为什么在tryTerminate里面,传入false,只需要中断任意一条空闲线程的原因。



总结


ThreadPoolExecutor回收工作线程,一条线程getTask返回,就会被回收。


两种场景。


  • 未调用shutdown ,RUNNING状态下全部任务执行完成的场景
  • 线程数量大于corePoolSize,线程超时阻塞,超时唤醒后CAS减少工作线程数,如果CAS成功,返回,线程回收。
  • 否则进入下一次循环。当工作者线程数量小于等于corePoolSize,就可以一直阻塞了。
  • 调用shutdown ,全部任务执行完成的场景
  • shutdown 会向所有线程发出中断信号,这时有两种可能。


所有线程都在阻塞


中断唤醒,进入循环,都符合第一个if判断条件,都返回,所有线程回收。



任务还没有完全执行完


至少会有一条线程被回收。在processWorkerExit(Worker w, boolean completedAbruptly)方法里会调用tryTerminate,向任意空闲线程发出中断信号。所有被阻塞的线程,最终都会被一个个唤醒,回收。





相关文章
|
1天前
|
Java 调度
Java中的线程池机制详解
Java中的线程池机制详解
|
1天前
|
存储 Java 数据库连接
Java中的数据持久化技术详解
Java中的数据持久化技术详解
|
2天前
|
安全 Java 测试技术
Java中的反射与元编程技术探秘
Java中的反射与元编程技术探秘
|
2天前
|
网络协议 Java
Java网络编程基础与Socket实现技术
Java网络编程基础与Socket实现技术
|
2天前
|
监控 Java 开发者
深入理解Java中的线程池实现及其优化
深入理解Java中的线程池实现及其优化
|
2天前
|
消息中间件 监控 Java
使用Java进行实时数据处理的工具和技术
使用Java进行实时数据处理的工具和技术
|
2天前
|
消息中间件 Java 开发者
Java中实现事件驱动架构的异步通信技术
Java中实现事件驱动架构的异步通信技术
|
2天前
|
存储 数据采集 搜索推荐
使用Java实现智能推荐系统的关键技术
使用Java实现智能推荐系统的关键技术
|
2月前
|
Java 调度
Java并发编程:深入理解线程池的原理与实践
【4月更文挑战第6天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将从线程池的基本原理入手,逐步解析其工作过程,以及如何在实际开发中合理使用线程池以提高程序性能。同时,我们还将关注线程池的一些高级特性,如自定义线程工厂、拒绝策略等,以帮助读者更好地掌握线程池的使用技巧。
|
10月前
|
算法 Java 调度
Java由浅入深理解线程池设计和原理1
Java由浅入深理解线程池设计和原理1
147 0