ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(上)

简介: ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(上)

知识回顾



并发工具类我们已经讲了很多,这些工具类的「目标」是让我们只关注任务本身,并且忽视线程间合作细节,简化了并发编程难度的同时,也增加了很多安全性。工具类的对使用者的「目标」虽然一致,但每一个工具类本身都有它独特的应用场景,比如:





将上面三种通用场景形象化展示一下:


微信图片_20220511192807.png


结合上图相信你的脑海里已经浮现出这几个工具类的具体实现方式,感觉这已经涵盖了所有的并发场景。


微信图片_20220511192833.png


TYTS,以上这些方式的子线程接到任务后不会再继续拆分成「子子」任务,也就是说,子线程即便接到很大或很复杂的任务也得硬着头皮努力执行完,很显然这个大任务是问题关键


如果能把大任务拆分成更小的子问题,直到子问题简单到可以直接求解就好了,这就是分治的思想


分治思想


在计算机科学中,分治法是一种很重要的算法。字面上的解释是「分而治之」,就是把一个复杂的问题分成两个或更多的相同或相似的子问题,再把子问题分成更小的子问题……直到最后子问题可以简单的直接求解,原问题的解就变成了子问题解的合并


这个技巧是很多高效算法的基础,如排序算法 (快速排序,归并排序),傅立叶变换 (快速傅立叶变换)……,如果你是搞大数据的,MapReduce 就是分支思想的典型,如果你想更详细的理解分治相关的算法,请参考这篇一文图解分治算法和思想


结合上面的描述,相信你脑海中已经构建出来分治的模型了:


微信图片_20220511192908.png


那所有的大任务都能用分治算法来解决吗?很显然不是的


分治法适用的情况


总体来说,分治法所能解决的问题一般具有以下几个特征:


  1. 该问题的规模缩小到一定的程度就可以容易地解决


  1. 该问题可以分解为若干个规模较小的相同问题,即该问题具有最优子结构性质。


  1. 利用该问题分解出的子问题的解可以合并为该问题的解;


  1. 该问题所分解出的各个子问题是相互独立的,即子问题之间不包含公共的子子问题


了解了分治算法的核心思想,我们就来看看 Java 是如何利用分治思想拆分与合并任务的吧


ForkJoin


有子任务,自然要用到多线程。我们很早之前说过,执行子任务的线程不允许单独创建,要用线程池管理。秉承相同设计理念,再结合分治算法, ForkJoin 框架中就出现了 ForkJoinPool 和 ForkJoinTask。正所谓:


天对地,雨对风。大陆对长空。山花对海树,赤曰对苍穹


套用已有知识,简单理解就是这样滴:


微信图片_20220511193006.png


我们之前说过无数次,JDK 不会重复造轮子,这里谈及相似是为了让大家有个简单的直观印象,内里肯定有所差别,我们先大致看一下这两个类:


ForkJoinTask


又是这个男人,Doug Lea,怎么就那么牛(破音)


 /**
 * Abstract base class for tasks that run within a {@link ForkJoinPool}.
 * A {@code ForkJoinTask} is a thread-like entity that is much
 * lighter weight than a normal thread.  Huge numbers of tasks and
 * subtasks may be hosted by a small number of actual threads in a
 * ForkJoinPool, at the price of some usage limitations.
 *
 * @since 1.7
 * @author Doug Lea
 */
public abstract class ForkJoinTask<V> implements Future<V>, Serializable


可以看到 ForkJoinTask 实现了 Future 接口(那就是具有 Future 接口的特性),同样如其名,fork()join() 自然是它的两个核心方法


  • fork() : 异步执行一个子任务(上面说的拆分)


  • join() : 阻塞当前线程等待子任务的执行结果(上面说的合并)


另外,从上面代码中可以看出,ForkJoinTask 是一个抽象类,在分治模型中,它还有两个抽象子类 RecursiveActionRecursiveTask


微信图片_20220511193054.png


那这两个子抽象类有什么差别呢?如果你打开 IDE,你应该一眼就能看出差别,so easy


微信图片_20220511193122.png


