Java 线程池源码解析(1)

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: Java 线程池源码解析

线程池

池化思想:线程池、数据连接池等,比如我们 Spark 的 Executor 就是典型的线程池,用户在启动 Spark 作业的同时启动线程池,这样 Spark 的 Task 就可以直接获取资源,而不用像 MR 程序那样等待容器上的进程开启了。

如果不使用线程池的话,我们需要:

  1. 手动创建线程对象
  2. 执行任务
  3. 执行完毕,回收资源

优点

  1. 提高线程的利用率
  2. 提高程序的响应速度(线程对象提前创建好的,节省了创建和销毁的开销)
  3. 便于统一管理线程对象
  4. 可以控制最大的并发数

1、Java 创建线程池示例

1.1、构造参数解释

我们先看看 ThreadPoolExecutor 的构造器的源码:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime); // 统一转为亚秒格式
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

参数大概意思就是:核心线程数、最大线程数、存活时间、存活时间单位、阻塞队列(存放任务)、线程创建工厂、拒绝策略。我们也可以通过下面的例子理解一下:

       线程池就好比我们的银行,它有很多个接待客人的柜台(一个柜台就是一个线程,用来执行任务)和用来供客人等待的座位(等待的执行的任务)。上面的图中,我们有 5 个柜台(对应第2个参数:maximumPoolSize) ;而其中绿色的三个柜台代表常驻柜台(也就是一直都有人,对应第1个参数:corePoolSize);红色的柜台代表预备柜台,也就是当业务繁忙的时候,最多还可以打电话摇两个人来(maximumPoolSize - corePoolSize);但是摇人来帮忙是有时效性的,如果帮完忙一段时间(取决于第3个和第4个参数)没有活干,这些线程就会被释放;摇人的方式取决于第6个参数(下面我们是通过默认的线程工厂来再创建线程的);蓝色的等候区区域代表座位(可允许等待的任务数量),当柜台前和座位都满了的时候,如果再有任务进来就会被拒绝,具体的拒绝方式取决于第7个参数,下面我们给出的拒绝策略是直接报错:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class TestThreadPool {
  public static void main(String[] args) {
    // 1.核心线程数 2.最大线程数 3.存活时间 4.时间单位 5.等待队列 6.线程工厂 7.拒绝策略(直接抛异常)
    ExecutorService executorService = new ThreadPoolExecutor(3, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
    for (int i = 0; i < 9; i++) {
      executorService.execute(()->{
        System.out.println(Thread.currentThread().getName()+"正在工作");
      });
    }
        // 关闭线程池
    executorService.shutdown();
  }
}
  • 使用线程池中执行任务很简单,我们不需要关心具体是哪个线程执行的,只需要把任务丢给它即可(通过 lambda 表达式来告诉线程池我们的任务执行逻辑)

       上面的案例中,我们的线程池的最大接收的任务量是 8 (最大线程数:5 + 等待队列容量:3),但并不是说只能跑8个任务,如果有任务释放资源仍然可以继续执行任务。

       所以,上面的案例同一时刻最多只能共存8个任务(其中最多只有五个任务会同时执行),如果,当我们的任务超过8时,会直接报错(因为我们设置了拒绝策略就是直接抛异常)。

2、ThreadExecutorPool 源码解析

创建一个包含 10 个核心线程、总线程数为 20、存活时间为 0 s、拒绝策略为直接抛异常的一个线程池对象:

public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                10, 20,
                0L, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()
        );
 
        executor.execute(new Runnable() {
            @Override
            public void run() {
 
            }
        });
    }

2.1、线程池保活与回收源码分析

2.1.1、execute 方法源码
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // 如果总线程数小于核心线程数
            if (addWorker(command, true))  // 添加一个核心线程
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // 如果当前全部线程都在工作就把任务添加到阻塞任务队列当中
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) 
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 如果上面没有成功把任务加到队列就新加一个线程
            reject(command); // 如果新加线程失败(比如超过最大线程数)就拒绝任务
    }

这里的注释已经说的很明白了:

  1. 如果正在运行的线程数少于核心线程数(corePoolSize),则尝试启动一个新线程,并将给定的命令参数作为其第一个任务。调用addWorker方法可以原子性地检查运行状态和工作线程数,从而防止在不应该添加线程时发出错误的警报,通过返回false来实现。
  2. 如果任务能够成功入队,我们仍然需要再次检查是否应该添加一个线程(因为自上次检查以来已有线程死亡)或者线程池在进入此方法后已经关闭。所以我们重新检查状态,如果有必要回滚入队操作(如果已停止),或者如果没有线程,则启动一个新线程。
  3. 如果我们无法将任务入队,则尝试添加一个新的线程。如果添加失败,我们知道线程池已经关闭或饱和,因此拒绝该任务。

       所以在上面我们创建线程池的代码中,并不是线程池创建好之后就会立马创建 10 个核心线程,而是真正有任务来的时候才会去新创建一个线程。

