Java 线程池源码解析(1)

简介: Java 线程池源码解析

线程池

池化思想:线程池、数据连接池等,比如我们 Spark 的 Executor 就是典型的线程池,用户在启动 Spark 作业的同时启动线程池,这样 Spark 的 Task 就可以直接获取资源,而不用像 MR 程序那样等待容器上的进程开启了。

如果不使用线程池的话,我们需要:

  1. 手动创建线程对象
  2. 执行任务
  3. 执行完毕,回收资源

优点

  1. 提高线程的利用率
  2. 提高程序的响应速度(线程对象提前创建好的,节省了创建和销毁的开销)
  3. 便于统一管理线程对象
  4. 可以控制最大的并发数

1、Java 创建线程池示例

1.1、构造参数解释

我们先看看 ThreadPoolExecutor 的构造器的源码:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime); // 统一转为亚秒格式
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

参数大概意思就是:核心线程数、最大线程数、存活时间、存活时间单位、阻塞队列(存放任务)、线程创建工厂、拒绝策略。我们也可以通过下面的例子理解一下:

       线程池就好比我们的银行,它有很多个接待客人的柜台(一个柜台就是一个线程,用来执行任务)和用来供客人等待的座位(等待的执行的任务)。上面的图中,我们有 5 个柜台(对应第2个参数:maximumPoolSize) ;而其中绿色的三个柜台代表常驻柜台(也就是一直都有人,对应第1个参数:corePoolSize);红色的柜台代表预备柜台,也就是当业务繁忙的时候,最多还可以打电话摇两个人来(maximumPoolSize - corePoolSize);但是摇人来帮忙是有时效性的,如果帮完忙一段时间(取决于第3个和第4个参数)没有活干,这些线程就会被释放;摇人的方式取决于第6个参数(下面我们是通过默认的线程工厂来再创建线程的);蓝色的等候区区域代表座位(可允许等待的任务数量),当柜台前和座位都满了的时候,如果再有任务进来就会被拒绝,具体的拒绝方式取决于第7个参数,下面我们给出的拒绝策略是直接报错:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class TestThreadPool {
  public static void main(String[] args) {
    // 1.核心线程数 2.最大线程数 3.存活时间 4.时间单位 5.等待队列 6.线程工厂 7.拒绝策略(直接抛异常)
    ExecutorService executorService = new ThreadPoolExecutor(3, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
    for (int i = 0; i < 9; i++) {
      executorService.execute(()->{
        System.out.println(Thread.currentThread().getName()+"正在工作");
      });
    }
        // 关闭线程池
    executorService.shutdown();
  }
}
  • 使用线程池中执行任务很简单,我们不需要关心具体是哪个线程执行的,只需要把任务丢给它即可(通过 lambda 表达式来告诉线程池我们的任务执行逻辑)

       上面的案例中,我们的线程池的最大接收的任务量是 8 (最大线程数:5 + 等待队列容量:3),但并不是说只能跑8个任务,如果有任务释放资源仍然可以继续执行任务。

       所以,上面的案例同一时刻最多只能共存8个任务(其中最多只有五个任务会同时执行),如果,当我们的任务超过8时,会直接报错(因为我们设置了拒绝策略就是直接抛异常)。

2、ThreadExecutorPool 源码解析

创建一个包含 10 个核心线程、总线程数为 20、存活时间为 0 s、拒绝策略为直接抛异常的一个线程池对象:

public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                10, 20,
                0L, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()
        );
 
        executor.execute(new Runnable() {
            @Override
            public void run() {
 
            }
        });
    }

2.1、线程池保活与回收源码分析

2.1.1、execute 方法源码
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // 如果总线程数小于核心线程数
            if (addWorker(command, true))  // 添加一个核心线程
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // 如果当前全部线程都在工作就把任务添加到阻塞任务队列当中
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) 
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 如果上面没有成功把任务加到队列就新加一个线程
            reject(command); // 如果新加线程失败(比如超过最大线程数)就拒绝任务
    }

