Fork/Join框架的主要功能
Fork/Join框架主要完成了两件事情:
- 任务分割:首先把大的任务分割成足够小的子任务;
- 执行任务并合并结果:分割的子任务分别存放到双端队列中,然后几个启动线程风别从双端队列中获取任务执行,子任务执行完成的结果都存放在另一个队列里,启动一个线程从队列里取数据,然后合并这些数据;
Fork/Join框架的实现原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成:
- ForkJoinTask数组负责将存放的程序提交给ForkJoinPool;
- ForkJoinWorkerThread负责执行这些任务 当调用ForkJoinTask的fork方法时,程序会把任务放在ForkJoinWorkerThread的putTask的workQueue中,异步的执行这个任务,然后立即返回结果
实例演示
- ForkJoinTask:我们使用Fork/Join框架首先要创建一个ForkJoin任务,需继承其子类:a. RecursiveAction:用于无返回值的任务 b. RecuriveTask:用于又返回值的任务
- ForkJoinPool:ForkJoinTask的执行环境 分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列尾部获取一个任务(工作窃取算法)
//首先继承其子类,这个是有返回值的,所以继承RecursiveTask public class CountTask extends RecursiveTask<Integer> { private static final int THREAD_HOLD = 2; private int satrt; private int end; public CountTask(int start,int end){ this.start = start; this.end = end; } @Override protected Integer compute(){ int sum = 0; boolean flag = (end - start)<=THREAD_HOLD; if(flag){ for(int i = start;i < end;i++){ sum += i; } } else { int middle = (start + end)/2; CountTask one = new CountTask(start,middle); CountTask two = new CountTask(middle+1,end); //执行子任务 one.fork(); two.fork(); //获取子任务执行结果 int oResult = one.join(); int tResult = two.join(); sum = oResult + tResult; } return sum; } } public class ExeWork { public static void main(String[] args){ ForkJoinPool pool = new ForkJoinPool (); CountTask task = new CountTask(1,4); Future<Integer> result = pool.submit(task); try{ //合并这些数据 System.ot.println(result.get()); }catch(Exception e){ e.printStackTrace(); } } }
Fork/Join框架的异常处理
ForkJoinTask在执行任务的时候可能会抛出异常,但主线程无法直接铺获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已将抛出异常或已被取消了,并可以通过ForkJoinTask的getException方法获取异常.