ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(下)

简介: ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(下)

doExec


形势一片大好,挺住,揭开 exec 的面纱,就看到本质了


//ForkJoinTask中的抽象方法,RecursiveTask 和 RecursiveAction 都重写了它
protected abstract boolean exec();
final int doExec() {
  int s; boolean completed;
  if ((s = status) >= 0) {
    try {
      completed = exec();
    } catch (Throwable rex) {
      return setExceptionalCompletion(rex);
    }
    if (completed)
      s = setCompletion(NORMAL);
  }
  return s;
}
//RecursiveTask重写的内容,终于看到我们文章开头 demo 中的compute 了
protected final boolean exec() {
  result = compute();
  return true;
}


到这里,我们已经看到本质了,绕了这么一大圈,终于和我们自己重写的compute方法联系到了一起,真是不容易,但是 runWorker 三部曲还差最后一曲 awaitWork 没谱,我们来看看


awaitWork


上面说的是 scan 到了任务,要是没有scan到任务,那就得将当前线程阻塞一下,具体标注在注释中,可以简单了解一下


private boolean awaitWork(WorkQueue w, int r) {
  if (w == null || w.qlock < 0)                 // w is terminating
    return false;
  for (int pred = w.stackPred, spins = SPINS, ss;;) {
    if ((ss = w.scanState) >= 0)
      break;
    else if (spins > 0) {
      r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
      if (r >= 0 && --spins == 0) {         // randomize spins
        WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
        if (pred != 0 && (ws = workQueues) != null &&
            (j = pred & SMASK) < ws.length &&
            //前驱任务队列还在
            (v = ws[j]) != null &&        // see if pred parking
            //并且工作队列已经激活,说明任务来了了
            (v.parker == null || v.scanState >= 0))
          //继续自旋等一会,别返回false
          spins = SPINS;                // continue spinning
      }
    }
    //自旋之后,再次检查工作队列是否终止,若是,退出扫描
    else if (w.qlock < 0)                     // recheck after spins
      return false;
    else if (!Thread.interrupted()) {
      long c, prevctl, parkTime, deadline;
      int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
      if ((ac <= 0 && tryTerminate(false, false)) ||
          (runState & STOP) != 0)           // pool terminating
        return false;
      if (ac <= 0 && ss == (int)c) {        // is last waiter
        prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
        int t = (short)(c >>> TC_SHIFT);  // shrink excess spares
        if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
          return false;                 // else use timed wait
        parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
        deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
      }
      else
        prevctl = parkTime = deadline = 0L;
      Thread wt = Thread.currentThread();
      U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupport
      w.parker = wt;
      if (w.scanState < 0 && ctl == c)      // recheck before park
        U.park(false, parkTime);
      U.putOrderedObject(w, QPARKER, null);
      U.putObject(wt, PARKBLOCKER, null);
      if (w.scanState >= 0)
        break;
      if (parkTime != 0L && ctl == c &&
          deadline - System.nanoTime() <= 0L &&
          U.compareAndSwapLong(this, CTL, c, prevctl))
        return false;                     // shrink pool
    }
  }
  return true;
}


到这里,ForkJoinPool 的完整流程算是有个基本了解了,但是我们前面讲的这些内容都是从 submission task 作为切入点的。刚刚聊到的 compute 方法,我们按照分治算法范式写了自己的逻辑,具体请回看文中开头的demo,很关键的一点是,我们在 compute 中调用了 fork 方法,这就给我们了解 worker task 的机会了,继续来看 fork 方法


微信图片_20220511201007.png


fork


Fork 方法的逻辑很简单,如果当前线程是 ForkJoinWorkerThread 类型,也就是说已经通过上文注册的 Worker,那么直接调用 push 方法将 task 放到当前线程拥有的 WorkQueue 中,否则就再调用 externalPush 重走我们已上说的所有逻辑(你敢再走一遍吗?)


public final ForkJoinTask<V> fork() {
  Thread t;
  if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
    ((ForkJoinWorkerThread)t).workQueue.push(this);
  else
    ForkJoinPool.common.externalPush(this);
  return this;
}
//push 方法很简单,这里就不再过多解释了
final void push(ForkJoinTask<?> task) {
  ForkJoinTask<?>[] a; ForkJoinPool p;
  int b = base, s = top, n;
  if ((a = array) != null) {    // ignore if queue removed
    int m = a.length - 1;     // fenced write for task visibility
    U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
    U.putOrderedInt(this, QTOP, s + 1);
    if ((n = s - b) <= 1) {
      if ((p = pool) != null)
        p.signalWork(p.workQueues, this);
    }
    else if (n >= m)
      growArray();
  }
}


有 fork 就有 join,继续看一下 join 方法()


join


join 的核心调用在 doJoin,但是看到这么多级联三元运算符,我慌了


public final V join() {
  int s;
  if ((s = doJoin() & DONE_MASK) != NORMAL)
    reportException(s);
  return getRawResult();
}
private int doJoin() {
  int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
  //status,task 的运行状态
  return (s = status) < 0 ? s :
  ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
    (w = (wt = (ForkJoinWorkerThread)t).workQueue).
    tryUnpush(this) && (s = doExec()) < 0 ? s :
  wt.pool.awaitJoin(w, this, 0L) :
  externalAwaitDone();
}


我们将 doJoin 方法用我们最熟悉的 if/else 做个改动,是不是就豁然开朗了