思考:如果线程的任务结束了,线程对象会怎么样呢?

       从创建线程池的构造器就不难想到,构造器中有一个阻塞队列的参数,其实当线程没有任务的时候,线程并不会关闭,而是一直阻塞,也叫保活

       保活线程的关键在于阻塞队列,即LinkedBlockingDeque。当队列为空时,如果线程尝试从队列中取元素,线程会被阻塞,直到队列中有元素可供取出。这样,线程就会在等待任务的过程中保持活跃状态。

2.1.2、getTask 方法源码

       getTask 方法是 runWorker 方法里的调用的,而 runWoker 又是 Worker 对象的方法,这个 Worker 实例又是上面的 execute 方法中的 addWorker 方法中实例化出来的。

       getTask 方法是所有线程池中的线程一直在不断调用检查的(在 runTask 方法中的 while 循环中被调用):

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
 
        for (;;) {
            int c = ctl.get();
 
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            
            // 总线程数
            int wc = workerCountOf(c);
 
            // 判断是否需要超时回收线程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
 
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 回收超时的线程
                    workQueue.take(); // 阻塞
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

方法解释

如果满足以下任一条件,该工作线程将返回null并退出:

  1. 当前线程数超过了最大线程数(由于调用了setMaximumPoolSize方法)。
  2. 线程池已停止。
  3. 线程池已关闭且队列为空。
  4. 该工作线程在等待任务时超时,并且超时的工作线程会受到终止处理(即allowCoreThreadTimeOut || workerCount > corePoolSize),无论是在超时等待之前还是之后。如果队列非空,则该工作线程不是线程池中的最后一个线程。

返回值

       如果满足上述条件之一,返回null,表示工作线程必须退出,此时workerCount会减1。

否则,返回task,表示成功获取到任务。

       从源码中可以看到,线程池中的工作线程会执行getTask()方法来获取任务。在这个方法中,线程会调用workQueue.poll()或workQueue.take()方法来尝试从LinkedBlockingDeque中获取任务。如果队列为空,这些方法会使线程阻塞,直到有新的任务添加到队列中。这就是线程在没有任务执行时仍然保持活跃(保活)的机制。

  • workQueue.take():当队列为空时,该方法会无限期地等待,直到有新的任务被添加到队列中。这意味着,如果所有核心线程都在执行任务并且队列为空,那么调用 take() 方法的线程会一直阻塞,直到其他线程向队列中添加了新的任务。
  • workQueue.poll():此方法在队列为空时会立即返回null,而不是等待。这通常用于非核心线程(也称为工作线程),当没有任务可做时,这些线程可以选择终止自己以减少资源占用。

思考:现在的核心线程数是 10,如果此时正在工作的线程有 11 个(10个核心线程,一个其它线程),那如果所有线程的任务都完成了,那么线程池又会执行怎样的逻辑呢?

2.1.3、runWorker 方法源码

在上面的 getTask 方法中,线程池中的每个线程在调用这个方法的时候都会判断是否需要回收线程:

        // 判断是否需要超时回收线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

现在我们有 11 个线程,所以明显总线程数 wc(11) > corePoolSize(10),所以自然会执行下面的逻辑:

        Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;

也就是 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):当前调用该方法的线程会阻塞一段时间(keppAliveTime 个单位),如果这段时间过后依然访问不到任务,那么下面的 timeOut = true ,getTask 方法返回 null。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }


Java 线程池源码解析(2)https://developer.aliyun.com/article/1534180

目录
打赏
0
0
0
0
37
分享
相关文章
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
79 29
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
干货含源码!如何用Java后端操作Docker(命令行篇)
只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
Java并发编程笔记之CopyOnWriteArrayList源码分析
并发包中并发List只有CopyOnWriteArrayList这一个,CopyOnWriteArrayList是一个线程安全的ArrayList,对其进行修改操作和元素迭代操作都是在底层创建一个拷贝数组(快照)上进行的,也就是写时拷贝策略。
19579 0
Java并发编程笔记之读写锁 ReentrantReadWriteLock 源码分析
我们知道在解决线程安全问题上使用 ReentrantLock 就可以,但是 ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而实际情况下会有写少读多的场景,显然 ReentrantLock 满足不了需求,所以 ReentrantReadWriteLock 应运而生,ReentrantReadWriteLock 采用读写分离,多个线程可以同时获取读锁。
3163 0
Java并发编程笔记之FutureTask源码分析
FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
4317 0
Java并发编程笔记之Timer源码分析
timer在JDK里面,是很早的一个API了。具有延时的,并具有周期性的任务,在newScheduledThreadPool出来之前我们一般会用Timer和TimerTask来做,但是Timer存在一些缺陷,为什么这么说呢?   Timer只创建唯一的线程来执行所有Timer任务。
3034 0
Java并发编程笔记之Semaphore信号量源码分析
JUC 中 Semaphore 的使用与原理分析,Semaphore 也是 Java 中的一个同步器,与 CountDownLatch 和 CycleBarrier 不同在于它内部的计数器是递增的,那么,Semaphore 的内部实现是怎样的呢?   Semaphore 信号量也是Java 中一个同步容器,与CountDownLatch 和 CyclicBarrier 不同之处在于它内部的计数器是递增的。
4328 0

推荐镜像

更多