我的Java开发学习之旅------>Java使用Fork/Join框架来并行执行任务

简介: 现代的计算机已经向多CPU方向发展,即使是普通的PC,甚至现在的智能手机、多核处理器已被广泛应用。在未来,处理器的核心数将会发展的越来越多。 虽然硬件上的多核CPU已经十分成熟,但是很多应用程序并未这种多核CPU做好准备,因此并不能很好地利用多核CPU的性能优势。

现代的计算机已经向多CPU方向发展,即使是普通的PC,甚至现在的智能手机、多核处理器已被广泛应用。在未来,处理器的核心数将会发展的越来越多。

虽然硬件上的多核CPU已经十分成熟,但是很多应用程序并未这种多核CPU做好准备,因此并不能很好地利用多核CPU的性能优势。

为了充分利用多CPU、多核CPU的性能优势,级软基软件系统应该可以充分“挖掘”每个CPU的计算能力,决不能让某个CPU处于“空闲”状态。为此,可以考虑把一个任务拆分成多个“小任务”,把多个"小任务"放到多个处理器核心上并行执行。当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

如下面的示意图所示:


第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。

第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。


Java提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合成总的计算结果。

ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。ForkJoinPool提供了如下两个常用的构造器。

  •  public ForkJoinPool(int parallelism):创建一个包含parallelism个并行线程的ForkJoinPool
  •  public ForkJoinPool() :以Runtime.getRuntime().availableProcessors()的返回值作为parallelism来创建ForkJoinPool

创建ForkJoinPool实例后,可以钓鱼ForkJoinPool的submit(ForkJoinTask<T> task)或者invoke(ForkJoinTask<T> task)来执行指定任务。其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它有两个抽象子类:RecursiveAction和RecursiveTask。

  • RecursiveTask代表有返回值的任务
  • RecursiveAction代表没有返回值的任务。


一、RecursiveAction

下面以一个没有返回值的大任务为例,介绍一下RecursiveAction的用法。

大任务是:打印0-200的数值。

小任务是:每次只能打印50个数值。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

//RecursiveAction为ForkJoinTask的抽象子类,没有返回值的任务
class PrintTask extends RecursiveAction {
	// 每个"小任务"最多只打印50个数
	private static final int MAX = 50;

	private int start;
	private int end;

	PrintTask(int start, int end) {
		this.start = start;
		this.end = end;
	}

	@Override
	protected void compute() {
		// 当end-start的值小于MAX时候,开始打印
		if ((end - start) < MAX) {
			for (int i = start; i < end; i++) {
				System.out.println(Thread.currentThread().getName() + "的i值:"
						+ i);
			}
		} else {
			// 将大任务分解成两个小任务
			int middle = (start + end) / 2;
			PrintTask left = new PrintTask(start, middle);
			PrintTask right = new PrintTask(middle, end);
			// 并行执行两个小任务
			left.fork();
			right.fork();
		}
	}
}

public class ForkJoinPoolTest {
	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		// 提交可分解的PrintTask任务
		forkJoinPool.submit(new PrintTask(0, 200));
		forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
		// 关闭线程池
		forkJoinPool.shutdown();
	}

}

运行结果如下:

