Java7中的ForkJoin并发框架初探(下)

简介:

前两篇文章已经对Fork Join的设计和JDK中源码的简要分析。这篇文章,我们来简单地看看我们在开发中怎么对JDK提供的工具类进行应用,以提高我们的需求处理效率。

Fork Join这东西确实用好了能给我们的任务处理提高效率,也为开发带来方便。但Fork Join不是那么容易用好的,我们先来看几个例子(反例)。

0. 反例错误分析

我们先来看看这篇文章中提供的例子:http://www.iteye.com/topic/643724 (因为是反例,就不提供超链接了,只以普通文本给出URL)

这篇文章是我学习和整理Fork Join时搜索到的一篇文章,其实总的来说这篇文章前面分析得还是比较好的,只是给出的第一个例子(有返回结果的RecursiveTask应用的例子)没有正确地对Fork Join进行应用。为了方便分析,还是贴下这个例子中具体的的代码吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public  class  Calculator  extends  RecursiveTask {
  
     private  static  final  int  THRESHOLD =  100 ;
     private  int  start;
     private  int  end;
  
     public  Calculator( int  start,  int  end) {
         this .start = start;
         this .end = end;
     }
  
     @Override
     protected  Integer compute() {
         int  sum =  0 ;
         if ((start - end) < THRESHOLD){
             for ( int  i = start; i< end;i++){
                 sum += i;
             }
         } else {
             int  middle = (start + end) / 2 ;
             Calculator left =  new  Calculator(start, middle);
             Calculator right =  new  Calculator(middle +  1 , end);
             left.fork();
             right.fork();
  
             sum = left.join() + right.join();
         }
         return  sum;
     }
  
}

我们看到其中一段已经高亮的代码,显示对两个子任务进行fork()调用,即分别提交给当前线程的任务队列,依次加到末尾。紧接着,又按照调用fork()的顺序执行两个子任务对象的join()方法。

其实,这样就有一个问题,在每次迭代中,第一个子任务会被放到线程队列的倒数第二个位置,第二个子任务是最后一个位置。当执行join()调用的时候,由于第一个子任务不在队列尾而不能通过执行ForkJoinWorkerThread的unpushTask()方法取出任务并执行,线程最终只能挂起阻塞,等待通知。而Fork Join本来的做法是想通过子任务的合理划分,避免过多的阻塞情况出现。这样,这个例子中的操作就违背了Fork Join的初衷,每次子任务的迭代,线程都会因为第一个子任务的join()而阻塞,加大了代码运行的成本,提高了资源开销,不利于提高程序性能。

除此之外,这段程序还是不能进入Fork Join的过程,因为还有一个低级错误。看下第15、16行代码的条件,就清楚了。按照逻辑,start必然是比end小的。这将导致所有任务都将以循环累加的方式完成,而不会执行fork()和join()。

由此可见,Fork Join的使用还是要注意对其本身的理解和对开发过程中细节的把握的。我们看下JDK中RecursiveAction和RecursiveTask这两个类。

1. RecursiveAction分析及应用实例

这两个类都是继承了ForkJoinTask,本身给出的实现逻辑并不多不复杂,在JDK的类文件中,它的注释比源码还要多。我们可以看下它的实现代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public  abstract  class  RecursiveAction  extends  ForkJoinTask<Void> {
     private  static  final  long  serialVersionUID = 5232453952276485070L;
  
     protected  abstract  void  compute();
  
     public  final  Void getRawResult() {  return  null ; }
  
     protected  final  void  setRawResult(Void mustBeNull) { }
  
     protected  final  boolean  exec() {
         compute();
         return  true ;
     }
}

我们看到其中两个方法是关于处理空返回值的方法。而exec方法则是调用了compute(),这个compute就是我们使用Fork Join时需要自己实现的逻辑。

我们可以看下API中给出的一个最简单最具体的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class  IncrementTask  extends  RecursiveAction {
    final  long [] array;  final  int  lo;  final  int  hi;
    IncrementTask( long [] array,  int  lo,  int  hi) {
      this .array = array;  this .lo = lo;  this .hi = hi;
    }
    protected  void  compute() {
      if  (hi - lo < THRESHOLD) {
        for  ( int  i = lo; i < hi; ++i)
          array[i]++;
      }
      else  {
        int  mid = (lo + hi) >>>  1 ;
        invokeAll( new  IncrementTask(array, lo, mid),
                  new  IncrementTask(array, mid, hi));
      }
    }
  }

大致的逻辑就是,对给定一个特定数组的某段,进行逐个加1的操作。我们看到else中的代码块,显示取一个lo和hi的中间值,此后分割成两个子任务,并进行invokeAll()调用。我们来看下继承自FutureTask的invokeAll()方法实现。很简单:

1
2
3
4
5
public  static  void  invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
     t2.fork();
     t1.invoke();
     t2.join();
}

对于参数中的两个子任务,对第二个子任务进行fork(),即放入线程对应队列的结尾,然后执行第一个子任务,再调用第二个子任务的join(),实际上就是跳转到第二个子任务,进行执行(当然如果不能执行,就需要阻塞等待了)。

