引言
在《(十一)彻悟并发之JUC分治思想产物-ForkJoin分支合并框架原理剖析上篇》中,我们曾初步了解了ForkJoin分支合并框架的使用,也分析框架的成员构成以及任务提交和创建工作的原理实现,在本篇则会对框架的任务执行、任务扫描、线程挂起、结果合并以及任务窃取的源码实现进行分析。
一、工作线程执行任务/工作窃取实现过程
在上篇的最后,从signalWork() -> tryAddWorker() -> createWorker() -> newThread() -> ForkJoinWorkerThread() -> registerWorker() -> deregisterWorker()
这条路线分析完了工作线程的注册与销毁原理实现。下面接着继续来分析工作线程执行任务的过程,先回到之前的createWorker()
方法:
// ForkJoinPool类 → createWorker()方法
private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
if (fac != null && (wt = fac.newThread(this)) != null) {
// 创建成功则调用start()方法执行
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
// 如果创建过程出现异常则注销线程
deregisterWorker(wt, ex);
return false;
}
可以很明显的看到,创建线程成功后则会开始调用start()
方法执行任务,最终会找到run()
方法执行它:
// ForkJoinWorkerThread类 → run()方法
public void run() {
// 如果任务数组不为空
if (workQueue.array == null) {
Throwable exception = null;
try {
// 钩子函数,用于拓展,这里是空实现
onStart();
// 使用线程池的runWorker方法执行队列任务
pool.runWorker(workQueue);
} catch (Throwable ex) {
// 如果执行过程中出现异常则先记录
exception = ex;
} finally {
try {
// 钩子函数,报告异常
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
// 如果执行出现异常,注销线程
pool.deregisterWorker(this, exception);
}
}
}
}
// ForkJoinPool类 → runWorker()方法
final void runWorker(WorkQueue w) {
// 初始化任务数组,任务数组一开始是没有初始化的
// 这个方法是初始化或两倍扩容数组
w.growArray();
// 获取注册队列时记录的用于计算索引的随机种子
int seed = w.hint;
// 如果种子为0,那么则改为1,避免使用0
int r = (seed == 0) ? 1 : seed;
// 死循环
for (ForkJoinTask<?> t;;) {
// 扫描任务:在池的队列数组中随机选择工作队列,获取任务执行
if ((t = scan(w, r)) != null)
// 如果获取到任务则执行
w.runTask(t);
// 没有扫描到任务则尝试自旋或挂起阻塞
else if (!awaitWork(w, r))
break;
// 每次执行完后修改随机值,换个队列获取任务
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
// ForkJoinPool类 → scan()方法
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
// 如果队列数组不为空并且任务队列已经初始化且不为空
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
// 获取当前队列scanState,最开始为队列在数组中的下标
int ss = w.scanState;
// r&m:随机得到一个数组中的下标,oldSum/checkSum:比较效验和的标识
// 开启循环
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
// 如果随机出的下标位置队列不为空
if ((q = ws[k]) != null) {
// 判断队列中有没有任务
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) {
// FIFO模式,通过内存偏移量计算出栈底/队头位置
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
// 获取栈底的任务
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null &&
q.base == b) {
// 如果工作线程处于活跃状态
if (ss >= 0) {
// 尝试利用CAS机制抢占线程(可能存在多个线程)
if (U.compareAndSwapObject(a, i, t, null)) {
// 抢占任务成功后将栈底挪一个位置
// 方便其他线程继续获取任务
q.base = b + 1;
// 如果队列中还剩有其他任务
if (n < -1) // signal others
// 新建或唤醒一条线程继续处理
signalWork(ws, q);
return t;
}
}
// 如果当前线程未激活,处于阻塞状态
else if (oldSum == 0 && // try to activate
w.scanState < 0)
// 唤醒线程
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
// 更新一次scanState值(因为前面可能唤醒了线程)
if (ss < 0) // refresh
ss = w.scanState;
// 获取一个新的随机值,用于随机下一个索引位置
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
// 根据新的随机种子计算出一个新的下标索引
origin = k = r & m; // move and rescan
// 效验和的标识复位
oldSum = checkSum = 0;
// 结束本次循环,继续下次循环
continue;
}
// 如果没有获取到任务,checkSum+1,表示遍历完了一个位置
// 用于效验
checkSum += b;
}
// k=(k+1)&m代表去队列数组的下个位置继续查找下个坑位的队列,
// 如果 ==origin 了,代表已经遍历了所有的队列
if ((k = (k + 1) & m) == origin) {
// continue until stable
// 如果工作线程还处于活跃状态并且扫描完成整个队列后,
// 效验和 还未发生改变,那代表着没有新的任务提交到线程池
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
// 如果活跃状态变为了<0,代表已经处于不活跃状态
// 那么则退出扫描,返回null,回到runWorker()阻塞线程
if (ss < 0 || w.qlock < 0) // already inactive
break;
// 灭活操作(灭活后的线程被称为失活状态):
// 先将当前scanState变为负数
int ns = ss | INACTIVE; // try to inactivate
// 在ctl中减去一个活跃线程数,
// 并且将失活的ss保存到ctl的低三十二位
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
// 用工作线程的stackPred成员保存上一个失活线程的
// scanState,从而形成一个阻塞栈,ctl的低32位保存栈顶
w.stackPred = (int)c; // hold prev stack top
// 更新当前工作线程的scanState
U.putInt(w, QSCANSTATE, ns);
// 使用cas机制更新ctl值
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
// 如果更新失败则退出回滚,继续扫描任务,因为cas过程
// 中,导致失败的原因就一个:ctl值发生了
// 改变,这可能是有新任务提交进来了之后,唤醒或
// 添加了一条线程
w.scanState = ss; // back out
}
// 检查标识复位
checkSum = 0;
}
}
}
// 如果未扫描到任务则直接返回null,并在外边的runWorker()发生阻塞
return null;
}
ok,如上是整个线程工作的源码实现,重点在于任务扫描的实现过程,同时它也是理解比较困难的一个地方,下面来整体梳理一下整个线程工作以及扫描任务的流程:
- ①线程
start()
启动之后会找到run()
方法,然后会开始去竞争线程池中的共享任务 - ②初始化线程的工作队列,同时获取注册队列时计算索引的随机种子
- ③开启循环扫描,通过随机种子计算出池中队列数组中的一个下标索引
- ④判断随机出来的索引位置是否为空:
- 不为空:判断队列中是否存在任务:
- 存在:判断当前线程状态是否为活跃状态:
- 是:通过cas机制,以FIFO的模式取出栈底/队头的任务,如果还剩余任务则新建或唤醒一条新的线程继续处理,然后返回获取到的任务
- 否:先将ctl中记录的失活线程唤醒,随机计算一个新的位置,跳出本次循环,继续下次循环
- 不存在:更新scanState并随机计算一个新的位置,跳出本次循环,继续下次循环
- 如果存在任务但是没有获取到任务,代表没有有其他线程抢了任务,checkSum+1,随机计算一个新的位置,跳出本次循环,继续下次循环
- 存在:判断当前线程状态是否为活跃状态:
- 为空:代表该位置不存在队列,找到下一个位置,依次类推....
- 不为空:判断队列中是否存在任务:
- ④如果队列为空时会找到下一个位置,然后接着重复第④步
- ⑤如果遍历完所有队列还是没有获取到任务,并且扫描期间也没有新的任务提交到线程池时,先判断工作线程的活跃状态:
- 失活(不活跃)状态:直接退出循环回到
runWorker()
方法自旋或挂起阻塞 - 活跃状态:进行灭活操作,利用cas机制减去ctl中一个活跃线程数,同时将当前线程的scanState值记录到ctl的低32位做为栈顶,使用stackPred保存上一条失活线程的scanState值,从而形成一个阻塞栈
- 如果灭活操作失败,则代表ctl发生了改变,代表有新任务提交进了线程池,则取消灭活操作
- 线程如果是处于活跃状态,在扫描一圈没有获取到任务之后,会再重新扫描一次所有队列,在第二遍扫描中线程是有机会被重新“复活(唤醒)”的
- 失活(不活跃)状态:直接退出循环回到
- ⑥当线程第二圈扫描后,依旧未获取到任务,那么当前线程会退出循环,返回空
- ⑦扫描完毕后回到
runWorker()
方法,在该方法中会判断扫描结果是否为空:- 非空:调用
runTask()
执行扫描获取到的任务 - 为空:调用
awaitWork()
自旋或挂起阻塞线程,直至有新任务提交后唤醒
- 非空:调用
- ⑧如果获取任务成功,在执行过程中出现异常则报告异常信息并注销线程
线程工作以及扫描的整个流程会比较长,尤其是有些小伙伴在理解scan()
方法的多次扫描有些困难,线程在第一圈扫描时未获取到任务,会先灭活然后再扫描一圈,如果第二圈扫描到了任务则会“复活”灭活线程,然后再扫描一圈。如果第二圈扫描同样未扫描到任务,那么则直接退出循环。下面来个流程图加深理解:
在扫描的实现中,其实也是包含了任务窃取的实现的,因为在扫描的过程中是不会区分偶数队列和奇数队列,而且将所有队列都进行扫描,只要有任务就获取执行,而获取任务的方式是通过FIFO方式进行的,代表着共享队列中的任务获取以及工作窃取是通过获取队列头部/栈底的元素实现。而线程在执行自己工作队列中的任务时,是通过LIFO的模式进行的,是从队列尾部/栈顶获取任务执行,这样做的好处是可以避免工作窃取和本地执行时的CAS竞争。
ok,接着来看看任务执行以及线程挂起的实现:
// FrokJoinPool类 → runTask()方法
final void runTask(ForkJoinTask<?> task) {
// 如果任务不为空
if (task != null) {
// scanState&=~SCANNING会把scanState变成偶数,表示正在执行任务
scanState &= ~SCANNING; // mark as busy
// 执行任务
(currentSteal = task).doExec();
// 执行完任务后将维护偷取到的任务的成员置空
U.putOrderedObject(this, QCURRENTSTEAL, null);
// 执行本地任务:工作线程自身队列中的任务
execLocalTasks();
ForkJoinWorkerThread thread = owner;
// 窃取任务计数
if (++nsteals < 0)
// 叠加到ForkJoinPool的stealCounter成员中
transferStealCount(pool);
// 执行完成后,将状态从执行重新改为扫描状态
scanState |= SCANNING;
// 执行钩子函数
if (thread != null)
thread.afterTopLevelExec();
}
}
// FrokJoinPool类 → execLocalTasks()方法
final void execLocalTasks() {
int b = base, m, s;
ForkJoinTask<?>[] a = array;
// 如果自身工作队列中有任务
if (b - (s = top - 1) <= 0 && a != null &&
(m = a.length - 1) >= 0) {
// 如果自身队列被指定成了FIFO模式执行
if ((config & FIFO_QUEUE) == 0) {
for (ForkJoinTask<?> t;;) {
// 从栈顶/队列头部获取任务执行
if ((t = (ForkJoinTask<?>)U.getAndSetObject
(a, ((m & s) << ASHIFT) + ABASE, null)) == null)
break;
U.putOrderedInt(this, QTOP, s);
//执行任务
t.doExec();
if (base - (s = top - 1) > 0)
break;
}
}
else
// 如果没有则直接以LIFO模式从栈底获取任务执行
pollAndExecAll();
}
}
// FrokJoinPool类 → pollAndExecAll()方法
final void pollAndExecAll() {
// 栈底/队列尾部获取任务
for (ForkJoinTask<?> t; (t = poll()) != null;)
t.doExec();
}
// WorkerQueue类 → poll()方法
final ForkJoinTask<?> poll() {
ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
// 任务队列不为空
while ((b = base) - top < 0 && (a = array) != null) {
// 从栈底/队列尾部取值
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
// 检查是否被其他线程抢占
if (base == b) {
if (t != null) {
// 置空
if (U.compareAndSwapObject(a, j, t, null)) {
base = b + 1;
return t;
}
}
// 如果队列中没有了任务则退出
else if (b + 1 == top) // now empty
break;
}
}
return null;
}
// FrokJoinPool类 → awaitWork()方法
private boolean awaitWork(WorkQueue w, int r) {
// 如果队列已经被注销,直接返回
if (w == null || w.qlock < 0)
return false;
// 开启循环(w.stackPred:上个阻塞线程的scanState值)
for (int pred = w.stackPred, spins = SPINS, ss;;) {
// 如果当前线程被“复活/唤醒”则直接退出
if ((ss = w.scanState) >= 0)
break;
// 自旋操作:在挂起线程前会随机自旋一段时间
else if (spins > 0) {
// 通过随机种子以及自旋数实现随机自旋
r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
// 检查前一个失活挂起的工作线程是否已经复活
if (r >= 0 && --spins == 0) {
// randomize spins
WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
if (pred != 0 && (ws = workQueues) != null &&
(j = pred & SMASK) < ws.length &&
(v = ws[j]) != null && // see if pred parking
(v.parker == null || v.scanState >= 0))
spins = SPINS; // continue spinning
}
}
// 再次检测队列状态,是否被注销
else if (w.qlock < 0) // recheck after spins
return false;
// 如果线程没有被中断
else if (!Thread.interrupted()) {
long c, prevctl, parkTime, deadline;
// 获取活跃线程数
int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
// 如果活跃线程数<=0,可能是要关闭线程池,这里会去帮忙关闭
if ((ac <= 0 && tryTerminate(false, false)) ||
(runState & STOP) != 0) // pool terminating
return false;
// 如果活跃线程数<=0并且当前线程是最后挂起的线程
if (ac <= 0 && ss == (int)c) {
// is last waiter
// 计算出一个ctl值
prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
// 获取总线程数
int t = (short)(c >>> TC_SHIFT); // shrink excess spares
// 如果总线数大于2,说明挂起的线程已经超过两个了
// 当前线程会被抛弃
if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
// 返回false后,外面的runWorker()会直接break退出,
// 从而导致run()结束,线程死亡
return false;
// 如果挂起的线程数<=2或者cas失败(有线程被唤醒/复活)
// 那么则计算挂起时间,将当前线程挂起一段时间
// 计算挂起时间
parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
// 计算结束时间
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
}
// 如果还存在活跃线程或当前线程不是最后被挂起的线程
else
// 将当前线程一直挂起(这类永久挂起的线程被唤醒后,如果对
// 应的scanState还是失活状态,这可能是线程池正在关闭了)
prevctl = parkTime = deadline = 0L;
// 获取当前线程
Thread wt = Thread.currentThread();
// 模仿LockSupport.park()挂起操作
U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport
w.parker = wt;
// 挂起之前会再检测一遍状态
if (w.scanState < 0 && ctl == c) // recheck before park
// 挂起操作
U.park(false, parkTime);
U.putOrderedObject(w, QPARKER, null);
U.putObject(wt, PARKBLOCKER, null);
// 如果被复活,则直接退出循环,返回true
if (w.scanState >= 0)
break;
// 如果阻塞时间不为零并且CTL值在期间没有发生改变,那么说明
// 在这段时间内外部并没有提交新的任务进来,当前线程则会被销毁
if (parkTime != 0L && ctl == c &&
deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, c, prevctl))
return false; // shrink pool
}
}
return true;
}
先说说线程执行任务的runTask()
方法,在该方法中首先会修改状态为执行状态,执行完成窃取到的任务之后会再执行自身工作队列中的任务,在执行自身任务时,除非是指定成了FIFO模式,不然默认都是会以LIFO模式,执行完所有任务后,会将状态重新改为扫描状态。整体逻辑还算比较简单。
再来谈谈挂起/阻塞方法awaitWork
,当线程扫描不到任务时,会先检查自己是否需要自旋,如果需要则会使用随机种子配合实现随机自旋。自旋结束后,如果池中挂起的(空闲的)线程数过多,或者外部已经很久没有提交新的任务进来,都会直接销毁线程,从而达到缩减线程数的目的。
销毁线程的实现也比较有意思,在前面的《线程池分析》中得知,线程池中的线程复用原理实则是通过死循环的方式卡住了
run()
方法,不让run()
方法结束,这样线程就不会停止。而在该方法中,当需要缩减线程数时,则会直接返回false
,让外面的runWorker()
方法中的循环退出,从而导致run()
结束,让线程正常执行终止达到缩减线程数的目的。
二、任务的拆分与合并实现过程分析
在分析Fork/Join框架成员构成时,曾简单提到过fork/join()
方法,下面再来详细分解它两的实现过程,先引用一下《上篇》中的片段:
// ForkJoinTask类 → fork方法
public final ForkJoinTask<V> fork() {
Thread t;
// 判断当前执行的线程是否为池中的工作线程
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
// 如果是的则直接将任务压入当前线程的任务队列
((ForkJoinWorkerThread)t).workQueue.push(this);
else
// 如果不是则压入common池中的某个工作线程的任务队列中
ForkJoinPool.common.externalPush(this);
// 返回当前ForkJoinTask对象,方便递归拆分
return this;
}
// ForkJoinTask类 → join方法
public final V join() {
int s;
// 判断任务执行状态如果是非正常结束状态
if ((s = doJoin() & DONE_MASK) != NORMAL)
// 抛出相关的异常堆栈信息
reportException(s);
// 正常执行结束则返回执行结果
return getRawResult();
}
// ForkJoinTask类 → doJoin方法
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
// status<0则直接返回status值
return (s = status) < 0 ? s :
// 判断当前线程是否为池中的工作线程
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
// 是则取出线程任务队列中的当前task执行,执行完成返回status值
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
// 尝试将栈顶任务置空,然后执行任务
tryUnpush(this) && (s = doExec()) < 0 ? s :
// 执行未完成则调用awaitJoin方法等待执行完成
wt.pool.awaitJoin(w, this, 0L) :
// 不是则调用externalAwaitDone()方法阻塞挂起当前线程
// 将任务交由通用的common线程池执行
externalAwaitDone();
}
- fork方法逻辑:
- ①判断当前线程是否为池中的工作线程类型
- 是:将当前任务压入当前线程的任务队列中
- 不是:将当前任务压入common池中某个工作线程的任务队列中
- ②返回当前的ForkJoinTask任务对象,方便递归拆分
- ①判断当前线程是否为池中的工作线程类型
- doJoin&join方法逻辑:
- ①判断任务状态status是否小于0:
- 小于:代表任务已经结束,返回status值
- 不小于:判断当前线程是否为池中的工作线程:
- 是:尝试从栈顶/队尾取出当前task执行:
- 任务在栈顶:执行任务并返回执行结束的status值
- 不在栈顶:调用awaitJoin方法等待执行结束
- 不是:调用externalAwaitDone()方法阻塞挂起当前线程,等待任务执行结束
- 是:尝试从栈顶/队尾取出当前task执行:
- ②判断任务执行状态是否为非正常结束状态,是则抛出异常堆栈信息
- 任务状态为被取消,抛出CancellationException异常
- 任务状态为异常结束,抛出对应的执行异常信息
- ③如果status为正常结束状态,则直接返回执行结果
- ①判断任务状态status是否小于0:
关于doJoin方法的代码可能看起来有些难理解,还是和前面上篇中分析“工作线程注册的原理时,理解奇数位索引计算”的方式一样,自己写一遍理解,换个写法如下:
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt;
ForkJoinPool.WorkQueue w;
// 如果任务已经执行完成,直接返回任务状态
if ((s = status) < 0) {
return s;
}
t = Thread.currentThread();
boolean isForkJoinThread = t instanceof ForkJoinWorkerThread;
// 如果当前线程不是工作线程,即外部线程直接调用join方法合并
if (!isForkJoinThread) {
// 等待任务被线程池分配线程执行完,返回任务状态
return externalAwaitDone();
}
// 如果当前线程是工作线程
wt = (ForkJoinWorkerThread) t;
w = wt.workQueue;
// 如果当前任务在队列尾部/栈顶,直接弹出来
if (w.tryUnpush(this)) {
// 然后执行弹出来的任务
return this.doExec();
}
// 如果当前任务不在队列尾部/栈顶,那么调用awaitJoin等待
return wt.pool.awaitJoin(w, this, 0L);
}
经过这样就可以非常清晰的看明白doJoin
方法的逻辑啦。接着往下分析,其实fork的原理实现还算简单,下面重点分析join
的实现。先看看tryUnpush()
方法:
// ForkJoinTask类 → tryUnpush()方法
final boolean tryUnpush(ForkJoinTask<?> t) {
ForkJoinTask<?>[] a; int s;
// 尝试将栈顶/队尾任务置空,如果t就是队列中的栈顶任务,那尝试cas置空
if ((a = array) != null && (s = top) != base &&
U.compareAndSwapObject
(a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
U.putOrderedInt(this, QTOP, s);
return true;
}
return false;
}
工作线程在合并结果时,如果这个任务被fork到了栈顶/队尾,那么执行该任务返回即可。但如果不在栈顶,有可能是被其他fork出的任务压下去了或者其他线程被窃取了,那么则会进入awaitJoin()
方法。
2.1、awaitJoin方法
接着来看看awaitJoin()
方法,源码如下:
// ForkJoinTask类 → awaitJoin()方法
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
if (task != null && w != null) {
// 记录前一个正在合并的任务
ForkJoinTask<?> prevJoin = w.currentJoin;
// 记录join合并当前任务
U.putOrderedObject(w, QCURRENTJOIN, task);
// CountedCompleter是ForkJoinTask的一个子类实现
CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
(CountedCompleter<?>)task : null;
// 自旋操作
for (;;) {
// 1.任务已经执行完毕,不需要再自旋了,直接返回
if ((s = task.status) < 0)
break;
// 如果任务是CountedCompleter类型,则获取它的派生子任务执行
if (cc != null)
helpComplete(w, cc, 0);
// 如果队列不为空,尝试从队列中获取当前需要join的任务执行。
// 如果当前队列任务为空,说明当前任务被其他工作线程给窃取了
// tryRemoveAndExec是用于尝试执行存到队列中的当前任务,
// 如果队列中没有找到当前join的任务,那代表被其他线程给偷走了
else if (w.base == w.top || w.tryRemoveAndExec(task))
// 找到窃取join任务的工作线程,帮助窃取者执行窃取者的任务
helpStealer(w, task);
// 3.再判断一次任务是否已经执行完毕,执行结束则退出
// 如果任务被窃取,能够执行到这一步,那么一定是前面的
// helpStealer方法退出了,原因有两个:
// 1.自己需要join合并的任务执行完了
// 2.窃取链断了或没有可窃取的任务了,准备阻塞
if ((s = task.status) < 0)
break;
long ms, ns;
if (deadline == 0L)
ms = 0L;
else if ((ns = deadline - System.nanoTime()) <= 0L)
break;
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L;
// 4.调用tryCompensate方法对线程池进行补偿
// 进入阻塞之前为了避免线程池所有线程都进入阻塞,
// 会为线程池补偿一个活跃线程(唤醒或新建)
if (tryCompensate(w)) {
// 自旋加阻塞,等待其他线程执行完成窃取的join任务
task.internalWait(ms);
// 唤醒后叠加活跃线程数
U.getAndAddLong(this, CTL, AC_UNIT);
}
}
// 当任务执行完成后,将currentJoin恢复成之前的currentJoin值
U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
}
return s;
}
// ForkJoinTask类 → tryRemoveAndExec()方法
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; int m, s, b, n;
// 如果队列中的任务数组不为空且已经初始化
if ((a = array) != null && (m = a.length - 1) >= 0 &&
task != null) {
// 队列中是否存在任务
while ((n = (s = top) - (b = base)) > 0) {
for (ForkJoinTask<?> t;;) {
// traverse from s to b
// 从栈顶开始往下取值
long j = ((--s & m) << ASHIFT) + ABASE;
// 因为存在并发,可能会被窃取者偷走任务
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
// 如果发生了任务窃取,那说明此时的s已经执行到了栈底
// 如果被偷走join任务是在栈顶被偷走的,那么将返回true
return s + 1 == top; // shorter than expected
// 如果找到了任务
else if (t == task) {
boolean removed = false;
// 当前join任务在栈顶,尝试将其弹出
// 如果cas失败,代表被其他线程偷走,此时队列已经空了
if (s + 1 == top) {
// pop
if (U.compareAndSwapObject(a, j, task, null)) {
U.putOrderedInt(this, QTOP, s);
removed = true;
}
}
// 当前join任务不在栈顶并且栈底没变,
// 将当前join任务的坑位替换成EmptyTask对象
else if (base == b) // replace with proxy
// 因为任务不在栈顶,不能直接替换成null,
// 替换成null就必须移动指针,显然这里不能移动指针
// 很多地方都是以null作为并发判断,
// 其他工作线程取到null时,
// 会认为任务被其他线程窃取了任务,
// 这样就永远获取不到任务了
removed = U.compareAndSwapObject(
a, j, task, new EmptyTask());
if (removed)
//执行任务
task.doExec();
break;
}
// 如果其他任务已经执行完成,并且是栈顶任务,那么置空
else if (t.status < 0 && s + 1 == top) {
if (U.compareAndSwapObject(a, j, t, null))
U.putOrderedInt(this, QTOP, s);
break; // was cancelled
}
// 从栈顶找到栈底都没有找到,返回false
// 虽然任务被偷了,但是也不去参与helpStealer了
if (--n == 0)
return false;
}
// 任务已经完成
if (task.status < 0)
return false;
}
}
return true;
}
awaitJoin
方法的总体逻辑还算简单,如下:
- ①检查当前线程的工作队列是否为空
- 为空:代表任务被窃取了
- 不为空:通过
tryRemoveAndExec
在整个队列中查找当前需要join的任务- 找到了:执行任务
- 没找到:代表任务还是被偷了(这种情况下不参与
helpStealer
方法)
- ②如果任务被偷了,那么通过
helpStealer
找到窃取者,帮助它执行任务 - ③如果从
helpStealer
方法中退出,会再检查一次任务是否已完成:- 已执行结束:退出循环,出去合并结果
- 未执行结束:准备进入阻塞,避免CPU资源浪费
- ④在进入阻塞之前,会先对线程池进行补偿,因为当前线程可能是线程池中的最后一个活跃线程,为了避免线程池所有线程都“死”掉,会先为线程池补偿一个活跃线程
ok~,再来看看tryRemoveAndExec
方法的逻辑,如下:
- 判断队列中是否有任务:
- 不存在:返回true,外部的
awaitJoin
方法进入helpStealer
逻辑 - 存在:判断任务是否在队列尾部/栈底:
- 在:尝试CAS弹出栈顶任务:
- 成功:执行任务
- 失败:代表CAS失败,任务被别的线程偷走了,进入
helpStealer
逻辑
- 不在:可能被其他任务压下去了,从栈顶开始查找整个队列:
- 找到了:将任务替换成
EmptyTask
对象,执行任务 - 没找到:代表任务被偷了,但虽然没找到,也不参与
helpStealer
了,不过在退出之前会再一次检测任务是否执行完成
- 找到了:将任务替换成
- 在:尝试CAS弹出栈顶任务:
- 不存在:返回true,外部的
tryRemoveAndExec
方法比较简单,该方法主要作用是遍历当前线程的WorkQueue
,在队列中查找要join合并的任务执行。而在执行过程中,如果队列为空或者任务在栈顶但cas失败以及遍历完整个队列都没找到要join的任务,这三种情况代表任务被偷了,对于前两种情况下,会进入helpStealer
帮助窃取者执行任务,而对于最后一种被窃取任务的情况,则会直接退出阻塞(个人猜测:可能是因为遍历完整个队列会导致一段时间的开销,被窃取走的任务很有可能在这段时间内已经执行完了或快执行完了。所以与其去帮助窃取者执行任务,还不如阻塞等待一会儿)。
2.2、helpStealer帮助窃取者执行方法
再来看看helpStealer
方法,源码如下:
// ForkJoinTask类 → helpStealer()方法
private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
WorkQueue[] ws = workQueues;
int oldSum = 0, checkSum, m;
// 如果队列数组和任务队列不为空
if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
task != null) {
do {
// restart point
checkSum = 0; // for stability check
ForkJoinTask<?> subtask;
WorkQueue j = w, v; // v is subtask stealer
descent: for (subtask = task; subtask.status >= 0; ) {
// j.hint开始是j队列在队列组中的用于计算下标的随机值,
// 如果找到了窃取者,这个值会变成对应窃取者的下标
// j.hint | 1=一个奇数,k += 2:步长为2,奇数+2=奇数
for (int h = j.hint | 1, k = 0, i; ; k += 2) {
// 查找完整个对应数组的所有奇数位,
// 如果还是没有找到任务,则直接退出(可能执行完成了)
if (k > m) // can't find stealer
break descent;
//(h + k) & m:计算出一个数组之内的奇数下标,
// 检查这个下标的队列是否偷走了自己的任务
if ((v = ws[i = (h + k) & m]) != null) {
// 判断currentSteal是否是当前的任务
if (v.currentSteal == subtask) {
// 是的,记录这个偷取者在队列组的下标
j.hint = i;
break;
}
// 检查了一个队列之后会计入校验和
checkSum += v.base;
}
}
// 当前线程帮窃取者线程执行任务
for (;;) {
// help v or descend
ForkJoinTask<?>[] a; int b;
// 将窃取者线程队列栈底也计入校验和,因为它窃取了任务
// ,很有可能fork出更小的任务然后被其他线程偷走
checkSum += (b = v.base);
// 获取窃取者当前正在join的任务
ForkJoinTask<?> next = v.currentJoin;
//subtask.status < 0 任务执行完成
// 如果任务执行结果
// 或者工作线程要合并的任务已经不是subtask了
// 或者窃取者窃取的任务已经不为当前join任务了
// 那么退出循环
if (subtask.status < 0 || j.currentJoin != subtask ||
v.currentSteal != subtask) // stale
break descent;
// 如果当前线程中没有任务,则会帮它join合并任务
// 在这里会对subtask重新赋值,如果为空则会回到descent
// 循环进行下一个迭代
if (b - v.top >= 0 || (a = v.array) == null) {
// 如果窃取者不需要join合并任务,
// 退出判断任务是否结束
if ((subtask = next) == null)
break descent;
// 如果窃取者有任务要join合并,
// 那将帮窃取者去找偷它任务的窃取者
j = v;
break;
}
// 如果窃取者的队列中有任务,从栈底/队头开始偷窃取者
// 线程的任务执行(可能窃取到自身被偷的任务
// fork出来的子任务),
int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
ForkJoinTask<?> t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i));
// 偷完之后看一下栈底/队头有没有发生变化,
// 如果变了,代表有其他线程也在偷窃取者线程的任务,
// 避免无效的cas,直接重新再偷一个新的任务
if (v.base == b) {
// ==null,代表任务被其他线程偷了,
// 然后赋值成了null,只是还没来得及将base更新
if (t == null) // stale
// 回到 descent 标志进行下一个迭代
break descent;
// 如果没变则cas置空栈底/队头的任务
// 这样可以告诉别的线程当前任务已被窃取
if (U.compareAndSwapObject(a, i, t, null)) {
// 更新栈底指针
v.base = b + 1;
// 记录自己前一个偷取的任务
ForkJoinTask<?> ps = w.currentSteal;
int top = w.top;
do {
// 将新偷到的任务更新到currentSteal中
U.putOrderedObject(w, QCURRENTSTEAL, t);
// 执行窃取到的任务
t.doExec(); // clear local tasks too
// 在join的任务还未执行完成的情况下,
// 并且刚才执行的任务发生了fork任务,
// 那么w.top !=top就会成立,
// 此时就得w.pop()执行本地任务
} while (task.status >= 0 &&
w.top != top &&
(t = w.pop()) != null);
// 执行结束后恢复原本的窃取记录
U.putOrderedObject(w, QCURRENTSTEAL, ps);
// 然后再看看自身队列中有没有任务
// 如果w.base != w.top成立,代表自身队列来了
// 任务,此时则直接结束,回去执行自己的任务,
// 没有必要帮别的线程执行任务了
if (w.base != w.top)
return; // can't further help
}
}
}
}
// 退出helpStealer的条件有两个:
// 1.自己需要合并的join任务执行完了,回去执行自己的合并任务;
// 2.自己的join任务没执行完,但已经窃取不到任务了,那退出阻塞
// 当前线程,因为继续找下去也是空跑,浪费CPU资源
} while (task.status >= 0 && oldSum != (oldSum = checkSum));
}
}
该方法是ForkJoin
框架实现“工作窃取思想”的核心体现。它与scan
扫描方法完成了整个框架“工作窃取”实现。在scan
方法之后的runTask
方法中,会对currentSteal
赋值,而helpStealer
方法就是依赖于该成员与currentJoin
成员形成的一条窃取链,实现了帮助窃取者执行任务,关于helpStealer
的具体逻辑则不再分析了,大家可以参考上述源码中的注释。
总而言之,
helpStealer
方法的核心思想是帮助执行,帮助窃取者执行它的任务,但它不仅仅只会帮助窃取者执行,还会基于currentSteal
与currentJoin
成员形成的窃取链帮助窃取者的窃取者执行、帮助窃取者的窃取者的窃取者执行、帮助窃取者.....的窃取者执行任务。上个例子理解,如下:
- ①线程T1需要join合并任务TaskA,但是TaskA被偷了,开始遍历查找所有奇数队列
- ②查找后发现TaskA==线程T2.currentSteal成员,此时T2为T1的窃取者
- ③T1从T2的队列栈底窃取一个任务执行,执行完再窃取一个执行,继续窃取....
- ④T1发现T2的队列中没有了任务,T1则会继续寻找窃取了T2.currentlJoin的线程
- ⑤经过遍历发现T2.currentlJoin==T5.currentSteal,T5为T2的窃取者
- ⑥然后T1继续从T5队列的栈底窃取一个任务执行,完成后继续窃取.....
- ⑦T1发现T5的队列中也没有了任务,T1会继续寻找窃取了T5.currentlJoin的....
- ⑧根据窃取链,一直这样循环下去.....
通过如上过程可发现:T1.currentlJoin → T2.currentSteal → T2.currentlJoin → T5.currentSteal → T5.currentlJoin....
,通过currentSteal
与currentJoin
两个成员构成了一条窃取链,如果理解了这条链路关系,那么也就理解了helpStealer
方法。不过值得注意的是:helpStealer
方法什么时候退出呢?答案是:窃取链断掉的时候会退出。总共有三种情况会导致窃取链断掉:
- ①任何一个工作线程的
currentSteal
或currentJoin
为空 - ②任何一个工作线程的
currentSteal
或currentJoin
已经执行完成 - ③当前线程的join任务已经执行完成
其实说到底,helpStealer
方法是ForkJoin
框架的一个优化性能的实现点,核心点在于减少线程因为合并而阻塞,在等待join任务执行期间帮其它线程执行一个任务,这样则保证了每个线程不停止工作,也能够加快整体框架的处理速度,同时在帮助执行的期间,被窃取的join任务就执行完了。
2.3、tryCompensate补偿活跃线程方法
再来看看为线程池补偿活跃线程的tryCompensate
方法:
// ForkJoinPool类 → tryCompensate()方法
private boolean tryCompensate(WorkQueue w) {
boolean canBlock;
WorkQueue[] ws; long c; int m, pc, sp;
// 如果线程池已经停止,处于terminate状态,不能阻塞,也不需要阻塞
if (w == null || w.qlock < 0 || // caller terminating
(ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
(pc = config & SMASK) == 0) // parallelism disabled
canBlock = false;
// 如果ctl的低32位中有挂起的空闲线程,那么尝试唤醒它,成功则阻塞自己
// 唤醒后在一定程度上也许会执行到自己被偷的任务fork出的子任务
// tryRelease第二个参数为0,当唤醒成功后,代表当前线程将被阻塞,
// 新的空闲线程被唤醒,所以没必要先减少活跃线程数,然后再加上
else if ((sp = (int)(c = ctl)) != 0) // release idle worker
canBlock = tryRelease(c, ws[sp & m], 0L);
// 如果没有空闲线程,就要创建新的线程
// 这里会导致线程池中的线程数,在一段时间内会超过创建时指定的并行数
else {
// 获取池中的活跃线程数
int ac = (int)(c >> AC_SHIFT) + pc;
// 获取池中的总线程数
int tc = (short)(c >> TC_SHIFT) + pc;
int nbusy = 0; // validate saturation
for (int i = 0; i <= m; ++i) {
// two passes of odd indices
WorkQueue v;
// 找奇数位置的队列,循环m次就是执行了两遍。
// 为什么执行两遍呢?主要是为了判断稳定性,有可能第二遍
// 的时候,正在执行任务的活跃线程会变少
if ((v = ws[((i << 1) | 1) & m]) != null) {
// 检查工作线程是否正在处理任务,
// 如果不在处理任务表示空闲,可以获取其他任务执行
if ((v.scanState & SCANNING) != 0)
break;
++nbusy;
}
}
// 如果线程池状态不稳定,那么则不能挂起当前线程
// 如果nbusy!=tc*2 说明还存在空闲或者还在扫描任务的工作线程
// 如果ctl!=c 代表ctl发生了改变,有可能线程执行完任务后,
// 没有扫描到新的任务被失活,这种情况下先不挂起,先自旋一段时间
if (nbusy != (tc << 1) || ctl != c)
canBlock = false; // unstable or stale
// tc:池内总线程数 pc:并行数 ac:池内活跃线程数
// tc>=pc 代表此时线程数已经够多了,当然并不代表不会创建新线程
// ac>1 代表除了自己外还有其他活跃线程
// w.isEmpty() 当前工作线程队列为空,其中没有任务需要执行
// 如果满足如上三个条件,那么则可以直接阻塞,不需要补偿
else if (tc >= pc && ac > 1 && w.isEmpty()) {
long nc = ((AC_MASK & (c - AC_UNIT)) |
(~AC_MASK & c)); // uncompensated
//cas ctl
canBlock = U.compareAndSwapLong(this, CTL, c, nc);
}
// 这是对于commonPool 公共线程池的特殊处理
// 如果总线程数超出MAX_CAP则会抛出异常
else if (tc >= MAX_CAP ||
(this == common && tc >= pc + commonMaxSpares))
throw new RejectedExecutionException(
"Thread limit exceeded replacing blocked worker");
else {
// similar to tryAddWorker
boolean add = false; int rs; // CAS within lock
// 准备创建新的工作线程(这里只加总线程数,不加活跃线程数)
// 因为当前工作线程将在创建补偿线程成功之后阻塞
// 但是这里会导致总线程数超出并行数
long nc = ((AC_MASK & c) |
(TC_MASK & (c + TC_UNIT)));
// 线程池没有停止的情况下才允许创建新的工作线程
if (((rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
// 创建新的工作线程
canBlock = add && createWorker(); // throws on exception
}
}
return canBlock;
}
该方法内的逻辑也算比较简单:
- ①判断池内有没有挂起的空闲线程,如果有则唤醒它代替自己
- ②如果没有挂起的空闲线程,判断池内活跃线程数是否存在两个及以上、总线程数是否饱和、自己工作队列是否为空,如果这些都满足,那么则不需要补偿,直接挂起
- ③如果不满足上述三条件,判断线程数是否关闭,如果没有则创建新线程补偿
值得一提的是:tryCompensate
方法会导致一段时间内,池中总线程数超出创建线程池时指定的并行数。而且如果在用Fork/Join
框架时,如果在ForkJoinTask中调用提交任务的方法:sumbit()/invoke()/execute()
时,会导致线程池一直补偿线程,硬件允许的情况下会导致一直补偿创建出最大0x7fff = 32767
条线程。
2.4、externalAwaitDone方法
前面分析doJoin
逻辑提到过:如果是外部线程调用join
方法时,会调用externalAwaitDone
方法,接着再来看看这个方法:
// ForkJoinPool类 → externalAwaitDone()方法
private int externalAwaitDone() {
// 如果任务是CountedCompleter类型,尝试使用common池去外部帮助执行,
// 执行完成后并将完成任务状态返回
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
// 当前task不是CountedCompleter,尝试从栈顶获取到当前
// join的任务交给common池执行,如果不在栈顶,s变为0
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
// 如果s>=0,那代表任务是未结束的状态,需要阻塞
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false;
do {
// 先设置SIGNAL信号标记,通知其他线程当前需要被唤醒
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
// 通过synchronized.wait()挂起线程
synchronized (this) {
if (status >= 0) {
// 双重检测
try {
wait(0L); // 挂起线程
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
// 如果发现已完成,则唤醒所有等待线程
notifyAll();
}
}
// task未完成会一直循环
} while ((s = status) >= 0);
// 响应中断操作
if (interrupted)
Thread.currentThread().interrupt();
}
// 执行完成后返回执行状态
return s;
}
externalAwaitDone
方法最简单,如果任务在栈顶,那么直接弹出执行,如果不在则挂起当前线程,直至任务执行结束,其他线程唤醒。
2.5、任务拆分合并原理总结
任务的fork操作比较简单,只需要将拆分好的任务push进入自己的工作队列即可。而对于任务结果的合并:join操作,实现就略显复杂了,大体思想是首先在自己队列中找需要join的任务,如果找到了则执行它并合并结果。如果没找到就是被偷了,需要去找窃取者线程,并且在join任务执行结束之前,会根据窃取链一直帮助窃取者执行任务,如果窃取链断了但是join任务还未执行完,那么挂起当前工作线程,不过在挂起之前会根据情况来决定是否为线程池补偿一条活跃线程代替自己工作,防止整个线程池所有的线程都阻塞,产生线程池“假死”状态。当然,如果是外部线程执行的join操作,如果要被join的任务还未执行完的情况下,那么则需要把这任务交给commonPool
公共池来处理。
三、ForkJoin中任务取消实现原理
任务取消的cancel
方法是实现于Future
接口的,逻辑比较简单,源码如下:
// ForkJoinTask类 → cancel()方法
public boolean cancel(boolean mayInterruptIfRunning) {
// 尝试将任务状态修改为CANCELLED,成功返回true,失败返回false
return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}
// ForkJoinTask类 → setCompletion()方法
private int setCompletion(int completion) {
// 开启自旋(死循环)
for (int s;;) {
// 如果任务已经完成,则直接返回执行后的状态
if ((s = status) < 0)
return s;
// 如果还未完成则尝试通过cas机制修改状态为入参:completion状态
if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
if ((s >>> 16) != 0)
synchronized (this) {
notifyAll(); }
return completion;
}
}
}
取消任务的逻辑比较简单,任务取消只能发生在任务还未被执行的情况下,如果任务已经完成则会直接返回执行状态。如果任务还未执行,则会尝试使用自旋+CAS机制修改任务状态为CANCELLED
状态,成功则代表任务取消成功。
四、ForkJoinPool线程池的关闭实现
一般在正常关闭线程池时,都会通过shundown
方法来停止线程池,接着再分析一下线程池关闭的实现:
// ForkJoinPool类 → shutdown()方法
public void shutdown() {
// 检查权限
checkPermission();
// 关闭线程池
tryTerminate(false, true);
}
// ForkJoinPool类 → checkPermission()方法
private static void checkPermission() {
// 获取权限管理器
SecurityManager security = System.getSecurityManager();
// 检测当前线程是否具备关闭线程池的权限
if (security != null)
security.checkPermission(modifyThreadPermission);
}
// ForkJoinPool类 → tryTerminate()方法
private boolean tryTerminate(boolean now, boolean enable) {
int rs;
// 如果是common公开池,不能关闭,common的关闭和Java程序绑定
if (this == common) // cannot shut down
return false;
// 如果线程池还在运行,那么检测enable是否为true,如果是false则退出
if ((rs = runState) >= 0) {
if (!enable)
return false;
rs = lockRunState(); // enter SHUTDOWN phase
// 如果线程池是要关闭,首先把运行状态改为 SHUTDOWN 标记
unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
}
// 如果线程池还不是stop停止状态(rs&stop==1表示处于stop状态)
if ((rs & STOP) == 0) {
// 如果now入参为false会进入如下逻辑
if (!now) {
// check quiescence
// 遍历整个工作队列数组
for (long oldSum = 0L;;) {
// repeat until stable
WorkQueue[] ws; WorkQueue w; int m, b; long c;
// 以目前的ctl值作为初始效验和
long checkSum = ctl;
// 检测池内活跃线程数,如果>0则不能直接置为stop状态
if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
return false; // still active workers
// 如果工作队列全部被注销了则可以设置为stop状态
if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
break; // check queues
// 开启循环
for (int i = 0; i <= m; ++i) {
// 循环每个工作队列
if ((w = ws[i]) != null) {
// 如果队列中还存在任务,且当前队列处于活跃状态
if ((b = w.base) != w.top || w.scanState >= 0 ||
w.currentSteal != null) {
// 唤醒空闲的线程帮助执行还未处理的任务
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
return false; // arrange for recheck
}
// 以栈底作为校验和
checkSum += b;
// 将偶数位队列中的任务全部取消(外部提交的任务)
if ((i & 1) == 0)
w.qlock = -1; // try to disable external
}
}
// 循环数组两次后,效验和都一致,代表任务都空了,
// 同时也没有新的线程被创建出来,那么可以设置stop状态了
if (oldSum == (oldSum = checkSum))
break;
}
}
// 如果线程池还未stop,那么则设置为stop状态
if ((runState & STOP) == 0) {
rs = lockRunState(); // enter STOP phase
unlockRunState(rs, (rs & ~RSLOCK) | STOP);
}
}
int pass = 0; // 3 passes to help terminate
for (long oldSum = 0L;;) {
// or until done or stable
WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m;
long checkSum = ctl;
// 所有队列全部已经空了或所有线程都注销了
if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
(ws = workQueues) == null || (m = ws.length - 1) <= 0) {
// 如果线程池是还不是TERMINATED状态
if ((runState & TERMINATED) == 0) {
rs = lockRunState(); // done
// 先将线程池状态改为TERMINATED状态
unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
synchronized (this) {
notifyAll(); } // for awaitTermination
}
break;
}
// 开启循环
for (int i = 0; i <= m; ++i) {
// 处理每个队列
if ((w = ws[i]) != null) {
checkSum += w.base;
w.qlock = -1; // try to disable
if (pass > 0) {
// 取消每个队列中的所有任务
w.cancelAll(); // clear queue
// 中断执行线程,唤醒所有被挂起的线程
if (pass > 1 && (wt = w.owner) != null) {
if (!wt.isInterrupted()) {
try {
// unblock join
wt.interrupt();
} catch (Throwable ignore) {
}
}
if (w.scanState < 0)
U.unpark(wt); // wake up
}
}
}
}
// 如果两次效验和不一致,赋值上一次的效验和
if (checkSum != oldSum) {
// unstable
oldSum = checkSum;
pass = 0;
}
// 线程池状态稳定了
// 所有任务被取消,执行线程被中断,挂起线程被唤醒中断了
else if (pass > 3 && pass > m) // can't further help
break;
// 如果有线程因为失活被挂起
else if (++pass > 1) {
// try to dequeue
long c; int j = 0, sp; // bound attempts
// 根据ctl中记录的阻塞链唤醒所有线程
while (j++ <= m && (sp = (int)(c = ctl)) != 0)
tryRelease(c, ws[sp & m], AC_UNIT);
}
}
return true;
}
线程池关闭的实现逻辑也比较简单,首先会将线程池标记为SHUTDOWN
状态,然后根据情况进行下一步处理,如果线程池中没啥活跃线程了,同时任务也不多了,将状态改为STOP
状态,在STOP
状态中会处理四件事:
- ①将所有活跃的队列状态改为注销状态,
w.qlock=-1
- ②取消整个线程池中所有还未执行的任务
- ③唤醒所有因为失活挂起阻塞的线程
- ④尝试中断所有执行的活跃线程,唤醒scanState<0的线程,确保一些还没来得及挂起的线程也能被中断
最后当所有线程都被中断了,并且未执行的任务都被取消了,那么会把状态改为TERMINATED
状态,线程池关闭完成。
五、总结
ForkJoin分支合并框架几乎是整个JUC包源码中最难的部分,因为整个框架比较庞大,分析起来也比较复杂,到目前为止还剩下ManagedBlocker
与CoutedCompleter
没有分析。因为对于ForkJoin框架的分析篇幅比较长了,所以对于这两就不再进行赘述,不过对CoutedCompleter
比较感兴趣的可以参考一下:《CoutedCompleter分析》这篇文章,它的作用更多的是为Java8的Stream并行流提供服务。而ManagedBlocker
则是为ForkJoin框架提供处理阻塞型任务的支持。
总的来说,ForkJoin分支合并框架思想非常优秀,完全的落地了分治以及工作窃取思想,整个框架中的各个成员各司其职却有配合紧密,内部采用了一个队列数组以奇/偶位存储内外任务,双端队列的方式实现工作与窃取思想。但是其内部实现涉及了很多的位运算知识,所以半道出家以及工作多年的小伙伴会有些生疏,看其源码实现会有些吃劲,但理解大体思想即可,对于任何源码分析类的知识都无需拘泥其细节过程。
最后的总结:
- 创建池ForkJoinPool,初始化并行数=cpu逻辑核心数,池中没有队列,没有线程
- 外部向线程池提交一个任务:
pool.submit(task)
- 初始化队列数组,容量:
2 * Max { 并行数, 2 ^ n }
- 创建一个共享队列,容量为2^13,随机放在队列数组的某一个偶数索引位
- 外部提交的任务存入这个共享队列,位置值为2^12处
- 再创建一条线程,并为其分配一个队列,容量为2^13,随机放在数组中某个奇数索引位
- 线程启动执行
- 随机一个位置,线程从此位置开始遍历所有队列,最终扫描到前面提交的任务,并将其从所在的队列取出
- 线程执行处理任务,首先拆分出两个子任务
- 如果用invokeAll提交,一个子任务执行,另一个压入队列
- 如果用fork提交,则两个都压入工作队列
- 提交的子任务触发创建新的线程并分配新的工作队列,同样放在奇数位置
- 提交的子任务可能仍然被当前线程执行,但也有可能被其它线程窃取
- 线程在子任务处join合并,join期间会帮助窃取者处理任务,窃取它的任务执行
- 优先偷窃取者队列栈底的任务
- 如果窃取者队列为空,则会根据窃取链去找窃取者的窃取者偷任务.....
- 如果整个池内都没任务了,则进入阻塞,阻塞前会根据情况补偿活跃线程
- 提交的子任务不管被哪条线程执行,仍可能会重复上述拆分/提交/窃取/阻塞步骤
- 当任务被拆分的足够细,达到了拆分阈值时,才会真正的开始执行这些子任务
- 处理完成会和拆分任务时一样,递归一层一层返回结果
- 直至最终所有子任务全部都执行结束,从而合并所有子结果,得到最终结果
- 如果外部没有再提交任务,所有线程扫描不到会被灭活,会进入失活(inactive)状态
- 一直没有任务时,线程池会削减线程数,直至最终所有线程销毁,所有奇数索引位的队列被注销,ForkJoinPool中只剩下一个最初创建的在偶数索引位的队列,以便于再次接受外部提交的任务,然后再从头开始重复所有步骤....