ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(下)

简介: ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(下)

doExec


形势一片大好,挺住,揭开 exec 的面纱,就看到本质了


//ForkJoinTask中的抽象方法,RecursiveTask 和 RecursiveAction 都重写了它
protected abstract boolean exec();
final int doExec() {
  int s; boolean completed;
  if ((s = status) >= 0) {
    try {
      completed = exec();
    } catch (Throwable rex) {
      return setExceptionalCompletion(rex);
    }
    if (completed)
      s = setCompletion(NORMAL);
  }
  return s;
}
//RecursiveTask重写的内容,终于看到我们文章开头 demo 中的compute 了
protected final boolean exec() {
  result = compute();
  return true;
}


到这里,我们已经看到本质了,绕了这么一大圈,终于和我们自己重写的compute方法联系到了一起,真是不容易,但是 runWorker 三部曲还差最后一曲 awaitWork 没谱,我们来看看


awaitWork


上面说的是 scan 到了任务,要是没有scan到任务,那就得将当前线程阻塞一下,具体标注在注释中,可以简单了解一下


private boolean awaitWork(WorkQueue w, int r) {
  if (w == null || w.qlock < 0)                 // w is terminating
    return false;
  for (int pred = w.stackPred, spins = SPINS, ss;;) {
    if ((ss = w.scanState) >= 0)
      break;
    else if (spins > 0) {
      r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
      if (r >= 0 && --spins == 0) {         // randomize spins
        WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
        if (pred != 0 && (ws = workQueues) != null &&
            (j = pred & SMASK) < ws.length &&
            //前驱任务队列还在
            (v = ws[j]) != null &&        // see if pred parking
            //并且工作队列已经激活,说明任务来了了
            (v.parker == null || v.scanState >= 0))
          //继续自旋等一会,别返回false
          spins = SPINS;                // continue spinning
      }
    }
    //自旋之后,再次检查工作队列是否终止,若是,退出扫描
    else if (w.qlock < 0)                     // recheck after spins
      return false;
    else if (!Thread.interrupted()) {
      long c, prevctl, parkTime, deadline;
      int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
      if ((ac <= 0 && tryTerminate(false, false)) ||
          (runState & STOP) != 0)           // pool terminating
        return false;
      if (ac <= 0 && ss == (int)c) {        // is last waiter
        prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
        int t = (short)(c >>> TC_SHIFT);  // shrink excess spares
        if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
          return false;                 // else use timed wait
        parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
        deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
      }
      else
        prevctl = parkTime = deadline = 0L;
      Thread wt = Thread.currentThread();
      U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupport
      w.parker = wt;
      if (w.scanState < 0 && ctl == c)      // recheck before park
        U.park(false, parkTime);
      U.putOrderedObject(w, QPARKER, null);
      U.putObject(wt, PARKBLOCKER, null);
      if (w.scanState >= 0)
        break;
      if (parkTime != 0L && ctl == c &&
          deadline - System.nanoTime() <= 0L &&
          U.compareAndSwapLong(this, CTL, c, prevctl))
        return false;                     // shrink pool
    }
  }
  return true;
}


到这里,ForkJoinPool 的完整流程算是有个基本了解了,但是我们前面讲的这些内容都是从 submission task 作为切入点的。刚刚聊到的 compute 方法,我们按照分治算法范式写了自己的逻辑,具体请回看文中开头的demo,很关键的一点是,我们在 compute 中调用了 fork 方法,这就给我们了解 worker task 的机会了,继续来看 fork 方法


微信图片_20220511201007.png


fork


Fork 方法的逻辑很简单,如果当前线程是 ForkJoinWorkerThread 类型,也就是说已经通过上文注册的 Worker,那么直接调用 push 方法将 task 放到当前线程拥有的 WorkQueue 中,否则就再调用 externalPush 重走我们已上说的所有逻辑(你敢再走一遍吗?)


public final ForkJoinTask<V> fork() {
  Thread t;
  if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
    ((ForkJoinWorkerThread)t).workQueue.push(this);
  else
    ForkJoinPool.common.externalPush(this);
  return this;
}
//push 方法很简单,这里就不再过多解释了
final void push(ForkJoinTask<?> task) {
  ForkJoinTask<?>[] a; ForkJoinPool p;
  int b = base, s = top, n;
  if ((a = array) != null) {    // ignore if queue removed
    int m = a.length - 1;     // fenced write for task visibility
    U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
    U.putOrderedInt(this, QTOP, s + 1);
    if ((n = s - b) <= 1) {
      if ((p = pool) != null)
        p.signalWork(p.workQueues, this);
    }
    else if (n >= m)
      growArray();
  }
}


有 fork 就有 join,继续看一下 join 方法()