public abstract class RecursiveAction extends ForkJoinTask<Void>{
    ...
  /**
   * The main computation performed by this task.
   */
  protected abstract void compute();
  ...
}
public abstract class RecursiveTask<V> extends ForkJoinTask<V>{
    ...
  protected abstract void compute();
  ...
}


两个类里面都定义了一个抽象方法 compute() ,需要子类重写实现具体逻辑


那子类要遵循什么逻辑重写这个方法呢?


遵循分治思想,重写的逻辑很简单,就是回答三个问题:


  • 什么时候进一步拆分任务?


  • 什么时候满足最小可执行任务,即不再进行拆分?


  • 什么时候汇总子任务结果


用「伪代码」再翻译一下上面这段话,大概就是这样滴:


if(任务小到不用继续拆分){
    直接计算得到结果
}else{
    拆分子任务
    调用子任务的fork()进行计算
    调用子任务的join()合并计算结果
}


(作为程序员,如果你写过递归运算,这个逻辑理解起来是非常简单的)


介绍到这里,就可以用 ForkJoin 干些事情了——经典 Fibonacci 计算就可以用分治思想(不信,你逐条按照上面分治算法适用情况自问自答一下?),直接借用官方 Docs (注意看 compute 方法),额外添加个 main 方法来看一下:


@Slf4j
public class ForkJoinDemo {
    public static void main(String[] args) {
        int n = 20;
        // 为了追踪子线程名称,需要重写 ForkJoinWorkerThreadFactory 的方法
        final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
            final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
            worker.setName("my-thread" + worker.getPoolIndex());
            return worker;
        };
        //创建分治任务线程池,可以追踪到线程名称
        ForkJoinPool forkJoinPool = new ForkJoinPool(4, factory, null, false);
        // 快速创建 ForkJoinPool 方法
        // ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        //创建分治任务
        Fibonacci fibonacci = new Fibonacci(n);
        //调用 invoke 方法启动分治任务
        Integer result = forkJoinPool.invoke(fibonacci);
        log.info("Fibonacci {} 的结果是 {}", n, result);
    }
}
@Slf4j
class Fibonacci extends RecursiveTask<Integer> {
    final int n;
    Fibonacci(int n) {
        this.n = n;
    }
    @Override
    public Integer compute() {
        //和递归类似,定义可计算的最小单元
        if (n <= 1) {
            return n;
        }
        // 想查看子线程名称输出的可以打开下面注释
        //log.info(Thread.currentThread().getName());
        Fibonacci f1 = new Fibonacci(n - 1);
        // 拆分成子任务
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        // f1.join 等待子任务执行结果
        return f2.compute() + f1.join();
    }
}


执行结果如下:


微信图片_20220511193257.png


进展到这里,相信基本的使用就已经搞定了,上面代码中使用了 ForkJoinPool,那问题来了:


池化既然是一类思想,Java 已经有了 ThreadPoolExecutor ,为什么又要搞出个 ForkJoinPool 呢?


微信图片_20220511193348.png


借助下面这张图,先来回忆一下 ThreadPoolExecutor 的实现原理(详情请看为什么要使用线程池):


微信图片_20220511193409.png


一眼就能看出来这是典型的生产者/消费者模式,消费者线程都从一个共享的 Task Queue 中消费提交的任务。ThreadPoolExecutor 简单的并行操作主要是为了执行时间不确定的任务(I/O 或定时任务等)


JDK 重复造轮子是不可能的,分治思想其实也可以理解成一种父子任务依赖的关系,当依赖层级非常深,用 ThreadPoolExecutor 来处理这种关系很显然是不太现实的,所以 ForkJoinPool 作为功能补充就出现了


ForkJoinPool


任务拆分后有依赖关系,还得减少线程之间的竞争,那就让线程执行属于自己的 task 就可以了呗,所以较 ThreadPoolExecutor 的单个 TaskQueue 的形式,ForkJoinPool 是多个 TaskQueue的形式,简单用图来表示,就是这样滴:


微信图片_20220511193442.png


有多个任务队列,所以在 ForkJoinPool 中就有一个数组形式的成员变量 WorkQueue[]。那问题又来了


