ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(下)

简介: ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(下)

doExec


形势一片大好,挺住,揭开 exec 的面纱,就看到本质了


//ForkJoinTask中的抽象方法,RecursiveTask 和 RecursiveAction 都重写了它
protected abstract boolean exec();
final int doExec() {
  int s; boolean completed;
  if ((s = status) >= 0) {
    try {
      completed = exec();
    } catch (Throwable rex) {
      return setExceptionalCompletion(rex);
    }
    if (completed)
      s = setCompletion(NORMAL);
  }
  return s;
}
//RecursiveTask重写的内容,终于看到我们文章开头 demo 中的compute 了
protected final boolean exec() {
  result = compute();
  return true;
}


到这里,我们已经看到本质了,绕了这么一大圈,终于和我们自己重写的compute方法联系到了一起,真是不容易,但是 runWorker 三部曲还差最后一曲 awaitWork 没谱,我们来看看


awaitWork


上面说的是 scan 到了任务,要是没有scan到任务,那就得将当前线程阻塞一下,具体标注在注释中,可以简单了解一下


private boolean awaitWork(WorkQueue w, int r) {
  if (w == null || w.qlock < 0)                 // w is terminating
    return false;
  for (int pred = w.stackPred, spins = SPINS, ss;;) {
    if ((ss = w.scanState) >= 0)
      break;
    else if (spins > 0) {
      r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
      if (r >= 0 && --spins == 0) {         // randomize spins
        WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
        if (pred != 0 && (ws = workQueues) != null &&
            (j = pred & SMASK) < ws.length &&
            //前驱任务队列还在
            (v = ws[j]) != null &&        // see if pred parking
            //并且工作队列已经激活,说明任务来了了
            (v.parker == null || v.scanState >= 0))
          //继续自旋等一会,别返回false
          spins = SPINS;                // continue spinning
      }
    }
    //自旋之后,再次检查工作队列是否终止,若是,退出扫描
    else if (w.qlock < 0)                     // recheck after spins
      return false;
    else if (!Thread.interrupted()) {
      long c, prevctl, parkTime, deadline;
      int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
      if ((ac <= 0 && tryTerminate(false, false)) ||
          (runState & STOP) != 0)           // pool terminating
        return false;
      if (ac <= 0 && ss == (int)c) {        // is last waiter
        prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
        int t = (short)(c >>> TC_SHIFT);  // shrink excess spares
        if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
          return false;                 // else use timed wait
        parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
        deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
      }
      else
        prevctl = parkTime = deadline = 0L;
      Thread wt = Thread.currentThread();
      U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupport
      w.parker = wt;
      if (w.scanState < 0 && ctl == c)      // recheck before park
        U.park(false, parkTime);
      U.putOrderedObject(w, QPARKER, null);
      U.putObject(wt, PARKBLOCKER, null);
      if (w.scanState >= 0)
        break;
      if (parkTime != 0L && ctl == c &&
          deadline - System.nanoTime() <= 0L &&
          U.compareAndSwapLong(this, CTL, c, prevctl))
        return false;                     // shrink pool
    }
  }
  return true;
}


到这里,ForkJoinPool 的完整流程算是有个基本了解了,但是我们前面讲的这些内容都是从 submission task 作为切入点的。刚刚聊到的 compute 方法,我们按照分治算法范式写了自己的逻辑,具体请回看文中开头的demo,很关键的一点是,我们在 compute 中调用了 fork 方法,这就给我们了解 worker task 的机会了,继续来看 fork 方法


微信图片_20220511201007.png


fork


Fork 方法的逻辑很简单,如果当前线程是 ForkJoinWorkerThread 类型,也就是说已经通过上文注册的 Worker,那么直接调用 push 方法将 task 放到当前线程拥有的 WorkQueue 中,否则就再调用 externalPush 重走我们已上说的所有逻辑(你敢再走一遍吗?)


public final ForkJoinTask<V> fork() {
  Thread t;
  if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
    ((ForkJoinWorkerThread)t).workQueue.push(this);
  else
    ForkJoinPool.common.externalPush(this);
  return this;
}
//push 方法很简单,这里就不再过多解释了
final void push(ForkJoinTask<?> task) {
  ForkJoinTask<?>[] a; ForkJoinPool p;
  int b = base, s = top, n;
  if ((a = array) != null) {    // ignore if queue removed
    int m = a.length - 1;     // fenced write for task visibility
    U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
    U.putOrderedInt(this, QTOP, s + 1);
    if ((n = s - b) <= 1) {
      if ((p = pool) != null)
        p.signalWork(p.workQueues, this);
    }
    else if (n >= m)
      growArray();
  }
}


