知识回顾
并发工具类我们已经讲了很多,这些工具类的「目标」是让我们只关注任务本身,并且忽视线程间合作细节,简化了并发编程难度的同时,也增加了很多安全性。工具类的对使用者的「目标」虽然一致,但每一个工具类本身都有它独特的应用场景,比如:
- 我会手动创建线程,为什么要使用线程池? 介绍了使用线程池管理线程将一个大任务分解成多个子任务来简单执行,借助 不会用Java Future,我怀疑你泡茶没我快, 又是超长图文!! 的 Future 特性获取子任务执行结果——二者结合使用就可以处理简单的并行任务
- 搞定 CompletableFuture,并发异步编程和编写串行程序还有什么区别? 借助 CompletableFuture 大大降低了异步编程的难度——使用串行的思维对任务进行编排执行(AND 或 OR 聚合)
- 既生 ExecutorService, 何生 CompletionService? 由于任务完成时间有先后,为避免等待阻塞——CompletionService 是批量并行任务的最佳选择
将上面三种通用场景形象化展示一下:
结合上图相信你的脑海里已经浮现出这几个工具类的具体实现方式,感觉这已经涵盖了所有的并发场景。
TYTS,以上这些方式的子线程接到任务后不会再继续拆分成「子子」任务,也就是说,子线程即便接到很大或很复杂的任务也得硬着头皮努力执行完,很显然这个大任务是问题关键
如果能把大任务拆分成更小的子问题,直到子问题简单到可以直接求解就好了,这就是分治的思想
分治思想
在计算机科学中,分治法是一种很重要的算法。字面上的解释是「分而治之」,就是把一个复杂的问题分成两个或更多的相同或相似的子问题,再把子问题分成更小的子问题……直到最后子问题可以简单的直接求解,原问题的解就变成了子问题解的合并。
这个技巧是很多高效算法的基础,如排序算法 (快速排序,归并排序),傅立叶变换 (快速傅立叶变换)……,如果你是搞大数据的,MapReduce 就是分支思想的典型,如果你想更详细的理解分治相关的算法,请参考这篇一文图解分治算法和思想
结合上面的描述,相信你脑海中已经构建出来分治的模型了:
那所有的大任务都能用分治算法来解决吗?很显然不是的
分治法适用的情况
总体来说,分治法所能解决的问题一般具有以下几个特征:
- 该问题的规模缩小到一定的程度就可以容易地解决
- 该问题可以分解为若干个规模较小的相同问题,即该问题具有最优子结构性质。
- 利用该问题分解出的子问题的解可以合并为该问题的解;
- 该问题所分解出的各个子问题是相互独立的,即子问题之间不包含公共的子子问题
了解了分治算法的核心思想,我们就来看看 Java 是如何利用分治思想拆分与合并任务的吧
ForkJoin
有子任务,自然要用到多线程。我们很早之前说过,执行子任务的线程不允许单独创建,要用线程池管理。秉承相同设计理念,再结合分治算法, ForkJoin 框架中就出现了 ForkJoinPool 和 ForkJoinTask。正所谓:
天对地,雨对风。大陆对长空。山花对海树,赤曰对苍穹
套用已有知识,简单理解就是这样滴:
我们之前说过无数次,JDK 不会重复造轮子,这里谈及相似是为了让大家有个简单的直观印象,内里肯定有所差别,我们先大致看一下这两个类:
ForkJoinTask
又是这个男人,Doug Lea
,怎么就那么牛(破音)
/** * Abstract base class for tasks that run within a {@link ForkJoinPool}. * A {@code ForkJoinTask} is a thread-like entity that is much * lighter weight than a normal thread. Huge numbers of tasks and * subtasks may be hosted by a small number of actual threads in a * ForkJoinPool, at the price of some usage limitations. * * @since 1.7 * @author Doug Lea */ public abstract class ForkJoinTask<V> implements Future<V>, Serializable
可以看到 ForkJoinTask
实现了 Future
接口(那就是具有 Future 接口的特性),同样如其名,fork()
和 join()
自然是它的两个核心方法
fork()
: 异步执行一个子任务(上面说的拆分)
join()
: 阻塞当前线程等待子任务的执行结果(上面说的合并)
另外,从上面代码中可以看出,ForkJoinTask
是一个抽象类,在分治模型中,它还有两个抽象子类 RecursiveAction
和 RecursiveTask
那这两个子抽象类有什么差别呢?如果你打开 IDE,你应该一眼就能看出差别,so easy
public abstract class RecursiveAction extends ForkJoinTask<Void>{ ... /** * The main computation performed by this task. */ protected abstract void compute(); ... } public abstract class RecursiveTask<V> extends ForkJoinTask<V>{ ... protected abstract void compute(); ... }
两个类里面都定义了一个抽象方法 compute()
,需要子类重写实现具体逻辑
那子类要遵循什么逻辑重写这个方法呢?
遵循分治思想,重写的逻辑很简单,就是回答三个问题:
- 什么时候进一步拆分任务?
- 什么时候满足最小可执行任务,即不再进行拆分?
- 什么时候汇总子任务结果
用「伪代码」再翻译一下上面这段话,大概就是这样滴:
if(任务小到不用继续拆分){ 直接计算得到结果 }else{ 拆分子任务 调用子任务的fork()进行计算 调用子任务的join()合并计算结果 }
(作为程序员,如果你写过递归运算,这个逻辑理解起来是非常简单的)
介绍到这里,就可以用 ForkJoin 干些事情了——经典 Fibonacci 计算就可以用分治思想(不信,你逐条按照上面分治算法适用情况自问自答一下?),直接借用官方 Docs (注意看 compute 方法),额外添加个 main 方法来看一下:
@Slf4j public class ForkJoinDemo { public static void main(String[] args) { int n = 20; // 为了追踪子线程名称,需要重写 ForkJoinWorkerThreadFactory 的方法 final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> { final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); worker.setName("my-thread" + worker.getPoolIndex()); return worker; }; //创建分治任务线程池,可以追踪到线程名称 ForkJoinPool forkJoinPool = new ForkJoinPool(4, factory, null, false); // 快速创建 ForkJoinPool 方法 // ForkJoinPool forkJoinPool = new ForkJoinPool(4); //创建分治任务 Fibonacci fibonacci = new Fibonacci(n); //调用 invoke 方法启动分治任务 Integer result = forkJoinPool.invoke(fibonacci); log.info("Fibonacci {} 的结果是 {}", n, result); } } @Slf4j class Fibonacci extends RecursiveTask<Integer> { final int n; Fibonacci(int n) { this.n = n; } @Override public Integer compute() { //和递归类似,定义可计算的最小单元 if (n <= 1) { return n; } // 想查看子线程名称输出的可以打开下面注释 //log.info(Thread.currentThread().getName()); Fibonacci f1 = new Fibonacci(n - 1); // 拆分成子任务 f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); // f1.join 等待子任务执行结果 return f2.compute() + f1.join(); } }
执行结果如下:
进展到这里,相信基本的使用就已经搞定了,上面代码中使用了 ForkJoinPool,那问题来了:
池化既然是一类思想,Java 已经有了ThreadPoolExecutor
,为什么又要搞出个ForkJoinPool
呢?
借助下面这张图,先来回忆一下 ThreadPoolExecutor 的实现原理(详情请看为什么要使用线程池):
一眼就能看出来这是典型的生产者/消费者
模式,消费者线程都从一个共享的 Task Queue 中消费提交的任务。ThreadPoolExecutor 简单的并行操作主要是为了执行时间不确定的任务(I/O 或定时任务等)
JDK 重复造轮子是不可能的,分治思想其实也可以理解成一种父子任务依赖的关系,当依赖层级非常深,用 ThreadPoolExecutor
来处理这种关系很显然是不太现实的,所以 ForkJoinPool
作为功能补充就出现了
ForkJoinPool
任务拆分后有依赖关系,还得减少线程之间的竞争,那就让线程执行属于自己的 task 就可以了呗,所以较 ThreadPoolExecutor
的单个 TaskQueue 的形式,ForkJoinPool
是多个 TaskQueue的形式,简单用图来表示,就是这样滴:
有多个任务队列,所以在 ForkJoinPool 中就有一个数组形式的成员变量 WorkQueue[]
。那问题又来了
任务队列有多个,提交的任务放到哪个队列中呢?(上图中的
Router Rule
部分)
这就需要一套路由规则,从上面的代码 Demo 中可以理解,提交的任务主要有两种:
- 有外部直接提交的(submission task)
- 也有任务自己 fork 出来的(worker task)
为了进一步区分这两种 task,Doug Lea 就设计一个简单的路由规则:
- 将
submission task
放到 WorkQueue 数组的「偶数」
下标中
- 将
worker task
放在 WorkQueue 的「奇数」
下标中,并且只有奇数下标才有线程( worker )与之相对
应局部丰富一下上图就是这样滴:
每个任务执行时间都是不一样的(当然是在 CPU 眼里),执行快的线程的工作队列的任务就可能是空的,为了最大化利用 CPU 资源,就允许空闲线程拿取其它任务队列中的内容,这个过程就叫做 work-stealing
(工作窃取)
当前线程要执行一个任务,其他线程还有可能过来窃取任务,这就会产生竞争,为了减少竞争,WorkQueue 就设计成了一个双端队列:
- 支持 LIFO(last-in-first-out) 的push(放)和pop(拿)操作——操作 top 端
- 支持 FIFO (first-in-first-out) 的 poll (拿)操作——操作 base 端
线程(worker)操作自己的 WorkQueue 默认是 LIFO 操作(可选FIFO),当线程(worker)尝试窃取其他 WorkQueue 里的任务时,这个时候执行的是FIFO操作,即从 base 端窃取,用图丰富一下就是这样滴:
这样的好处非常明显了:
- LIFO 操作只有对应的 worker 才能执行,push和pop不需要考虑并发
- 拆分时,越大的任务越在WorkQueue的base端,尽早分解,能够尽快进入计算
从 WorkQueue 的成员变量的修饰符中也能看出一二了(base 有 volatile 修饰,而 top 却没有):
volatile int base; // index of next slot for poll int top; // index of next slot for push
到这里,相信你已经了解 ForkJoinPool 的基本实现原理了,但也会伴随着很多疑问(这都是怎么实现的?),比如:
- 有竞争就需要锁,ForkJoinPool 是如何控制状态的呢?
- ForkJoinPool 的线程数是怎么控制的呢?
- 上面说的路由规则的具体逻辑是什么呢?
- ......
保留住这些问题,一点点看源码来了解一下吧:
源码分析(JDK 1.8)
ForkJoinPool 的源码涉及到大量的位运算,这里会把核心部分说清楚,想要理解的更深入,还需要大家自己一点点追踪查看
结合上面的铺垫,你应该知道 ForkJoinPool 里有三个重要的角色:
- ForkJoinWorkerThread(继承 Thread):就是我们上面说的线程(Worker)
- WorkQueue:双向的任务队列
- ForkJoinTask:Worker 执行的对象
源码分析的整个流程也是围绕这几个类的方法来说明,但在了解这三个角色之前,我们需要先了解 ForkJoinPool 都为这三个角色铺垫了哪些内容
故事就得从 ForkJoinPool 的构造方法说起
ForkJoinPool 构造方法
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }
除了以上三个构造方法之外,在 JDK1.8 中还增加了另外一种初始化 ForkJoinPool 对象的方式(QQ:这是什么设计模式?):
static final ForkJoinPool common; /** * @return the common pool instance * @since 1.8 */ public static ForkJoinPool commonPool() { // assert common != null : "static init error"; return common; }
Common 是在静态块里面初始化的(只会被执行一次):
common = java.security.AccessController.doPrivileged (new java.security.PrivilegedAction<ForkJoinPool>() { public ForkJoinPool run() { return makeCommonPool(); }}); private static ForkJoinPool makeCommonPool() { int parallelism = -1; ... 其他默认初始化内容 if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; // 执行上面的构造方法 return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); }
因为这是一个单例通用的 ForkJoinPool,所以切记:
如果使用通用 ForkJoinPool,最好只做 CPU 密集型的计算操作,不要有不确定性的 I/O 内容在任务里面,以防拖垮整体
上面所有的构造方法最后都会调用这个私有方法:
private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
参数有点多,在这里解释一下每个参数的含义:
序号 | 参数名 | 描述/解释 |
1 | parallelism | 并行度,这并不是定义的线程数,具体线程数,以及 WorkQueue 的长度等都是根据这个并行度来计算的,通过上面 makeCommonPool 方法可以知道,parallelism 默认值是 CPU 核心线程数减 1 |
2 | factory | 很常见了,创建 ForkJoinWorkerThread 的工厂接口 |
3 | handler | 每个线程的异常处理器 |
4 | mode | 上面说的 WorkQueue 的模式,LIFO/FIFO; |
5 | workerNamePrefix | ForkJoinWorkerThread的前缀名称 |
6 | ctl | 线程池的核心控制线程字段 |
在构造方法中就已经有位运算了,太难了:
想知道 ForkJoinPool 的成员变量 config 要表达的意思,就要仔细拆开来看
static final int SMASK = 0xffff; // short bits == max index this.config = (parallelism & SMASK) | mode;
parallelism & SMASK
其实就是要保证并行度的值不能大于 SMASK,上面所有的构造方法在传入 parallelism 的时候都会调用 checkParallelism
来检查合法性:
static final int MAX_CAP = 0x7fff; // max #workers - 1 private static int checkParallelism(int parallelism) { if (parallelism <= 0 || parallelism > MAX_CAP) throw new IllegalArgumentException(); return parallelism; }
可以看到 parallelism 的最大值就是 MAX_CAP
了,0x7fff
肯定小于0xffff
。所以 config 的值其实就是:
this.config = parallelism | mode;
这里假设 parallelism 就是 MAX_CAP
, 然后与 mode 进行或运算
,其中 mode 有三种:
- LIFO_QUEUE
- FIFO_QUEUE
- SHARED_QUEUE
下面以 LIFO_QUEUE 和 FIFO_QUEUE 举例说明:
// Mode bits for ForkJoinPool.config and WorkQueue.config static final int MODE_MASK = 0xffff << 16; // top half of int static final int LIFO_QUEUE = 0; static final int FIFO_QUEUE = 1 << 16; static final int SHARED_QUEUE = 1 << 31; // must be negative
所以 parallelism | mode
根据 mode 的不同会产生两种结果,但是会得到一个确认的信息:
config 的第 17 位表示模式,低 15 位表示并行度 parallelism
当我们需要从 config 中获取模式 mode 时候,只需要用mode 掩码 (MODE_MASK)和 config 做与运算
就可以了
所以一张图概括 config 就是:
long np = (long)(-parallelism); // offset ctl counts
上面这段代码就是将并行度 parallelism 补码转换为 long 型,以 MAX_CAP
作为并行度为例,np 的值就是
这个 np 的值,就会用作 ForkJoinPool 成员变量 ctl 的计算:
// Active counts 活跃线程数 private static final int AC_SHIFT = 48; private static final long AC_UNIT = 0x0001L << AC_SHIFT; private static final long AC_MASK = 0xffffL << AC_SHIFT; // Total counts 总线程数 private static final int TC_SHIFT = 32; private static final long TC_UNIT = 0x0001L << TC_SHIFT; private static final long TC_MASK = 0xffffL << TC_SHIFT; private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign // 计算 ctl this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
np << AC_SHIFT
即 np 向左移动 48 位,这样原来的低 16 位变成了高 16 位,再用 AC 掩码(AC_MASK) 做与运算
,也就是说 ctl 的 49 ~ 64 位表示活跃线程数
np << TC_SHIFT
即 np 向左移动 32 位,这样原来的低 16 位变成了 33 ~ 48 位,再用 TC 掩码做与运算
,也就是说 ctl 的 33 ~ 48 位表示总线程数
最后二者再进行或运算,如果并行度还是 MAX_CAP
,那 ctl 的最后结果就是:
到这里,我们才阅读完一个构造函数的内容,从最终的结论可以看出,初始化后 AC = TC,并且 ctl 是一个小于零的数,ctl 是 64 位的 long 类型,低 32 位是如何构造的并没有在构造函数中体现出来,但注释给了明确的说明:
/* * Bits and masks for field ctl, packed with 4 16 bit subfields: * AC: Number of active running workers minus target parallelism * TC: Number of total workers minus target parallelism * SS: version count and status of top waiting thread * ID: poolIndex of top of Treiber stack of waiters * * When convenient, we can extract the lower 32 stack top bits * (including version bits) as sp=(int)ctl. The offsets of counts * by the target parallelism and the positionings of fields makes * it possible to perform the most common checks via sign tests of * fields: When ac is negative, there are not enough active * workers, when tc is negative, there are not enough total * workers. When sp is non-zero, there are waiting workers. To * deal with possibly negative fields, we use casts in and out of * "short" and/or signed shifts to maintain signedness. * * Because it occupies uppermost bits, we can add one active count * using getAndAddLong of AC_UNIT, rather than CAS, when returning * from a blocked join. Other updates entail multiple subfields * and masking, requiring CAS. */
这段注释主要说明了低 32 位的作用(后面会从源码中体现出来,这里先有个印象会对后面源码阅读有帮助),按注释含义先完善一下 ctl 的值:
- SS:栈顶工作线程状态和版本数(每一个线程在挂起时都会持有前一个等待线程所在工作队列的索引,由此构成一个等待的工作线程栈,栈顶是最新等待的线程),第一位表示状态
1:不活动(inactive)
;0:活动(active)
,后15表示版本号,防止 ABA 问题
- ID: 栈顶工作线程所在工作队列的索引
注释中还说,另 sp=(int)ctl
,即获取 64 位 ctl 的低 32 位(SS | ID
),因为低 32 位都是创建出线程之后才会存在的值,所以推断出,如果 sp != 0, 就存在等待的工作线程,唤醒使用就行,不用创建新的线程。这样就通过 ctl 可以获取到有关线程所需要的一切信息了