导航:
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
目录
2.3.5.2 CallerRunsPolicy:调用线程run()方法
2.3.5.3 DiscardOldestPolicy:丢弃最老任务
2.4.3 COUNT_BITS和CAPACITY:线程数上限
2.4.6 workers和mainLock:维护所有工作线程
2.4.7 Worker类:控制中断线程、执行runWorker()方法
2.4.8 allowCoreThreadTimeOut:是否允许核心线程超时
2.9.1.2 校验关闭权限:checkShutdownAccess()
2.9.1.3 修改线程池状态:advanceRunState()
2.9.1.4 中断空闲线程:interruptIdleWorkers()
2.9.1.5 尝试终止线程池:tryTerminate()
3.2 newFixedThreadPool:固定大小的线程池
3.3 newCachedThreadPool:缓存线程池(无限大)
3.4 newScheduledThreadPool:定时任务线程池
3.5 newSingleThreadExecutor:单线程化的线程池
一、什么是线程池?
1.1 基本介绍
为了对多线程进行统一的管理,Java引入了线程池,它通过限制并发线程的数量、将待执行的线程放入队列、销毁空闲线程,来控制资源消耗,使线程更合理地运行,避免系统因为创建过多线程而崩溃。
线程池作用:
- 管理线程数量:它可以管理线程的数量,可以避免无节制的销毁、创建线程,导致额外的性格损耗、或者线程数超出系统负荷直至崩溃。
- 提高性能:当有新的任务到来时,可以直接从线程池中取出一个空闲线程来执行任务,而不需要等待创建新线程,从而减少了响应时间。
- 让线程复用:它还可以让线程复用,可以大大地减少创建和销毁线程所带来的开销。
- 合理的拒绝策略:线程池提供了多种拒绝策略,当线程池队列满了时,可以采用不同的策略进行处理,如抛出异常、丢弃任务或调用者运行等。
1.2 创建线程的两种方式
线程池一共有两种,分别是:
- 自定义线程池ThreadPoolExecutor(推荐);
- 线程池工具类Executors.newXxxThreadPool
1.2.1 方式1:自定义线程池(推荐)
线程池执行器ThreadPoolExecutor创建自定义线程池:
ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor( 5, //核心线程数 200, //最大线程数量,控制资源并发 10, //存活时间 TimeUnit.SECONDS, //时间单位 new LinkedBlockingDeque<>( 100000), //任务队列,大小100000个 Executors.defaultThreadFactory(), //线程的创建工厂 new ThreadPoolExecutor.AbortPolicy()); //拒绝策略 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { //开启异步编排,有返回值 return 1; }, threadPoolExecutor).thenApplyAsync(res -> { //串行化,接收参数并有返回值 return res+1; }, threadPoolExecutor); Integer integer = future.get(); //获取返回值
七个参数:
- corePoolSize:核心线程数。创建以后,会一直存活到线程池销毁,空闲时也不销毁。
- maximumPoolSize:最大线程数量。阻塞队列满了
- keepAliveTime: 存活时间。释放空闲时间超过“存活时间”的线程,仅留核心线程数量的线程。
- TimeUnitunit:时间单位
- workQueue: 任务队列。如果线程数超过核心数量,就把剩余的任务放到队列里。只要有线程空闲,就会去队列取出新的任务执行。new LinkedBlockingDeque()队列大小默认是Integer的最大值,内存不够,所以建议指定队列大小。
- SynchronousQueue是一个同步队列,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
- LinkedBlockingQueue是一个无界队列,可以缓存无限多的任务。由于其无界特性,因此需要合理地处理好任务的生产速率和线程池中线程的数量,以避免内存溢出等异常问题。无限缓存,拒绝策略就能随意了。
- ArrayBlockingQueue是一个有界(容量固定)队列,只能缓存固定数量的任务。通过固定队列容量,可以避免任务过多导致线程阻塞,保证线程池资源的可控性和稳定性。推荐,有界队列能增加系统的稳定性和预警能力。可以根据需要设大一点,比如几千,新任务丢弃后未来重新入队。
- PriorityBlockingQueue是一个优先级队列,能够对任务按照优先级进行排序,当任务数量超过队列容量时,会根据元素的Comparable或Comparator排序规则进行丢弃或抛异常。
new PriorityBlockingQueue<>((o1, o2) -> o1.length() - o2.length());
- threadFactory:线程的创建工厂。可以使用默认的线程工厂Executors.defaultThreadFactory(),也可以自定义线程工厂(实现ThreadFactory接口)
- RejectedExecutionHandler handler:拒绝策略。如果任务队列和最大线程数量满了,按照指定的拒绝策略执行任务。
- Abort(默认):直接抛异常(拒绝执行异常RejectedExecutionException)
- CallerRuns:直接同步调用线程run()方法,不创建线程了
- DiscardOldest:丢弃最老任务
- Discard:直接丢弃新任务
- 实现拒绝执行处理器接口(RejectedExecutionHandler),自定义拒绝策略。
1.2.2 方式2:线程池工具类
执行器工具类Executors创建线程池: 底层都是return new ThreadPoolExecutor(...)。一般不使用这种方式,参数配置死了不可控。
- newFixedThreadPool:固定大小的线程池。
- 核心线程数:所有线程都是核心线程(通过构造参数指定),最大线程数=核心线程数。
- 存活时间0s:因为所有线程都是核心线程,所以用不到存活时间,线程都会一直存活。keepAliveTime为0S。
- 链表阻塞队列:超出的线程会在LinkedBlockingQueue队列中等待。
- newCachedThreadPool:缓存线程池(无限大)。
- 核心线程数是0,最大线程数无限大:最大线程数Integer.MAX_VALUE。线程数量可以无限扩大,所有线程都是非核心线程。
- 空闲线程存活时间60s:keepAliveTime为60S,空闲线程超过60s会被杀死。
- 同步队列:因为最大线程数无限大,所以也用不到阻塞队列,所以设为没有存储空间的SynchronousQueue同步队列。这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
- newScheduledThreadPool:定时任务线程池。创建一个定长线程池, 支持定时及周期性任务执行。可指定核心线程数,最大线程数。
- newSingleThreadExecutor:单线程化的线程池。核心线程数与最大线程数都只有一个,不回收。后台从LinkedBlockingQueue队列中获取任务。创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务, 保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
// 源码 // FixedThreadPool: 固定大小线程池 // CachedTheadPool: 缓存线程池 new ThreadExcutor(0, max_valuem, 60L, s, new SynchronousQueue<Runnable>()); new ThreadExcutor(n, n, 0L, ms, new LinkedBlockingQueue<Runable>() // ScheduledThreadPoolExcutor: 定时任务线程池 ScheduledThreadPool, SingleThreadScheduledExecutor // SingleThreadExecutor: 单线程化线程池 new ThreadExcutor(1, 1, 0L, ms, new LinkedBlockingQueue<Runable>())
代码示例:
/** * @Author: vince * @CreateTime: 2024/09/10 * @Description: 测试类 * @Version: 1.0 */ public class Test { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(5); for (int i = 0; i < 3; i++) { pool.execute(() -> System.out.println("Hello World")); } } }
二、ThreadPoolExecutor源码解析
2.1 基本介绍
ThreadPoolExecutor是线程池最核心的类,在JDK1.5引入,位于java.util.concurrent包下,是ExecutorService接口的实现类。
package java.util.concurrent; /** * 自定义线程池 * @since 1.5 * @author Doug Lea */ public class ThreadPoolExecutor extends AbstractExecutorService { // ... } public abstract class AbstractExecutorService implements ExecutorService { // ... }
2.2 扩展:ExecutorService接口
ExecutorService是Java提供的线程池接口,所有线程池都直接或间接实现ExecutorService接口。它有两个最核心的方法,即submit或execute方法,用于线程池提交任务。
submit或execute方法:
- execute:参数只能是Runnable,没有返回值
- submit:参数可以是Runnable、Callable,返回值是FutureTask
Executor接口:
只有一个execute方法,用来替代通常创建或启动线程的方法。
代码示例:
execute:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolExample { public static void main(String[] args) { // 创建一个具有固定线程数的线程池 ExecutorService executorService = Executors.newFixedThreadPool(5); // 提交任务给线程池执行 for (int i = 0; i < 10; i++) { final int taskId = i; executorService.execute(() -> { System.out.println("Task ID : " + taskId + " 执行中..."); // 模拟任务执行耗时 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task ID : " + taskId + " 完成"); }); } // 关闭线程池 executorService.shutdown(); } }
submit:
public class ThreadPoolExample { public static void main(String[] args) { // 创建一个具有固定线程数的线程池 ExecutorService executorService = Executors.newFixedThreadPool(5); // 提交任务给线程池执行 for (int i = 0; i < 10; i++) { int taskId = i; Future<String> future = executorService.submit(() -> { System.out.println("Task ID : " + taskId + " 执行中..."); // 模拟任务执行耗时 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task ID : " + taskId + " 完成"; }); // 获取任务的执行结果 try { String result = future.get(); System.out.println(result); } catch (Exception e) { e.printStackTrace(); } } // 关闭线程池 executorService.shutdown(); } }
2.3 线程池参数
2.3.1 ThreadPoolExecutor的七个参数
// 创建线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, // 核心线程数 200, // 最大线程数量 10, // 空闲线程存活时间 TimeUnit.SECONDS, // 时间单位 new LinkedBlockingDeque<>(100000), // 任务队列,大小100000 Executors.defaultThreadFactory(), // 线程工厂 new ThreadPoolExecutor.AbortPolicy() // 拒绝策略 );
ThreadPoolExecutor 的构造方法有7个重要的参数,分别是下面几个:
- corePoolSize:核心线程数。创建以后,会一直存活到线程池销毁,空闲时也不销毁。
- maximumPoolSize:最大线程数量。阻塞队列满了
- keepAliveTime: 存活时间。释放空闲时间超过“存活时间”的线程,仅留核心线程数量的线程。
- TimeUnitunit:时间单位
- workQueue: 任务队列。如果线程数超过核心数量,就把剩余的任务放到队列里。只要有线程空闲,就会去队列取出新的任务执行。new LinkedBlockingDeque()队列大小默认是Integer的最大值,内存不够,所以建议指定队列大小。
- LinkedBlockingQueue:链表阻塞队列。是一个无界队列,可以缓存无限多的任务。由于其无界特性,因此需要合理地处理好任务的生产速率和线程池中线程的数量,以避免内存溢出等异常问题。无限缓存,拒绝策略就能随意了。
- ArrayBlockingQueue:数组阻塞队列。是一个有界(容量固定)队列,只能缓存固定数量的任务。通过固定队列容量,可以避免任务过多导致线程阻塞,保证线程池资源的可控性和稳定性。推荐,有界队列能增加系统的稳定性和预警能力。可以根据需要设大一点,比如几千,新任务丢弃后未来重新入队。
- SynchronousQueue:同步队列。这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
- PriorityBlockingQueue:优先级队列。能够对任务按照优先级进行排序,当任务数量超过队列容量时,会根据元素的Comparable或Comparator排序规则进行丢弃或抛异常。
new PriorityBlockingQueue<>((o1, o2) -> o1.length() - o2.length());
- DelayedWorkQueue:延迟阻塞队列。无界的延迟+优先级队列,每个任务有个延时时间,按时长逆序排列在队列中。延时队列越短,则优先级最高,在队列中越靠前,优先被消费。
- threadFactory:线程的创建工厂。可以使用默认的线程工厂Executors.defaultThreadFactory(),也可以自定义线程工厂(实现ThreadFactory接口)
- RejectedExecutionHandler handler:拒绝策略。如果任务队列和最大线程数量满了,按照指定的拒绝策略执行任务。
- Abort(默认):直接抛异常(拒绝执行异常RejectedExecutionException)
- CallerRuns:直接同步调用线程run()方法,不创建线程了
- DiscardOldest:丢弃最老任务
- Discard:直接丢弃新任务
- 实现拒绝执行处理器接口(RejectedExecutionHandler),自定义拒绝策略。
2.3.2 如何为线程池设置合适的线程数
下面的参数只是一个预估值,适合初步设置,具体的线程数需要经过压测确定,压榨(更好的利用)CPU的性能。
CPU核心数为N;
核心线程数:
- CPU密集型:N+1。数量与CPU核数相近是为了不浪费CPU,并防止频繁的上下文切换,加1是为了有线程被阻塞后还能不浪费CPU的算力。
- I/O密集型:2N,或N/(1-阻塞系数)。I/O密集型任务CPU使用率并不是很高,可以让CPU在等待I/O操作的时去处理别的任务,充分利用CPU,所以数量就比CPU核心数高一倍。有些公司会考虑阻塞系数,阻塞系数是任务线程被阻塞的比例,一般是0.8~0.9。
- 实际开发中更适合的公式:N*((线程等待时间+线程计算时间)/线程计算时间)
最大线程数:设成核心线程数的2-4倍。数量主要由CPU和IO的密集性、处理的数据量等因素决定。
需要增加线程的情况:jstack打印线程快照,如果发现线程池中大部分线程都等待获取任务、则说明线程够用。如果大部分线程都处于运行状态,可以继续适当调高线程数量。
jstack:打印指定进程此刻的线程快照。定位线程长时间停顿的原因,例如死锁、等待资源、阻塞。如果有死锁会打印线程的互相占用资源情况。线程快照:该进程内每条线程正在执行的方法堆栈的集合。
2.3.3 代码示例
创建一个线程池,再创建10个线程打印文字:
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExample { public static void main(String[] args) { // 创建线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, // 核心线程数 200, // 最大线程数量 10, // 空闲线程存活时间 TimeUnit.SECONDS, // 时间单位 new LinkedBlockingDeque<>(100000), // 任务队列,大小100000 Executors.defaultThreadFactory(), // 线程工厂 new ThreadPoolExecutor.AbortPolicy() // 拒绝策略 ); // 提交任务到线程池 for (int i = 0; i < 10; i++) { threadPoolExecutor.execute(() -> { System.out.println(Thread.currentThread().getName() + " 正在执行任务"); }); } // 关闭线程池 threadPoolExecutor.shutdown(); } }
可以看见只有5个核心线程执行任务,原因阻塞队列有100000的大小,所以用不到新增非核心线程:
2.3.4 线程池的初始化
2.3.4.1 不指定线程工厂的构造方法
ThreadPoolExecutor有多个构造方法,这些构造方法内部实际都调用了全参数的构造方法。
例如不指定线程的创建工厂,将使用默认的线程创建工厂:
/** * 不指定线程工厂的构造方法 * @param corePoolSize 核心线程数 * @param maximumPoolSize 最大线程数 * @param keepAliveTime 线程存活时间 * @param unit 线程存活时间单位 * @param workQueue 任务队列 * @param handler 拒绝策略 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { // 不指定线程工厂,将使用默认工厂,调用7参数的构造方法 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
2.3.4.2 全参数核心构造方法
核心流程:
- 参数校验;非空校验、大于0校验、最大线程数大于核心线程数;
- 参数赋值:7个参数分别赋值给对应的成员变量
具体代码:
/** * 构造方法初始化 * @param corePoolSize 核心线程数 * @param maximumPoolSize 最大线程数 * @param keepAliveTime 非核心空闲线程存活时间 * @param unit 非核心空闲线程存活时间单位 * @param workQueue 任务队列 * @param threadFactory 线程工厂(生产线程的地方) * @param handler 拒绝策略 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 1.参数校验: // 核心线程数不能为负 // 最大线程数不能为0且必须大于核心线程数 // 非核心空闲线程存活时间不能为负 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); // 任务队列、线程工厂和拒绝策略不能为空 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); // 2.将传入的参数赋值给线程池的成员变量 // 核心线程数 this.corePoolSize = corePoolSize; // 最大线程数 this.maximumPoolSize = maximumPoolSize; // 任务队列 this.workQueue = workQueue; // 将存活时间转换为纳秒存储 this.keepAliveTime = unit.toNanos(keepAliveTime); // 线程工厂,用于创建新线程 this.threadFactory = threadFactory; // 拒绝策略,当线程池无法执行任务时使用 this.handler = handler; }
2.3.5 拒绝策略详解
2.3.5.0 概述
四个拒绝策略:
- Abort(默认):直接抛异常(拒绝执行异常RejectedExecutionException)
- CallerRuns:直接同步调用线程run()方法,不创建线程了
- DiscardOldest:丢弃最老任务
- Discard:直接丢弃新任务
线程池共有以上4个拒绝策略,他们都属于ThreadPoolExecutor的内部类:
2.3.5.1 AbortPolicy:抛异常
核心流程:
- 直接抛出拒绝执行异常(RejectedExecutionException)
具体代码:
/** * 直接抛异常拒绝策略 * {@code RejectedExecutionException}. */ public static class AbortPolicy implements RejectedExecutionHandler { /** * 构造方法 */ public AbortPolicy() { } /** * 拒绝执行方法:总是抛出RejectedExecutionException异常 * * @param r 线程 * @param e 线程池 * @throws RejectedExecutionException 总是抛出该异常 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 代码只有一行,抛出异常 throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
2.3.5.2 CallerRunsPolicy:调用线程run()方法
核心流程:
- 如果线程池还活着,直接同步调用线程的run()方法
具体代码:
/** * 同步调用拒绝策略 */ public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * 创建一个 {@code CallerRunsPolicy} 实例。 */ public CallerRunsPolicy() { } /** * 拒绝执行方法 * 此时任务将被丢弃。 * * @param r 线程 * @param e 线程池 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 如果线程池还活着,直接同步调用线程的run()方法 if (!e.isShutdown()) { r.run(); } } }
2.3.5.3 DiscardOldestPolicy:丢弃最老任务
核心流程:
- 如果当前的阻塞队列满了,弹出时间最久的
具体代码:
/** * 丢弃最老任务策略 */ public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * 为给定的执行器创建一个 {@code DiscardOldestPolicy} 实例。 */ public DiscardOldestPolicy() { } /** * 丢弃最老任务 * * @param r 线程 * @param e 线程池 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 如果线程池还活着,就让最老任务出队,执行最新任务 if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
2.3.5.4 DiscardPolicy:丢弃新任务
核心流程:
- 丢弃新任务:方法内一行代码也没有,不执行任何操作,相当于丢弃任务
具体代码:
/** * 丢弃新任务策略 */ public static class DiscardPolicy implements RejectedExecutionHandler { /** * 创建一个 {@code DiscardPolicy} 实例。 */ public DiscardPolicy() { } /** * 不执行任何操作,相当于丢弃任务 r * * @param r 线程 * @param e 线程池 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
2.3.5.5 自定义拒绝策略
通过实现拒绝执行处理器接口(RejectedExecutionHandler),重写rejectedExection()方法,可以创建一个自定义的拒绝策略。
例如:创建一个丢弃策略,丢弃最老任务,并且把让新任务优先执行
/** * @Author: vince * @CreateTime: 2024/08/20 * @Description: 丢最老和新任务拒绝策略 * @Version: 1.0 */ public class DiscardOldestAndNewestPolicy implements RejectedExecutionHandler { /** * 自定义拒绝策略,丢弃最老的和最新的任务,并打印日志。 */ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { BlockingQueue<Runnable> queue = executor.getQueue(); // 检查队列是否为空,不为空则移除最老的任务 if (!queue.isEmpty()) { Runnable oldestTask = queue.poll(); System.out.println("丢弃最老的任务: " + oldestTask.toString()); } // 打印日志并丢弃最新的任务:不对线程对象执行操作,就相当于丢弃了它 System.out.println("丢弃最新的任务: " + r.toString()); } }
2.4 核心字段和内部类
2.4.1 基本介绍
ThreadPoolExecutor有以下一些核心字段和内部类:
/* * 状态和线程数:用来标记线程池状态(高3位),线程个数(低29位,所以线程个数最多是2^29-1) * 默认是RUNNING状态,线程个数为0 */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); /* * 线程个数掩码位数:线程池中最大可以容纳的线程数量。 * int:4字节(32位),数据范围是 `-2^31 ~ 2^31-1`。 * 所以准确说是具体平台下Integer的二进制位数-3后的剩余位数才是线程的个数, * 32-3=29 * 为什么COUNT_BITS是29而不是32?因为int类型是32位,我们需要将高3位保留用于表示线程池的状态。剩下的29位则用来表示线程个数。 */ private static final int COUNT_BITS = Integer.SIZE - 3; /* * 线程最大个数(低29位)00011111111111111111111111111111 * 默认容量:2^29-1 * 位运算1左移32位,相当于2的32次方 * (左移<<:是将二进制数的位左移若干位,低位补0,相当于乘以2的n次方。) */ private static final int CAPACITY = (1 << COUNT_BITS) - 1; /* * 线程池有5种状态,按大小排序如下: * RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED * RUNNING:运行。线程池处于正常状态,可以接受新的任务,同时会按照预设的策略来处理已有任务的执行。 * SHUTDOWN:关闭。线程池处于关闭状态,不再接受新的任务,但是会继续执行已有任务直到执行完成。执行线程池对象的shutdown()时进入该状态。 * STOP:停止。线程池处于关闭状态,不再接受新的任务,同时会中断正在执行的任务,清空线程队列。执行shutdownNow()时进入该状态。 * TIDYING:整理。所有任务已经执行完毕,线程池进入该状态会开始进行一些结尾工作,比如及时清理线程池的一些资源。 * TERMINATED:终止。线程池已经完全停止,所有的状态都已经结束了,线程池处于最终的状态。 */ private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; /** * 可重入锁,用于加锁往workers里加线程 */ private final ReentrantLock mainLock = new ReentrantLock(); /** * 一个HashSet,存有所有工作线程 */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * 是否允许核心线程超时:默认是false,即核心线程永不超时 * 如果是true,核心线程将根据参数的超时时间存活 */ private volatile boolean allowCoreThreadTimeOut;
2.4.2 ctl:线程池当前状态和数量
- ctl:标记线程池状态(高3位)和数量(低29位,所以线程个数最多是2^29-1)。
/* * 状态和线程数:用来标记线程池状态(高3位),线程个数(低29位,所以线程个数最多是2^29-1) * 默认是RUNNING状态,线程个数为0 */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
2.4.3 COUNT_BITS和CAPACITY:线程数上限
- COUNT_BITS:线程数上限的掩码,值是29
- CAPACITY:线程数上限,值是2^29-1
为什么COUNT_BITS是29而不是32?
因为int类型是32位,并且我们需要将高3位保留用于表示线程池的状态。剩下的29位则用来表示线程个数。
源码:
private static final int COUNT_BITS = Integer.SIZE - 3; /* * 线程最大个数(低29位)00011111111111111111111111111111 * 默认容量:2^29-1 * 位运算1左移32位,相当于2的32次方 * (左移<<:是将二进制数的位左移若干位,低位补0,相当于乘以2的n次方。) */ private static final int CAPACITY = (1 << COUNT_BITS) - 1;
2.4.4 五个状态字段:线程池的生命周期
通常线程池的生命周期包含5个状态,对应状态值分别是:-1、0、1、2、3,这些状态只能由小到大迁移,不可逆。
- RUNNING:运行。线程池处于正常状态,可以接受新的任务,同时会按照预设的策略来处理已有任务的执行。
- SHUTDOWN:关闭。线程池处于关闭状态,不再接受新的任务,但是会继续执行已有任务直到执行完成。执行线程池对象的shutdown()时进入该状态。
- STOP:停止。线程池处于关闭状态,不再接受新的任务,同时会中断正在执行的任务,清空线程队列。执行shutdownNow()时进入该状态。
- TIDYING:整理。所有任务已经执行完毕,线程池进入该状态会开始进行一些结尾工作,比如及时清理线程池的一些资源。
- TERMINATED:终止。线程池已经完全停止,所有的状态都已经结束了,线程池处于最终的状态。
/* * 线程池有5种状态,按大小排序如下: * RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED * RUNNING:运行。线程池处于正常状态,可以接受新的任务,同时会按照预设的策略来处理已有任务的执行。 * SHUTDOWN:关闭。线程池处于关闭状态,不再接受新的任务,但是会继续执行已有任务直到执行完成。执行线程池对象的shutdown()时进入该状态。 * STOP:停止。线程池处于关闭状态,不再接受新的任务,同时会中断正在执行的任务,清空线程队列。执行shutdownNow()时进入该状态。 * TIDYING:整理。所有任务已经执行完毕,线程池进入该状态会开始进行一些结尾工作,比如及时清理线程池的一些资源。 * TERMINATED:终止。线程池已经完全停止,所有的状态都已经结束了,线程池处于最终的状态。 */ private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
2.4.5 扩展:线程的生命周期
Java线程在运行的生命周期中,在任意给定的时刻,只能处于下列6种状态之一:
- NEW :初始状态,线程被创建,但是还没有调用start方法。
- RUNNABLE:可运行状态,等待调度或运行。线程正在JVM中执行,但是有可能在等待操作系统的调度。
- BLOCKED :阻塞状态,线程正在等待获取监视器锁。
- WAITING :等待状态,线程正在等待其他线程的通知或中断。线程等待状态不占用 CPU 资源,被唤醒后进入可运行状态(等待调度或运行)。
- TIMED_WAITING:超时等待状态,在WAITING的基础上增加了超时时间,即超出时间自动返回。Thread.sleep(1000);让线程超时等待1s。
- TERMINATED:终止状态,线程已经执行完毕。
线程的运行过程:
线程在创建之后默认为NEW(初始状态),在调用start方法之后进入RUNNABLE(可运行状态)。
注意:
可运行状态不代表线程正在运行,它有可能正在等待操作系统的调度。
WAITING (等待状态)的线程需要其他线程的通知才能返回到可运行状态,而TIMED_WAITING(超时等待状态)相当于在等待状态的基础上增加了超时限制,除了他线程的唤醒,在超时时间到达时也会返回运行状态。
此外,线程在执行同步方法时,在没有获取到锁的情况下,会进入到BLOCKED(阻塞状态)。线程在执行完run方法之后,会进入到TERMINATED(终止状态)。
等待状态如何被唤醒?
Object类:
- wait()方法让线程进入等待状态
- notify()唤醒该对象上的随机一个线程
- notifyAll()唤醒该对象上的所有线程。
这3个方法必须处于synchronized代码块或方法中,否则会抛出IllegalMonitorStateException异常。因为调用这三个方法之前必须拿要到当前锁对象的监视器(Monitor对象),synchronized基于对象头和Monitor对象。
也可以通过Condition类的 await/signal/signalAll方法实现线程的等待和唤醒,从而实现线程的通信,令线程之间协作处理任务。这两个方法依赖于Lock对象。
2.4.6 workers和mainLock:维护所有工作线程
这两个字段主要在addWorker()方法内用到,每次调用线程.start()方法启动线程前,都往wokers集合里放进一个线程,并给ctl加1
/** * 可重入锁,用于加锁往workers里加线程 */ private final ReentrantLock mainLock = new ReentrantLock(); /** * mainLock锁的Condition对象,用于线程通信 * Condition对象:用于线程间通信,通过await()和signal(),signalAll()让线程等待或唤醒。通常用lock锁创建Condition对象,即lock.newCondition(); */ private final Condition termination = mainLock.newCondition(); /** * 一个HashSet,存有所有工作线程 */ private final HashSet<Worker> workers = new HashSet<Worker>();
2.4.7 Worker类:控制中断线程、执行runWorker()方法
Worker类是线程池的内部类,它继承了AQS, 实现了Runnable接口,它
/** * @Author: vince * @CreateTime: 2024/09/09 * @Description: Worker继承了AQS,目的就是为了控制工作线程的中断。 * Worker实现了Runnable,它的run()方法调用runWorker()启动线程 * @Version: 1.0 */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ private static final long serialVersionUID = 6138294804551838833L; // 当前线程工厂创建的线程(也是执行任务使用的线程) final Thread thread; // 当前的第一个任务 Runnable firstTask; // 记录执行了多少个任务 volatile long completedTasks; // 构造方法 Worker(Runnable firstTask) { // 将State设置为-1,代表当前不允许中断线程 setState(-1); // 设置任务 this.firstTask = firstTask; // 设置线程 this.thread = getThreadFactory().newThread(this); } /** * 线程启动执行的方法,调用了runWorker()方法 */ public void run() { runWorker(this); } /** * 中断工作线程 */ void interruptIfStarted() { Thread t; // 只有Worker中的state >= 0的时候,可以中断工作线程 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { // 如果状态正常,并且线程未中断,这边就中断线程 t.interrupt(); } catch (SecurityException ignore) { } } } }
2.4.8 allowCoreThreadTimeOut:是否允许核心线程超时
allowCoreThreadTimeOut用于指定核心线程是否能超时,它在通过getTask()获取线程时调用,通过它判断是否允许核心线程超时。
/** * 是否允许核心线程超时:默认是false,即核心线程永不超时 * 如果是true,核心线程将根据参数的超时时间存活 */ private volatile boolean allowCoreThreadTimeOut;
volatile特性:
- 有序性:被volatile声明的变量之前的代码一定会比它先执行,而之后的代码一定会比它慢执行。底层是在生成字节码文件时,在指令序列中插入内存屏障防止指令重排序。
- 可见性:一旦修改变量则立即刷新到共享内存中,当其他线程要读取这个变量的时候,最终会去内存中读取,而不是从自己的工作空间中读取。每个线程自己的工作空间用于存放堆栈(存方法的参数和返回地址)和局部变量。
- 原子性:volatile变量不能保证完全的原子性,只能保证单次的读/写操作具有原子性(在同一时刻只能被一个线程访问和修改),自增减、复合操作(+=,/=等)则不具有原子性。这也是和synchronized的区别。
读写内存语义:
- 写内存语义:当写一个volatile变量时,JMM(Java内存模型)会把该线程本地内存中的共享变量的值刷新到主内存中。
- 读内存语义:当读一个volatile变量时,JMM会把该线程本地内存置为无效,使其从主内存中读取共享变量。
有序性实现机制:
volatile有序性是通过内存屏障来实现的。内存屏障就是在编译器生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。
机器指令:JVM包括类加载子系统、运行时数据区、执行引擎。 执行引擎负责将字节码指令转为操作系统能识别的本地机器指令。
指令重排序:处理器为了提高运算速度会对指令重排序,重排序分三种类型:编译器优化重排序、处理器指令级并行重排序、内存系统重排序。
- 编译器优化的重排序:编译器在不改变单线程程序的语义前提下,可以重新安排语句的执行顺序。
- 指令级并行的重排序:现在处理器采用了指令集并行技术,将多条指令重叠执行。如果不存在依赖性,处理器可以改变语句对应的机器指令的执行顺序。
- 内存系统的重排序:由于处理器使用缓存和读写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。
2.5 execute():启动线程
2.5.1 工作原理
任务加入线程池时,判断的顺序:
- 核心线程数
- 阻塞队列
- 最大线程数
- 拒绝策略
线程池工作原理:
- 判断核心线程数:新加入任务,判断corePoolSize是否到最大值;如果没到最大值就创建核心线程执行新任务,如果到最大值就判断是否有空闲的核心线程;
- 判断加入队列:如果有空闲的核心线程,则空闲核心线程执行新任务,如果没空闲的核心线程,则尝试加入FIFO阻塞队列;
- 判断队列容量:若加入成功,则等待空闲核心线程将队头任务取出并执行,若加入失败(例如队列满了),则判断是否到最大值;
- 判断丢弃策略:如果没到最大值就创建非核心线程执行新任务,如果到了最大值就执行丢弃策略,默认丢弃新任务;
- 非核心线程自动回收:线程数大于corePoolSize时,空闲线程将在keepAliveTime后回收,直到线程数等于核心线程数。这些核心线程也不会被回收。
注意:
实际上线程本身没有核心和非核心的概念,都是靠比较corePoolSize和当前线程数判断一个线程是不是能看作核心线程。
可能某个线程之前被看作是核心线程,等它空闲了,线程池又有corePoolSize个线程在执行任务,这个线程到keepAliveTime后还是会被回收。
验证:
1.阻塞队列大时只有核心线程数的线程工作
创建一个线程池,再创建10个线程打印文字:
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExample { public static void main(String[] args) { // 创建线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, // 核心线程数 200, // 最大线程数量 10, // 空闲线程存活时间 TimeUnit.SECONDS, // 时间单位 new LinkedBlockingDeque<>(100000), // 任务队列,大小100000 Executors.defaultThreadFactory(), // 线程工厂 new ThreadPoolExecutor.AbortPolicy() // 拒绝策略 ); // 提交任务到线程池 for (int i = 0; i < 10; i++) { threadPoolExecutor.execute(() -> { System.out.println(Thread.currentThread().getName() + " 正在执行任务"); }); } // 关闭线程池 threadPoolExecutor.shutdown(); } }
可以看见只有5个核心线程执行任务,原因阻塞队列有100000的大小,所以用不到新增非核心线程:
2.阻塞队列小时,会创建非核心线程工作:
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExample { public static void main(String[] args) { // 创建线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, // 核心线程数 200, // 最大线程数量 10, // 空闲线程存活时间 TimeUnit.SECONDS, // 时间单位 new LinkedBlockingDeque<>(3), // 任务队列,大小100000 Executors.defaultThreadFactory(), // 线程工厂 new ThreadPoolExecutor.AbortPolicy() // 拒绝策略 ); // 提交任务到线程池 for (int i = 0; i < 10; i++) { threadPoolExecutor.execute(() -> { System.out.println(Thread.currentThread().getName() + " 正在执行任务"); }); } // 关闭线程池 threadPoolExecutor.shutdown(); } }
可以看到,有7个线程在工作。流程如下:
- 5个任务创建核心线程工作;
- 3个任务因为核心线程数满了,所以进入阻塞队列,等待有空闲线程时执行;
- 2个任务因为阻塞队列满了,所以创建非核心线程工作;
2.5.2 源码解析
核心流程:
- 启动核心线程数:如果corePoolSize没到最大值就创建核心线程直接执行新任务
- 任务放至阻塞队列:判断当前的状态是不是Running的状态(RUNNING可以处理任务,并且处理阻塞队列中的任务)
- 启动非核心线程数:尝试添加非核心工作线程,若添加非核心工作线程失败,证明已经到达maximumPoolSize的限制,执行拒绝策略
具体代码:
/** * 执行任务 * @param command Runnable对象 */ public void execute(Runnable command) { // 0.判空:如果当前传过来的任务是null,直接抛出异常即可 if (command == null) throw new NullPointerException(); int c = ctl.get(); // 1.启动核心线程数:如果corePoolSize没到最大值就创建核心线程直接执行新任务 if (workerCountOf(c) < corePoolSize) { // Step2:添加一个核心线程 if (addWorker(command, true)){ return; } // 更新一下当前值 c = ctl.get(); } // 如果走到下面会有两种情况: // a、核心线程数满了,需要往阻塞队列里面扔任务 // b、核心线程数满了,阻塞队列也满了,执行拒绝策略 // 2.任务放至阻塞队列:判断当前的状态是不是Running的状态(RUNNING可以处理任务,并且处理阻塞队列中的任务) // 如果是Running的状态,则尝试将任务放至阻塞队列。队列的offer(E e):用于非阻塞的插入操作,插入失败会返回false而不是等待 if (isRunning(c) && workQueue.offer(command)) { // 再次更新数值 int recheck = ctl.get(); // 再次校验当前的线程池状态是不是Running // 如果线程池状态不是Running的话,需要删除掉刚刚放的任务 if (!isRunning(recheck) && remove(command)){ // 执行拒绝策略 reject(command); } // 如果到这里,说明上面阻塞队列中已经有数据了 // 如果线程池的个数为0的话,需要创建一个非核心工作线程去执行该任务 // 不能让人家堵塞着 else if (workerCountOf(recheck) == 0){ addWorker(null, false); } } // 此时会有两种情况: // a、任务成功放进阻塞队列,正在等待被执行; // b、放进阻塞队列失败,等待添加非核心工作线程 // 3.启动非核心线程数 // 尝试添加非核心工作线程,若添加非核心工作线程失败,证明已经到达maximumPoolSize的限制,执行拒绝策略 // 添加一个非核心工作线程 else if (!addWorker(command, false)) // 工作队列中添加任务失败,执行拒绝策略 reject(command); }
2.6 addWorker():添加工作线程
调用关系:
可以看到,上面execute()方法,启动核心线程、非核心线程时都要执行addWorker()方法,这个方法是往线程池添加工作线程。
核心流程:
- ctl自旋加1:在一个retry循环里自旋给ctl字段加1。ctl存放线程池当前状态和数量
- 校验线程池状态和队列:如果不是RUNNING,直接退出循环
- 校验线程数:如果超过上限直接失败
- 尝试给ctl加1:如果成功则结束循环,如果失败则重新循环
- workers存线程:加锁把线程并放进workers,workers是一个HashSet,用于保存所有工作线程
- 启动线程:如果成功加进workers,调用线程的.start()方法,启动线程。
- 注意这里调用的是Worker对象的start()方法,而不是原线程的start()方法。Worker对象封装了原线程。Worker类是Runnable的子类,它的run()方法只有一行,调用了runWorker()方法。
完整源码:
/** * 添加工作线程 * @param firstTask Runnable对象 * @param core 是否是核心线程 * @return boolean */ private boolean addWorker(Runnable firstTask, boolean core) { // retry主要是为了结束整个循环 // retry 是一个标签(label),用来标记循环的某个位置。 // 这个标签与 for (;;) 循环结合使用,调用continue retry时重新开始整个循环,调用break retry时结束整个循环 retry: for (;;) { // 获取当前线程池的ctl:包含线程池状态和数量信息 int c = ctl.get(); // 获取线程池状态:根据ctl,获取线程池状态 // runStateOf:基于&运算的特点,保证只会拿到ctl高三位的值 int rs = runStateOf(c); // 1.校验线程池状态和队列:如果不是运行时,直接退出循环 // rs >= SHUTDOWN:表示此时不再接收新任务。代表当前线程池状态为:SHUTDOWN、STOP、TIDYING、TERMINATED,线程池状态异常 // RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED // rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务 // 但这里SHUTDOWN状态稍许不同(不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完) if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())){ return false; } for (;;) { // 2.校验工作线程数量:如果超过线程数上限直接失败 // 获取线程数 int wc = workerCountOf(c); // CAPACITY:是ctl的低29位的最大值(二进制是29个1),返回false; // core:addWorker()方法的第二个参数,如果为true表示根据corePoolSize来比较, // 如果为false则根据maximumPoolSize来比较。 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 3.尝试给ctl加1:如果成功,则跳出第一个for循环;如果失败,则重新循环 // 尝试CAS方式自增workCount 其实就是类似锁,同时只有一个线程能成功 if (compareAndIncrementWorkerCount(c)) break retry; // 3.1 如果增加workerCount失败,则重新获取ctl的值 c = ctl.get(); // 3.2 如果当前的运行状态不等于第一个循环刚开始时的状态,说明状态已被改变,返回第一个for循环继续执行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 4.线程放进workers:加锁把线程放进workers,workers是一个HashSet,用于保存所有工作线程 // 代码执行到这里 只是通过不停的循环通过CAS获取到执行任务的“锁” boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 根据firstTask来创建Worker对象 w = new Worker(firstTask); // 每一个Worker对象都会创建一个线程 final Thread t = w.thread; if (t != null) { // 获取锁。mainLock是ThreadPoolExecutor的可重入锁字段,用于保证线程安全 final ReentrantLock mainLock = this.mainLock; // 加锁 mainLock.lock(); try { int rs = runStateOf(ctl.get()); /* * rs < SHUTDOWN表示是RUNNING状态; * 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。 * 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务 */ if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 校验线程是否存活,不存活直接抛出异常 if (t.isAlive()) throw new IllegalThreadStateException(); // workers是一个HashSet,用于保存所有工作线程 workers.add(w); int s = workers.size(); // largestPoolSize记录着线程池中出现过的最大线程数量 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { // 解锁 mainLock.unlock(); } // 5.启动线程:如果成功加进workers,调用线程的.start()方法,启动线程 if (workerAdded) { // 启动线程 这个t.start()实际是调用的Worker类中的run方法,Worker本身实现了Runnable接口,所以一个Worker类型的对象也是一个线程。 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
2.7 runWorker ():执行线程
调用关系:
上面addWorker()开启线程时,调用的Worker对象的start()方法。Worker对象封装了原线程。Worker类是Runnable的子类,它的run()方法只有一行,调用了runWorker()方法。
addWorker()调用Worker类的start()方法:
Worker类的run()方法
核心流程:
- 获取任务:调用getTask()方法,获取Worker中的Runnable任务,拿到后Worker置空、解锁
- 执行任务:如果任务不等于空,则加锁执行任务
- 加锁
- beforeExecute():这个方法为空,需要我们继承ThreadPoolExecutor重写这个方法
- 执行任务:调用Runnable的run()方法执行任务
- afterExecute():这个方法为空,需要我们继承ThreadPoolExecutor重写这个方法
- 解锁
代码解析:
/** * 执行任务:由Worker类的run()调用的唯一一行方法 * @param w Worker对象 */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 1.获取Worker中的runnable任务,拿到后Worker置空、解锁 // 拿到当前Worker的第一个任务(如果携带的话) Runnable task = w.firstTask; // 置空 w.firstTask = null; // 解锁 w.unlock(); boolean completedAbruptly = true; try { // 2.如果任务不等于空,则加锁、执行任务、解锁 while (task != null || (task = getTask()) != null) { // 加锁 w.lock(); // 如果线程池状态 >= STOP,确保线程中断了 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 在任务执行前做一些操作:这个方法内为空,需要我们继承ThreadPoolExecutor重写这个方法 beforeExecute(wt, task); Throwable thrown = null; try { // 任务执行:调用线程的run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 任务执行后一些操作:这个方法内为空,需要我们继承ThreadPoolExecutor重写这个方法 afterExecute(task, thrown); } } finally { // 任务置空 task = null; // 执行任务+1 w.completedTasks++; // 解锁 w.unlock(); } } completedAbruptly = false; } finally { // 删除线程的方法 processWorkerExit(w, completedAbruptly); } }
2.8 getTask():从队列获取任务
调用关系:
上面runWorker()方法,调用了getTask()方法获取线程,返回值是Runnable对象
核心流程:
- 进入死循环
- 校验当前线程池状态:如果状态是停止,或关闭+阻塞队列是空,则返回空
- 校验超时:如果是非核心线程,或者允许超时的核心线程,则尝试销毁线程;
- 从阻塞队列取出线程:当队列没线程时,如果线程允许超时则返回空,如果不允许则阻塞获取
源码解析:
/** * 获取任务 * @return {@link Runnable } */ private Runnable getTask() { // 声明超时变量:表示上次从阻塞队列中取任务时是否超时,先标记为未超时 boolean timedOut = false; // 1.死循环拿线程对象 for (;;) { // 1.1 校验当前线程池状态 // 拿到当前的ctl int c = ctl.get(); // 获取当前的线程池状态 int rs = runStateOf(c); // 如果线程池状态是STOP,没有必要处理阻塞队列任务,直接返回null // 如果线程池状态是SHUTDOWN,并且阻塞队列是空的,直接返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 线程池中的线程个数减一 decrementWorkerCount(); return null; } // 当前线程池中线程个数 int wc = workerCountOf(c); // 1.2 判断线程是否能超时:如果核心线程允许超时,或者当前线程池中的线程个数大于核心线程数,那么这个线程就有寿命 // allowCoreThreadTimeOut:是否允许核心线程数超时(开启这个之后),核心线程数也会执行下面超时的逻辑 // wc > corePoolSize:当前线程池中的线程个数大于核心线程数 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 1.3 如果线程超时,则销毁线程 // wc > maximumPoolSize:基本不存在 // timed && timedOut:第一次肯定是失败的(超时标记为false) if ((wc > maximumPoolSize || (timed && timedOut)) // 1、线程个数为1 // 2、阻塞队列是空的 && (wc > 1 || workQueue.isEmpty())) { // 线程池的线程个数减一 if (compareAndDecrementWorkerCount(c)){ return null; } continue; } // 运行到这里,说明线程没超时 try { // 1.4 从阻塞队列取出线程:当队列没线程时,如果线程允许超时则返回空,如果不允许则阻塞获取 // poll():从队列中获取并移除一个任务。如果队列为空,它会返回 null // take():从队列中获取并移除一个任务。如果队列为空,它会阻塞当前线程直到有任务可用为止。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null){ return r; } // 到这里,说明没从阻塞队列拿出线程,线程超时了 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
2.9 关闭
- SHUTDOWN:关闭。线程池处于关闭状态,不再接受新的任务,但是会继续执行已有任务直到执行完成。执行线程池对象的shutdown()时进入该状态。
- STOP:停止。线程池处于关闭状态,不再接受新的任务,同时会中断正在执行的任务,清空线程队列。执行shutdownNow()时进入该状态。
2.9.1 shutdown():关闭线程
2.9.1.1 源码解析
shutdown()方法用于关闭线程,线程池关闭后,将不再接受新的任务,旧任务将执行完成。
核心流程:
- 加锁
- 校验关闭权限:校验调用者是否有权限关闭线程池及其中的所有线程
- 修改线程池状态:将线程池状态修改为SHUTDOWN
- 中断空闲线程:遍历线程池workers中的工作线程,并尝试中断那些处于空闲状态的线程。
- 执行关闭时方法:钩子函数,实际内容为空,用户继承重写可以编写关闭线程池后要执行的逻辑
- 尝试终止:查看当前线程池是否可以变为TERMINATED状态
代码解析:
/** * 关闭线程池 */ public void shutdown() { // 1.加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 2.校验关闭权限:校验调用者是否有权限关闭线程池及其中的所有线程 checkShutdownAccess(); // 3.修改线程池状态:将线程池状态修改为SHUTDOWN advanceRunState(SHUTDOWN); // 4.中断空闲线程:遍历线程池workers中的工作线程,并尝试中断那些处于空闲状态的线程。 interruptIdleWorkers(); // 5.钩子函数,实际内容为空,用户继承重写可以编写关闭线程池后要执行的逻辑 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 6.查看当前线程池是否可以变为TERMINATED状态 // 从 SHUTDOWN 状态修改为 TIDYING,在修改为 TERMINATED tryTerminate(); }
2.9.1.2 校验关闭权限:checkShutdownAccess()
/** * 校验关闭权限:校验调用者是否有权限关闭线程池及其中的所有线程 */ private void checkShutdownAccess() { // 1. 获取系统的安全管理器 SecurityManager security = System.getSecurityManager(); // 2. 如果有安全管理器,需要进行权限检查 if (security != null) { // 3. 首先检查调用者是否有权限关闭线程池(shutdownPerm 是一种特殊的权限) security.checkPermission(shutdownPerm); // 4. 获取线程池的锁,确保操作的线程安全 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 5. 遍历线程池中的每一个工作线程,检查调用者是否有权限中断每个线程 for (Worker w : workers) security.checkAccess(w.thread); } finally { // 6. 最后解锁,确保锁的释放以防止死锁 mainLock.unlock(); } } }
2.9.1.3 修改线程池状态:advanceRunState()
/** * 修改线程池状态 * @param targetState 目标状态 */ private void advanceRunState(int targetState) { // 1.进入直接死循环 for (;;) { // 2.拿到当前的ctl int c = ctl.get(); // 3.修改状态:如果当前状态已经大于等于目标状态,则跳出循环,否则CAS修改线程池状态 if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
2.9.1.4 中断空闲线程:interruptIdleWorkers()
/** * 中断空闲线程:遍历线程池workers中的工作线程,并尝试中断那些处于空闲状态的线程。 * 不传参数时,默认参数是false,即中断所有线程,而不是只中断完一个线程就完事 */ private void interruptIdleWorkers() { interruptIdleWorkers(false); } /** * 中断空闲线程:遍历线程池workers中的工作线程,并尝试中断那些处于空闲状态的线程。 * @param onlyOne 是否只中断一个:如果参数 onlyOne 为 true,中断一个线程后就直接跳出循环。如果为 false,则继续尝试中断所有空闲线程。 */ private void interruptIdleWorkers(boolean onlyOne) { // 1.加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 2.遍历线程池workers中的工作线程,并尝试中断那些处于空闲状态的线程。 for (java.util.concurrent.ThreadPoolExecutor.Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } // 如果只中断一个,则中断后直接退出循环 if (onlyOne) break; } } finally { mainLock.unlock(); } }
2.9.1.5 尝试终止线程池:tryTerminate()
/** * 尝试设置线程池为TERMINATED状态 */ final void tryTerminate() { // 1.开始死循环; for (;;) { // 拿到ctl int c = ctl.get(); // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 2.尝试CAS将线程池状态设为TIDYING,如果失败则解锁再次循环,如果成功则继续执行: if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 2.1 执行销毁前方法:根据用户自定义的逻辑,在线程池销毁之前做最后的处理 // 该方法是一个钩子函数,是个空方法,允许我们通过继承重写的方式,自己定义该方法逻辑 terminated(); } finally { // 2.2 线程池状态设为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); // 2.3 唤醒所有等待线程池终止的线程 // termination是mainLock的 Condition 对象 ,这是与 ReentrantLock 相关联的条件变量,用于线程间的通信。 // Condition对象:用于线程间通信,通过await()和signal(),signalAll()让线程等待或唤醒。通常用lock锁创建Condition对象,即lock.newCondition(); termination.signalAll(); } return; } } finally { // 解锁 mainLock.unlock(); } } }
2.9.2 shutdownNow():停止线程
2.9.2.1 源码解析
shutdownNow()方法用于关闭线程,线程池关闭后,将不再接受新的任务,旧任务将直接中断。
核心流程:
- 加锁
- 校验关闭权限:校验调用者是否有权限关闭线程池及其中的所有线程
- 修改线程池状态:将线程池状态修改为STOP
- 中断所有线程:将线程池中的线程全部中断
- 删除所有线程:
- 删除当前所有的工作线程
- 删除阻塞队列中所有的工作线程
- 并将这些删除线程收集到tasks
- 解锁
- 尝试设置线程池为TERMINATED状态:
- CAS自旋修改状态
- 执行销毁前自定义方法
- 唤醒所有等待线程池终止的线程
代码解析:
/** * 终止线程池 * @return {@link List }<{@link Runnable }> 被删除的所有线程 */ public List<Runnable> shutdownNow() { // 声明tasks,用于收集所有被删除的线程 List<Runnable> tasks; // 1.加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 2.校验关闭权限:校验调用者是否有权限关闭线程池及其中的所有线程 checkShutdownAccess(); // 3.修改线程池状态:将线程池状态修改为STOP advanceRunState(STOP); // 4.中断所有线程:将线程池中的线程全部中断 interruptWorkers(); // 5.删除所有线程:删除阻塞队列中所有的工作线程,并将这些删除线程收集到tasks // drain以为排空 tasks = drainQueue(); } finally { // 6.解锁 mainLock.unlock(); } // 7.尝试设置线程池为TERMINATED状态 // 从 Stop 状态修改为 TIDYING,在修改为 TERMINATED tryTerminate(); return tasks; }
2.9.2.2 清空阻塞队列:drainQueue()
/** * 清空阻塞队列:删除阻塞队列中所有线程,并返回这些线程 * @return {@link List }<{@link Runnable }> */ private List<Runnable> drainQueue() { // 1.获取阻塞队列 BlockingQueue<Runnable> q = workQueue; // 返回的结果 ArrayList<Runnable> taskList = new ArrayList<Runnable>(); // 2.清空阻塞队列并将数据放入taskList中。 // drainTo 方法是 BlockingQueue 的批量删除方法,它可以将队列中的所有元素批量转移到另一个集合中。 q.drainTo(taskList); // 3.再次遍历清空:因为没加锁,所以需要再次遍历,防止依赖 if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } // 最终返回即可 return taskList; }
三、Executors源码解析
3.1 Executors工具类
执行器工具类Executors主要用于创建线程池,它主要有4个创建线程池的方法:
- newFixedThreadPool:固定大小的线程池
- newCachedThreadPool:缓存线程池(无限大)
- newScheduledThreadPool:定时任务线程池
- newSingleThreadExecutor:单线程化的线程池
一般不使用这种方式创建线程池,因为虽然它简单方便,但是它的参数配置写死了,不可控。
参数配置包括:
核心线程数、最大线程数、线程存活时间、阻塞队列、拒绝策略。详细看上文的“ThreadPoolExecutor”
这四个线程池底层都是return new ThreadPoolExecutor(...),返回自定义线程池,并根据各个线程池的特性,传入线程数等参数。
ExecutorService executorService = Executors.newFixedThreadPool(10); // 源码 // 固定大小线程池: new ThreadExcutor(n, n, 0L, ms, new LinkedBlockingQueue<Runable>() // 缓存线程池: new ThreadExcutor(0, max_valuem, 60L, s, new SynchronousQueue<Runnable>()); // 定时任务线程池: new ScheduledThreadPoolExecutor(corePoolSize); // super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); // 单线程线程池: new ThreadExcutor(1, 1, 0L, ms, new LinkedBlockingQueue<Runable>())
以固定大小的线程池为例:
源码如下
可以看到:
- 核心线程数和最大线程数都等于传入的参数
- 空闲线程存活时间是0L。这里其实设置多少都一个效果,因为这个线程池最大线程数=核心线程数,也就是说不会有非核心线程,而核心线程是不会被回收的。
- 阻塞队列是链表。无界,可以缓存无限多的任务,所以也用不到拒绝策略。
public class Executors { /** * 创建固定大小的线程池 * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } ... }
3.2 newFixedThreadPool:固定大小的线程池
特点:
- 核心线程数:所有线程都是核心线程(通过构造参数指定),最大线程数=核心线程数。
- 存活时间0s:因为所有线程都是核心线程,所以用不到存活时间,线程都会一直存活。keepAliveTime为0S。
- 链表阻塞队列:超出的线程会在LinkedBlockingQueue队列中等待。
源码:
public class Executors { /** * 创建固定大小的线程池 * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } // ... }
可以看到,底层是创建自定义线程池对象,并且传入参数进行了设置:
- 核心线程数和最大线程数:都等于传入的参数
- 空闲线程存活时间:是0L。这里其实设置多少都一个效果,因为这个线程池最大线程数=核心线程数,也就是说不会有非核心线程,而核心线程是不会被回收的。
- 阻塞队列:是链表。无界,可以缓存无限多的任务,所以也用不到拒绝策略。
3.3 newCachedThreadPool:缓存线程池(无限大)
特点:
- 所有线程都是非核心线程:核心线程数是0,最大线程数无限大
- 空闲线程一分钟后杀死:keepAliveTime为60S,空闲线程超过60s会被杀死。
- 阻塞队列没有空间:因为最大线程数无限大,所以也用不到阻塞队列,所以设为没有存储空间的SynchronousQueue同步队列。
源码:
public class Executors { /** * 缓存线程池 * @return the newly created thread pool */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } // ... }
可以看到,底层是创建自定义线程池对象,并且传入参数进行了设置:
- 核心线程数是0,最大线程数无限大:最大线程数Integer.MAX_VALUE,即2^31-1。线程数量可以无限扩大,所有线程都是非核心线程。
- 空闲线程存活时间60s:keepAliveTime为60S,空闲线程超过60s会被杀死。
- 同步队列:因为最大线程数无限大,所以也用不到阻塞队列,所以设为没有存储空间的SynchronousQueue同步队列。这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
3.4 newScheduledThreadPool:定时任务线程池
创建一个定长线程池, 支持定时及周期性任务执行。可指定核心线程数。
特点:
- 核心线程数:由传入参数决定;
- 最大线程数:无限大
- 存活时间:0
- 阻塞队列:延迟队列。无界的延迟+优先级队列,延迟时间短的任务排在队头,优先被执行。
源码:
public class Executors { /** * 定时任务线程池:可以定期或设定多久后执行任务 */ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } // ... }
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { /** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given core pool size. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } // ... }
可以看到:
- 核心线程数:由传入参数corePoolSize决定;
- 最大线程数:无限大,2^31-1。这里其实设置无限大和设置corePoolSize是一个效果,因为它的阻塞队列是无界的,内存溢出前永远不会满,所以也不会有创建非核心线程的机会。至于为什么设置成无限大呢,可能是为了保证内存溢出等极端情况下,任务可以不被丢弃,也可能是心理安慰。
- 存活时间:0。因为阻塞队列无界,所以不会有非核心线程,所以存活时间也没意义,内存溢出时创建了非核心线程,空闲后直接销毁。
- 阻塞队列:延迟队列。无界的延迟+优先级队列,延迟时间短的任务排在队头,优先被执行。
3.5 newSingleThreadExecutor:单线程化的线程池
- 核心线程数和最大线程数:核心线程数与最大线程数都只有一个,不回收。创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务, 保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
- 阻塞队列:无界的链表队列。
- 使用场景:需要稳定串行化执行一个个线程的场景;
思考:单线程化的线程池存在的意义是什么?直接创建一个线程不就好了?
主要是为了利用线程池的优势。
- 让线程复用:如果是直接创建一个线程,这个线程创建后会直接销毁,而线程池的核心线程会一直存活,这就减少了创建、销毁的时间损耗。
- 管理线程数量:它可以管理线程的数量,保证只有一个线程,减少了创建、销毁的时间损耗。
源码:
public class Executors { /** * 单线程线程池:仅有一个核心线程和无界队列 */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } // ... }