任务队列有多个,提交的任务放到哪个队列中呢?(上图中的 Router Rule 部分)


这就需要一套路由规则,从上面的代码 Demo 中可以理解,提交的任务主要有两种:


  • 有外部直接提交的(submission task


  • 也有任务自己 fork 出来的(worker task


为了进一步区分这两种 task,Doug Lea 就设计一个简单的路由规则:


  • submission task 放到 WorkQueue 数组的「偶数」下标中


  • worker task 放在 WorkQueue 的「奇数」下标中,并且只有奇数下标才有线程( worker )与之相对


应局部丰富一下上图就是这样滴:


微信图片_20220511193522.png


每个任务执行时间都是不一样的(当然是在 CPU 眼里),执行快的线程的工作队列的任务就可能是空的,为了最大化利用 CPU 资源,就允许空闲线程拿取其它任务队列中的内容,这个过程就叫做 work-stealing (工作窃取)


当前线程要执行一个任务,其他线程还有可能过来窃取任务,这就会产生竞争,为了减少竞争,WorkQueue 就设计成了一个双端队列:


  • 支持 LIFO(last-in-first-out) 的push(放)和pop(拿)操作——操作 top 端


  • 支持 FIFO (first-in-first-out) 的 poll (拿)操作——操作 base 端


线程(worker)操作自己的 WorkQueue 默认是 LIFO 操作(可选FIFO),当线程(worker)尝试窃取其他 WorkQueue 里的任务时,这个时候执行的是FIFO操作,即从 base 端窃取,用图丰富一下就是这样滴:


微信图片_20220511193556.png


这样的好处非常明显了:


  1. LIFO 操作只有对应的 worker 才能执行,push和pop不需要考虑并发


  1. 拆分时,越大的任务越在WorkQueue的base端,尽早分解,能够尽快进入计算


从 WorkQueue 的成员变量的修饰符中也能看出一二了(base 有 volatile 修饰,而 top 却没有):


volatile int base;         // index of next slot for poll
int top;                   // index of next slot for push


到这里,相信你已经了解 ForkJoinPool 的基本实现原理了,但也会伴随着很多疑问(这都是怎么实现的?),比如:


  • 有竞争就需要锁,ForkJoinPool 是如何控制状态的呢?


  • ForkJoinPool 的线程数是怎么控制的呢?


  • 上面说的路由规则的具体逻辑是什么呢?


  • ......


保留住这些问题,一点点看源码来了解一下吧:


微信图片_20220511193700.png


源码分析(JDK 1.8)


ForkJoinPool 的源码涉及到大量的位运算,这里会把核心部分说清楚,想要理解的更深入,还需要大家自己一点点追踪查看


结合上面的铺垫,你应该知道 ForkJoinPool 里有三个重要的角色:


  • ForkJoinWorkerThread(继承 Thread):就是我们上面说的线程(Worker)


  • WorkQueue:双向的任务队列


  • ForkJoinTask:Worker 执行的对象


源码分析的整个流程也是围绕这几个类的方法来说明,但在了解这三个角色之前,我们需要先了解 ForkJoinPool 都为这三个角色铺垫了哪些内容


image.gif


故事就得从 ForkJoinPool 的构造方法说起


ForkJoinPool 构造方法

public ForkJoinPool() {
  this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
       defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {
  this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
  this(checkParallelism(parallelism),
       checkFactory(factory),
       handler,
       asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
       "ForkJoinPool-" + nextPoolId() + "-worker-");
  checkPermission();
}


除了以上三个构造方法之外,在 JDK1.8 中还增加了另外一种初始化 ForkJoinPool 对象的方式(QQ:这是什么设计模式?):


static final ForkJoinPool common;
/**
     * @return the common pool instance
     * @since 1.8
     */
public static ForkJoinPool commonPool() {
  // assert common != null : "static init error";
  return common;
}


Common 是在静态块里面初始化的(只会被执行一次):


common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});
private static ForkJoinPool makeCommonPool() {
  int parallelism = -1;
  ... 其他默认初始化内容 
    if (parallelism < 0 && // default 1 less than #cores
        (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
      parallelism = 1;
  if (parallelism > MAX_CAP)
    parallelism = MAX_CAP;
  // 执行上面的构造方法
  return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                          "ForkJoinPool.commonPool-worker-");
}


因为这是一个单例通用的 ForkJoinPool,所以切记:


如果使用通用 ForkJoinPool,最好只做 CPU 密集型的计算操作,不要有不确定性的 I/O 内容在任务里面,以防拖垮整体


上面所有的构造方法最后都会调用这个私有方法:


private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode,
                     String workerNamePrefix) {
  this.workerNamePrefix = workerNamePrefix;
  this.factory = factory;
  this.ueh = handler;
  this.config = (parallelism & SMASK) | mode;
  long np = (long)(-parallelism); // offset ctl counts
  this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}    


参数有点多,在这里解释一下每个参数的含义:


序号 参数名 描述/解释
1 parallelism 并行度,这并不是定义的线程数,具体线程数,以及 WorkQueue 的长度等都是根据这个并行度来计算的,通过上面 makeCommonPool 方法可以知道,parallelism 默认值是 CPU 核心线程数减 1
2 factory 很常见了,创建 ForkJoinWorkerThread 的工厂接口
3 handler 每个线程的异常处理器
4 mode 上面说的 WorkQueue 的模式,LIFO/FIFO;
5 workerNamePrefix ForkJoinWorkerThread的前缀名称
6 ctl 线程池的核心控制线程字段


在构造方法中就已经有位运算了,太难了:


微信图片_20220511194005.png


想知道 ForkJoinPool 的成员变量 config 要表达的意思,就要仔细拆开来看


static final int SMASK        = 0xffff;        // short bits == max index
this.config = (parallelism & SMASK) | mode;


parallelism & SMASK 其实就是要保证并行度的值不能大于 SMASK,上面所有的构造方法在传入 parallelism 的时候都会调用 checkParallelism 来检查合法性:


static final int MAX_CAP      = 0x7fff;        // max #workers - 1
private static int checkParallelism(int parallelism) {
        if (parallelism <= 0 || parallelism > MAX_CAP)
            throw new IllegalArgumentException();
        return parallelism;
    }


可以看到 parallelism 的最大值就是 MAX_CAP 了,0x7fff 肯定小于0xffff。所以 config 的值其实就是:


this.config = parallelism | mode;


这里假设 parallelism 就是 MAX_CAP , 然后与 mode 进行或运算,其中 mode 有三种:


  • LIFO_QUEUE


  • FIFO_QUEUE


  • SHARED_QUEUE


下面以 LIFO_QUEUE 和 FIFO_QUEUE 举例说明:


 // Mode bits for ForkJoinPool.config and WorkQueue.config
 static final int MODE_MASK    = 0xffff << 16;  // top half of int
 static final int LIFO_QUEUE   = 0;
 static final int FIFO_QUEUE   = 1 << 16;
 static final int SHARED_QUEUE = 1 << 31;       // must be negative


微信图片_20220511194155.png


所以 parallelism | mode 根据 mode 的不同会产生两种结果,但是会得到一个确认的信息:


config 的第 17 位表示模式,低 15 位表示并行度 parallelism


当我们需要从 config 中获取模式 mode 时候,只需要用mode 掩码 (MODE_MASK)和 config 做与运算就可以了


微信图片_20220511194227.png


所以一张图概括 config 就是:


微信图片_20220511194249.png


微信图片_20220511194305.png


long np = (long)(-parallelism); // offset ctl counts


上面这段代码就是将并行度 parallelism 补码转换为 long 型,以 MAX_CAP 作为并行度为例,np 的值就是


微信图片_20220511194334.png


这个 np 的值,就会用作 ForkJoinPool 成员变量 ctl 的计算:


// Active counts 活跃线程数
private static final int  AC_SHIFT   = 48;
private static final long AC_UNIT    = 0x0001L << AC_SHIFT;
private static final long AC_MASK    = 0xffffL << AC_SHIFT;
// Total counts 总线程数
private static final int  TC_SHIFT   = 32;
private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
private static final long TC_MASK    = 0xffffL << TC_SHIFT;
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
// 计算 ctl 
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);


  • np << AC_SHIFT 即 np 向左移动 48 位,这样原来的低 16 位变成了高 16 位,再用 AC 掩码(AC_MASK) 做与运算,也就是说 ctl 的 49 ~ 64 位表示活跃线程数


  • np << TC_SHIFT 即 np 向左移动 32 位,这样原来的低 16 位变成了 33 ~ 48 位,再用 TC 掩码做与运算,也就是说 ctl 的 33 ~ 48 位表示总线程数


