ThreadPoolExecutor是jdk自带的线程池实现。看到了"池"一定会想到对象池模式,它是单例模式的一个变种,主要思想是通过共享复用已有的空闲对象,达到限制开销和提高性能的目的。这里的对象可以理解为某种"资源",比如:数据库连接、线程、socket连接...创建这种资源的消耗比较大,如果每次使用都新建的话,会造成额外的开销。同时因为对资源的创建没有限制,可能会重复创建大量的对象,导致资源枯竭。如果能够提前创建好一批对象放到一个"池"中,每次使用时都从池中选取空闲的对象,用完后再还入池中,这样便提高了资源的使用效率,又因为限制了创建对象的数量,也可以避免大量创建对象导致资源枯竭。我们常用的线程池、DB连接池、socket连接池等都是用的这个思路。
适合使用线程池的场景是建立在如下基础上的:
0)串行执行的响应性和吞吐量无法满足需求:串行程序会在一个线程中逐处理所有任务,因此可能受任务执行时间的长度影响,具有较差的响应性。同时,因为单线程程序往往不能充分利用CPU的资源,无法达到最优的吞吐量。
1)线程的生命周期管理(时间)开销很高:因为创建、销毁线程开销很大,所以频繁的创建线程会造成较大的时间开销,从而减慢任务的处理速率,增加延迟。
2)物理资源的消耗:活跃的线程会消耗操作系统资源,包括:内存、文件句柄等。如果应用程序中存在大量活跃的线程,将会占用大量的资源,有可能导致系统资源枯竭。
3)稳定性:随意的创建线程而不加管控,不利于整个应用程序的稳定。
综上所述,使用多线程提升并发应该是使用合适的线程数达到最好的性能即可,并非线程越多越好。一些场景下(如任务规模很小或者资源需要串行访问的场景),单线程处理可能比多线程更简单且性能更好。
线程池的组成
抛开ThreadPoolExecutor的实现,我们先自己设想一下线程池的概念模型:
首先我们需要一个用于全局管控线程池运行的类,负责线程池的创建、任务的运行以及最后线程池关闭...完整流程的管理,这个类构成了线程池的主体,即线程管理类。线程池中包含多个用于执行任务的工作线程,工作线程由线程管理类创建和维护,能够执行外部提交的任务,并响应取消任务的操作。
由于不同的业务场景创建的线程具有不同的特征,比如:以相同的前缀命名,设置为守护线程,归属于相同的ThreadGroup等...这些参数的配置对于线程池而言是重复性工作,因此单独抽取一个线程工厂类用作工作线程的创建,由于不同的场景设置的线程参数不同,因而需要客户端自己根据需求定制,故抽象为接口。
线程池中使用阻塞队列作为任务提交和任务执行的"缓冲",当工作线程忙不过来时,线程池可以先将任务存放到一个任务队列中,待后续有工作线程空闲时,再从队列中获取新的任务执行。由于有了任务队列这个"缓冲",便增加了执行任务的灵活度,任务队列可以根据不同的策略对其中的任务进行编排,比如按优先级执行、延迟执行...存在这灵活性,就意味着要为客户端留有扩展的余地,因而任务队列也抽象为接口。
线程池作为生产者-消费者模式的实现,也要考虑生产速率和消费速率的平衡。当生产-消费失衡时,即任务的提交速率大于消费速率时,为了保障程序的稳定性,需要对任务做驳回处理,以便降低线程池的负载。同时,为了减少驳回任务造成的损失,需要对被驳回的任务提供必要的驳回处理策略。最简单快速且合理的驳回策略便是抛出异常,即我们在服务降级中常用的快速失败策略,为了避免处理驳回任务带来额外的负载压力,直接放弃任务执行,通过抛出异常告知客户端任务失败。当然,为了满足各式各样的业务场景,也需要留给客户端定制策略的余地,因此驳回策略也被抽象为接口。
线程池管理类、工作线程、任务队列、线程初始化类、任务驳回策略构成了一个线程池的基本组成。有了基本思路,下面对照着看看ThreadPoolExecutor的组成。
ThreadPoolExecutor,线程池管理类,继承AbstractExecutorService类,作为ExecutorService的一种实现,通过线程池调度任务;
Worker,工作线程类,属于ThreadPoolExecutor的私有内部类,实现Runnable接口,用作运行线程任务;继承AbstractQueuedSynchronizer接口,用作控制工作线程的空闲/运行状态的同转换;
ThreadFactory,线程工厂接口,实现该接口,定制线程创建参数;
BlockingQueue,阻塞队列接口,用于缓存客户端提交的任务,传入不同的实现类,实现不同的任务编排策略;
RejectedExecutionHandler,驳回处理策略接口,定制驳回任务的处理策略,ThreadPoolExecutor默认提供4种实现:
- CallerRunsPolicy:使用提交任务的线程处理被驳回任务;
- AbortPolicy::中止驳回任务,抛出RejectedExecutionException异常;
- DiscardPolicy:忽略驳回任务,不做任何处理;
- DiscardOldestPolicy:忽略阻塞队列header元素,然后将任务提交ThreadPoolExecutor重新执行,如果失败,则继续忽略header元素;
线程池的生命周期
线程池的生命周期,即线程池从创建、使用到最终销毁所经历的过程。ThreadPoolExecutor将线程池的生命周期划分为5个状态,分别是:running、shutdown、stop、tidying、terminated。状态图如下:
- running:线程池创建后的状态,能够接收新任务,同时处理任务队列中的任务;
- shutdown:执行shutdown()后,不接收新任务,但是会处理任务队列中的任务;
- stop:执行shutdownNow()后,不接收新任务,也不处理任务队列中的任务,同时会中断正在执行的任务;
- tidying:当所有任务都已被终止,工作线程数为0后,状态变为tidying状态,然后运行terminated()钩子方法;
- terminated:当terminated()钩子方法执行完毕后,变为此状态。此时ThreadPoolExecutor已被完全终止;
ThreadPoolExecutor将线程池状态runState和工作线程数workCount放入同一个字段。其中,1~29位存放workCount;30~32位存放runState,包括1位符号位。
// 使用一个原子整型存放runState和workCount两个字段;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// workCount的最大位数,此处为32位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池的最大线程数:2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 存放runState的5个状态常量
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取runState值
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取workCount值
private static int workerCountOf(int c) { return c & CAPACITY; }
// 封装指定的runState和workCount到ctl字段中
private static int ctlOf(int runState, int workCount) { return runState | workCount; }
/**
* 控制runState字段
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 通过CAS方式增加/减少workerCount
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
线程池的初始化
ThreadPoolExecutor初始化至少需要指定4个参数:corePoolSize(核心工作线程数),maximumPoolSize(最大工作线程数),keepAliveTime(空闲线程存活时间),workQueue(任务队列);除此之外,可以选定2个参数:threadFactory(线程工厂,默认使用DefaultThreadFactory类),handler(驳回任务处理器,默认使用AbortPolicy类)。根据填写参数的不同,提供了4个构造函数,如下是最终的构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
提交任务
ThreadPoolExecutor继承了AbstractExecutorService抽象类。AbstractExecutorService实现了ExecutorService接口中提交任务的基本方法,包括:submit和invoke*等方法,用于提交任务,但没有实现execute方法。
submit包含3个重载方法,均是将Runnable、Callable对象包装成RunnableFuture对象,然后调用execute方法,可见执行任务的具体方式仍是依赖子类实现,有一些模板方法模式的味道。
invokeAll方法用于批量执行多个任务,并同步等待任务执行完成返回结果。invokeAny方法用于执行一批任务中的一个任务,并同步等待,直到有任意一个任务执行完成,便取消其它任务,并返回。invokeAny方法依赖于ExecutorCompletionService实现,ExecutorCompletionService作为CompletionService接口的实现类,可以理解为一个Executor对象+一个BlockingQueue,Executor用于执行提交的任务,可由调用方依赖注入,此处传入AbstractExecutorService本身,即AbstractExecutorService的实现类本身。BlockingQueue用于存放执行完成的任务。invokeAny方法调用doInvokeAny方法,doInvokeAny方法调用ExecutorCompletionService的submit方法,提交所有待执行的任务,然后调用ExecutorCompletionService的take方法,阻塞等待ExecutorCompletionService的BlockingQueue中有完成任务的Future放入。待收到第一个完成的任务结果后,调用Future的cancel方法取消其它还在执行的任务,随即返回。
还有一个newTaskFor方法,标志为protected,用于将Runnable、Callable接口封装成RunnableFuture对象,AbstractExecutorService的子类可根据包装Future对象的不同重写。
执行任务
所有Executor接口的实现类,均以execute方法为执行入口。ThreadPoolExecutor的execute方法执行如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// step1. 如果poolSize < corePoolSize,则新建线程;
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// step2. 如果线程池状态为running,且poolSize >= corePoolSize,则放入任务队列;
// 如果线程池状态不是running,则驳回任务,执行驳回策略;
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// step3. 如果放入任务队列失败,则创建一个新线程执行任务;
// 如果创建线程失败,则驳回任务,执行驳回策略;
else if (!addWorker(command, false))
reject(command);
}
从代码可以看出,线程池执行任务主要分为3步:
- 第一步:如果poolSize < corePoolSize,则新建线程;
- 第二步:如果线程池状态为running,且poolSize >= corePoolSize,则放入任务队列;如果线程池状态不是running,则驳回任务,执行驳回策略;
- 第三步:如果放入任务队列失败,则创建一个新线程执行任务; 如果创建线程失败,则驳回任务,执行驳回策略;
execute方法中调用addWorker方法用于创建一个工作线程,传入当前任务并启动;来看addWorker实现:
private boolean addWorker(Runnable firstTask, boolean core) {
// 通过CAS操作使workerCount++
... ...
// 标识工作线程是否正常启动
boolean workerStarted = false;
// 标识工作线程是否添加成功
boolean workerAdded = false;
Worker w = null;
try {
// 创建工作线程对象,传入任务,调用ThreadFactory创建线程;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁,保证如下操作的原子性:
// 1)添加工作线程到线程集合;
// 2)更新线程池运行数据;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动工作线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 线程启动失败,则终止线程池运行,尝试调用ThreadPoolExecutor提供的terninated()回调函数;
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
创建Worker线程后,执行t.start()方法会启动工作线程,工作线程的run方法调用ThreadPoolExecutor的runWorker方法。该方法用于执行线程任务,期间会调用ThreadPoolExecutor提供的beforeExecute和afterExecute回调函数,这两个函数由ThreadPoolExecutor的子类实现。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断
// 标识任务是否未被完整执行,true标识未被完整执行;
boolean completedAbruptly = true;
try {
// getTask阻塞等待任务,当线程池停止或者获取任务超时,则返回null;循环退出条件如下:
// 1)因为调用setMaximumPoolSize导致poolSize大于setMaximumPoolSize,需要回收掉部分工作线程;
// 2)线程池被stop;
// 3)线程池被shutdown并且任务队列为空;
// 4)线程池获取任务超时,即超过keepAliveTimeout时限,需要被回收时;
while (task != null || (task = getTask()) != null) {
// 标识当前工作线程为已使用状态
w.lock();
// 检查当前线程是否因为执行shutdownNow方法而被中断;
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // 在当前工作线程中执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
// 更新统计数据
w.completedTasks++;
// 标识工作线程为未使用状态
w.unlock();
}
}
// 如果执行到此处,表明任务被完整执行
completedAbruptly = false;
} finally {
// 此方法调用场景:
// 1)终止线程池时,用于从线程集合中移除工作线程;
// 2)当前线程因为异常而退出,重新创建线程替换之;
// 3)线程池中因为设置allowCoreThreadTimeOut=true,导致工作线程全部被回收时,任务队列仍然有任务,则新建线程;
processWorkerExit(w, completedAbruptly);
}
}
关闭线程池
关闭线程池有2个入口:shutdown、shutdownNow。shutdown是相对"温和"的关闭方式,它不接受新提交任务,中断所有空闲线程,但不会干预正在运行的线程,以及已经提交的任务。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查权限
checkShutdownAccess();
// 设置线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断所有等待任务的空闲线程
interruptIdleWorkers();
// 调用回调函数
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 执行线程池终止操作
tryTerminate();
}
shutdownNow是相对"激进"的关闭方法,它会尝试中断所有工作线程,同时干预正在运行的任务中止,清空任务队列,并将任务队列中未执行的任务返回给调用方。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查权限
checkShutdownAccess();
// 设置线程池状态为STOP
advanceRunState(STOP);
// 中断所有线程
interruptWorkers();
// 导出未执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 执行终止操作
tryTerminate();
return tasks;
}
shutdown和shutdownNow并不会同步等待线程池终止,而是提前异步返回,如果需要在调用shutdown或者shutdownNow后,同步等待线程池终止,则可以再调用awaitTermination方法。该方法会等到线程池状态变为terminated状态或者超时后返回。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
// 状态变为terminated,返回true
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
// 超时,返回false
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
以上是ThreadPoolExecutor的基本实现介绍,除了对主要流程的实现以外,ThreadPoolExecutor还提供了线程池监控、参数配置等方法,实现简单,详情可细读源码。
问题思考:为什么要把runState和workerCount放到同一个AtomicInteger中?