ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(中)

简介: ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(中)

除了构造方法所构建的成员变量,ForkJoinPool 还有一个非常重要的成员变量 runState,和你之前了解的知识一样,线程池也需要状态来进行管理


volatile int runState;               // lockable status
// runState bits: SHUTDOWN must be negative, others arbitrary powers of two
private static final int  RSLOCK     = 1;       //线程池被锁定
private static final int  RSIGNAL    = 1 << 1;    //线程池有线程需要唤醒
private static final int  STARTED    = 1 << 2;  //线程池已经初始化
private static final int  STOP       = 1 << 29;    //线程池停止
private static final int  TERMINATED = 1 << 30; //线程池终止
private static final int  SHUTDOWN   = 1 << 31; //线程池关闭


runState 有上面 6 种状态切换,按注释所言,只有 SHUTDOWN 状态是负数,其他都是整数,在并发环境更改状态必然要用到锁,ForkJoinPool 对线程池加锁和解锁分别由 lockRunStateunlockRunState 来实现 (这两个方法可以暂且不用深入理解,可以暂时跳过,只需要理解它们是帮助安全更改线程池状态的锁即可)


不深入了解可以,但是我不能不写啊...... 你待会不是得回看吗?


微信图片_20220511195101.png


lockRunState


/**
* Acquires the runState lock; returns current (locked) runState.
*/
// 从方法注释中看到,该方法一定会返回 locked 的 runState,也就是说一定会加锁成功
private int lockRunState() {
  int rs;
  return ((((rs = runState) & RSLOCK) != 0 ||
           !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
          awaitRunStateLock() : rs);
}


  • 因为 RSLOCK = 1,如果 runState & RSLOCK == 0,则说明目前没有加锁,进入或运算的下半段 CAS


  • 先通过 CAS 尝试加锁,尝试成功直接返回,尝试失败则要调用 awaitRunStateLock 方法


/**
* Spins and/or blocks until runstate lock is available.  See
* above for explanation.
*/
private int awaitRunStateLock() {
  Object lock;
  boolean wasInterrupted = false;
  for (int spins = SPINS, r = 0, rs, ns;;) {
    //判断是否加锁(==0表示未加锁)
    if (((rs = runState) & RSLOCK) == 0) {
      // 通过CAS加锁
      if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) {
        if (wasInterrupted) {
          try {
            // 重置线程终端标记
            Thread.currentThread().interrupt();
          } catch (SecurityException ignore) {
            // 这里竟然 catch 了个寂寞
          }
        }
        // 加锁成功返回最新的 runState,for 循环的唯一正常出口
        return ns;
      }
    }
    else if (r == 0)
      r = ThreadLocalRandom.nextSecondarySeed();
    else if (spins > 0) {
      r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
      if (r >= 0)
        --spins;
    }
    // Flag1 如果是其他线程正在初始化占用锁,则调用 yield 方法让出 CPU,让其快速初始化
    else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
      Thread.yield();   // initialization race
    // Flag2 如果其它线程持有锁,并且线程池已经初始化,则将唤醒位标记为1
    else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {
      // 进入互斥锁
      synchronized (lock) {
        // 再次判断,如果等于0,说明进入互斥锁前刚好有线程进行了唤醒,就不用等待,直接进行唤醒操作即可,否则就进入等待
        if ((runState & RSIGNAL) != 0) {
          try {
            lock.wait();
          } catch (InterruptedException ie) {
            if (!(Thread.currentThread() instanceof
                  ForkJoinWorkerThread))
              wasInterrupted = true;
          }
        }
        else
          lock.notifyAll();
      }
    }
  }
}


上面代码 33 ~ 34 (Flag1)行以及 36 ~ 50 (Flag2) 行,如果你没看后续代码,现在来理解是有些困难的,我这里先提前说明一下:


Flag1: 当完整的初始化 ForkJoinPool 时,直接利用了 stealCounter 这个原子变量,因为初始化时(调用 externalSubmit 时),才会对 StealCounter 赋值。所以,这里的逻辑是,当状态不是 STARTED 或者 stealCounter 为空,让出线程等待,也就是说,别的线程还没初始化完全,让其继续占用锁初始化即可


Flag2: 我们在讲等待/通知模型时就说,不要让无限自旋尝试,如果资源不满足就等待,如果资源满足了就通知,所以,如果 (runState & RSIGNAL) == 0 成立,说明有线程需要唤醒,直接唤醒就好,否则也别浪费资源,主动等待一会


当阅读到这的代码时,马上就抛出来两个问题:


Q1: 既然是加锁,为什么不用已有的轮子 ReentrantLock 呢?


