除了构造方法所构建的成员变量,ForkJoinPool 还有一个非常重要的成员变量 runState
,和你之前了解的知识一样,线程池也需要状态来进行管理
volatile int runState; // lockable status // runState bits: SHUTDOWN must be negative, others arbitrary powers of two private static final int RSLOCK = 1; //线程池被锁定 private static final int RSIGNAL = 1 << 1; //线程池有线程需要唤醒 private static final int STARTED = 1 << 2; //线程池已经初始化 private static final int STOP = 1 << 29; //线程池停止 private static final int TERMINATED = 1 << 30; //线程池终止 private static final int SHUTDOWN = 1 << 31; //线程池关闭
runState
有上面 6 种状态切换,按注释所言,只有 SHUTDOWN
状态是负数,其他都是整数,在并发环境更改状态必然要用到锁,ForkJoinPool 对线程池加锁和解锁分别由 lockRunState
和 unlockRunState
来实现 (这两个方法可以暂且不用深入理解,可以暂时跳过,只需要理解它们是帮助安全更改线程池状态的锁即可)
不深入了解可以,但是我不能不写啊...... 你待会不是得回看吗?
lockRunState
/** * Acquires the runState lock; returns current (locked) runState. */ // 从方法注释中看到,该方法一定会返回 locked 的 runState,也就是说一定会加锁成功 private int lockRunState() { int rs; return ((((rs = runState) & RSLOCK) != 0 || !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ? awaitRunStateLock() : rs); }
- 因为 RSLOCK = 1,如果 runState & RSLOCK == 0,则说明目前没有加锁,进入
或运算
的下半段 CAS
- 先通过 CAS 尝试加锁,尝试成功直接返回,尝试失败则要调用
awaitRunStateLock
方法
/** * Spins and/or blocks until runstate lock is available. See * above for explanation. */ private int awaitRunStateLock() { Object lock; boolean wasInterrupted = false; for (int spins = SPINS, r = 0, rs, ns;;) { //判断是否加锁(==0表示未加锁) if (((rs = runState) & RSLOCK) == 0) { // 通过CAS加锁 if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) { if (wasInterrupted) { try { // 重置线程终端标记 Thread.currentThread().interrupt(); } catch (SecurityException ignore) { // 这里竟然 catch 了个寂寞 } } // 加锁成功返回最新的 runState,for 循环的唯一正常出口 return ns; } } else if (r == 0) r = ThreadLocalRandom.nextSecondarySeed(); else if (spins > 0) { r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift if (r >= 0) --spins; } // Flag1 如果是其他线程正在初始化占用锁,则调用 yield 方法让出 CPU,让其快速初始化 else if ((rs & STARTED) == 0 || (lock = stealCounter) == null) Thread.yield(); // initialization race // Flag2 如果其它线程持有锁,并且线程池已经初始化,则将唤醒位标记为1 else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) { // 进入互斥锁 synchronized (lock) { // 再次判断,如果等于0,说明进入互斥锁前刚好有线程进行了唤醒,就不用等待,直接进行唤醒操作即可,否则就进入等待 if ((runState & RSIGNAL) != 0) { try { lock.wait(); } catch (InterruptedException ie) { if (!(Thread.currentThread() instanceof ForkJoinWorkerThread)) wasInterrupted = true; } } else lock.notifyAll(); } } } }
上面代码 33 ~ 34 (Flag1)行以及 36 ~ 50 (Flag2) 行,如果你没看后续代码,现在来理解是有些困难的,我这里先提前说明一下:
Flag1: 当完整的初始化 ForkJoinPool 时,直接利用了 stealCounter 这个原子变量,因为初始化时(调用 externalSubmit 时),才会对 StealCounter 赋值。所以,这里的逻辑是,当状态不是 STARTED 或者 stealCounter 为空,让出线程等待,也就是说,别的线程还没初始化完全,让其继续占用锁初始化即可
Flag2: 我们在讲等待/通知模型时就说,不要让无限自旋尝试,如果资源不满足就等待,如果资源满足了就通知,所以,如果 (runState & RSIGNAL) == 0
成立,说明有线程需要唤醒,直接唤醒就好,否则也别浪费资源,主动等待一会
当阅读到这的代码时,马上就抛出来两个问题:
Q1: 既然是加锁,为什么不用已有的轮子 ReentrantLock 呢?
PS:如果你读过并发系列 Java AQS队列同步器以及ReentrantLock的应用 ,你会知道 ReentrantLock 是用一个完整字段 state 来控制同步状态。但这里在竞争锁的时候还会判断线程池的状态,如果是初始化状态主动 yield 放弃 CPU 来减少竞争;另外,用一个完整的 runState 不同位来表示状态也体现出更细的粒度吧
Q2: synchronized 大法虽好,但是我们都知道这是比较重量级的锁,为什么还在这里应用了呢?
PS: 首先 synchronized 经过不断优化,没有它刚诞生时那么重,另外按照 Flag 2 的代码含义,进入 synchronized 同步块的概率还是很低的,可以用最简单的方式稳稳兜底(奥卡姆剃刀了原理?)
有加锁自然要解锁,向下看 unlockRunState
unlockRunState
解锁的逻辑相对简单多了,总体目标是清除锁标记位。如果顺利将状态修改为目标状态,自然解锁成功;否则表示有别的线程进入了wait,需要调用notifyAll唤醒,重新尝试竞争
/** * Unlocks and sets runState to newRunState. * * @param oldRunState a value returned from lockRunState * @param newRunState the next value (must have lock bit clear). */ private void unlockRunState(int oldRunState, int newRunState) { if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) { Object lock = stealCounter; runState = newRunState; // clears RSIGNAL bit if (lock != null) synchronized (lock) { lock.notifyAll(); } } }
这两个方法贯穿着后续代码分析的始终,多注意 unlockRunState
的入参即可,另外你也看到了通知都是用的 notifyAll,而不是 notify,这个问题我们之前重点说明过,你还记得为什么吗?如果不记得,打开并发编程之等待通知机制 回忆一下吧
第一层知识铺垫已经差不多了,前进
invoke/submit/execute
回到本文最开始带有 main 函数的 demo,我们向 ForkJoinPool 提交任务调用的是 invoke 方法, 其实 ForkJoinPool 还支持 submit 和 execute 两种方式来提交任务。并发的玩法非常类似,这三类方法的作业也很好区分:
- invoke:提交任务,并等待返回执行结果
- submit:提交并立刻返回任务,ForkJoinTask实现了Future,可以充分利用 Future 的特性
- execute:只提交任务
在这三大类基础上又重载了几个更细粒度的方法,这里不一一列举:
public <T> T invoke(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task.join(); } public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; } public void execute(ForkJoinTask<?> task) { if (task == null) throw new NullPointerException(); externalPush(task); }
相信你已经发现了,提交任务的方法都会调用 externalPush(task) 这个用法,源码的主角终于要登场了
但是......
如果你看 externalPush 代码,第一行就是声明一个 WorkQueue 数组变量,为了后续流程更加丝滑,咱还得铺垫一点 WorkQueue 的知识(又要铺垫)
WorkQueue
一看这么多成员变量,还是很慌的,不过,我们只需要把我几个主要的就足够了
//初始队列容量 static final int INITIAL_QUEUE_CAPACITY = 1 << 13; //最大队列容量 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M // Instance fields volatile int scanState; // versioned, <0: inactive; odd:scanning int stackPred; // pool stack (ctl) predecessor 前任池(WorkQueue[])索引,由此构成一个栈 int nsteals; // number of steals 偷取的任务个数 int hint; // randomization and stealer index hint 记录偷取者的索引,方便后面顺藤摸瓜 int config; // pool index and mode volatile int qlock; // 1: locked, < 0: terminate; else 0 volatile int base; // index of next slot for poll int top; // index of next slot for push ForkJoinTask<?>[] array; // the elements (initially unallocated) 任务数组 final ForkJoinPool pool; // the containing pool (may be null) final ForkJoinWorkerThread owner; // owning thread or null if shared 当前工作队列的工作线程,共享模式下为null volatile Thread parker; // == owner during call to park; else null 调用park阻塞期间为owner,其他情况为null volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin 记录当前join来的任务 volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer 记录从其他工作队列偷取过来的任务
我们上面说了,WorkQueue 是一个双端队列,线程池有 runState,WorkQueue 有 scanState
- 小于零:inactive (未激活状态)
- 奇数:scanning (扫描状态)
- 偶数:running (运行状态)
操作线程池需要锁,操作队列也是需要锁的,qlock 就派上用场了
- 1: 锁定
- 0:未锁定
- 小于零:终止状态
WorkQueue 中也有个 config,但是和 ForkJoinPool 中的是不一样的,WorkQueue 中的config 记录了该 WorkQueue 在 WorkQueue[] 数组的下标以及 mode
其他字段的含义我们就写在代码注释中吧,主角重新登场,这次是真的
externalPush
文章前面说过,task 会细分成 submission task
和 worker task
,worker task
是 fork
出来的,那从这个入口进入的,自然也就是 submission task
了,也就是说:
- 通过
invoke()
|submit()
|execute()
等方法提交的 task, 是submission task
,会放到 WorkQueue 数组的偶数索引位置- 调用
fork()
方法生成出的任务,叫 worker task,会放到 WorkQueue 数组的奇数索引位置
该方法上的注释也写的很清楚,具体请参考代码注释
/** * Tries to add the given task to a submission queue at * submitter's current queue. Only the (vastly) most common path * is directly handled in this method, while screening for need * for externalSubmit. * * @param task the task. Caller must ensure non-null. */ final void externalPush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue q; int m; //Flag1: 通过ThreadLocalRandom产生随机数,用于下面计算槽位索引 int r = ThreadLocalRandom.getProbe(); int rs = runState; //初始状态为0 //Flag2: 如果ws,即ForkJoinPool中的WorkQueue数组已经完成初始化,且根据随机数定位的index存在workQueue,且cas的方式加锁成功 if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && //对WorkQueue操作加锁 U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a; int am, n, s; //WorkQueue中的任务数组不为空 if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) { //组长度大于任务个数,不需要扩容 int j = ((am & s) << ASHIFT) + ABASE; //WorkQueue中的任务数组不为空 U.putOrderedObject(a, j, task); //向Queue中放入任务 U.putOrderedInt(q, QTOP, s + 1);//top值加一 U.putIntVolatile(q, QLOCK, 0); //对WorkQueue操作解锁 //任务个数小于等于1,那么此槽位上的线程有可能等待,如果大家都没任务,可能都在等待,新任务来了,唤醒,起来干活了 if (n <= 1) //唤醒可能存在等待的线程 signalWork(ws, q); return; } //任务入队失败,前面加锁了,这里也要解锁 U.compareAndSwapInt(q, QLOCK, 1, 0); } //Flag3: 不满足上述条件,也就是说上面的这些 WorkQueue[]等都不存在,就要通过这个方法一切从头开始创建 externalSubmit(task); }
上面加了三处 Flag,为了让大家更好的理解代码还是有必要做进一步说明的:
Flag1: ThreadLocalRandom 是 ThreadLocal 的衍生物,每个线程默认的 probe 是 0,当线程调用ThreadLocalRandom.current()时,会初始化 seed 和 probe,维护在线程内部,这里就知道是生成一个随机数就好,具体细节还是值得大家自行看一下
Flag2: 这里包含的信息还是非常多的
// 二进制为:0000 0000 0000 0000 0000 0000 0111 1110 static final int SQMASK = 0x007e; // max 64 (even) slots
- m 的值代表 WorkQueue 数组的最大下表
- m & r 会保证随机数 r 大于 m 的部分不可用
- m & r & SQMASK 因为 SQMASK 最后一位是 0,最终的结果就会是偶数
- r != 0 说明当前线程已经初始化过一些内容
- rs > 0 说明 ForkJoinPool 的 runState 也已经被初始化过
Flag3: 看过 flag2 的描述,你也就很好理解 Flag 3 了,如果是第一次提交任务,必走 Flag 3 的 externalSubmit
方法
externalSubmit
这个方法很长,但没超过 80 行,具体请看方法注释
//初始化所需要的一切 private void externalSubmit(ForkJoinTask<?> task) { int r; // initialize caller's probe //生成随机数 if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } for (;;) { WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; // 如果线程池的状态为终止状态,则帮助终止 if ((rs = runState) < 0) { tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } //Flag1: 再判断一次状态是否为初始化,因为在lockRunState过程中有可能状态被别的线程更改了 else if ((rs & STARTED) == 0 || // initialize ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { int ns = 0; //Flag1.1: 加锁 rs = lockRunState(); try { if ((rs & STARTED) == 0) { // 初始化stealcounter的值(任务窃取计数器,原子变量) U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // create workQueues array with size a power of two //取config的低16位(确切说是低15位),获取并行度 int p = config & SMASK; // ensure at least 2 slots //Flag1.2: 如果你看过HashMap 的源码,这个就很好理解了,获取2次幂大小 int n = (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; //初始化 WorkQueue 数组 workQueues = new WorkQueue[n]; // 标记初始化完成 ns = STARTED; } } finally { // 解锁 unlockRunState(rs, (rs & ~RSLOCK) | ns); } } //Flag2 上面分析过,取偶数位槽位,将任务放进偶数槽位 else if ((q = ws[k = r & m & SQMASK]) != null) { // 对 WorkQueue 加锁 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a = q.array; int s = q.top; // 初始化任务提交标识 boolean submitted = false; // initial submission or resizing try { // locked version of push //计算内存偏移量,放任务,更新top值 if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { int j = (((a.length - 1) & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); //提交任务成功 submitted = true; } } finally { //WorkQueue解锁 U.compareAndSwapInt(q, QLOCK, 1, 0); } // 任务提交成功了 if (submitted) { //自然要唤醒可能存在等待的线程来处理任务了 signalWork(ws, q); return; } } //任务提交没成功,可以重新计算随机数,再走一次流程 move = true; // move on failure } //Flag3: 接Flag2,如果找到的槽位是空,则要初始化一个WorkQueue else if (((rs = runState) & RSLOCK) == 0) { // create new queue q = new WorkQueue(this, null); // 设置工作队列的窃取线索值 q.hint = r; // 如上面 WorkQueue 中config 的介绍,记录当前WorkQueue在WorkQueue[]数组中的值,和队列模式 q.config = k | SHARED_QUEUE; // 初始化为 inactive 状态 q.scanState = INACTIVE; //加锁 rs = lockRunState(); // publish index if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) ws[k] = q; // else terminated //解锁 unlockRunState(rs, rs & ~RSLOCK); } else move = true; // move if busy if (move) r = ThreadLocalRandom.advanceProbe(r); } }
Flag1.1 : 有个细节需要说一下,我们在 Java AQS队列同步器以及ReentrantLock的应用 时提到过使用锁的范式以及为什么要这样用,ForkJoinPool
这里同样遵循这种范式
Lock lock = new ReentrantLock(); lock.lock(); try{ ... }finally{ lock.unlock(); }
Flag1.2: 简单描述这个过程,就是根据不同的并行度来初始化不同大小的 WorkQueue[]数组,数组大小要求是 2 的 n 次幂,所以给大家个表格直观理解一下并行度和队列容量的关系:
并行度p | 容量 |
1,2 | 4 |
3,4 | 8 |
5 ~ 8 | 16 |
9 ~ 16 | 32 |
Flag 1,2,3: 如果你理解了上面这个方法,很显然,第一次执行这个方法内部的逻辑顺序应该是 Flag1
——> Flag3
——>Flag2
externalSubmit 如果任务成功提交,就会调用 signalWork
方法了
signalWork
前面铺垫的知识要大规模派上用场(一大波僵尸来袭),are you ready?
如果 ForkJoinPool 的 ctl 成员变量的作用已经忘了,赶紧向上翻重新记忆一下
//常量值 static final int SS_SEQ = 1 << 16; // version count final void signalWork(WorkQueue[] ws, WorkQueue q) { long c; int sp, i; WorkQueue v; Thread p; // ctl 小于零,说明活动的线程数 AC 不够 while ((c = ctl) < 0L) { // too few active // 取ctl的低32位,如果为0,说明没有等待的线程 if ((sp = (int)c) == 0) { // no idle workers // 取TC的高位,如果不等于0,则说明目前的工作着还没有达到并行度 if ((c & ADD_WORKER) != 0L) // too few workers //添加 Worker,也就是说要创建线程了 tryAddWorker(c); break; } //未开始或者已停止,直接跳出 if (ws == null) // unstarted/terminated break; //i=空闲线程栈顶端所属的工作队列索引 if (ws.length <= (i = sp & SMASK)) // terminated break; if ((v = ws[i]) == null) // terminating break; //程序执行到这里,说明有空闲线程,计算下一个scanState,增加了版本号,并且调整为 active 状态 int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState int d = sp - v.scanState; // screen CAS //计算下一个ctl的值,活动线程数 AC + 1,通过stackPred取得前一个WorkQueue的索引,重新设置回sp,行程最终的ctl值 long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); //更新 ctl 的值 if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { v.scanState = vs; // activate v //如果有线程阻塞,则调用unpark唤醒即可 if ((p = v.parker) != null) U.unpark(p); break; } //没有任务,直接跳出 if (q != null && q.base == q.top) // no more work break; } }
假设程序刚开始执行,那么活动线程数以及总线程数肯定都没达到并行度要求,这时就会调用 tryAddWorker
方法了
tryAddWorker
tryAddWorker 的逻辑就非常简单了,因为是操作线程池,同样会用到 lockRunState
/unlockRunState
的锁控制
private void tryAddWorker(long c) { //初始化添加worker表识 boolean add = false; do { //因为要添加Worker,所以AC和TC都要加一 long nc = ((AC_MASK & (c + AC_UNIT)) | (TC_MASK & (c + TC_UNIT))); //ctl还没被改变 if (ctl == c) { int rs, stop; // check if terminating if ((stop = (rs = lockRunState()) & STOP) == 0) //更新ctl 的值, add = U.compareAndSwapLong(this, CTL, c, nc); unlockRunState(rs, rs & ~RSLOCK); if (stop != 0) break; //ctl值更新成功,开始真正的创建Worker if (add) { createWorker(); break; } } // 重新获取ctl,并且没有达到最大线程数,并且没有空闲的线程 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); }
一切顺利,就要调用 createWorker 方法来创建真正的 Worker 了,形势逐渐明朗
createWorker
介绍过了 WorkerQueue 和 ForkJoinTask,上文说的三个重要角色中的最后一个 ForkJoinWorkerThread
终于登场了
private boolean createWorker() { ForkJoinWorkerThreadFactory fac = factory; Throwable ex = null; ForkJoinWorkerThread wt = null; try { //如果工厂已经存在了,就用factory来创建线程,会去注册线程,这里的this就是ForkJoinPool对象 if (fac != null && (wt = fac.newThread(this)) != null) { //启动线程 wt.start(); return true; } } catch (Throwable rex) { ex = rex; } //如果创建线程失败,就要逆向注销线程,包括前面对ctl等的操作 deregisterWorker(wt, ex); return false; }
Worker 线程是如何与 WorkQueue 对应的,就藏在 fac.newThread(this)
这个方法里面,下面这点代码展示一下调用过程
public ForkJoinWorkerThread newThread(ForkJoinPool pool); static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } } protected ForkJoinWorkerThread(ForkJoinPool pool) { // Use a placeholder until a useful name can be set in registerWorker super("aForkJoinWorkerThread"); this.pool = pool; this.workQueue = pool.registerWorker(this); }
很显然核心内容在 registerWorker
方法里面了
registerWorker
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { this.pool = pool; this.owner = owner; // Place indices in the center of array (that is not yet allocated) base = top = INITIAL_QUEUE_CAPACITY >>> 1; } final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; //这里线程被设置为守护线程,因为,当只剩下守护线程时,JVM就会推出 wt.setDaemon(true); // configure thread //填补处理异常的handler if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); //创建一个WorkQueue,并且设置当前WorkQueue的owner是当前线程 WorkQueue w = new WorkQueue(this, wt); int i = 0; // assign a pool index //又用到了config的知识,提取出我们期望的WorkQueue模式 int mode = config & MODE_MASK; //加锁 int rs = lockRunState(); try { WorkQueue[] ws; int n; // skip if no array //判断ForkJoinPool的WorkQueue[]都初始化完全 if ((ws = workQueues) != null && (n = ws.length) > 0) { //一种魔数计算方式,用以减少冲突 int s = indexSeed += SEED_INCREMENT; // unlikely to collide //假设WorkQueue的初始长度是16,那这里的m就是15,最终目的就是为了得到一个奇数 int m = n - 1; //和得到偶数的计算方式一样,得到一个小于m的奇数i i = ((s << 1) | 1) & m; // odd-numbered indices //如果这个槽位不为空,说明已经被其他线程初始化过了,也就是有冲突,选取别的槽位 if (ws[i] != null) { // collision int probes = 0; // step by approx half n //步长加2,也就保证step还是奇数 int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; //一直遍历,直到找到空槽位,如果都遍历了一遍,那就需要对WorkQueue[]扩容了 while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; probes = 0; } } } //初始化一个随机数 w.hint = s; // use as random seed //如文章前面所说,config记录索引值和模式 w.config = i | mode; //扫描状态也记录为索引值,如文章前面所说,奇数表示为scanning状态 w.scanState = i; // publication fence //把初始化好的WorkQueue放到ForkJoinPool的WorkQueue[]数组中 ws[i] = w; } } finally { //解锁 unlockRunState(rs, rs & ~RSLOCK); } //设置worker的前缀名,用于业务区分 wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); //返回当前线程创建的WorkQueue,回到上一层调用栈,也就将WorkQueue注册到ForkJoinWorkerThread里面了 return w; }
到这里线程是顺利创建成功了,可是如果线程没有创建成功,就需要 deregisterWorker来做善后工作了
deregisterWorker
deregisterWorker 方法接收刚刚创建的线程引用和异常作为参数,来做善后工作,将 registerWorker 相关工作撤销回来
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; if (wt != null && (w = wt.workQueue) != null) { WorkQueue[] ws; // remove index from array //获取当前线程注册的索引值 int idx = w.config & SMASK; //加锁 int rs = lockRunState(); //如果奇数槽位都不为空,则清空内容 if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w) ws[idx] = null; //解锁 unlockRunState(rs, rs & ~RSLOCK); } long c; // decrement counts //死循环式CAS更改ctl的值,将前面AC和TC加1的值再减1,ctl就在那里,不增不减 do {} while (!U.compareAndSwapLong (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | (SP_MASK & c)))); //清空WorkQueue,将其中的task取消掉 if (w != null) { w.qlock = -1; // ensure set w.transferStealCount(this); w.cancelAll(); // cancel remaining tasks } //可能的替换操作 for (;;) { // possibly replace WorkQueue[] ws; int m, sp; //如果线程池终止了,那就跳出循环即可 if (tryTerminate(false, false) || w == null || w.array == null || (runState & STOP) != 0 || (ws = workQueues) == null || (m = ws.length - 1) < 0) // already terminating break; //当前线程创建失败,通过sp判断,如果还存在空闲线程,则调用tryRelease来唤醒这个线程,然后跳出 if ((sp = (int)(c = ctl)) != 0) { // wake up replacement if (tryRelease(c, ws[sp & m], AC_UNIT)) break; } //如果没空闲线程,并且还没有达到满足并行度的条件,那就得再次尝试创建一个线程,弥补刚刚的失败 else if (ex != null && (c & ADD_WORKER) != 0L) { tryAddWorker(c); // create replacement break; } else // don't need replacement break; } if (ex == null) // help clean on way out //处理异常 ForkJoinTask.helpExpungeStaleExceptions(); else // rethrow ForkJoinTask.rethrow(ex); }
总之 deregisterWorker 方法从线程池里注销线程,清空WorkQueue,同时更新ctl,最后做可能的替换,根据线程池的状态决定是否找一个自己的替代者:
- 有空闲线程,则唤醒一个
- 没有空闲线程,再次尝试创建一个新的工作线程
deregisterWorker 线程解释清楚了是为了帮助大家完整理解流程,但 registerWorker 成功后的流程还没走完,咱得继续,有了 Worker,那就调用 wt.start()
干活吧
run
ForkJoinWorkerThread 继承自Thread,调用start() 方法后,自然要调用自己重写的 run() 方法
public void run() { if (workQueue.array == null) { // only run once Throwable exception = null; try { onStart(); //Work开始工作,处理workQueue中的任务 pool.runWorker(workQueue); } catch (Throwable ex) { exception = ex; } finally { try { onTermination(exception); } catch (Throwable ex) { if (exception == null) exception = ex; } finally { pool.deregisterWorker(this, exception); } } } }
方法的重点自然是进入到 runWorker
runWorker
runWorker 是很常规的三部曲操作:
- scan: 通过扫描获取任务
- runTask:执行扫描到的任务
- awaitWork:没任务进入等待
具体请看注释
final void runWorker(WorkQueue w) { //初始化队列,并根据需要是否扩容为原来的2倍 w.growArray(); // allocate queue int seed = w.hint; // initially holds randomization hint int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift //死循环更新偏移r,为扫描任务作准备 for (ForkJoinTask<?> t;;) { //扫描任务 if ((t = scan(w, r)) != null) //扫描到就执行任务 w.runTask(t); //没扫描到就等待,如果等也等不到任务,那就跳出循环别死等了 else if (!awaitWork(w, r)) break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } }
先来看 scan 方法
scan
ForkJoinPool 的任务窃取机制要来了,如何 steal 的,就藏在scan 方法中
private ForkJoinTask<?> scan(WorkQueue w, int r) { WorkQueue[] ws; int m; //再次验证workQueue[]数组的初始化情况 if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { //获取当前扫描状态 int ss = w.scanState; // initially non-negative //又一个死循环,注意到出口位置就好 //和前面逻辑类似,随机一个起始位置,并赋值给k for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t; int b, n; long c; //如果k槽位不为空 if ((q = ws[k]) != null) { //base-top小于零,并且任务q不为空 if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { // non-empty //获取base的偏移量,赋值给i long i = (((a.length - 1) & b) << ASHIFT) + ABASE; //从base端获取任务,和前文的描述的steal搭配上了,是从base端steal if ((t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i))) != null && q.base == b) { //是active状态 if (ss >= 0) { //更新WorkQueue中数组i索引位置为空,并且更新base的值 if (U.compareAndSwapObject(a, i, t, null)) { q.base = b + 1; //n<-1,说明当前队列还有剩余任务,继续唤醒可能存在的其他线程 if (n < -1) // signal others signalWork(ws, q); //直接返回任务 return t; } } else if (oldSum == 0 && // try to activate w.scanState < 0) tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); } //如果获取任务失败,则准备换位置扫描 if (ss < 0) // refresh ss = w.scanState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m; // move and rescan oldSum = checkSum = 0; continue; } checkSum += b; } //k一直在变,扫描到最后,如果等于origin,说明已经扫描了一圈还没扫描到任务 if ((k = (k + 1) & m) == origin) { // continue until stable if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) // already inactive break; //准备inactive当前工作队列 int ns = ss | INACTIVE; // try to inactivate //活动线程数AC减1 long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); w.stackPred = (int)c; // hold prev stack top U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) ss = ns; else w.scanState = ss; // back out } checkSum = 0; } } } return null; }
如果顺利扫描到任务,那就要调用 runTask 方法来真正的运行这个任务了
runTask
马上就接近真相了,steal 到任务了,就干点正事吧
final void runTask(ForkJoinTask<?> task) { if (task != null) { scanState &= ~SCANNING; // mark as busy //Flag1: 记录当前的任务是偷来的,至于如何执行task,是我们写在compute方法中的,我们一会看doExec() 方法 (currentSteal = task).doExec(); U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC execLocalTasks(); ForkJoinWorkerThread thread = owner; //累加偷来的数量,亲兄弟明算帐啊,虽然算完也没啥实际意义 if (++nsteals < 0) // collect on overflow transferStealCount(pool); //任务执行完后,就重新更新scanState为SCANNING scanState |= SCANNING; if (thread != null) thread.afterTopLevelExec(); } }
Flag1: doExec 方法才是真正执行任务的关键,它是链接我们自定义 compute 方法的核心,来看 doExec 方法