ForkJoinPool-1-worker-2的i值:75
ForkJoinPool-1-worker-2的i值:76
ForkJoinPool-1-worker-2的i值:77
ForkJoinPool-1-worker-2的i值:78
ForkJoinPool-1-worker-2的i值:79
ForkJoinPool-1-worker-2的i值:80
ForkJoinPool-1-worker-2的i值:81
ForkJoinPool-1-worker-2的i值:82
ForkJoinPool-1-worker-2的i值:83
ForkJoinPool-1-worker-2的i值:84
ForkJoinPool-1-worker-2的i值:85
ForkJoinPool-1-worker-2的i值:86
ForkJoinPool-1-worker-2的i值:87
ForkJoinPool-1-worker-2的i值:88
ForkJoinPool-1-worker-2的i值:89
ForkJoinPool-1-worker-2的i值:90
ForkJoinPool-1-worker-2的i值:91
ForkJoinPool-1-worker-2的i值:92
ForkJoinPool-1-worker-2的i值:93
ForkJoinPool-1-worker-2的i值:94
ForkJoinPool-1-worker-2的i值:95
ForkJoinPool-1-worker-2的i值:96
ForkJoinPool-1-worker-2的i值:97
ForkJoinPool-1-worker-2的i值:98
ForkJoinPool-1-worker-2的i值:99
ForkJoinPool-1-worker-2的i值:50
ForkJoinPool-1-worker-2的i值:51
ForkJoinPool-1-worker-2的i值:52
ForkJoinPool-1-worker-2的i值:53
ForkJoinPool-1-worker-2的i值:54
ForkJoinPool-1-worker-2的i值:55
ForkJoinPool-1-worker-2的i值:56
ForkJoinPool-1-worker-2的i值:57
ForkJoinPool-1-worker-2的i值:58
ForkJoinPool-1-worker-2的i值:59
ForkJoinPool-1-worker-2的i值:60
ForkJoinPool-1-worker-2的i值:61
ForkJoinPool-1-worker-2的i值:62
ForkJoinPool-1-worker-2的i值:63
ForkJoinPool-1-worker-2的i值:64
ForkJoinPool-1-worker-2的i值:65
ForkJoinPool-1-worker-2的i值:66
ForkJoinPool-1-worker-2的i值:67
ForkJoinPool-1-worker-2的i值:68
ForkJoinPool-1-worker-2的i值:69
ForkJoinPool-1-worker-1的i值:175
ForkJoinPool-1-worker-1的i值:176
ForkJoinPool-1-worker-1的i值:177
ForkJoinPool-1-worker-1的i值:178
ForkJoinPool-1-worker-1的i值:179
ForkJoinPool-1-worker-1的i值:180
ForkJoinPool-1-worker-1的i值:181
ForkJoinPool-1-worker-1的i值:182
ForkJoinPool-1-worker-1的i值:183
ForkJoinPool-1-worker-1的i值:184
ForkJoinPool-1-worker-1的i值:185
ForkJoinPool-1-worker-1的i值:186
ForkJoinPool-1-worker-1的i值:187
ForkJoinPool-1-worker-1的i值:188
ForkJoinPool-1-worker-1的i值:189
ForkJoinPool-1-worker-1的i值:190
ForkJoinPool-1-worker-1的i值:191
ForkJoinPool-1-worker-1的i值:192
ForkJoinPool-1-worker-1的i值:193
ForkJoinPool-1-worker-1的i值:194
ForkJoinPool-1-worker-1的i值:195
ForkJoinPool-1-worker-1的i值:196
ForkJoinPool-1-worker-1的i值:197
ForkJoinPool-1-worker-1的i值:198
ForkJoinPool-1-worker-1的i值:199
ForkJoinPool-1-worker-1的i值:150
ForkJoinPool-1-worker-1的i值:151
ForkJoinPool-1-worker-1的i值:152
ForkJoinPool-1-worker-1的i值:153
ForkJoinPool-1-worker-1的i值:154
ForkJoinPool-1-worker-1的i值:155
ForkJoinPool-1-worker-1的i值:156
ForkJoinPool-1-worker-1的i值:157
ForkJoinPool-1-worker-1的i值:158
ForkJoinPool-1-worker-1的i值:159
ForkJoinPool-1-worker-1的i值:160
ForkJoinPool-1-worker-1的i值:161
ForkJoinPool-1-worker-1的i值:162
ForkJoinPool-1-worker-1的i值:163
ForkJoinPool-1-worker-1的i值:164
ForkJoinPool-1-worker-1的i值:165
ForkJoinPool-1-worker-1的i值:166
ForkJoinPool-1-worker-1的i值:167
ForkJoinPool-1-worker-1的i值:168
ForkJoinPool-1-worker-1的i值:169
ForkJoinPool-1-worker-1的i值:170
ForkJoinPool-1-worker-1的i值:171
ForkJoinPool-1-worker-1的i值:172
ForkJoinPool-1-worker-1的i值:173
ForkJoinPool-1-worker-1的i值:174
ForkJoinPool-1-worker-1的i值:125
ForkJoinPool-1-worker-1的i值:126
ForkJoinPool-1-worker-1的i值:127
ForkJoinPool-1-worker-1的i值:128
ForkJoinPool-1-worker-1的i值:129
ForkJoinPool-1-worker-1的i值:130
ForkJoinPool-1-worker-1的i值:131
ForkJoinPool-1-worker-1的i值:132
ForkJoinPool-1-worker-1的i值:133
ForkJoinPool-1-worker-1的i值:134
ForkJoinPool-1-worker-1的i值:135
ForkJoinPool-1-worker-1的i值:136
ForkJoinPool-1-worker-1的i值:137
ForkJoinPool-1-worker-1的i值:138
ForkJoinPool-1-worker-1的i值:139
ForkJoinPool-1-worker-1的i值:140
ForkJoinPool-1-worker-1的i值:141
ForkJoinPool-1-worker-1的i值:142
ForkJoinPool-1-worker-1的i值:143
ForkJoinPool-1-worker-1的i值:144
ForkJoinPool-1-worker-1的i值:145
ForkJoinPool-1-worker-1的i值:146
ForkJoinPool-1-worker-1的i值:147
ForkJoinPool-1-worker-1的i值:148
ForkJoinPool-1-worker-1的i值:149
ForkJoinPool-1-worker-1的i值:100
ForkJoinPool-1-worker-1的i值:101
ForkJoinPool-1-worker-1的i值:102
ForkJoinPool-1-worker-1的i值:103
ForkJoinPool-1-worker-1的i值:104
ForkJoinPool-1-worker-1的i值:105
ForkJoinPool-1-worker-1的i值:106
ForkJoinPool-1-worker-1的i值:107
ForkJoinPool-1-worker-1的i值:108
ForkJoinPool-1-worker-1的i值:109
ForkJoinPool-1-worker-1的i值:110
ForkJoinPool-1-worker-1的i值:111
ForkJoinPool-1-worker-1的i值:112
ForkJoinPool-1-worker-1的i值:113
ForkJoinPool-1-worker-1的i值:114
ForkJoinPool-1-worker-1的i值:115
ForkJoinPool-1-worker-1的i值:116
ForkJoinPool-1-worker-1的i值:117
ForkJoinPool-1-worker-1的i值:118
ForkJoinPool-1-worker-1的i值:119
ForkJoinPool-1-worker-1的i值:120
ForkJoinPool-1-worker-1的i值:121
ForkJoinPool-1-worker-1的i值:122
ForkJoinPool-1-worker-1的i值:123
ForkJoinPool-1-worker-1的i值:124
ForkJoinPool-1-worker-1的i值:25
ForkJoinPool-1-worker-1的i值:26
ForkJoinPool-1-worker-1的i值:27
ForkJoinPool-1-worker-1的i值:28
ForkJoinPool-1-worker-1的i值:29
ForkJoinPool-1-worker-1的i值:30
ForkJoinPool-1-worker-1的i值:31
ForkJoinPool-1-worker-1的i值:32
ForkJoinPool-1-worker-1的i值:33
ForkJoinPool-1-worker-1的i值:34
ForkJoinPool-1-worker-1的i值:35
ForkJoinPool-1-worker-1的i值:36
ForkJoinPool-1-worker-1的i值:37
ForkJoinPool-1-worker-1的i值:38
ForkJoinPool-1-worker-1的i值:39
ForkJoinPool-1-worker-1的i值:40
ForkJoinPool-1-worker-1的i值:41
ForkJoinPool-1-worker-1的i值:42
ForkJoinPool-1-worker-1的i值:43
ForkJoinPool-1-worker-1的i值:44
ForkJoinPool-1-worker-1的i值:45
ForkJoinPool-1-worker-1的i值:46
ForkJoinPool-1-worker-1的i值:47
ForkJoinPool-1-worker-1的i值:48
ForkJoinPool-1-worker-1的i值:49
ForkJoinPool-1-worker-1的i值:0
ForkJoinPool-1-worker-1的i值:1
ForkJoinPool-1-worker-1的i值:2
ForkJoinPool-1-worker-1的i值:3
ForkJoinPool-1-worker-1的i值:4
ForkJoinPool-1-worker-1的i值:5
ForkJoinPool-1-worker-1的i值:6
ForkJoinPool-1-worker-1的i值:7
ForkJoinPool-1-worker-1的i值:8
ForkJoinPool-1-worker-1的i值:9
ForkJoinPool-1-worker-1的i值:10
ForkJoinPool-1-worker-1的i值:11
ForkJoinPool-1-worker-1的i值:12
ForkJoinPool-1-worker-1的i值:13
ForkJoinPool-1-worker-1的i值:14
ForkJoinPool-1-worker-1的i值:15
ForkJoinPool-1-worker-1的i值:16
ForkJoinPool-1-worker-1的i值:17
ForkJoinPool-1-worker-1的i值:18
ForkJoinPool-1-worker-1的i值:19
ForkJoinPool-1-worker-1的i值:20
ForkJoinPool-1-worker-1的i值:21
ForkJoinPool-1-worker-1的i值:22
ForkJoinPool-1-worker-1的i值:23
ForkJoinPool-1-worker-1的i值:24
ForkJoinPool-1-worker-2的i值:70
ForkJoinPool-1-worker-2的i值:71
ForkJoinPool-1-worker-2的i值:72
ForkJoinPool-1-worker-2的i值:73
ForkJoinPool-1-worker-2的i值:74