PS:如果你读过并发系列 Java AQS队列同步器以及ReentrantLock的应用 ,你会知道 ReentrantLock 是用一个完整字段 state 来控制同步状态。但这里在竞争锁的时候还会判断线程池的状态,如果是初始化状态主动 yield 放弃 CPU 来减少竞争;另外,用一个完整的 runState 不同位来表示状态也体现出更细的粒度吧


Q2: synchronized 大法虽好,但是我们都知道这是比较重量级的锁,为什么还在这里应用了呢?


PS: 首先 synchronized 经过不断优化,没有它刚诞生时那么重,另外按照 Flag 2 的代码含义,进入 synchronized 同步块的概率还是很低的,可以用最简单的方式稳稳兜底(奥卡姆剃刀了原理?)


有加锁自然要解锁,向下看 unlockRunState


unlockRunState


解锁的逻辑相对简单多了,总体目标是清除锁标记位。如果顺利将状态修改为目标状态,自然解锁成功;否则表示有别的线程进入了wait,需要调用notifyAll唤醒,重新尝试竞争


    /**
     * Unlocks and sets runState to newRunState.
     *
     * @param oldRunState a value returned from lockRunState
     * @param newRunState the next value (must have lock bit clear).
     */
    private void unlockRunState(int oldRunState, int newRunState) {
        if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
            Object lock = stealCounter;
            runState = newRunState;              // clears RSIGNAL bit
            if (lock != null)
                synchronized (lock) { lock.notifyAll(); }
        }
    }


这两个方法贯穿着后续代码分析的始终,多注意 unlockRunState 的入参即可,另外你也看到了通知都是用的 notifyAll,而不是 notify,这个问题我们之前重点说明过,你还记得为什么吗?如果不记得,打开并发编程之等待通知机制 回忆一下吧


微信图片_20220511195344.png


第一层知识铺垫已经差不多了,前进

invoke/submit/execute


回到本文最开始带有 main 函数的 demo,我们向 ForkJoinPool 提交任务调用的是 invoke 方法, 其实 ForkJoinPool 还支持 submit 和 execute 两种方式来提交任务。并发的玩法非常类似,这三类方法的作业也很好区分:


  • invoke:提交任务,并等待返回执行结果


  • submit:提交并立刻返回任务,ForkJoinTask实现了Future,可以充分利用 Future 的特性


  • execute:只提交任务


在这三大类基础上又重载了几个更细粒度的方法,这里不一一列举:


public <T> T invoke(ForkJoinTask<T> task) {
  if (task == null)
    throw new NullPointerException();
  externalPush(task);
  return task.join();
}
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
  if (task == null)
    throw new NullPointerException();
  externalPush(task);
  return task;
}
public void execute(ForkJoinTask<?> task) {
  if (task == null)
    throw new NullPointerException();
  externalPush(task);
}


相信你已经发现了,提交任务的方法都会调用 externalPush(task) 这个用法,源码的主角终于要登场了


微信图片_20220511195539.gif


但是......


如果你看 externalPush 代码,第一行就是声明一个 WorkQueue 数组变量,为了后续流程更加丝滑,咱还得铺垫一点 WorkQueue 的知识(又要铺垫)


WorkQueue


一看这么多成员变量,还是很慌的,不过,我们只需要把我几个主要的就足够了


微信图片_20220511195619.png


//初始队列容量
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
//最大队列容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// Instance fields
volatile int scanState;    // versioned, <0: inactive; odd:scanning
int stackPred;             // pool stack (ctl) predecessor  前任池(WorkQueue[])索引,由此构成一个栈
int nsteals;               // number of steals  偷取的任务个数
int hint;                  // randomization and stealer index hint  记录偷取者的索引,方便后面顺藤摸瓜
int config;                // pool index and mode
volatile int qlock;        // 1: locked, < 0: terminate; else 0
volatile int base;         // index of next slot for poll
int top;                   // index of next slot for push
ForkJoinTask<?>[] array;   // the elements (initially unallocated)  任务数组
final ForkJoinPool pool;   // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared  当前工作队列的工作线程,共享模式下为null
volatile Thread parker;    // == owner during call to park; else null  调用park阻塞期间为owner,其他情况为null
volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin  记录当前join来的任务
volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer  记录从其他工作队列偷取过来的任务


我们上面说了,WorkQueue 是一个双端队列,线程池有 runState,WorkQueue 有 scanState


  • 小于零:inactive (未激活状态)


  • 奇数:scanning (扫描状态)


  • 偶数:running (运行状态)


