Fork/Join框架(三)加入任务的结果

简介:

加入任务的结果

Fork/Join框架提供了执行返回一个结果的任务的能力。这些任务的类型是实现了RecursiveTask类。这个类继承了ForkJoinTask类和实现了执行者框架提供的Future接口。

在任务中,你必须使用Java API方法推荐的结构:


1 If (problem size < size){
2 tasks=Divide(task);
3 execute(tasks);
4 groupResults()
5 return result;
6 } else {
7 resolve problem;
8 return result;
9 }

如果这个任务必须解决一个超过预定义大小的问题,你应该将这个任务分解成更多的子任务,并且用Fork/Join框架来执行这些子任务。当这些子任务完成执行,发起的任务将获得所有子任务产生的结果 ,对这些结果进行分组,并返回最终的结果。最终,当在池中执行的发起的任务完成它的执行,你将获取整个问题地最终结果。

在这个指南中,你将学习如何使用Fork/Join框架解决这种问题,开发一个在文档中查找单词的应用程序。你将会实现以下两种任务类型:

  • 一个文档任务,将在文档中的行集合中查找一个单词。
  • 一个行任务,将在文档的一部分数据中查找一个单词。

所有任务将返回单词在文档的一部分中或行中出现的次数。

如何做…

根据以下这些步骤来实现这个例子:

1.创建一个Document类,它将产生用来模拟文档的字符串的二维数组。