从上面结果来看,ForkJoinPool启动了两个线程来执行这个打印任务,这是因为笔者的计算机的CPU是双核的。不仅如此,读者可以看到程序虽然打印了0-199这两百个数字,但是并不是连续打印的,这是因为程序将这个打印任务进行了分解,分解后的任务会并行执行,所以不会按顺序从0打印 到199。


二、RecursiveTask

下面以一个有返回值的大任务为例,介绍一下RecursiveTask的用法。

大任务是:计算随机的100个数字的和。

小任务是:每次只能20个数值的和。

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

//RecursiveTask为ForkJoinTask的抽象子类,有返回值的任务
class SumTask extends RecursiveTask<Integer> {
	// 每个"小任务"最多只打印50个数
	private static final int MAX = 20;
	private int arr[];
	private int start;
	private int end;

	SumTask(int arr[], int start, int end) {
		this.arr = arr;
		this.start = start;
		this.end = end;
	}

	@Override
	protected Integer compute() {
		int sum = 0;
		// 当end-start的值小于MAX时候,开始打印
		if ((end - start) < MAX) {
			for (int i = start; i < end; i++) {
				sum += arr[i];
			}
			return sum;
		} else {
			System.err.println("=====任务分解======");
			// 将大任务分解成两个小任务
			int middle = (start + end) / 2;
			SumTask left = new SumTask(arr, start, middle);
			SumTask right = new SumTask(arr, middle, end);
			// 并行执行两个小任务
			left.fork();
			right.fork();
			// 把两个小任务累加的结果合并起来
			return left.join() + right.join();
		}
	}

}