操作线程池需要锁,操作队列也是需要锁的,qlock 就派上用场了


  • 1: 锁定


  • 0:未锁定


  • 小于零:终止状态


WorkQueue 中也有个 config,但是和 ForkJoinPool 中的是不一样的,WorkQueue 中的config 记录了该 WorkQueue 在 WorkQueue[] 数组的下标以及 mode


其他字段的含义我们就写在代码注释中吧,主角重新登场,这次是真的


externalPush


文章前面说过,task 会细分成 submission taskworker taskworker taskfork 出来的,那从这个入口进入的,自然也就是 submission task 了,也就是说:


  • 通过invoke()submit() | execute() 等方法提交的 task, 是 submission task,会放到 WorkQueue 数组的偶数索引位置
  • 调用 fork() 方法生成出的任务,叫 worker task,会放到 WorkQueue 数组的奇数索引位置


该方法上的注释也写的很清楚,具体请参考代码注释


    /**
     * Tries to add the given task to a submission queue at
     * submitter's current queue. Only the (vastly) most common path
     * is directly handled in this method, while screening for need
     * for externalSubmit.
     *
     * @param task the task. Caller must ensure non-null.
     */
    final void externalPush(ForkJoinTask<?> task) {
        WorkQueue[] ws; WorkQueue q; int m;
          //Flag1: 通过ThreadLocalRandom产生随机数,用于下面计算槽位索引
        int r = ThreadLocalRandom.getProbe();
        int rs = runState; //初始状态为0
          //Flag2: 如果ws,即ForkJoinPool中的WorkQueue数组已经完成初始化,且根据随机数定位的index存在workQueue,且cas的方式加锁成功
        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
            //对WorkQueue操作加锁
            U.compareAndSwapInt(q, QLOCK, 0, 1)) {
            ForkJoinTask<?>[] a; int am, n, s;
              //WorkQueue中的任务数组不为空
            if ((a = q.array) != null && 
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {  //组长度大于任务个数,不需要扩容
                int j = ((am & s) << ASHIFT) + ABASE; //WorkQueue中的任务数组不为空
                U.putOrderedObject(a, j, task); //向Queue中放入任务
                U.putOrderedInt(q, QTOP, s + 1);//top值加一
                U.putIntVolatile(q, QLOCK, 0);  //对WorkQueue操作解锁
                  //任务个数小于等于1,那么此槽位上的线程有可能等待,如果大家都没任务,可能都在等待,新任务来了,唤醒,起来干活了
                  if (n <= 1)
                      //唤醒可能存在等待的线程
                    signalWork(ws, q);
                return;
            }
              //任务入队失败,前面加锁了,这里也要解锁
            U.compareAndSwapInt(q, QLOCK, 1, 0);
        }
          //Flag3: 不满足上述条件,也就是说上面的这些 WorkQueue[]等都不存在,就要通过这个方法一切从头开始创建
        externalSubmit(task);
    }


上面加了三处 Flag,为了让大家更好的理解代码还是有必要做进一步说明的:


Flag1: ThreadLocalRandom 是 ThreadLocal 的衍生物,每个线程默认的 probe 是 0,当线程调用ThreadLocalRandom.current()时,会初始化 seed 和 probe,维护在线程内部,这里就知道是生成一个随机数就好,具体细节还是值得大家自行看一下


Flag2: 这里包含的信息还是非常多的


// 二进制为:0000 0000 0000 0000 0000 0000 0111 1110 
static final int SQMASK       = 0x007e;        // max 64 (even) slots


  • m 的值代表 WorkQueue 数组的最大下表


  • m & r 会保证随机数 r 大于 m 的部分不可用


  • m & r & SQMASK 因为 SQMASK 最后一位是 0,最终的结果就会是偶数


  • r != 0 说明当前线程已经初始化过一些内容


  • rs > 0 说明 ForkJoinPool 的 runState 也已经被初始化过


Flag3: 看过 flag2 的描述,你也就很好理解 Flag 3 了,如果是第一次提交任务,必走 Flag 3 的 externalSubmit 方法


externalSubmit