1 public class Document {

2.创建一个带有一些单词的字符串数组。这个数组将被用来生成字符串二维数组。


1 private String words[]={"the","hello""goodbye","packt", "java","thread","pool","random","class","main"};

3.实现generateDocument()方法。它接收以下参数:行数、每行的单词数。这个例子返回一个字符串二维数组,来表示将要查找的单词。


1 public String[][] generateDocument(int numLines, int numWords,String word){

4.首先,创建生成这个文档必需的对象:字符串二维对象和生成随机数的Random对象。


1 int counter=0;
2 String document[][]=new String[numLines][numWords];
3 Random random=new Random();

5.用字符串填充这个数组。存储在每个位置的字符串是单词数组的随机位置,统计这个程序将要在生成的数组中查找的单词出现的次数。你可以使用这个值来检查程序是否执行正确。


1 for (int i=0; i<numLines; i++){
2 for (int j=0; j<numWords; j++) {
3 int index=random.nextInt(words.length);
4 document[i][j]=words[index];
5 if (document[i][j].equals(word)){
6 counter++;
7 }
8 }
9 }

6.将单词出现的次数写入控制台,并返回生成的二维数组。


1 System.out.println("DocumentMock: The word appears "+counter+" times in the document");
2  
3 return document;

7.创建一个DocumentTask类,指定它继承RecursiveTask类,并参数化为Integer类型。该类将实现统计单词在一组行中出现的次数的任务。


1 public class DocumentTask extends RecursiveTask<Integer> {

8.声明一个私有的String类型的二维数组document,两个私有的int类型的属性名为start和end,一个私有的String类型的属性名为word。


1 private String document[][];
2 private int start, end;
3 private String word;

9.实现这个类的构造器,用来初始化这些属性。


1 public DocumentTask (String document[][], int start, int end, String word){
2 this.document=document;
3 this.start=start;
4 this.end=end;
5 this.word=word;
6 }

10.实现compute()方法。如果属性end和start的差小于10,那么这个任务统计单词位于行在调用processLines()方法的这些位置中出现的次数。


1 @Override
2 protected Integer compute() {
3 int result;
4 if (end-start<10){
5 result=processLines(document, start, end, word);

11.否则,用两个对象分解行组,创建两个新的DocumentTask对象用来处理这两个组,并且在池中使用invokeAll()方法来执行它们。


1 } else {
2 int mid=(start+end)/2;
3 DocumentTask task1=new DocumentTask(document,start,mid,word);
4 DocumentTask task2=new DocumentTask(document,mid,end,word);
5 invokeAll(task1,task2);

12.然后,使用groupResults()方法将这两个任务返回的结果相加。最后,返回这个任务统计的结果。


1 try {
2 result=groupResults(task1.get(),task2.get());
3 } catch (InterruptedException | ExecutionException e) {
4 e.printStackTrace();
5 }
6 }
7 return result;

13.实现processLines()方法。它接收以下参数:字符串二维数组、start属性、end属性、任务将要查找的word属性。


1 private Integer processLines(String[][] document, int start, int
2 end,String word) {

14.对于任务要处理的每行,创建LineTask对象来处理整行,并且将它们存储在任务数列中。


1 List&lt;LineTask&gt; tasks=new ArrayList&lt;LineTask&gt;();
2 for (int i=start; i&lt;end; i++){
3 LineTask task=new LineTask(document[i], 0, document[i].
4 length, word);
5 tasks.add(task);
6 }

15.在那个数列中使用invokeAll()执行所有任务。


1 invokeAll(tasks);

16.合计所有这些任务返回的值,并返回这个结果。


01 int result=0;
02 for (int i=0; i&lt;tasks.size(); i++) {
03 LineTask task=tasks.get(i);
04 try {
05 result=result+task.get();
06 } catch (InterruptedException | ExecutionException e) {
07 e.printStackTrace();
08 }
09 }
10 return new Integer(result);

17.实现groupResults()方法。它相加两个数,并返回这个结果。


1 private Integer groupResults(Integer number1, Integer number2) {
2 Integer result;
3 result=number1+number2;
4 return result;
5 }

18.创建LineTask类,指定它继承RecursiveTask类,并参数化为Integer类型。这个类将实现统计单词在一行中出现的次数的任务。


1 public class LineTask extends RecursiveTask&lt;Integer&gt;{

19.声明这个类的序列号版本UID。这个元素是必需的,因为RecursiveTask类的父类,ForkJoinTask类实现了Serializable接口。声明一个私有的、String类型的属性line,两个私有的、int类型的属性start和end,一个私有的、String类型的属性word。


1 private static final long serialVersionUID = 1L;
2 private String line[];
3 private int start, end;
4 private String word;

20.实现这个类的构造器,初始化这些属性。


1 public LineTask(String line[], int start, int end, String word)
2 {
3 this.line=line;
4 this.start=start;
5 this.end=end;
6 this.word=word;
7 }

21.实现这个类的compute()方法。如果属性end和start之差小于100,这个任务在行中由start和end属性使用count()方法决定的片断中查找单词。


1 @Override
2 protected Integer compute() {
3 Integer result=null;
4 if (end-start&lt;100) {
5 result=count(line, start, end, word);

22.否则,将行中的单词组分成两部分,创建两个新的LineTask对象来处理这两个组,在池中使用invokeAll()方法执行它们。


1 } else {
2 int mid=(start+end)/2;
3 LineTask task1=new LineTask(line, start, mid, word);
4 LineTask task2=new LineTask(line, mid, end, word);
5 invokeAll(task1, task2);

23.然后,使用groupResults()方法将这两个任务返回的值相加。最后,返回这个任务计算的结果。


1 try {
2 result=groupResults(task1.get(),task2.get());
3 } catch (InterruptedException | ExecutionException e) {
4 e.printStackTrace();
5 }
6 }
7 return result;

24.实现count()方法。它接收以下参数:完整行的字符串数组、start属性、end属性、任务将要查找的word属性。


1 private Integer count(String[] line, int start, int end, String
2 word) {

25.比较这个任务将要查找的word属性中的在start和end属性之间的位置的单词,如果它们相等,则增加count变量。


1 int counter;
2 counter=0;
3 for (int i=start; i&lt;end; i++){
4 if (line[i].equals(word)){
5  
6 counter++;
7 }
8 }

26.为了显示示例的执行,令任务睡眠10毫秒。


1 try {
2 Thread.sleep(10);
3 } catch (InterruptedException e) {
4 e.printStackTrace();
5 }

27.返回counter变量的值。


1 return counter;

28.实现groupResults()方法。它合计两个数的值,并返回这个结果。


1 private Integer groupResults(Integer number1, Integer number2) {
2 Integer result;
3 result=number1+number2;
4 return result;
5 }

29.实现示例的主类,通过创建Main类,并实现main()方法。


1 public class Main{
2 public static void main(String[] args) {

30.使用DocumentMock类,创建一个带有100行,每行1000个单词的Document。


1 DocumentMock mock=new DocumentMock();
2 String[][] document=mock.generateDocument(100, 1000, "the");

31.创建一个新的DocumentTask对象,用来更新整个文档的产品。参数start值为0,参数end值为100。


1 DocumentTask task=new DocumentTask(document, 0, 100, &quot;the&quot;);

32.使用无参构造器创建一个ForkJoinPool对象,在池中使用execute()方法执行这个任务。


1 ForkJoinPool pool=new ForkJoinPool();
2 pool.execute(task);

33.实现一个代码块,用来显示关于池变化的信息。每秒向控制台写入池的某些参数的值,直到任务完成它的执行。


01 do {
02 System.out.printf("******************************************\n");
03 System.out.printf("Main: Parallelism: %d\n",pool.getParallelism());
04 System.out.printf("Main: Active Threads: %d\n",pool.getActiveThreadCount());
05 System.out.printf("Main: Task Count: %d\n",pool.getQueuedTaskCount());
06 System.out.printf("Main: Steal Count: %d\n",pool.getStealCount());
07 System.out.printf("******************************************\n");
08 try {
09 TimeUnit.SECONDS.sleep(1);
10 } catch (InterruptedException e) {
11 e.printStackTrace();
12 }
13 } while (!task.isDone());

34.使用shutdown()方法关闭这个池。


1 pool.shutdown();

35.使用awaitTermination()方法等待任务的结束。


1 try {
2 System.out.printf("Main: The word appears %d in the document",task.get());
3 } catch (InterruptedException | ExecutionException e) {
4 e.printStackTrace();
5 }

36.打印单词在文档中出现的次数。检查这个数是否与DocumentMock类中写入的数一样。


1 try {
2 System.out.printf("Main: The word appears %d in the document",task.get());
3 } catch (InterruptedException | ExecutionException e) {
4 e.printStackTrace();
5 }

它是如何工作的…

在这个示例中,你已经实现了两种不同的任务:

  • DocumentTask类:这个类的任务将处理由start和end属性决定的文档中的行组。如果这个行组的大小小于10,它为每行创建LineTask对象,并且当它们完成它们的执行时,它合计这些任务的结果,并返回这个合计值。如果这个任务要处理的行组大小不小于10,它将这个组分成两个并创建两个DocumentTask对象来处理这些新组。当这些任务完成它们的执行时,这个任务合计它们的结果,并返回这个合计值。
  • LineTask类:这个类的任务将处理文档中的一行的单词组。如果这个单词组小于10,这个任务直接在这个单词组中查找单词,并且返回这个单词出现的次数。否则,它将这个单词组分成两个并创建两个LineTask对象来处理。当这些任务完成它们的执行,这个任务合计这些任务的结果并返回这个合计值。

在Main类中,你已经使用默认构造器一个ForkJoinPool对象,并且你在它里面执行一个DocumentTask类,这个类将处理一个拥有100行,每行有1000个单词的文档。这个任务将使用其他的DocumentTask对象和LineTask对象来分解这个问题,当所有任务完成它们的执行,你可以使用启动任务来获取单词在整个文档中出现的总次数。由于任务返回一个结果,所以它们继承RecursiveTask类。

为了获取Task返回的结果,你已经使用了get()方法 。这个方法是在Future接口中声明的,由RecursiveTask类实现的。

当你执行这个程序,你可以比较在控制台中的第一行和最后一行。第一行是文档生成时计算的单词出现的次数,最后一行是由Fork/Join任务计算的。

不止这些…

ForkJoinTask类提供其他的方法来完成一个任务的执行,并返回一个结果,这就是complete()方法。这个方法接收一个RecursiveTask类的参数化类型的对象,并且当join()方法被调用时,将这个对象作为任务的结果返回。 它被推荐使用在:提供异步任务结果。

由于RecursiveTask类实现了Future接口,get()方法其他版本如下:

  • get(long timeout, TimeUnit unit):这个版本的get()方法,如果任务的结果不可用,在指定的时间内等待它。如果超时并且结果不可用,那么这个方法返回null值。TimeUnit类是一个枚举类,它有以下常量:DAYS, HOURS,MICROSECONDS,MILLISECONDS, MINUTES, NANOSECONDS和SECONDS。

参见

  • 在第5章,Fork/Join框架中的创建一个Fork/Join池的指南
  • 在第8章,测试并发应用程序中的监控Fork/Join池的指南
目录
相关文章
|
6月前
|
JavaScript Java 大数据
分享Fork/Join经典案例
`shigen`是位专注于Java、Python、Vue和Shell的博主,分享技术成长和认知。上篇文章探讨了Java的Fork/Join框架,它类似线程池,通过拆分大任务并并行执行提升效率。以大序列求和为例展示了ForkJoinPool的使用,与普通线程池对比,Fork/Join效率提升约50%。适合递归任务、独立子任务和长执行时间的任务。注意任务粒度、避免共享状态和死锁。推荐观看相关视频深入理解。一起学习,每天进步!
41 0
分享Fork/Join经典案例
|
6月前
|
并行计算 安全 算法
探索Java并发编程:Fork/Join框架的应用与优化
在多核处理器普及的今天,并发编程已经成为提高程序性能的重要手段。Java提供了多种并发工具,其中Fork/Join框架是处理分治任务的强大工具。本文将深入探讨Fork/Join框架的核心原理、使用场景以及性能优化技巧,帮助开发者更好地利用这一框架解决实际问题。通过实例分析,我们将看到如何有效地使用Fork/Join框架来加速计算密集型任务,并提供一系列最佳实践,以确保高效和线程安全的并发执行。
|
6月前
|
并行计算 算法 Java
Java并发 -- Fork/Join框架
Java并发 -- Fork/Join框架
61 0
|
并行计算 负载均衡 算法
什么是 Fork/Join 框架?Java 中如何使用 Fork/Join 框架?
什么是 Fork/Join 框架?Java 中如何使用 Fork/Join 框架?
|
算法
Fork/Join框架的学习和浅析
Fork/Join框架的学习和浅析
79 0
|
分布式计算 算法 Java
【JUC基础】16. Fork Join
“分而治之”一直是一个非常有效的处理大量数据的方法。著名的MapReduce也是采取了分而治之的思想。。简单地说,就是如果你要处理 1000 个数据,但是你并不具备处理 1000个数据的能力,那么你可以只处理其中的 10 个,然后分阶段处理 100 次,将 100 次的结进行合成,就是最终想要的对原始 1000 个数据的处理结果。而这就是Fork Join的基本思想。
105 1
|
并行计算 算法 Java
Fork/Join解读
Fork/Join解读
|
并行计算 算法 Java
【JAVA并发编程专题】Fork/Join框架的理解和使用
【JAVA并发编程专题】Fork/Join框架的理解和使用
|
缓存 并行计算 算法
浅析 Fork/Join 基本概念和实战
在 JDK 1.7 版本中提供了 Fork/Join 并行执行任务框架,它主要的作用是把大任务分割成若干个小任务,再对每个小任务得到的结果进行汇总,此种开发方法也叫做分治编程,分治编程可以极大的利用 CPU 资源,提高任务执行效率。
217 0
浅析 Fork/Join 基本概念和实战
|
并行计算 算法
Fork/Join框架简介
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
260 0
Fork/Join框架简介