💥最详细的ThreadPoolExecutor
♨️本篇文章记录的为JUC知识中线程池相关内容,适合在学Java的小白,也适合复习中,面试中的大佬🙉🙉🙉。
♨️如果文章有什么需要改进的地方还请大佬不吝赐教❤️🧡💛
@[TOC]
🔥1.1 为什么要使用线程池
1.线程复用,降低资源消耗,提高响应速度
2.便于管理,比如可以控制最大并发数
线程池如何使用 (Java 中的线程池是通过 Executor 框架实现的,该框架中用到 Executor,Executors,ExecutorService,ThreadPoolExecutor 这几个类)。
🍋1.2 ThreadPoolExecutor 使用详解
其实 java 线程池的实现原理很简单,说白了就是一个线程集合 workerSet 和一个阻塞队列 workQueue。当用户向线程池提交一个任务 (也就是线程) 时,线程池会先将任务放入 workQueue 中。workerSet 中的线程会不断的从 workQueue 中获取线程然后执行。当 workQueue 中没有任务的时候,worker 就会阻塞,直到队列中有任务了就取出来继续执行
1.2.1 三种类型
🍓newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
线程池的线程数量达 corePoolSize 后,即使线程池没有可执行任务时,也不会释放线程。
FixedThreadPool 的工作队列为无界队列 LinkedBlockingQueue (队列容量为 Integer.MAX_VALUE), 这会导致以下问题:
- 线程池里的线程数量不超过 corePoolSize, 这导致了 maximumPoolSize 和 keepAliveTime 将会是个无用参数
- 由于使用了无界队列,所以 FixedThreadPool 永远不会拒绝,即饱和策略失效
🍓newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行.
- 由于使用了无界队列,所以 SingleThreadPool 永远不会拒绝,即饱和策略失效
🍓newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
线程池的线程数可达到 Integer.MAX_VALUE,即 2147483647,内部使用 SynchronousQueue 作为阻塞队列; 和 newFixedThreadPool 创建的线程池不同,newCachedThreadPool 在没有任务执行时,当线程的空闲时间超过 keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销; 执行过程与前两种稍微不同:
- 主线程调用 SynchronousQueue 的 offer () 方法放入 task, 倘若此时线程池中有空闲的线程尝试读取 SynchronousQueue 的 task, 即调用了SynchronousQueue 的 poll (), 那么主线程将该 task
交给空闲线程。否则执行 (2)- 当线程池为空或者没有空闲的线程,则创建新的线程执行任务.
- 执行完任务的线程倘若在 60s 内仍空闲,则会被终止。因此长时间空闲的 CachedThreadPool 不会持有任何线程资源.
1.2.2 关闭线程池
遍历线程池中的所有线程,然后逐个调用线程的 interrupt 方法来中断线程.
关闭方式 - shutdown
将线程池里的线程状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程.
关闭方式 - shutdownNow
将线程池里的线程状态设置成 STOP 状态,然后停止所有正在执行或暂停任务的线程。只要调用这两个关闭方法中的任意一个,isShutDown () 返回 true. 当所有任务都成功关闭了,isTerminated () 返回 true.
💖1.3 线程池的核心组件和核心类
Java 线程池主要由以下 4 个核心组件组成。
- 线程池管理器:用于创建并管理线程池。
- 工作线程:线程池中执行具体任务的线程。
- 任务接口:用于定义工作线程的调度和执行策略,只有线程实现了该接口,线程中的任务才能够被线程池调度。
- 任务队列:存放待处理的任务,新的任务将会不断被加入队列中,执行完成的任务将被从队列中移除。
Java 中的线程池是通过 Executor 框架实现的,在该框架中用到了 Executor、 Executors、ExecutorService、ThreadPoolExecutor、Callable、Future、FutureTask 这几个核心类,具体的继承关系如图 3-2 所示。
🌟1.4 线程池的七大参数
1. corePoolSize : 线程池中的常驻核心线程数
核心线程池大小,即在没有任务需要执行的时候线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。这里需要注意的是:在刚刚创建ThreadPoolExecutor的时候,线程并不会立即创建,而是要等到有任务提交时才会创建,除非调用了prestartCoreThread/prestartAllCoreThreads事先创建核心线程。再考虑到keepAliveTime和allowCoreThreadTimeOut超时参数(executor.allowCoreThreadTimeOut(true))的影响,所以没有任务需要执行的时候,线程池的大小不一定是corePoolSize。
2. maximumPoolSize :
线程池中允许创建的最大线程数,线程池中的当前线程数目不会超过该值。如果队列中任务已满,并且当前线程个数(poolSize)小于maximumPoolSize,那么会创建新的线程来执行任务。这里值得一提的是largestPoolSize,该变量记录了线程池在整个生命周期中曾经出现的最大线程个数。为什么说是曾经呢?因为线程池创建之后,可以调用setMaximumPoolSize()改变运行的最大线程的数目。
3. keepAliveTime :
多余的空闲线程存活时间,当空闲时间达到 keepAliveTime 值时,多余的线程会被销毁直到只剩下 corePoolSize 个线程为止 (非核心线程)
举个例子,如果线程池的核心大小corePoolSize=5,而当前大小poolSize =8,那么超出核心大小的线程,会按照keepAliveTime的值判断是否会超时退出。如果线程池的核心大小corePoolSize=5,而当前大小poolSize =5,那么线程池中所有线程都是核心线程,这个时候线程是否会退出,取决于allowCoreThreadTimeOut。
4. unit :
keepAliveTime 的单位
5. workQueue :
任务队列,被提交但尚未被执行的任务 (候客区)
6. threadFactory :
表示生成线程池中工作线程的线程工厂,用户创建新线程,一般用默认即可
7. handler :
拒绝策略,表示当线程队列满了并且工作线程大于等于线程池的最大显示 数 (maxnumPoolSize) 时如何来拒绝
🍀1.5 线程池用过吗?
⭐线程池的拒绝策略请你谈谈⭐
- 等待队列也已经排满了,再也塞不下新的任务了。同时,线程池的 maximumPoolSize 也到达了,无法接续为新任务服务,这时我们需要拒绝策略机制合理的处理这个问题
- JDK 内置的拒绝策略
- AbortPolicy (默认) : 当队列满了,正在执行任务的线程数也满的时候,又新进来了任务线程,此时会直接报异常 RejectException
- CallerRunPolicy : 将新进来的线程任务,返回给调用者,例如 main.
- DiscardOldestPolicy : 将最早进入队列的任务删除,之后再尝试加入队列
- DiscardPolicy: 直接丢弃任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的拒绝策略
以上内置策略均实现了 RejectExecutionHandler 接口
⭐工作中我们一般怎么用?⭐
答案是一个都不用,我们生产上只能使用自定义的。
参考阿里巴巴 java 开发手册
【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。 说明:使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者 “过度切换” 的问题。
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明:Executors 返回的线程池对象的弊端如下:
- FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE, 可能会堆积大量的请求,从而导致 OOM。
- CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程,从而导致 OOM。
AbortPolicy : 最大不会抛出异常的值 = maximumPoolSize + new LinkedBlockingDeque(3) =8 个。如果超过 8 个,默认的拒绝策略会抛出异常
CallerRunPolicy : 如果超过 8 个,不会抛出异常,会返回给调用者去
DiscardOldestPolicy : 如果超过 8 个,将最早进入队列的任务删除,之后再尝试加入队列
DiscardPolicy : 直接丢弃任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的拒绝策略
🔒1.6 线程池的运行流程
- 如果正在运行的线程数量少于 corePoolSize(用户定义的核心线程数),线程池就会立刻创建线程并执行该线程任务。
- 如果正在运行的线程数量大于等于 corePoolSize,该任务就将被放入阻塞队列中。
- 在阻塞队列已满且正在运行的线程数量少于 maximumPoolSize 时,线程池会创建非核心线程立刻执行该线程任务。
- 在阻塞队列已满且正在运行的线程数量大于等于 maximumPoolSize 时,线程池将拒绝执行该线程任务并抛出 RejectExecutionException 异常。
- 在线程任务执行完毕后,该任务将被从线程池队列中移除,线程池将从队列中取下一个线程任务继续执行。
- 在线程处于空闲状态的时间超过 keepAliveTime 时间时,正在运行的线程数量超过 corePoolSize,该线程将会被认定为空闲线程并停止。因此在线程池中所有线程任务都执行完毕后,线程池会收缩到 corePoolSize 大小。
总结来说就是优先核心线程、阻塞队列次之,最后非核心线程。
🔑1.7 源码解析
🥝1.7.1 关键属性
//这个属性是用来存放 当前运行的worker数量以及线程池状态的
//int是32位的,这里把int的高3位拿来充当线程池状态的标志位,后29位拿来充当当前运行worker的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
//worker的集合,用set来存放
private final HashSet<Worker> workers = new HashSet<Worker>();
//历史达到的worker数最大值
private int largestPoolSize;
//当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生存时间
private volatile long keepAliveTime;
//常驻worker的数量
private volatile int corePoolSize;
//最大worker的数量,一般当workQueue满了才会用到这个参数
private volatile int maximumPoolSize;
🍅1.7.2 内部状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) {
return c & ~CAPACITY; }
private static int workerCountOf(int c) {
return c & CAPACITY; }
private static int ctlOf(int rs, int wc) {
return rs | wc; }
其中 AtomicInteger 变量 ctl 的功能非常强大:利用低 29 位表示线程池中线程数,通过高 3 位表示线程池的运行状态:
- RUNNING: -1 << COUNT_BITS,即高 3 位为 111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
- SHUTDOWN: 0 << COUNT_BITS,即高 3 位为 000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
- STOP : 1 << COUNT_BITS,即高 3 位为 001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
- TIDYING : 2 << COUNT_BITS,即高 3 位为 010, 所有的任务都已经终止;
- TERMINATED: 3 <<COUNT_BITS,即高 3 位为 011, terminated () 方法已经执行完成
🍒1.7.3 execute () 方法
ThreadPoolExecutor.execute (task) 实现了 Executor.execute (task)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();//获取线程池状态
/*
* 1. 如果运行的线程少于corePoolSize,则尝试用给定的命令作为第一个任务启动一个新线程。对addWorker的调用会自动检查runState和workerCount,从而通过返回false来防止在不应该添加
* 线程的情况下添加线程的错误警报。
*/
if (workerCountOf(c) < corePoolSize) {
//workerCountOf获取线程池的当前线程数;小于corePoolSize,执行addWorker创建新线程执行command任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 如果一个任务可以成功排队,那么我们仍然需要再次检查是否应该添加一个线程(因为自从上次检查以来已有线程已经死亡),或者在进入这个方法后线程池已经关闭。因此,我们重新检查状态,如有
// 必要,回滚排队停止,或启动一个新的线程,如果没有。
// double check: c, recheck
// 线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// recheck and if necessary 回滚到入队操作前,即倘若线程池shutdown状态,就remove(command)
//如果线程池没有RUNNING,成功从阻塞队列中删除任务,执行reject方法处理任务
if (! isRunning(recheck) && remove(command))
reject(command);
//线程池处于running状态,但是没有线程,则创建线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3.如果不能将任务排队,则尝试添加一个新线程。如果失败了,我们就知道自己被关闭或饱和了,所以拒绝了任务。
// 往线程池中创建新的线程失败,则reject任务
else if (!addWorker(command, false))
reject(command);
}
为什么需要 double check 线程池的状态?
在多线程环境下,线程池的状态时刻在变化,而 ctl.get () 是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将 command 加入 workque 是线程池之前的状态。倘若没有 double check,万一线程池处于非 running 状态 (在多线程环境下很有可能发生),那么 command 永远不会执行。
🍑1.7.4 addWorker 方法
从方法 execute 的实现可以看出: addWorker 主要负责创建新的线程并执行任务线程池创建新线程执行任务时,需要 获取全局锁:
private final ReentrantLock mainLock = new ReentrantLock();
private boolean addWorker(Runnable firstTask, boolean core) {
// CAS更新线程池数量
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 线程池重入锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 线程启动,执行任务(Worker.thread(firstTask).start());
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
🍏1.7.5 Worker 类的 runworker 方法
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 创建线程
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// ...
}
- 继承了 AQS 类,可以方便的实现工作线程的中止操作;
- 实现了 Runnable 接口,可以将自身作为一个任务在工作线程中执行;
- 当前提交的任务 firstTask 作为参数传入 Worker 的构造方法;
一些属性还有构造方法:
//运行的线程,前面addWorker方法中就是直接通过启动这个线程来启动这个worker
final Thread thread;
//当一个worker刚创建的时候,就先尝试执行这个任务
Runnable firstTask;
//记录完成任务的数量
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//创建一个Thread,将自己设置给他,后面这个thread启动的时候,也就是执行worker的run方法
this.thread = getThreadFactory().newThread(this);
}
runWorker 方法是线程池的核心:
- 线程启动之后,通过 unlock 方法释放锁,设置 AQS 的 state 为 0,表示运行可中断;
- Worker 执行 firstTask 或从 workQueue 中获取任务:
- 进行加锁操作,保证 thread 不被其他线程中断 (除非线程池被中断)
- 检查线程池状态,倘若线程池处于中断状态,当前线程将中断。
- 执行 beforeExecute
- 执行任务的 run 方法
- 执行 afterExecute 方法
- 解锁操作
通过 getTask 方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask 方法会被阻塞并挂起,不会占用 cpu 资源;
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 先执行firstTask,再从workerQueue中取task(getTask())
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
🥭1.7.6 getTask 方法
下面来看一下 getTask () 方法,这里面涉及到 keepAliveTime 的使用,从这个方法我们可以看出线程池是怎么让超过 corePoolSize 的那部分 worker 销毁的。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
注意这里一段代码是 keepAliveTime 起作用的关键:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
allowCoreThreadTimeOut 为 false,线程即使空闲也不会被销毁;倘若为 ture,在 keepAliveTime内仍空闲则会被销毁。
如果线程允许空闲等待而不被销毁 timed == false,workQueue.take任务:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take 方法返回任务,并执行;
如果线程不允许无休止空闲 timed == true, workQueue.poll 任务:如果在 keepAliveTime 时间内,阻塞队列还是没有任务,则返回 null;
如果这篇【文章】有帮助到你💖,希望可以给我点个赞👍,创作不易,如果有对Java后端或者对redis感兴趣的朋友,请多多关注💖💖💖
💖个人主页
💖谱尼学java