这个方法很长,但没超过 80 行,具体请看方法注释


  //初始化所需要的一切  
    private void externalSubmit(ForkJoinTask<?> task) {
        int r;                                    // initialize caller's probe
          //生成随机数
        if ((r = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();
            r = ThreadLocalRandom.getProbe();
        }
        for (;;) {
            WorkQueue[] ws; WorkQueue q; int rs, m, k;
            boolean move = false;
              // 如果线程池的状态为终止状态,则帮助终止
            if ((rs = runState) < 0) {
                tryTerminate(false, false);     // help terminate
                throw new RejectedExecutionException();
            }
              //Flag1: 再判断一次状态是否为初始化,因为在lockRunState过程中有可能状态被别的线程更改了
            else if ((rs & STARTED) == 0 ||     // initialize
                     ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                int ns = 0;
                  //Flag1.1: 加锁
                rs = lockRunState();
                try {
                    if ((rs & STARTED) == 0) {
                          // 初始化stealcounter的值(任务窃取计数器,原子变量)
                        U.compareAndSwapObject(this, STEALCOUNTER, null,
                                               new AtomicLong());
                        // create workQueues array with size a power of two
                          //取config的低16位(确切说是低15位),获取并行度
                        int p = config & SMASK; // ensure at least 2 slots
                          //Flag1.2: 如果你看过HashMap 的源码,这个就很好理解了,获取2次幂大小
                        int n = (p > 1) ? p - 1 : 1;
                        n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                        n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                          //初始化 WorkQueue 数组
                        workQueues = new WorkQueue[n];
                          // 标记初始化完成
                        ns = STARTED;
                    }
                } finally {
                      // 解锁
                    unlockRunState(rs, (rs & ~RSLOCK) | ns);
                }
            }
              //Flag2 上面分析过,取偶数位槽位,将任务放进偶数槽位
            else if ((q = ws[k = r & m & SQMASK]) != null) {
                  // 对 WorkQueue 加锁
                if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                    ForkJoinTask<?>[] a = q.array;
                    int s = q.top;
                      // 初始化任务提交标识
                    boolean submitted = false; // initial submission or resizing
                    try {                      // locked version of push
                          //计算内存偏移量,放任务,更新top值
                        if ((a != null && a.length > s + 1 - q.base) ||
                            (a = q.growArray()) != null) {
                            int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                            U.putOrderedObject(a, j, task);
                            U.putOrderedInt(q, QTOP, s + 1);
                              //提交任务成功
                            submitted = true;
                        }
                    } finally {
                          //WorkQueue解锁
                        U.compareAndSwapInt(q, QLOCK, 1, 0);
                    }
                      // 任务提交成功了
                    if (submitted) {
                          //自然要唤醒可能存在等待的线程来处理任务了
                        signalWork(ws, q);
                        return;
                    }
                }
                  //任务提交没成功,可以重新计算随机数,再走一次流程
                move = true;                   // move on failure
            }
              //Flag3: 接Flag2,如果找到的槽位是空,则要初始化一个WorkQueue
            else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                q = new WorkQueue(this, null);
                  // 设置工作队列的窃取线索值
                q.hint = r;
                  // 如上面 WorkQueue 中config 的介绍,记录当前WorkQueue在WorkQueue[]数组中的值,和队列模式
                q.config = k | SHARED_QUEUE;
                  // 初始化为 inactive 状态
                q.scanState = INACTIVE;
                  //加锁
                rs = lockRunState();           // publish index
                if (rs > 0 &&  (ws = workQueues) != null &&
                    k < ws.length && ws[k] == null)
                    ws[k] = q;                 // else terminated
                  //解锁
                unlockRunState(rs, rs & ~RSLOCK);
            }
            else
                move = true;                   // move if busy
            if (move)
                r = ThreadLocalRandom.advanceProbe(r);
        }
    }


Flag1.1 : 有个细节需要说一下,我们在 Java AQS队列同步器以及ReentrantLock的应用 时提到过使用锁的范式以及为什么要这样用,ForkJoinPool 这里同样遵循这种范式


Lock lock = new ReentrantLock();
lock.lock();
try{
    ...
}finally{
    lock.unlock();
}


Flag1.2: 简单描述这个过程,就是根据不同的并行度来初始化不同大小的 WorkQueue[]数组,数组大小要求是 2 的 n 次幂,所以给大家个表格直观理解一下并行度和队列容量的关系:


并行度p 容量
1,2 4
3,4 8
5 ~ 8 16
9 ~ 16 32


Flag 1,2,3: 如果你理解了上面这个方法,很显然,第一次执行这个方法内部的逻辑顺序应该是 Flag1——> Flag3——>Flag2


externalSubmit 如果任务成功提交,就会调用 signalWork 方法了


signalWork


前面铺垫的知识要大规模派上用场(一大波僵尸来袭),are you ready?


微信图片_20220511200013.gif


如果 ForkJoinPool 的 ctl 成员变量的作用已经忘了,赶紧向上翻重新记忆一下