这里的注释已经说的很明白了:

  1. 如果正在运行的线程数少于核心线程数(corePoolSize),则尝试启动一个新线程,并将给定的命令参数作为其第一个任务。调用addWorker方法可以原子性地检查运行状态和工作线程数,从而防止在不应该添加线程时发出错误的警报,通过返回false来实现。
  2. 如果任务能够成功入队,我们仍然需要再次检查是否应该添加一个线程(因为自上次检查以来已有线程死亡)或者线程池在进入此方法后已经关闭。所以我们重新检查状态,如果有必要回滚入队操作(如果已停止),或者如果没有线程,则启动一个新线程。
  3. 如果我们无法将任务入队,则尝试添加一个新的线程。如果添加失败,我们知道线程池已经关闭或饱和,因此拒绝该任务。

       所以在上面我们创建线程池的代码中,并不是线程池创建好之后就会立马创建 10 个核心线程,而是真正有任务来的时候才会去新创建一个线程。

思考:如果线程的任务结束了,线程对象会怎么样呢?

       从创建线程池的构造器就不难想到,构造器中有一个阻塞队列的参数,其实当线程没有任务的时候,线程并不会关闭,而是一直阻塞,也叫保活

       保活线程的关键在于阻塞队列,即LinkedBlockingDeque。当队列为空时,如果线程尝试从队列中取元素,线程会被阻塞,直到队列中有元素可供取出。这样,线程就会在等待任务的过程中保持活跃状态。

2.1.2、getTask 方法源码

       getTask 方法是 runWorker 方法里的调用的,而 runWoker 又是 Worker 对象的方法,这个 Worker 实例又是上面的 execute 方法中的 addWorker 方法中实例化出来的。

       getTask 方法是所有线程池中的线程一直在不断调用检查的(在 runTask 方法中的 while 循环中被调用):

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
 
        for (;;) {
            int c = ctl.get();
 
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            
            // 总线程数
            int wc = workerCountOf(c);
 
            // 判断是否需要超时回收线程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
 
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 回收超时的线程
                    workQueue.take(); // 阻塞
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

方法解释

如果满足以下任一条件,该工作线程将返回null并退出:

  1. 当前线程数超过了最大线程数(由于调用了setMaximumPoolSize方法)。
  2. 线程池已停止。
  3. 线程池已关闭且队列为空。
  4. 该工作线程在等待任务时超时,并且超时的工作线程会受到终止处理(即allowCoreThreadTimeOut || workerCount > corePoolSize),无论是在超时等待之前还是之后。如果队列非空,则该工作线程不是线程池中的最后一个线程。

返回值

       如果满足上述条件之一,返回null,表示工作线程必须退出,此时workerCount会减1。

否则,返回task,表示成功获取到任务。

       从源码中可以看到,线程池中的工作线程会执行getTask()方法来获取任务。在这个方法中,线程会调用workQueue.poll()或workQueue.take()方法来尝试从LinkedBlockingDeque中获取任务。如果队列为空,这些方法会使线程阻塞,直到有新的任务添加到队列中。这就是线程在没有任务执行时仍然保持活跃(保活)的机制。

  • workQueue.take():当队列为空时,该方法会无限期地等待,直到有新的任务被添加到队列中。这意味着,如果所有核心线程都在执行任务并且队列为空,那么调用 take() 方法的线程会一直阻塞,直到其他线程向队列中添加了新的任务。
  • workQueue.poll():此方法在队列为空时会立即返回null,而不是等待。这通常用于非核心线程(也称为工作线程),当没有任务可做时,这些线程可以选择终止自己以减少资源占用。

思考:现在的核心线程数是 10,如果此时正在工作的线程有 11 个(10个核心线程,一个其它线程),那如果所有线程的任务都完成了,那么线程池又会执行怎样的逻辑呢?

2.1.3、runWorker 方法源码

在上面的 getTask 方法中,线程池中的每个线程在调用这个方法的时候都会判断是否需要回收线程:

        // 判断是否需要超时回收线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

现在我们有 11 个线程,所以明显总线程数 wc(11) > corePoolSize(10),所以自然会执行下面的逻辑:

        Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;

也就是 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):当前调用该方法的线程会阻塞一段时间(keppAliveTime 个单位),如果这段时间过后依然访问不到任务,那么下面的 timeOut = true ,getTask 方法返回 null。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }


Java 线程池源码解析(2)https://developer.aliyun.com/article/1534180

