JDK 线程池如何保证核心线程不被销毁

简介: JDK 线程池如何保证核心线程不被销毁

前言

很早之前那个时候练习线程池, 就是感觉线程池类似于 ArrayList 这种集合类结构, 将 Thread 类存储, 来任务了就进行消费, 然鹅...

线程包装类

线程池并不是对 Thread 直接存储, 而是对 Thread 进行了一层包装, 包装类叫做 Worker

线程在线程池中的存储结构如下:

private final HashSet<Worker> workers = new HashSet<Worker>();

先看一下 Worker 类中的变量及方法

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {

    /**
     * 此线程为线程池中的工作线程
     */
    final Thread thread;

    /**
     * 指定线程运行的第一项任务
     * 第一项任务没有则为空
     */
    Worker(Runnable firstTask) {
        ...
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /**
     * 运行传入的 Runnable 任务
     */
    @Override
    public void run() {
        runWorker(this);
    }
}

通过 Worker 的构造方法和重写的 run 得知:

线程池提交的任务, 会由 Worker 中的 thread 进行执行调用

addWorker

这里还是要先放一下线程池的执行流程代码, 具体流程如下:

public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
          // 🌟【重点】关注方法
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    } else if (!addWorker(command, false))
        reject(command);
}

我们具体看一下 ThreadPoolExecutor#addWorker

和大纲没关系的代码和知识点不作讲解
private boolean addWorker(Runnable firstTask, boolean core) {
    ...
    // worker运行标识
    boolean workerStarted = false;  
      // worker添加标识
    boolean workerAdded = false;  
    Worker w = null;
    try {
          // 调用Worker构造方法
        w = new Worker(firstTask);
          // 获取Worker中工作线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                      // 如无异常, 将w添加至workers
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                          // 更新池内最大线程
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }

            if (workerAdded) {
                  // 启动Woker中thread工作线程
                t.start();
                  // 设置启动成功标识成功
                workerStarted = true;
            }
        }
    } finally {
          // 如果启动失败抛出异常终止, 将Worker从workers中移除
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

这里需要着重关心下 t.start(), t 是我们 Worker 中的工作线程

上文说到 Worker 实现了 Runnable 接口, 并重写了 run 方法, 所以 t.start()

最终还是会调用 Worker 中的 run()

@Override
public void run() {
    runWorker(this);
}

runWorker

ThreadPoolExecutor#runWorker 是具体执行线程池提交任务的方法, 大致思路如下:

1、获取 Worker 中的第一个任务

2、如果第一个任务不为空则执行具体流程

3、第一个任务为空则从阻塞队列中获取任务, 这一点也是核心线程不被回收的关键

runWorker() 中有两个扩展方法, beforeExecute、afterExecute, 在任务执行前后输出一些重要信息, 可用作与监控等...

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
          // 🌟【重点】getTask() 是核心线程不被回收的精髓
        while (task != null || (task = getTask()) != null) {
            w.lock();
            ...
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } ...
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                      // 🌟【重点】执行完任务后, 将Task置空
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
              // 退出Worker
        processWorkerExit(w, completedAbruptly);
    }
}

关键关注下 while 循环, 内部会在执行完流程后将 task 设置为空, 这样就会跳出循环

可以看到 processWorkerExit 是在 finally 语句块中, 相当于 获取不到阻塞队列任务就会去关闭 Worker

线程池是如何保证核心线程获取不到任务时不被销毁呢?

我们继续看一下 getTask() 中是如何获取任务

getTask

ThreadPoolExecutor#getTask 只做了一件事情, 就是从线程池的阻塞队列中获取任务返回

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

timed

重点代码从上面截出来, 首先是判断是否需要获取阻塞队列任务时在规定时间返回

/**
 * 【重点】判断线程是否超时, 这里会针对两种情况判断
 * 1. 设置allowCoreThreadTimeOut参数默认false
 *    如果为true表示核心线程也会进行超时回收
 * 2. 判断当前线程池的数量是否大于核心线程数
 * 
 * 这里参与了或运算符, 只要其中一个判断符合即为True
 */
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

根据 timed 属性, 判断获取阻塞队列中任务的方式

/**
 * 【重点】根据timed判断两种不同方式的任务获取
 * 1. 如果为True, 表示线程会根据规定时间调用阻塞队列任务
 * 2. 如果为False, 表示线程会进行阻塞调用
 */
Runnable r = timed ? 
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 
        workQueue.take();

这里就比较清楚了, 如果 timed 为 True, 线程经过 非核心线程过期时间后还没有获取到任务, 则方法结束, 后续会将 Worker 进行回收