//常量值
static final int SS_SEQ       = 1 << 16;       // version count
final void signalWork(WorkQueue[] ws, WorkQueue q) {
        long c; int sp, i; WorkQueue v; Thread p;
          // ctl 小于零,说明活动的线程数 AC 不够
        while ((c = ctl) < 0L) {                       // too few active
              // 取ctl的低32位,如果为0,说明没有等待的线程
            if ((sp = (int)c) == 0) {                  // no idle workers
                  // 取TC的高位,如果不等于0,则说明目前的工作着还没有达到并行度
                if ((c & ADD_WORKER) != 0L)            // too few workers
                      //添加 Worker,也就是说要创建线程了
                    tryAddWorker(c);
                break;
            }
              //未开始或者已停止,直接跳出
            if (ws == null)                            // unstarted/terminated
                break;
              //i=空闲线程栈顶端所属的工作队列索引
            if (ws.length <= (i = sp & SMASK))         // terminated
                break;
            if ((v = ws[i]) == null)                   // terminating
                break;
              //程序执行到这里,说明有空闲线程,计算下一个scanState,增加了版本号,并且调整为 active 状态
            int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
            int d = sp - v.scanState;                  // screen CAS
            //计算下一个ctl的值,活动线程数 AC + 1,通过stackPred取得前一个WorkQueue的索引,重新设置回sp,行程最终的ctl值
              long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
              //更新 ctl 的值
            if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                v.scanState = vs;                      // activate v
                  //如果有线程阻塞,则调用unpark唤醒即可 
                  if ((p = v.parker) != null)
                    U.unpark(p);
                break;
            }
              //没有任务,直接跳出
            if (q != null && q.base == q.top)          // no more work
                break;
        }
    }


假设程序刚开始执行,那么活动线程数以及总线程数肯定都没达到并行度要求,这时就会调用 tryAddWorker 方法了


tryAddWorker


tryAddWorker 的逻辑就非常简单了,因为是操作线程池,同样会用到 lockRunState/unlockRunState 的锁控制


    private void tryAddWorker(long c) {
          //初始化添加worker表识
        boolean add = false;
        do {
              //因为要添加Worker,所以AC和TC都要加一
            long nc = ((AC_MASK & (c + AC_UNIT)) |
                       (TC_MASK & (c + TC_UNIT)));
              //ctl还没被改变
            if (ctl == c) {
                int rs, stop;                 // check if terminating
                if ((stop = (rs = lockRunState()) & STOP) == 0)
                      //更新ctl 的值,
                    add = U.compareAndSwapLong(this, CTL, c, nc);
                unlockRunState(rs, rs & ~RSLOCK);
                if (stop != 0)
                    break;
                  //ctl值更新成功,开始真正的创建Worker
                if (add) {
                    createWorker();
                    break;
                }
            }
         // 重新获取ctl,并且没有达到最大线程数,并且没有空闲的线程
        } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
    }


一切顺利,就要调用 createWorker 方法来创建真正的 Worker 了,形势逐渐明朗


createWorker


介绍过了 WorkerQueue 和 ForkJoinTask,上文说的三个重要角色中的最后一个 ForkJoinWorkerThread 终于登场了


    private boolean createWorker() {
        ForkJoinWorkerThreadFactory fac = factory;
        Throwable ex = null;
        ForkJoinWorkerThread wt = null;
        try {
              //如果工厂已经存在了,就用factory来创建线程,会去注册线程,这里的this就是ForkJoinPool对象
            if (fac != null && (wt = fac.newThread(this)) != null) {
              //启动线程  
              wt.start();
                return true;
            }
        } catch (Throwable rex) {
            ex = rex;
        }
          //如果创建线程失败,就要逆向注销线程,包括前面对ctl等的操作
        deregisterWorker(wt, ex);
        return false;
    }


Worker 线程是如何与 WorkQueue 对应的,就藏在 fac.newThread(this) 这个方法里面,下面这点代码展示一下调用过程


public ForkJoinWorkerThread newThread(ForkJoinPool pool);
static final class DefaultForkJoinWorkerThreadFactory
  implements ForkJoinWorkerThreadFactory {
  public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    return new ForkJoinWorkerThread(pool);
  }
}
protected ForkJoinWorkerThread(ForkJoinPool pool) {
  // Use a placeholder until a useful name can be set in registerWorker
  super("aForkJoinWorkerThread");
  this.pool = pool;
  this.workQueue = pool.registerWorker(this);
}


很显然核心内容在 registerWorker 方法里面了


registerWorker


WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
  this.pool = pool;
  this.owner = owner;
  // Place indices in the center of array (that is not yet allocated)
  base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}  
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
  UncaughtExceptionHandler handler;
  //这里线程被设置为守护线程,因为,当只剩下守护线程时,JVM就会推出
  wt.setDaemon(true);                           // configure thread
  //填补处理异常的handler
  if ((handler = ueh) != null)
    wt.setUncaughtExceptionHandler(handler);
  //创建一个WorkQueue,并且设置当前WorkQueue的owner是当前线程
  WorkQueue w = new WorkQueue(this, wt);
  int i = 0;                                    // assign a pool index
  //又用到了config的知识,提取出我们期望的WorkQueue模式
  int mode = config & MODE_MASK;
  //加锁
  int rs = lockRunState();
  try {
    WorkQueue[] ws; int n;                    // skip if no array
    //判断ForkJoinPool的WorkQueue[]都初始化完全
    if ((ws = workQueues) != null && (n = ws.length) > 0) {
      //一种魔数计算方式,用以减少冲突
      int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
      //假设WorkQueue的初始长度是16,那这里的m就是15,最终目的就是为了得到一个奇数
      int m = n - 1;
      //和得到偶数的计算方式一样,得到一个小于m的奇数i
      i = ((s << 1) | 1) & m;               // odd-numbered indices
      //如果这个槽位不为空,说明已经被其他线程初始化过了,也就是有冲突,选取别的槽位
      if (ws[i] != null) {                  // collision
        int probes = 0;                   // step by approx half n
        //步长加2,也就保证step还是奇数
        int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
        //一直遍历,直到找到空槽位,如果都遍历了一遍,那就需要对WorkQueue[]扩容了
        while (ws[i = (i + step) & m] != null) {
          if (++probes >= n) {
            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
            m = n - 1;
            probes = 0;
          }
        }
      }
      //初始化一个随机数
      w.hint = s;                           // use as random seed
      //如文章前面所说,config记录索引值和模式
      w.config = i | mode;
      //扫描状态也记录为索引值,如文章前面所说,奇数表示为scanning状态
      w.scanState = i;                      // publication fence
      //把初始化好的WorkQueue放到ForkJoinPool的WorkQueue[]数组中
      ws[i] = w;
    }
  } finally {
    //解锁
    unlockRunState(rs, rs & ~RSLOCK);
  }
  //设置worker的前缀名,用于业务区分
  wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
  //返回当前线程创建的WorkQueue,回到上一层调用栈,也就将WorkQueue注册到ForkJoinWorkerThread里面了
  return w;
}


到这里线程是顺利创建成功了,可是如果线程没有创建成功,就需要 deregisterWorker来做善后工作了


deregisterWorker


deregisterWorker 方法接收刚刚创建的线程引用和异常作为参数,来做善后工作,将 registerWorker 相关工作撤销回来


final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
  WorkQueue w = null;
  if (wt != null && (w = wt.workQueue) != null) {
    WorkQueue[] ws;                           // remove index from array
    //获取当前线程注册的索引值
    int idx = w.config & SMASK;
    //加锁
    int rs = lockRunState();
    //如果奇数槽位都不为空,则清空内容
    if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
      ws[idx] = null;
    //解锁
    unlockRunState(rs, rs & ~RSLOCK);
  }
  long c;                                       // decrement counts
  //死循环式CAS更改ctl的值,将前面AC和TC加1的值再减1,ctl就在那里,不增不减
  do {} while (!U.compareAndSwapLong
               (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                     (TC_MASK & (c - TC_UNIT)) |
                                     (SP_MASK & c))));
  //清空WorkQueue,将其中的task取消掉
  if (w != null) {
    w.qlock = -1;                             // ensure set
    w.transferStealCount(this);
    w.cancelAll();                            // cancel remaining tasks
  }
  //可能的替换操作
  for (;;) {                                    // possibly replace
    WorkQueue[] ws; int m, sp;
    //如果线程池终止了,那就跳出循环即可
    if (tryTerminate(false, false) || w == null || w.array == null ||
        (runState & STOP) != 0 || (ws = workQueues) == null ||
        (m = ws.length - 1) < 0)              // already terminating
      break;
    //当前线程创建失败,通过sp判断,如果还存在空闲线程,则调用tryRelease来唤醒这个线程,然后跳出
    if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
      if (tryRelease(c, ws[sp & m], AC_UNIT))
        break;
    }
    //如果没空闲线程,并且还没有达到满足并行度的条件,那就得再次尝试创建一个线程,弥补刚刚的失败
    else if (ex != null && (c & ADD_WORKER) != 0L) {
      tryAddWorker(c);                      // create replacement
      break;
    }
    else                                      // don't need replacement
      break;
  }
  if (ex == null)                               // help clean on way out
    //处理异常
    ForkJoinTask.helpExpungeStaleExceptions();
  else                                          // rethrow
    ForkJoinTask.rethrow(ex);
}