相关文章
|
1月前
|
缓存 安全 Java
Java并发性能优化|读写锁与互斥锁解析
本文深入解析Java中两种核心锁机制——互斥锁与读写锁,通过概念对比、代码示例及性能测试,揭示其适用场景。互斥锁适用于写多或强一致性场景,读写锁则在读多写少时显著提升并发性能。结合锁降级、公平模式等高级特性,助你编写高效稳定的并发程序。
108 0
|
1月前
|
安全 Oracle Java
JAVA高级开发必备·卓伊凡详细JDK、JRE、JVM与Java生态深度解析-形象比喻系统理解-优雅草卓伊凡
JAVA高级开发必备·卓伊凡详细JDK、JRE、JVM与Java生态深度解析-形象比喻系统理解-优雅草卓伊凡
159 0
JAVA高级开发必备·卓伊凡详细JDK、JRE、JVM与Java生态深度解析-形象比喻系统理解-优雅草卓伊凡
|
28天前
|
算法 Java 测试技术
零基础学 Java: 从语法入门到企业级项目实战的详细学习路线解析
本文为零基础学习者提供完整的Java学习路线,涵盖语法基础、面向对象编程、数据结构与算法、多线程、JVM原理、Spring框架、Spring Boot及项目实战,助你从入门到进阶,系统掌握Java编程技能,提升实战开发能力。
78 0
|
2月前
|
存储 Java Linux
操作系统层面视角下 Java IO 的演进路径及核心技术变革解析
本文从操作系统层面深入解析Java IO的演进历程,涵盖BIO、NIO、多路复用器及Netty等核心技术。分析各阶段IO模型的原理、优缺点及系统调用机制,探讨Java如何通过底层优化提升并发性能与数据处理效率,全面呈现IO技术的变革路径与发展趋势。
53 1
|
2月前
|
并行计算 Java API
Java List 集合结合 Java 17 新特性与现代开发实践的深度解析及实战指南 Java List 集合
本文深入解析Java 17中List集合的现代用法,结合函数式编程、Stream API、密封类、模式匹配等新特性,通过实操案例讲解数据处理、并行计算、响应式编程等场景下的高级应用,帮助开发者提升集合操作效率与代码质量。
128 1
|
2月前
|
安全 Java API
Java 集合高级应用与实战技巧之高效运用方法及实战案例解析
本课程深入讲解Java集合的高级应用与实战技巧,涵盖Stream API、并行处理、Optional类、现代化Map操作、不可变集合、异步处理及高级排序等核心内容,结合丰富示例,助你掌握Java集合的高效运用,提升代码质量与开发效率。
191 0
|
2月前
|
安全 JavaScript Java
java Web 项目完整案例实操指南包含从搭建到部署的详细步骤及热门长尾关键词解析的实操指南
本项目为一个完整的JavaWeb应用案例,采用Spring Boot 3、Vue 3、MySQL、Redis等最新技术栈,涵盖前后端分离架构设计、RESTful API开发、JWT安全认证、Docker容器化部署等内容,适合掌握企业级Web项目全流程开发与部署。
133 0
|
2月前
|
安全 Java
Java编程探究:深入解析final关键字
1. **使用限制**: 对于 `final` 方法和类,可以限制其他开发人员对代码的使用,确保其按设计的方式工作而不会被子类意外改变。
91 0
|
2月前
|
存储 安全 算法
Java 核心知识与技术全景解析
本文涵盖 Java 多方面核心知识,包括基础语法中重载与重写、== 与 equals 的区别,String 等类的特性及异常体系;集合类中常见数据结构、各集合实现类的特点,以及 HashMap 的底层结构和扩容机制;网络编程中 BIO、NIO、AIO 的差异;IO 流的分类及用途。 线程与并发部分详解了 ThreadLocal、悲观锁与乐观锁、synchronized 的原理及锁升级、线程池核心参数;JVM 部分涉及堆内存结构、垃圾回收算法及伊甸园等区域的细节;还包括 Lambda 表达式、反射与泛型的概念,以及 Tomcat 的优化配置。内容全面覆盖 Java 开发中的关键技术点,适用于深
|
2月前
|
缓存 安全 前端开发
Java 核心知识点与实战应用解析
我梳理的这些内容涵盖了 Java 众多核心知识点。包括 final 关键字的作用(修饰类、方法、变量的特性);重载与重写的区别;反射机制的定义、优缺点及项目中的应用(如结合自定义注解处理数据、框架底层实现)。 还涉及 String、StringBuffer、StringBuilder 的差异;常见集合类及线程安全类,ArrayList 与 LinkedList 的区别;HashMap 的实现原理、put 流程、扩容机制,以及 ConcurrentHashMap 的底层实现。 线程相关知识中,创建线程的四种方式,Runnable 与 Callable 的区别,加锁方式(synchronize

热门文章

最新文章

推荐镜像

更多
  • DNS