- 方案三:采用ForkJoinPool(Fork/Join)
前面花了点时间讲解了 ForkJoinPool 之前的实现方法,主要为了在代码的编写难度上进行一下对比。现在就列出本篇文章的重点——ForkJoinPool 的实现方法。
/** * 采用ForkJoin来计算求和 * * @author fangshixiang@vipkid.com.cn * @description // * @date 2018/11/5 15:09 */ public class ForkJoinCalculator implements Calculator { private ForkJoinPool pool; //执行任务RecursiveTask:有返回值 RecursiveAction:无返回值 private static class SumTask extends RecursiveTask<Long> { private long[] numbers; private int from; private int to; public SumTask(long[] numbers, int from, int to) { this.numbers = numbers; this.from = from; this.to = to; } //此方法为ForkJoin的核心方法:对任务进行拆分 拆分的好坏决定了效率的高低 @Override protected Long compute() { // 当需要计算的数字个数小于6时,直接采用for loop方式计算结果 if (to - from < 6) { long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } return total; } else { // 否则,把任务一分为二,递归拆分(注意此处有递归)到底拆分成多少分 需要根据具体情况而定 int middle = (from + to) / 2; SumTask taskLeft = new SumTask(numbers, from, middle); SumTask taskRight = new SumTask(numbers, middle + 1, to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } } } public ForkJoinCalculator() { // 也可以使用公用的线程池 ForkJoinPool.commonPool(): // pool = ForkJoinPool.commonPool() pool = new ForkJoinPool(); } @Override public long sumUp(long[] numbers) { Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1)); pool.shutdown(); return result; } } 输出: 耗时:390ms 结果为:50000005000000
可以看出,使用了 ForkJoinPool 的实现逻辑全部集中在了 compute() 这个函数里,仅用了14行就实现了完整的计算过程。特别是,在这段代码里没有显式地“把任务分配给线程”,只是分解了任务,而把具体的任务到线程的映射交给了 ForkJoinPool 来完成。
方案四:采用并行流(JDK8以后的推荐做法)
public static void main(String[] args) { Instant start = Instant.now(); long result = LongStream.rangeClosed(0, 10000000L).parallel().reduce(0, Long::sum); Instant end = Instant.now(); System.out.println("耗时:" + Duration.between(start, end).toMillis() + "ms"); System.out.println("结果为:" + result); // 打印结果500500 } 输出: 耗时:130ms 结果为:50000005000000
并行流底层还是Fork/Join框架,只是任务拆分优化得很好。
耗时效率方面解释:Fork/Join 并行流等当计算的数字非常大的时候,优势才能体现出来。也就是说,如果你的计算比较小,或者不是CPU密集型的任务,不太建议使用并行处理
原理
**我一直以为,要理解一样东西的原理,最好就是自己尝试着去实现一遍。**根据上面的示例代码,可以看出 fork() 和 join() 是 Fork/Join Framework “魔法”的关键。我们可以根据函数名假设一下 fork() 和 join() 的作用:
fork():开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。
join():等待该任务的处理线程处理完毕,获得返回值。
疑问:当任务分解得越来越细时,所需要的线程数就会越来越多,而且大部分线程处于等待状态?
但是如果我们在上面的示例代码加入以下代码
System.out.println(pool.getPoolSize());
这会显示当前线程池的大小,在我的机器上这个值是4,也就是说只有4个工作线程。甚至即使我们在初始化 pool 时指定所使用的线程数为1时,上述程序也没有任何问题——除了变成了一个串行程序以外。
public ForkJoinCalculator() { pool = new ForkJoinPool(1); }
这个矛盾可以导出,我们的假设是错误的,并不是每个 fork() 都会促成一个新线程被创建,而每个 join() 也不是一定会造成线程被阻塞。Fork/Join Framework 的实现算法并不是那么“显然”,而是一个更加复杂的算法——这个算法的名字就叫做work stealing 算法。
1.ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
2.每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
3.每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
4.在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
5.在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
至于Fork和Join源码级别的的细节,本文不做过多描述了~~
submit() 和 fork() 其实没有本质区别,只是提交对象变成了 submitting queue 而已(还有一些同步,初始化的操作)。submitting queue 和其他 work queue 一样,是工作线程”窃取“的对象,因此当其中的任务被一个工作线程成功窃取时,就意味着提交的任务真正开始进入执行阶段。