总之 deregisterWorker 方法从线程池里注销线程,清空WorkQueue,同时更新ctl,最后做可能的替换,根据线程池的状态决定是否找一个自己的替代者:


  • 有空闲线程,则唤醒一个


  • 没有空闲线程,再次尝试创建一个新的工作线程


deregisterWorker 线程解释清楚了是为了帮助大家完整理解流程,但 registerWorker 成功后的流程还没走完,咱得继续,有了 Worker,那就调用 wt.start() 干活吧


run


ForkJoinWorkerThread 继承自Thread,调用start() 方法后,自然要调用自己重写的 run() 方法


public void run() {
  if (workQueue.array == null) { // only run once
    Throwable exception = null;
    try {
      onStart();
      //Work开始工作,处理workQueue中的任务
      pool.runWorker(workQueue);
    } catch (Throwable ex) {
      exception = ex;
    } finally {
      try {
        onTermination(exception);
      } catch (Throwable ex) {
        if (exception == null)
          exception = ex;
      } finally {
        pool.deregisterWorker(this, exception);
      }
    }
  }
}


方法的重点自然是进入到 runWorker


runWorker


runWorker 是很常规的三部曲操作:


  • scan: 通过扫描获取任务


  • runTask:执行扫描到的任务


  • awaitWork:没任务进入等待


具体请看注释


    final void runWorker(WorkQueue w) {
          //初始化队列,并根据需要是否扩容为原来的2倍
        w.growArray();                   // allocate queue
        int seed = w.hint;               // initially holds randomization hint
        int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
          //死循环更新偏移r,为扫描任务作准备  
          for (ForkJoinTask<?> t;;) {
              //扫描任务
            if ((t = scan(w, r)) != null)
                  //扫描到就执行任务
                w.runTask(t);
              //没扫描到就等待,如果等也等不到任务,那就跳出循环别死等了
            else if (!awaitWork(w, r))
                break;
            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
        }
    }


先来看 scan 方法


scan


ForkJoinPool 的任务窃取机制要来了,如何 steal 的,就藏在scan 方法中


private ForkJoinTask<?> scan(WorkQueue w, int r) {
  WorkQueue[] ws; int m;
  //再次验证workQueue[]数组的初始化情况
  if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
    //获取当前扫描状态
    int ss = w.scanState;                     // initially non-negative
    //又一个死循环,注意到出口位置就好
    //和前面逻辑类似,随机一个起始位置,并赋值给k
    for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
      WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
      int b, n; long c;
      //如果k槽位不为空
      if ((q = ws[k]) != null) {
        //base-top小于零,并且任务q不为空
        if ((n = (b = q.base) - q.top) < 0 &&
            (a = q.array) != null) {      // non-empty
          //获取base的偏移量,赋值给i
          long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
          //从base端获取任务,和前文的描述的steal搭配上了,是从base端steal
          if ((t = ((ForkJoinTask<?>)
                    U.getObjectVolatile(a, i))) != null &&
              q.base == b) {
            //是active状态
            if (ss >= 0) {
              //更新WorkQueue中数组i索引位置为空,并且更新base的值
              if (U.compareAndSwapObject(a, i, t, null)) {
                q.base = b + 1;
                //n<-1,说明当前队列还有剩余任务,继续唤醒可能存在的其他线程
                if (n < -1)       // signal others
                  signalWork(ws, q);
                //直接返回任务
                return t;
              }
            }
            else if (oldSum == 0 &&   // try to activate
                     w.scanState < 0)
              tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
          }
          //如果获取任务失败,则准备换位置扫描
          if (ss < 0)                   // refresh
            ss = w.scanState;
          r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
          origin = k = r & m;           // move and rescan
          oldSum = checkSum = 0;
          continue;
        }
        checkSum += b;
      }
      //k一直在变,扫描到最后,如果等于origin,说明已经扫描了一圈还没扫描到任务
      if ((k = (k + 1) & m) == origin) {    // continue until stable
        if ((ss >= 0 || (ss == (ss = w.scanState))) &&
            oldSum == (oldSum = checkSum)) {
          if (ss < 0 || w.qlock < 0)    // already inactive
            break;
          //准备inactive当前工作队列
          int ns = ss | INACTIVE;       // try to inactivate
          //活动线程数AC减1
          long nc = ((SP_MASK & ns) |
                     (UC_MASK & ((c = ctl) - AC_UNIT)));
          w.stackPred = (int)c;         // hold prev stack top
          U.putInt(w, QSCANSTATE, ns);
          if (U.compareAndSwapLong(this, CTL, c, nc))
            ss = ns;
          else
            w.scanState = ss;         // back out
        }
        checkSum = 0;
      }
    }
  }
  return null;
}