最后二者再进行或运算,如果并行度还是 MAX_CAP ,那 ctl 的最后结果就是:


微信图片_20220511194417.png

到这里,我们才阅读完一个构造函数的内容,从最终的结论可以看出,初始化后 AC = TC,并且 ctl 是一个小于零的数,ctl 是 64 位的 long 类型,低 32 位是如何构造的并没有在构造函数中体现出来,但注释给了明确的说明:


/*
* Bits and masks for field ctl, packed with 4 16 bit subfields:
* AC: Number of active running workers minus target parallelism
* TC: Number of total workers minus target parallelism
* SS: version count and status of top waiting thread
* ID: poolIndex of top of Treiber stack of waiters
*
* When convenient, we can extract the lower 32 stack top bits
* (including version bits) as sp=(int)ctl.  The offsets of counts
* by the target parallelism and the positionings of fields makes
* it possible to perform the most common checks via sign tests of
* fields: When ac is negative, there are not enough active
* workers, when tc is negative, there are not enough total
* workers.  When sp is non-zero, there are waiting workers.  To
* deal with possibly negative fields, we use casts in and out of
* "short" and/or signed shifts to maintain signedness.
*
* Because it occupies uppermost bits, we can add one active count
* using getAndAddLong of AC_UNIT, rather than CAS, when returning
* from a blocked join.  Other updates entail multiple subfields
* and masking, requiring CAS.
*/