如果没有设置 allowCoreThreadTimeOut 为 True, 以及当前线程池内线程数量不大于核心线程

那么从阻塞队列获取的话是 take(), take() 会 一直阻塞, 等待任务的添加返回

这样也就间接达到了核心线程数不会被回收的效果

getTask流程

图片本来是要放上面的, 但是画的实在有点不忍直视 🙃️

核心线程与非核心线程区别

核心线程只是一个叫法, 核心线程与非核心线程的区别是:

创建核心线程时会携带一个任务, 而非核心线程没有, 如果核心线程执行完第一个任务, 线程池内线程无区别

线程池是期望达到 corePoolSize 的并发状态, 不关心最先添加到线程池的核心线程是否会被销毁

相关文章
|
20天前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
46 1
|
1天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
2天前
|
监控 数据可视化 Java
使用JDK自带的监控工具JConsole来监控线程池的内存使用情况
使用JDK自带的监控工具JConsole来监控线程池的内存使用情况
|
12天前
|
监控 Java
线程池中线程异常后:销毁还是复用?技术深度剖析
在并发编程中,线程池作为一种高效利用系统资源的工具,被广泛用于处理大量并发任务。然而,当线程池中的线程在执行任务时遇到异常,如何妥善处理这些异常线程成为了一个值得深入探讨的话题。本文将围绕“线程池中线程异常后:销毁还是复用?”这一主题,分享一些实践经验和理论思考。
26 3
|
22天前
|
缓存 Java 调度
【Java 并发秘籍】线程池大作战:揭秘 JDK 中的线程池家族!
【8月更文挑战第24天】Java的并发库提供多种线程池以应对不同的多线程编程需求。本文通过实例介绍了四种主要线程池:固定大小线程池、可缓存线程池、单一线程线程池及定时任务线程池。固定大小线程池通过预设线程数管理任务队列;可缓存线程池能根据需要动态调整线程数量;单一线程线程池确保任务顺序执行;定时任务线程池支持周期性或延时任务调度。了解并正确选用这些线程池有助于提高程序效率和资源利用率。
31 2
|
18天前
|
数据采集 Java Python
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
|
19天前
|
Java
线程池中线程抛了异常,该如何处理?
【8月更文挑战第27天】在Java多线程编程中,线程池(ThreadPool)是一种常用的并发处理工具,它能够有效地管理线程的生命周期,提高资源利用率,并简化并发编程的复杂性。然而,当线程池中的线程在执行任务时抛出异常,如果不妥善处理,这些异常可能会导致程序出现未预料的行为,甚至崩溃。因此,了解并掌握线程池异常处理机制至关重要。
93 0
|
3天前
|
Java 数据库 Android开发
一个Android App最少有几个线程?实现多线程的方式有哪些?
本文介绍了Android多线程编程的重要性及其实现方法,涵盖了基本概念、常见线程类型(如主线程、工作线程)以及多种多线程实现方式(如`Thread`、`HandlerThread`、`Executors`、Kotlin协程等)。通过合理的多线程管理,可大幅提升应用性能和用户体验。
23 15
一个Android App最少有几个线程?实现多线程的方式有哪些?
|
5天前
|
Java 数据库 Android开发
一个Android App最少有几个线程?实现多线程的方式有哪些?
本文介绍了Android应用开发中的多线程编程,涵盖基本概念、常见实现方式及最佳实践。主要内容包括主线程与工作线程的作用、多线程的多种实现方法(如 `Thread`、`HandlerThread`、`Executors` 和 Kotlin 协程),以及如何避免内存泄漏和合理使用线程池。通过有效的多线程管理,可以显著提升应用性能和用户体验。
23 10
|
12天前
|
存储 Ubuntu Linux
C语言 多线程编程(1) 初识线程和条件变量
本文档详细介绍了多线程的概念、相关命令及线程的操作方法。首先解释了线程的定义及其与进程的关系,接着对比了线程与进程的区别。随后介绍了如何在 Linux 系统中使用 `pidstat`、`top` 和 `ps` 命令查看线程信息。文档还探讨了多进程和多线程模式各自的优缺点及适用场景,并详细讲解了如何使用 POSIX 线程库创建、退出、等待和取消线程。此外,还介绍了线程分离的概念和方法,并提供了多个示例代码帮助理解。最后,深入探讨了线程间的通讯机制、互斥锁和条件变量的使用,通过具体示例展示了如何实现生产者与消费者的同步模型。