如果顺利扫描到任务,那就要调用 runTask 方法来真正的运行这个任务了


runTask


马上就接近真相了,steal 到任务了,就干点正事吧


        final void runTask(ForkJoinTask<?> task) {
            if (task != null) {
                scanState &= ~SCANNING; // mark as busy
                  //Flag1: 记录当前的任务是偷来的,至于如何执行task,是我们写在compute方法中的,我们一会看doExec() 方法
                (currentSteal = task).doExec();
                U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
                execLocalTasks();
                ForkJoinWorkerThread thread = owner;
                  //累加偷来的数量,亲兄弟明算帐啊,虽然算完也没啥实际意义
                if (++nsteals < 0)      // collect on overflow
                    transferStealCount(pool);
                  //任务执行完后,就重新更新scanState为SCANNING
                scanState |= SCANNING;
                if (thread != null)
                    thread.afterTopLevelExec();
            }
        }


Flag1: doExec 方法才是真正执行任务的关键,它是链接我们自定义 compute 方法的核心,来看 doExec 方法


相关文章
|
5天前
|
JavaScript Java 测试技术
基于Java的校园闲置物品交易平台的设计与实现(源码+lw+部署文档+讲解等)
基于Java的校园闲置物品交易平台的设计与实现(源码+lw+部署文档+讲解等)
14 0
|
9月前
|
存储 运维
两大赛道示例代码现已开放,4重豪礼送不停!
数据洞察创新挑战赛,两大赛道示例代码现已开放,小白也能轻松得分,20万奖金池等你瓜分!更有无门槛参与奖+限时冲榜奖+邀请试用奖,多重好礼送不停!
149 0
两大赛道示例代码现已开放,4重豪礼送不停!
|
6月前
|
数据采集 传感器 搜索推荐
链动2+1系统开发流程需求/指南功能/玩法详情/案例设计/源码出售
骑行数据采集与展示:系统需要能够通过传感器或者设备连接手机等方式,实时采集骑行数据,包括速度、距离、时间等,并能将这些数据以可视化的方式展示给用户。
|
6月前
|
存储 安全 前端开发
DApp公排互助预约抢单排单模式系统开发参考版/详细流程/方案逻辑/规则玩法/案例设计/源码程序
需求分析:与团队明确系统的需求、目标和范围,包括公排互助预约抢单排单模式系统的功能、规则、奖励机制等方面
|
7月前
|
编解码 前端开发 算法
国网B接口语音对讲和广播技术探究及与GB28181差别
在谈国网B接口的语音广播和语音对讲的时候,大家会觉得,国网B接口是不是和GB28181大同小异?实际上确实信令有差别,但是因为要GB28181设备接入测的对接,再次做国网B接口就简单多了。
|
11月前
|
存储 边缘计算 编解码
《2022中国云游戏行业认知与观察》——第二章、云游戏应用场景与技术实践——2.2 微端:游戏小包分发 提高转化效率——2.2.1 应用案例 十秒完成下载,《三国志·战略版》用了什么黑科技?
《2022中国云游戏行业认知与观察》——第二章、云游戏应用场景与技术实践——2.2 微端:游戏小包分发 提高转化效率——2.2.1 应用案例 十秒完成下载,《三国志·战略版》用了什么黑科技?
154 0
|
算法 C++
ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(下)
ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(下)
ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(下)
|
存储 数据可视化 程序员
选择文库系统的时候需要重点注意和对比哪些东西?
本人程序员出身,接近15年的代码经验,对互联网产品和运营也一直在实践和研究,尤其是对文库产品有着深度理解,因为我自己也一直在运营文库项目。下面是我站在一个普通站长角度给出的一些经验,如果你也想做一个文库网站或文库平台,需要选择一套文库系统产品,请从下面几点出发去做对比,最终做出正确选择。
选择文库系统的时候需要重点注意和对比哪些东西?
抽签软件免费提供,代码开源,可用作抽奖、课堂抽背、游戏分组等活动场合,可以直接下载
抽签软件免费提供,代码开源,可用作抽奖、课堂抽背、游戏分组等活动场合,可以直接下载
574 1
抽签软件免费提供,代码开源,可用作抽奖、课堂抽背、游戏分组等活动场合,可以直接下载
线程同步类实现【国际化翻译案例+机场安检案例】
线程同步类实现【国际化翻译案例+机场安检案例】
57 0