这段注释主要说明了低 32 位的作用(后面会从源码中体现出来,这里先有个印象会对后面源码阅读有帮助),按注释含义先完善一下 ctl 的值:


微信图片_20220511194506.png


  • SS:栈顶工作线程状态和版本数(每一个线程在挂起时都会持有前一个等待线程所在工作队列的索引,由此构成一个等待的工作线程栈,栈顶是最新等待的线程),第一位表示状态 1:不活动(inactive)0:活动(active),后15表示版本号,防止 ABA 问题


  • ID: 栈顶工作线程所在工作队列的索引


注释中还说,另 sp=(int)ctl,即获取 64 位 ctl 的低 32 位(SS | ID),因为低 32 位都是创建出线程之后才会存在的值,所以推断出,如果 sp != 0, 就存在等待的工作线程,唤醒使用就行,不用创建新的线程。这样就通过 ctl 可以获取到有关线程所需要的一切信息了


微信图片_20220511194545.png


微信图片_20220511194559.png






目录
打赏
0
1
0
0
1
分享
相关文章
0039Java程序设计-基于java校园闲置物交易系统论文
0039Java程序设计-基于java校园闲置物交易系统论文
98 0
手机可跑,3.8B参数量超越GPT-3.5!微软发布Phi-3技术报告:秘密武器是洗干净数据
【5月更文挑战第16天】微软发布 Phi-3 技术报告,介绍了一个拥有3.8B参数的新语言模型,超越GPT-3.5,成为最大模型之一。 Phi-3 在手机上运行的特性开启了大型模型移动应用新纪元。报告强调数据清洗是关键,通过优化设计实现高效运行。实验显示 Phi-3 在多项NLP任务中表现出色,但泛化能力和数据隐私仍是挑战。该模型预示着AI领域的未来突破。[[论文链接](https://arxiv.org/pdf/2404.14219.pdf)]
106 2
ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(下)
ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(下)
ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(下)
如何做到一站检索前沿主流 AIGC / GPT 文章?定时任务抓取文章!
如何做到一站检索前沿主流 AIGC / GPT 文章?定时任务抓取文章!
310 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等