有 fork 就有 join,继续看一下 join 方法()


join


join 的核心调用在 doJoin,但是看到这么多级联三元运算符,我慌了


public final V join() {
  int s;
  if ((s = doJoin() & DONE_MASK) != NORMAL)
    reportException(s);
  return getRawResult();
}
private int doJoin() {
  int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
  //status,task 的运行状态
  return (s = status) < 0 ? s :
  ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
    (w = (wt = (ForkJoinWorkerThread)t).workQueue).
    tryUnpush(this) && (s = doExec()) < 0 ? s :
  wt.pool.awaitJoin(w, this, 0L) :
  externalAwaitDone();
}


我们将 doJoin 方法用我们最熟悉的 if/else 做个改动,是不是就豁然开朗了


private int doJoin() {
  int s;
  Thread t;
  ForkJoinWorkerThread wt;
  ForkJoinPool.WorkQueue w;
  if((s = status) < 0) { // 有结果,直接返回
    return s;
  }else {
    if((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {         
      // 如果是 ForkJoinWorkerThread Worker
      if((w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) // 类似上面提到的 scan,但是是专项尝试从本工作队列里取出等待的任务
         // 取出了任务,就去执行它,并返回结果
         && (s = doExec()) < 0) { 
        return s;
      }else {
        // 也有可能别的线程把这个任务偷走了,那就执行内部等待方法
        return wt.pool.awaitJoin(w, this, 0L); 
      }
    }else { 
      // 如果不是 ForkJoinWorkerThread,执行外部等待方法
      return externalAwaitDone();
    }
  }
}


其中 awaitJoin 和 externalAwaitDone 都用到了 Helper(帮助) 和 Compensating(补偿) 两种策略,这两种策略大家完全可以自行阅读了,尤其是 awaitJoin 方法,强烈推荐大家自行阅读,其中 pop 的过程在这里,这里不再展开


到这里,有关 ForkJoinPool 相关的内容就算是结束了,为了让大家有个更好的理解 fork/join 机制,我们还是画几张图解释一下


Fork/Join 图解


假设我们的大任务是 Task(8), 最终被分治成可执行的最小单元是 Task(1)


按照分治思想拆分任务的整体目标就是这样滴:


微信图片_20220511201157.png


从外部先提交一个大的 Task(8),将其放在偶数槽位中(请注意颜色对应


微信图片_20220511201219.png


不满足并行度,会创建 Worker 1 来扫描,并从 base 端窃取到任务 task(8),执行到 compute, fork


出两个 task(4), 并 push到 WorkQueue 中


微信图片_20220511201243.png

在执行任务时始终会确认是否满足并行度要求,如果没有就会继续创建新的Worker,与此同时,也会继续 fork 任务,直到最小单元。Worker1 会从 top 端 pop 出来 task(4) 来继续 compute 和 fork,并重新 push 到 WorkQueue 中


微信图片_20220511201307.png


task(2) 还不是最小单元,所以会继续 pop 出 task(2),并最终 fork 出两个 task(1) push 到 WorkQueue中


微信图片_20220511201328.png


task(1) 已经是最小粒度了,可以直接 pop 出来执行,获取最终结果;在 Worker1 进行这些 pop 操作的同时,为了满足并行度要求也会创建的其他Worker,比如 Worker 2,这时 Worker2 会从 Worker 1 所在队列的 base 端窃取任务


微信图片_20220511201351.png


Worker 2 依旧是按照这个规则进行 pop->fork,到最终可以 exec 任务,假设 Worker 1 的任务先执行完,要 join 结果,当 join task(4) 时,通过 hint 定位到是谁偷走了 task(4),这时顺藤摸瓜找到 Worker2,如果 Worker2 还有任务没执行完,Worker1 再窃取回来帮着执行,这样互帮互助,最终快速完成任务


灵魂追问


  1. 为什么说 ForkjoinPool 效率要更高?同时建议使用 commonPool?


  1. JDK1.8 Stream 底层就充分用到了 ForkJoinPool,你知道还有哪里用到了 ForkJoinPool 了吗


  1. ForkJoinPool 最多会有多少个槽位?


  1. 下面代码有人说不能充分利用 ForkJoinPool,多个 task 的提交要用 invokeAll,你知道为什么吗?如果不用 invokeAll,要怎样使用 fork/join 呢?
protected Long compute() {
    if (任务足够小) {
        return cal();
    }
    SumTask subtask1 = new SumTask(...);
    SumTask subtask2 = new SumTask(...);
    // 分别对子任务调用fork():
    subtask1.fork();
    subtask2.fork();
    // 分别获取合并结果:
    Long subresult1 = subtask1.join();
    Long subresult2 = subtask2.join();
    return subresult1 + subresult2;
}


总结


这又是一篇长文,很多小伙伴私下都建议我将长文拆开,一方面读者好消化,另一方面我自己也在数量的体现上变得高产。几次想拆开,但好多文章拆开就失去了连续性(大家都有遗忘曲线)。过年没回老家,就有时间撸文章了。为了更好的理解源码,文章的基础铺垫内容很多,看到这,你应该很累了,想要将更零散的知识点串起来,那就多看代码注释回味一下,然后一起膜拜 Doug Lea 吧



相关文章
|
4月前
|
数据可视化 安全 关系型数据库
写给工程师的 MacBook 商用级大模型知识库部署方案(上)
写给工程师的 MacBook 商用级大模型知识库部署方案(上)
506 2
|
10月前
|
存储 前端开发 安全
0039Java程序设计-基于java校园闲置物交易系统论文
0039Java程序设计-基于java校园闲置物交易系统论文
70 0
|
11月前
|
机器学习/深度学习 人工智能 自然语言处理
科普神文,一次性讲透AI大模型的核心概念
令牌,向量,嵌入,注意力,这些AI大模型名词是否一直让你感觉熟悉又陌生,如果答案肯定的话,那么朋友,今天这篇科普神文不容错过。我将结合大量示例及可视化的图形手段,为你由浅入深一次性讲透AI大模型的核心概念。本文转载至:https://baijiahao.baidu.com/s?id=1779925030313909037&wfr=spider&for=pc。确实是一篇很不错的文,很好的解释了大模型底层的一些基本概念,对于我这种AI新手非常友好哈哈哈
科普神文,一次性讲透AI大模型的核心概念
|
4月前
|
新零售 人工智能 供应链
写给工程师的 MacBook 商用级大模型知识库部署方案(下)
写给工程师的 MacBook 商用级大模型知识库部署方案(下)
311 2
|
4月前
|
NoSQL 关系型数据库 API
写给工程师的 MacBook 商用级大模型知识库部署方案(中)
写给工程师的 MacBook 商用级大模型知识库部署方案(中)
224 1
|
11月前
|
机器学习/深度学习 自然语言处理
社区供稿 | EcomGPT:基于任务链数据的电商大模型(附魔搭推理实践)
在电商领域中,自然语言处理和深度学习的发展对电商技术的推进做出了很大的贡献。通过这些技术,可以实现从产品信息提取到用户查询理解等多种能力,尤其是近期各类大语言模型(Large Language Models,LLMs)的涌现,让我们看到了它们在电商领域引用的潜力。然而,通用的大语言模型并不是专门为电商领域设计的,这可能导致它们在电商任务中表现不佳。
|
人工智能 自然语言处理 前端开发
阿里云智能媒体服务IMS在视频剪辑中花式抠图的代码实操与案例详述
本文介绍阿里云智能媒体服务IMS,围绕视频剪辑及数字人训练中的抠图需求,如何运用 绿幕抠图、实景抠图能力,实现高效、便捷的视频制作及合成体验。
376 0
|
JavaScript 前端开发 开发工具
如何做到一站检索前沿主流 AIGC / GPT 文章?定时任务抓取文章!
如何做到一站检索前沿主流 AIGC / GPT 文章?定时任务抓取文章!
276 0
如何用ChatGPT做内容营销方案和选题计划,同时生产和优化内容?
该场景对应的关键词库(31个): 内容营销、目标、主题、类型、选题计划、素材、推广策略、优化方案、渠道、目标受众、竞争对手、行业背景、转化率、品牌知名度、客户参与度、销售、发布频率、选题阶段、生产阶段、推广阶段、预算分配、人群特征、话题标签、视觉元素、电子邮件、SEO、数字广告、线下广告、在线聊天、社交媒体、赞助
512 0
|
机器学习/深度学习 人工智能 PyTorch
AutoGPT star量破10万,这是首篇系统介绍自主智能体的文章(1)
AutoGPT star量破10万,这是首篇系统介绍自主智能体的文章
118 0