join


join 的核心调用在 doJoin,但是看到这么多级联三元运算符,我慌了


public final V join() {
  int s;
  if ((s = doJoin() & DONE_MASK) != NORMAL)
    reportException(s);
  return getRawResult();
}
private int doJoin() {
  int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
  //status,task 的运行状态
  return (s = status) < 0 ? s :
  ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
    (w = (wt = (ForkJoinWorkerThread)t).workQueue).
    tryUnpush(this) && (s = doExec()) < 0 ? s :
  wt.pool.awaitJoin(w, this, 0L) :
  externalAwaitDone();
}


我们将 doJoin 方法用我们最熟悉的 if/else 做个改动,是不是就豁然开朗了


private int doJoin() {
  int s;
  Thread t;
  ForkJoinWorkerThread wt;
  ForkJoinPool.WorkQueue w;
  if((s = status) < 0) { // 有结果,直接返回
    return s;
  }else {
    if((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {         
      // 如果是 ForkJoinWorkerThread Worker
      if((w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) // 类似上面提到的 scan,但是是专项尝试从本工作队列里取出等待的任务
         // 取出了任务,就去执行它,并返回结果
         && (s = doExec()) < 0) { 
        return s;
      }else {
        // 也有可能别的线程把这个任务偷走了,那就执行内部等待方法
        return wt.pool.awaitJoin(w, this, 0L); 
      }
    }else { 
      // 如果不是 ForkJoinWorkerThread,执行外部等待方法
      return externalAwaitDone();
    }
  }
}


其中 awaitJoin 和 externalAwaitDone 都用到了 Helper(帮助) 和 Compensating(补偿) 两种策略,这两种策略大家完全可以自行阅读了,尤其是 awaitJoin 方法,强烈推荐大家自行阅读,其中 pop 的过程在这里,这里不再展开


到这里,有关 ForkJoinPool 相关的内容就算是结束了,为了让大家有个更好的理解 fork/join 机制,我们还是画几张图解释一下


Fork/Join 图解


假设我们的大任务是 Task(8), 最终被分治成可执行的最小单元是 Task(1)


按照分治思想拆分任务的整体目标就是这样滴:


微信图片_20220511201157.png


从外部先提交一个大的 Task(8),将其放在偶数槽位中(请注意颜色对应


微信图片_20220511201219.png


不满足并行度,会创建 Worker 1 来扫描,并从 base 端窃取到任务 task(8),执行到 compute, fork


出两个 task(4), 并 push到 WorkQueue 中


微信图片_20220511201243.png

在执行任务时始终会确认是否满足并行度要求,如果没有就会继续创建新的Worker,与此同时,也会继续 fork 任务,直到最小单元。Worker1 会从 top 端 pop 出来 task(4) 来继续 compute 和 fork,并重新 push 到 WorkQueue 中


微信图片_20220511201307.png


task(2) 还不是最小单元,所以会继续 pop 出 task(2),并最终 fork 出两个 task(1) push 到 WorkQueue中


微信图片_20220511201328.png


task(1) 已经是最小粒度了,可以直接 pop 出来执行,获取最终结果;在 Worker1 进行这些 pop 操作的同时,为了满足并行度要求也会创建的其他Worker,比如 Worker 2,这时 Worker2 会从 Worker 1 所在队列的 base 端窃取任务


微信图片_20220511201351.png


Worker 2 依旧是按照这个规则进行 pop->fork,到最终可以 exec 任务,假设 Worker 1 的任务先执行完,要 join 结果,当 join task(4) 时,通过 hint 定位到是谁偷走了 task(4),这时顺藤摸瓜找到 Worker2,如果 Worker2 还有任务没执行完,Worker1 再窃取回来帮着执行,这样互帮互助,最终快速完成任务


灵魂追问


  1. 为什么说 ForkjoinPool 效率要更高?同时建议使用 commonPool?


  1. JDK1.8 Stream 底层就充分用到了 ForkJoinPool,你知道还有哪里用到了 ForkJoinPool 了吗


  1. ForkJoinPool 最多会有多少个槽位?


  1. 下面代码有人说不能充分利用 ForkJoinPool,多个 task 的提交要用 invokeAll,你知道为什么吗?如果不用 invokeAll,要怎样使用 fork/join 呢?
protected Long compute() {
    if (任务足够小) {
        return cal();
    }
    SumTask subtask1 = new SumTask(...);
    SumTask subtask2 = new SumTask(...);
    // 分别对子任务调用fork():
    subtask1.fork();
    subtask2.fork();
    // 分别获取合并结果:
    Long subresult1 = subtask1.join();
    Long subresult2 = subtask2.join();
    return subresult1 + subresult2;
}


总结


这又是一篇长文,很多小伙伴私下都建议我将长文拆开,一方面读者好消化,另一方面我自己也在数量的体现上变得高产。几次想拆开,但好多文章拆开就失去了连续性(大家都有遗忘曲线)。过年没回老家,就有时间撸文章了。为了更好的理解源码,文章的基础铺垫内容很多,看到这,你应该很累了,想要将更零散的知识点串起来,那就多看代码注释回味一下,然后一起膜拜 Doug Lea 吧



相关文章
|
数据挖掘 索引 Python
数据分析三剑客【AIoT阶段一(下)】(十万字博文 保姆级讲解)—NumPy—Numpy 高级—训练场(1)(十一)
你好,感谢你能点进来本篇博客,请不要着急退出,相信我,如果你有一定的 Python 基础,想要学习 Python数据分析的三大库:numpy,pandas,matplotlib;这篇文章不会让你失望,本篇博客是 【AIoT阶段一(下)】 的内容:Python数据分析,
412 0
数据分析三剑客【AIoT阶段一(下)】(十万字博文 保姆级讲解)—NumPy—Numpy 高级—训练场(1)(十一)
|
19天前
|
JavaScript Java 测试技术
基于Java的双减后初小教育课外学习生活活动平台的设计与实现(源码+lw+部署文档+讲解等)
基于Java的双减后初小教育课外学习生活活动平台的设计与实现(源码+lw+部署文档+讲解等)
22 0
|
9月前
|
新零售 人工智能 供应链
七星创客/艾倍生/推三返一/系统开发方案项目/开发案例/规则玩法/源码程序
  所谓新零售即是个人、企业以互联网为依托,通过运用大数据、人工智能等先进技术手段,对商品的生产、流通与销售过程进行升级改造,进而重塑业态结构与生态圈,并对线上服务、线下体验以及现代物流进行深度融合的零售新模式。
|
10月前
|
存储 运维
两大赛道示例代码现已开放,4重豪礼送不停!
数据洞察创新挑战赛,两大赛道示例代码现已开放,小白也能轻松得分,20万奖金池等你瓜分!更有无门槛参与奖+限时冲榜奖+邀请试用奖,多重好礼送不停!
149 0
两大赛道示例代码现已开放,4重豪礼送不停!
|
7月前
|
存储 安全 前端开发
DApp公排互助预约抢单排单模式系统开发参考版/详细流程/方案逻辑/规则玩法/案例设计/源码程序
需求分析:与团队明确系统的需求、目标和范围,包括公排互助预约抢单排单模式系统的功能、规则、奖励机制等方面
|
11月前
|
算法 机器人 区块链
数字货币量化机器人系统开发(项目案例)/功能说明/逻辑方案/源码平台
  简单地说,量化交易机器人就是能够自动执行交易策略的交易软件。它借助于计算机技术和数学模型,对市场行情进行分析预测,并根据程序设定的规则和条件自动执行交易策略,完成交易操作。Compared with traditional manual trading,quantitative trading robots have faster trading speed,lower transaction costs,and higher trading efficiency.
|
11月前
|
UED
体育赛事直播系统的源码中包含的的互动功能详解
近年来随着和手机的普及,体育直播平台已经成为了当代人收看体育赛事的重要途径之一。而赛事直播系统的源码中包含的的互动功能,则是观众和体育爱好者带来了更丰富、实时参与体验感。本文详解“东莞梦幻网络科技”所提供的体育直播系统源码中常用的互动功能,并讨论其对于体育直播的作用。
|
12月前
|
存储 边缘计算 编解码
《2022中国云游戏行业认知与观察》——第二章、云游戏应用场景与技术实践——2.2 微端:游戏小包分发 提高转化效率——2.2.1 应用案例 十秒完成下载,《三国志·战略版》用了什么黑科技?
《2022中国云游戏行业认知与观察》——第二章、云游戏应用场景与技术实践——2.2 微端:游戏小包分发 提高转化效率——2.2.1 应用案例 十秒完成下载,《三国志·战略版》用了什么黑科技?
159 0
抽签软件免费提供,代码开源,可用作抽奖、课堂抽背、游戏分组等活动场合,可以直接下载
抽签软件免费提供,代码开源,可用作抽奖、课堂抽背、游戏分组等活动场合,可以直接下载
584 1
抽签软件免费提供,代码开源,可用作抽奖、课堂抽背、游戏分组等活动场合,可以直接下载
|
机器学习/深度学习 算法 搜索推荐
基于surprise模块快速搭建旅游产品推荐系统(代码+原理)(一)
基于surprise模块快速搭建旅游产品推荐系统(代码+原理)
448 0
基于surprise模块快速搭建旅游产品推荐系统(代码+原理)(一)