public class ForkJoinPoolTest2 {
	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		int arr[] = new int[100];
		Random random = new Random();
		int total = 0;
		// 初始化100个数字元素
		for (int i = 0; i < arr.length; i++) {
			int temp = random.nextInt(100);
			// 对数组元素赋值,并将数组元素的值添加到total总和中
			total += (arr[i] = temp);
		}
		System.out.println("初始化时的总和=" + total);
		// 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		// 提交可分解的PrintTask任务
		Future<Integer> future = forkJoinPool.submit(new SumTask(arr, 0,
				arr.length));
		System.out.println("计算出来的总和=" + future.get());
		// 关闭线程池
		forkJoinPool.shutdown();
	}

}

计算结果如下:
初始化时的总和=4283
=====任务分解======
=====任务分解======
=====任务分解======
=====任务分解======
=====任务分解======
=====任务分解======
=====任务分解======
计算出来的总和=4283

从上面结果来看,ForkJoinPool将任务分解了7次,程序通过SumTask计算出来的结果,和初始化数组时统计出来的总和是相等的,这表明计算结果一切正常。



读者还参考以下文章加深对ForkJoinPool的理解

http://www.infoq.com/cn/articles/fork-join-introduction/

http://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/


==================================================================================================

  作者:欧阳鹏  欢迎转载,与人分享是进步的源泉!

  转载请保留原文地址http://blog.csdn.net/ouyang_peng

