1. Fork/Join框架概念
Fork/Join框架是Java提供的一个用于并行执行任务的框架,它会将一个大任务分成多个小任务,并且将每个小任务的最终结果汇总得到大任务结果的框架。比如对1+2+3+····+100求和,可以分成十个子任务分别对10个数求和,最后再汇总这十个子任务的结果。
2.工作窃取算法
工作窃取算法是指某个线程从其他任务队列里窃取任务来执行。
假如我们可以将一个总任务分割成多个互不相干的子任务,为了减少线程的竞争,我们会将这些子任务放在不同的队列中,并为每个队列都建造一个线程执行该队列的任务,线程和队列一一对应。但是有些线程可能会很早的执行完自己队列中的所有任务,而其他线程还会处理自己拥有的队列中的任务,此时已处理完任务的线程与其等待其他线程执行任务,不如帮助其他线程一起执行剩余任务。这时他们会从其他线程的队列里窃取一个线程执行任务,所以为了避免因为工作窃取引起的两个线程之间的竞争,通常任务队列会使用双端队列。任务队列线程从头部取任务,窃取线程从尾部取任务。
优点是充分利用线程进行并行运算,并减少了竞争,缺点就是还是存在竞争情况,比如队列中任务数为1时,还会因为创建多个线程和队列造成更多的资源消耗。
3. Fork/Join框架设计实现类
通过前面的介绍我们可以了解到Fork/Join框架主要实现两个步骤:分割任务以及执行任务并汇总结果。
- 分割任务:我们需要一个Fork类来分割任务,并且要将大任务分割的足够小。
- 执行任务并汇总结果:分割的子任务分别放在双端队列里,然后几个启动线程分 别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程 从队列里拿数据,然后合并这些数据。
Fork/Join框架设计了两个类来完成以上两个步骤的功能。
- ForkJoinTask:这是一个抽象类,主要使用该类来创建一个ForkJoin任务,它提供在任务中执行fork()和join()操作的机制。但Fork/Join框架提供了ForkJoinTask的两个抽象子类, RecursiveAction(用于没有返回结果的任务); RecursiveTask(用于有返回结果的任务)。通过继承这两个子类来创建一个ForkJoin任务。
- ②ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当 一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
4.简单使用Fork/Join框架
用Fork/Join框架来计算一个1+2+3+····+n的结果。注意,我们必须先想好如何分割任务,分割计算1+2+3+····+n时,必须至少分割为两个数字一组才能计算,但如果n过大两个数字一组会造成极大资源消耗,所以应考虑好任务分割的程度。此处仅以1+2+3+4为例。
public class TestFork_JoinFrame extends RecursiveTask<Integer>{
private static final int THRESHOLD=2;//设置阀值,也就是任务最小分割程度为2个数字相加
private int start;
private int end;
public TestFork_JoinFrame(int s,int e) {
start=s;
end=e;
}
public static void main(String[] args) {
ForkJoinPool pool=new ForkJoinPool();
TestFork_JoinFrame task=new TestFork_JoinFrame(1, 4);
//执行任务
Future<Integer> f=pool.submit(task);
try {
System.out.println("最终结果:"+f.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
protected Integer compute() {
int sum=0;
boolean canCompute=(end-start)<=THRESHOLD;//如果已经是最小分割值,则进行运算,否则继续分割
if(canCompute){
for(int i=start;i<=end;i++){
sum+=i;
}
}else{
//分隔成两个子任务
int middle=(start+end)/2;
TestFork_JoinFrame leftTask=new TestFork_JoinFrame(start, middle);
TestFork_JoinFrame rightTask=new TestFork_JoinFrame(middle+1, end);
//执行子任务
leftTask.fork();
rightTask.fork();
//等待子任务完成并获取结果汇总
sum=leftTask.join()+rightTask.join();
}
return sum;
}
}
5.异常处理
ForkJoinTask在执行过程中可能会抛出异常,但是与普通线程任务一样,我们无法在主线程中对其进行捕获,所以以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被 取消了,并且可以通过ForkJoinTask的getException方法获取异常。getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如 果任务没有完成或者没有抛出异常则返回null。
public class TestFork_JoinFrame extends RecursiveTask<Integer>{
private static final int THRESHOLD=2;//设置阀值,也就是任务最小分割程度为2个数字相加
private int start;
private int end;
public TestFork_JoinFrame(int s,int e) {
start=s;
end=e;
}
public static void main(String[] args) {
ForkJoinPool pool=new ForkJoinPool();
TestFork_JoinFrame task=new TestFork_JoinFrame(1, 4);
//执行任务
Future<Integer> f=pool.submit(task);
try {
if(task.isCompletedAbnormally()){
System.out.println(task.getException());
}else{
System.out.println("最终结果:"+f.get());
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
protected Integer compute() {
int sum=0;
boolean canCompute=(end-start)<=THRESHOLD;//如果已经是最小分割值,则进行运算,否则继续分割
if(canCompute){
for(int i=start;i<=end;i++){
sum+=i;
}
}else{
//分隔成两个子任务
int middle=(start+end)/2;
TestFork_JoinFrame leftTask=new TestFork_JoinFrame(start, middle);
TestFork_JoinFrame rightTask=new TestFork_JoinFrame(middle+1, end);
//执行子任务
leftTask.fork();
rightTask.fork();
//等待子任务完成并获取结果汇总
sum=leftTask.join()+rightTask.join();
}
return sum;
}
}
}
6.框架实现原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
1.ForkJoinTask的fork()方法原理:调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。源码如下
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
pushTask方法把当前任务存放在ForkJoinTask数组队列里。然后再调用ForkJoinPool的 signalWork()方法唤醒或创建一个工作线程来执行任务。源码码如下。
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}
2.ForkJoinTask的join方法实现原理:Join方法的主要作用是阻塞当前线程并等待获取结果。源码如下
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
首先,它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结 果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常 (EXCEPTIONAL)。如果任务状态是已完成,则直接返回任务结果。如果任务状态是被取消,则直接抛出CancellationException。如果任务状态是抛出异常,则直接抛出对应的异常。
doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成, 则直接返回任务状态;如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。