内容来自《 java8实战 》,本篇文章内容均为非盈利,旨为方便自己查询、总结备份、开源分享。如有侵权请告知,马上删除。
书籍购买地址:java8实战
- 这篇是接上一篇并行数据处理与性能余下的问题:forkjoin进行讲解的
- forkjoin的目的就是以递归的方式来拆分更小的任务,然后将每个小任务处理后的结果在合并,fork就是拆分,join就是合并
-
RecursiveTask
- 要把任务提交到这个池,就必须创建 RecursiveTask的子类,R就是你需要返回的结果类型,如果不返回结果就使用RecursiveAction类型,继承这个抽象类只需要实现一个方法
protected abstract V compute();
- 此方法定以了将任务拆分成子任务的逻辑,以及无法再才分或不方便拆分时生成单个子任务结果的逻辑,(它定义了咋拆分又定义了怎么把子任务聚合)
-
先实现一下,再来说原理
- 还是实现1到一千万的累加和
public class ForkJoinImpl extends java.util.concurrent.RecursiveTask<Long> { //临界值,就是结束值减开始值的结果如果小于这个值那么就不拆分了,大于这个值才会拆分 private final int MEDIAN_NUM = 100000; //从多少计算 private int start_num = 0; //计算到多少 private int end_num = 0; //构造 public ForkJoinImpl(int start_num, int end_num) { this.start_num = start_num; this.end_num = end_num; } @Override protected Long compute() { //结束值减开始值的结果 int temp = end_num - start_num; //判断结束值减开始值的结果是否小于上面定义的临界值 if (temp <= MEDIAN_NUM){ //如果小的话,那么就不进行拆分了,就直接调用方法开始计算 return sequentiallySum(); } //到这就代表结束值减开始值的结果是大于临界值的 //继续进行拆分 //start_num到start_num + temp / 2是把数据的左半部分形成一个新的task //比如0到10,那么就是 10-0=10,temp=10,start_num=0,所以形成的新task就是(0,10/2=5),也就是左半部分 ForkJoinImpl leftTask = new ForkJoinImpl(start_num,start_num + temp / 2); //利用ForkJoinPool中的线程异步执行新创建的子任务 leftTask.fork(); //这创建的就是数据的后半段,start_num + temp / 2 = 0+10/2 = 6,所以形成的新task就是(0+10/2=6,10),也就是右半部分 ForkJoinImpl rightTask = new ForkJoinImpl(start_num + temp / 2,end_num); //同时执行第二个子任务,有可能允许进一步划分 Long rightResult = rightTask.compute(); //读取第一个子任务的结果,如果没有完成就等待 Long leftResult = leftTask.join(); //该任务的结果是两个子任务结果的组合 return rightResult + leftResult; } //计算方法:在不能进行拆分的时候进行计算 private Long sequentiallySum(){ long sum = 0; for (int i = start_num; i <= end_num; i++) { sum += i; } return sum; } } @Test public void test() throws Exception { ForkJoinImpl forkJoin = new ForkJoinImpl(0, 10000000); Long invoke = new ForkJoinPool().invoke(forkJoin); System.out.println("invoke = " + invoke); }
- 上面的流程的总结:当把ForkJoinImpl对象传给ForkJoinPool时,这个任务就由池中的一个线程执行,这个线程会调用任务的compute方法,该方法会检查任务是够孝道足以顺序执行,也就是我们上面定义的临界值,如果不够小就会要求再次拆分数据,并分给一个新的ForkJoinImpl,新的ForkJoinImpl也是由pool安排执行。因此这个过程是递归重复的,把缘任务拆分为更小的任务。这时候达到临街值要求后,会顺序计算每个任务的结果,然后由分支过程创建的任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。如下面的图
-
使用此框架的最佳做法
- 对一个任务嗲用join方法会阻塞调用方,直到该任务做出结果,因此有必要在两个子任务的计算都开始之后在调用它,否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须去等待另一个任务完成才能启动
- 对子任务调用fork方法可以把他排进pool,同时对左边和右边的子任务调用它似乎很自然,但这样做的效率比直接对其中一个调用compute低,这样做可以以为其中一个子任务重用同一个线程,从而避免在线程池中多分配一个任务造成的开销。比如有些写
//执行子任务 left.fork(); right.fork(); //获取子任务结果 int lResult = left.join(); int rResult = right.join(); 而更高效的写法是 leftTask.fork(); ForkJoinImpl rightTask = new ForkJoinImpl(start_num + temp / 2,end_num); //自己理解的是它一直判断是否有大于临界值,如果有就左半部分进行加入pool,而右边一直去判断是否还有大于临界值的情况 //而不至于两方都在等待,并且上面的写法需要left返回后右边才会返回。如果理解的不对请评论指正 Long rightResult = rightTask.compute(); Long leftResult = leftTask.join();
-
工作窃取
- 对于上面会产生很多的小任务然后分配到内核上进行计算,理想情况下是所有的小任务机会全部同时执行完毕,但是由于线程是随意切换的,这种情况会发生改变:可能A已经空闲了,但是B还是忙的要死,为了解决这样的一个问题,此框架就有工作窃取的技术了
- 在应用中,这意味着这些任务拆不多被平均分配到pool中的所有的线程上。但每个线程都为分配给他的任务保存一个双向队列,每完成一个任务,就会从队列头上取出下一个任务开始执行,当A已经忙完,B还在忙的时候,A并不会闲下来,而是随机选一个别的线程,从队列的尾巴上偷走一个任务,这个过程一直继续下去,直到所有的任务执行完毕,所有队列都情况,这就是为什么不划分为几个大任务而是很多小任务,这样有助于更好的在工作线程之间平衡负载