其实invokeAll()是个重载方法,同名的还有另外两个,基本逻辑都是一样的,我们拿出一个通用一点的来看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public  static  void  invokeAll(ForkJoinTask<?>... tasks) {
     Throwable ex =  null ;
     int  last = tasks.length -  1 ;
     for  ( int  i = last; i >=  0 ; --i) {
         ForkJoinTask<?> t = tasks[i];
         if  (t ==  null ) {
             if  (ex ==  null )
                 ex =  new  NullPointerException();
         }
         else  if  (i !=  0 )
             t.fork();
         else  if  (t.doInvoke() < NORMAL && ex ==  null )
             ex = t.getException();
     }
     for  ( int  i =  1 ; i <= last; ++i) {
         ForkJoinTask<?> t = tasks[i];
         if  (t !=  null ) {
             if  (ex !=  null )
                 t.cancel( false );
             else  if  (t.doJoin() < NORMAL && ex ==  null )
                 ex = t.getException();
         }
     }
     if  (ex !=  null )
         UNSAFE.throwException(ex);
}

我们发现第一个子任务(i==0的情况)没有进行fork,而是直接执行,其余的统统先调用fork()放入任务队列,之后再逐一join()。其实我们注意到一个要点就是第一个任务不要fork()再join(),也就是上面中例子的错误所在,这样会造成阻塞,而不能充分利用Fork Join的特点,也就不能保证任务执行的性能。

Oracle的JavaSE7 API中在RecursiveAction里还有一个更复杂的例子,是计算double数组平方和的,由于代码较长,就不列在这里了。总体思路和上面是一样的,额外增加了动态阈值的判断,感兴趣的想深入理解的可以到这里去参考一下。

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/RecursiveAction.html

2. RecursiveTask简要说明

其实说完了RecursiveAction,RecursiveTask可以用“同理”来解释。实现代码也很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public  abstract  class  RecursiveTask<V>  extends  ForkJoinTask<V> {
     private  static  final  long  serialVersionUID = 5232453952276485270L;
  
     V result;
  
     protected  abstract  V compute();
  
     public  final  V getRawResult() {
         return  result;
     }
  
     protected  final  void  setRawResult(V value) {
         result = value;
     }
  
     protected  final  boolean  exec() {
         result = compute();
         return  true ;
     }
  
}

我们看到唯一不同的是返回结果的处理,其余都可以和RecursiveAction一样使用。

3. Fork Join应用小结

Fork Join是为我们提供了一个非常好的“分而治之”思想的实现平台,并且在一定程度上实现了“变串行并发为并行”。但Fork Join不是万能的页不完全是通用的,对于可很好分解成子任务的场景,我们可以对其进行应用,更多时候要考虑需


特别说明:尊重作者的劳动成果,转载请注明出处哦~~~http://blog.yemou.net/article/query/info/tytfjhfascvhzxcytp86
相关文章
|
并行计算 算法 Java
深入理解Java中的ForkJoin框架原理
深入理解Java中的ForkJoin框架原理
|
算法 Java 程序员
论文翻译 | 【深入挖掘Java技术】「底层原理专题」深入分析一下并发编程之父Doug Lea的纽约州立大学的ForkJoin框架的本质和原理
本文深入探讨了一个Java框架的设计、实现及其性能。该框架遵循并行编程的理念,通过递归方式将问题分解为多个子任务,并利用工作窃取技术进行并行处理。所有子任务完成后,其结果被整合以形成完整的并行程序。 在总体设计上,该框架借鉴了Cilk工作窃取框架的核心理念。其核心技术主要聚焦于高效的任务队列构建和管理,以及工作线程的管理。经过实际性能测试,我们发现大多数程序的并行加速效果显著,但仍有优化空间,未来可能需要进一步研究改进方案。
160 3
论文翻译 | 【深入挖掘Java技术】「底层原理专题」深入分析一下并发编程之父Doug Lea的纽约州立大学的ForkJoin框架的本质和原理
java8中的并行流,封装ForkJoin
并行流就是执行任务的时候分配给多个线程队列执行
|
Java
详解java中一个分而治之的框架ForkJoin
在古代,皇帝要想办成一件事肯定不会自己亲自去动手,而是把任务细分发给下面的大臣,下面的大臣也懒呀,于是把任务继续分成几个部分,继续下发,于是到了最后最终负责的人就完成了一个小功能。上面的领导再把这些结果一层一层汇总,最终返回给皇帝。这就是分而治之的思想,也是我们今天的主题ForkJoin。
268 0
详解java中一个分而治之的框架ForkJoin
|
运维 Java 大数据
Java并发JUC(java.util.concurrent)ForkJoin/异步回调
Java并发JUC(java.util.concurrent)ForkJoin/异步回调
Java并发JUC(java.util.concurrent)ForkJoin/异步回调
|
Java
java8学习:ForkJoin
内容来自《 java8实战 》,本篇文章内容均为非盈利,旨为方便自己查询、总结备份、开源分享。如有侵权请告知,马上删除。书籍购买地址:java8实战 这篇是接上一篇并行数据处理与性能余下的问题:forkjoin进行讲解的 forkjoin的目的就是以递归的方式来拆分更小的任务,然后将每个小任务处理.
3194 0
|
Java 并行计算 分布式计算
Java并发-ForkJoin
主要用于并行计算中,和 MapReduce 原理类似,都是把大的计算任务拆分成多个小任务并行计算。 public class ForkJoinExample extends RecursiveTask { private final int ...
887 0