==================================================================================================




相关文章
|
4天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
42 17
|
15天前
|
移动开发 前端开发 Java
Java最新图形化界面开发技术——JavaFx教程(含UI控件用法介绍、属性绑定、事件监听、FXML)
JavaFX是Java的下一代图形用户界面工具包。JavaFX是一组图形和媒体API,我们可以用它们来创建和部署富客户端应用程序。 JavaFX允许开发人员快速构建丰富的跨平台应用程序,允许开发人员在单个编程接口中组合图形,动画和UI控件。本文详细介绍了JavaFx的常见用法,相信读完本教程你一定有所收获!
Java最新图形化界面开发技术——JavaFx教程(含UI控件用法介绍、属性绑定、事件监听、FXML)
|
1天前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
|
26天前
|
存储 JavaScript 前端开发
基于 SpringBoot 和 Vue 开发校园点餐订餐外卖跑腿Java源码
一个非常实用的校园外卖系统,基于 SpringBoot 和 Vue 的开发。这一系统源于黑马的外卖案例项目 经过站长的进一步改进和优化,提供了更丰富的功能和更高的可用性。 这个项目的架构设计非常有趣。虽然它采用了SpringBoot和Vue的组合,但并不是一个完全分离的项目。 前端视图通过JS的方式引入了Vue和Element UI,既能利用Vue的快速开发优势,
110 13
|
1月前
|
算法 Java API
如何使用Java开发获得淘宝商品描述API接口?
本文详细介绍如何使用Java开发调用淘宝商品描述API接口,涵盖从注册淘宝开放平台账号、阅读平台规则、创建应用并申请接口权限,到安装开发工具、配置开发环境、获取访问令牌,以及具体的Java代码实现和注意事项。通过遵循这些步骤,开发者可以高效地获取商品详情、描述及图片等信息,为项目和业务增添价值。
63 10
|
24天前
|
前端开发 Java 测试技术
java日常开发中如何写出优雅的好维护的代码
代码可读性太差,实际是给团队后续开发中埋坑,优化在平时,没有那个团队会说我专门给你一个月来优化之前的代码,所以在日常开发中就要多注意可读性问题,不要写出几天之后自己都看不懂的代码。
59 2
|
8月前
|
Java Unix 程序员
java 8 新特性讲解Optional类--Fork/Join 框架--新时间日期API--以及接口的新特性和注解
java 8 新特性讲解Optional类--Fork/Join 框架--新时间日期API--以及接口的新特性和注解
105 1
|
8月前
|
并行计算 算法 Java
探索Java并发编程:Fork/Join框架的深度解析
【5月更文挑战第29天】在多核处理器普及的时代,有效利用并发编程以提升程序性能已经成为开发者必须面对的挑战。Java语言提供的Fork/Join框架是一个强大的工具,它旨在利用多线程执行分而治之的任务。本文将通过深入分析Fork/Join框架的工作原理、关键特性以及与传统线程池技术的差异,帮助开发者更好地掌握这一高效处理并发任务的技术手段。
|
6月前
|
并行计算 算法 Java
Java面试题:解释Java中的无锁编程的概念,Java中的Fork/Join框架的作用和使用场景,Java中的CompletableFuture的作用和使用场景
Java面试题:解释Java中的无锁编程的概念,Java中的Fork/Join框架的作用和使用场景,Java中的CompletableFuture的作用和使用场景
46 0
|
8月前
|
Java 开发者
探索Java并发编程:Fork/Join框架的深度解析
【5月更文挑战第25天】在多核处理器日益普及的今天,并发编程成为了提升应用性能的关键。Java语言提供了多种并发工具,其中Fork/Join框架是一个高效且强大的工具,用于处理分而治之的任务。本文将深入探讨Fork/Join框架的原理、使用及其在实际应用中的优化策略,旨在帮助开发者更好地利用这一框架以解决复杂的并发问题。
下一篇
开通oss服务