private int doJoin() {
  int s;
  Thread t;
  ForkJoinWorkerThread wt;
  ForkJoinPool.WorkQueue w;
  if((s = status) < 0) { // 有结果,直接返回
    return s;
  }else {
    if((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {         
      // 如果是 ForkJoinWorkerThread Worker
      if((w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) // 类似上面提到的 scan,但是是专项尝试从本工作队列里取出等待的任务
         // 取出了任务,就去执行它,并返回结果
         && (s = doExec()) < 0) { 
        return s;
      }else {
        // 也有可能别的线程把这个任务偷走了,那就执行内部等待方法
        return wt.pool.awaitJoin(w, this, 0L); 
      }
    }else { 
      // 如果不是 ForkJoinWorkerThread,执行外部等待方法
      return externalAwaitDone();
    }
  }
}


其中 awaitJoin 和 externalAwaitDone 都用到了 Helper(帮助) 和 Compensating(补偿) 两种策略,这两种策略大家完全可以自行阅读了,尤其是 awaitJoin 方法,强烈推荐大家自行阅读,其中 pop 的过程在这里,这里不再展开


到这里,有关 ForkJoinPool 相关的内容就算是结束了,为了让大家有个更好的理解 fork/join 机制,我们还是画几张图解释一下


Fork/Join 图解


假设我们的大任务是 Task(8), 最终被分治成可执行的最小单元是 Task(1)


按照分治思想拆分任务的整体目标就是这样滴:


微信图片_20220511201157.png


从外部先提交一个大的 Task(8),将其放在偶数槽位中(请注意颜色对应


微信图片_20220511201219.png


不满足并行度,会创建 Worker 1 来扫描,并从 base 端窃取到任务 task(8),执行到 compute, fork


出两个 task(4), 并 push到 WorkQueue 中


微信图片_20220511201243.png

在执行任务时始终会确认是否满足并行度要求,如果没有就会继续创建新的Worker,与此同时,也会继续 fork 任务,直到最小单元。Worker1 会从 top 端 pop 出来 task(4) 来继续 compute 和 fork,并重新 push 到 WorkQueue 中


微信图片_20220511201307.png


task(2) 还不是最小单元,所以会继续 pop 出 task(2),并最终 fork 出两个 task(1) push 到 WorkQueue中


微信图片_20220511201328.png


task(1) 已经是最小粒度了,可以直接 pop 出来执行,获取最终结果;在 Worker1 进行这些 pop 操作的同时,为了满足并行度要求也会创建的其他Worker,比如 Worker 2,这时 Worker2 会从 Worker 1 所在队列的 base 端窃取任务


微信图片_20220511201351.png


Worker 2 依旧是按照这个规则进行 pop->fork,到最终可以 exec 任务,假设 Worker 1 的任务先执行完,要 join 结果,当 join task(4) 时,通过 hint 定位到是谁偷走了 task(4),这时顺藤摸瓜找到 Worker2,如果 Worker2 还有任务没执行完,Worker1 再窃取回来帮着执行,这样互帮互助,最终快速完成任务


灵魂追问


  1. 为什么说 ForkjoinPool 效率要更高?同时建议使用 commonPool?


  1. JDK1.8 Stream 底层就充分用到了 ForkJoinPool,你知道还有哪里用到了 ForkJoinPool 了吗


  1. ForkJoinPool 最多会有多少个槽位?


  1. 下面代码有人说不能充分利用 ForkJoinPool,多个 task 的提交要用 invokeAll,你知道为什么吗?如果不用 invokeAll,要怎样使用 fork/join 呢?
protected Long compute() {
    if (任务足够小) {
        return cal();
    }
    SumTask subtask1 = new SumTask(...);
    SumTask subtask2 = new SumTask(...);
    // 分别对子任务调用fork():
    subtask1.fork();
    subtask2.fork();
    // 分别获取合并结果:
    Long subresult1 = subtask1.join();
    Long subresult2 = subtask2.join();
    return subresult1 + subresult2;
}


总结


这又是一篇长文,很多小伙伴私下都建议我将长文拆开,一方面读者好消化,另一方面我自己也在数量的体现上变得高产。几次想拆开,但好多文章拆开就失去了连续性(大家都有遗忘曲线)。过年没回老家,就有时间撸文章了。为了更好的理解源码,文章的基础铺垫内容很多,看到这,你应该很累了,想要将更零散的知识点串起来,那就多看代码注释回味一下,然后一起膜拜 Doug Lea 吧



目录
打赏
0
1
0
0
1
分享
相关文章
0039Java程序设计-基于java校园闲置物交易系统论文
0039Java程序设计-基于java校园闲置物交易系统论文
98 0
手机可跑,3.8B参数量超越GPT-3.5!微软发布Phi-3技术报告:秘密武器是洗干净数据
【5月更文挑战第16天】微软发布 Phi-3 技术报告,介绍了一个拥有3.8B参数的新语言模型,超越GPT-3.5,成为最大模型之一。 Phi-3 在手机上运行的特性开启了大型模型移动应用新纪元。报告强调数据清洗是关键,通过优化设计实现高效运行。实验显示 Phi-3 在多项NLP任务中表现出色,但泛化能力和数据隐私仍是挑战。该模型预示着AI领域的未来突破。[[论文链接](https://arxiv.org/pdf/2404.14219.pdf)]
106 2
如何做到一站检索前沿主流 AIGC / GPT 文章?定时任务抓取文章!
如何做到一站检索前沿主流 AIGC / GPT 文章?定时任务抓取文章!
310 0
手把手一步一步教你使用Java开发一个大型街机动作闯关类游戏02支持中文及显示FPS
手把手一步一步教你使用Java开发一个大型街机动作闯关类游戏02支持中文及显示FPS
152 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等