现实生活中的分治
分治的思想,顾名思义分而治之。就像古代的王想治理好天下,单单靠他一个人是不够的,还需要大臣的辅助,把天下划分为一块块区域,分派的下面的人负责,然后下面的人又分派给他们的属下负责,层层传递。
这就是分治,也就是把一个复杂的问题分解成相似的子问题,然后子问题再分子问题,直到问题分的很简单不必再划分了。然后层层返回问题的结果,最终上报给王!
分治在算法上有很多应用,类似大数据的MapReduce,归并算法、快速排序算法等。 在JUC中也提供了一个叫Fork/Join的并行计算框架用来处理分治的情况,它类似于单机版的 MapReduce
。
Fork/Join
分治分为两个阶段,第一个阶段分解任务,把任务分解为一个个小任务直至小任务可以简单的计算返回结果。
第二阶段合并结果,把每个小任务的结果合并返回得到最终结果。而Fork
就是分解任务,Join
就是合并结果。
Fork/Join框架主要包含两部分:ForkJoinPool、ForkJoinTask
。
ForkJoinPool
就是治理分治任务的线程池。它和在之前的文章提到ThreadPoolExecutor
线程池,共同点都是消费者-生产者模式的实现,但是有一些不同。ThreadPoolExecuto
r的线程池是只有一个任务队列的,而ForkJoinPool
有多个任务队列。通过ForkJoinPool
的invoke
或submit
或execute
提交任务的时候会根据一定规则分配给不同的任务队列,并且任务队列的双端队列。
execute 异步,无返回结果 invoke 同步,有返回结果 (会阻塞) submit 异步,有返回结果 (Future)
为啥要双端队列呢?因为ForkJoinPool
有一个机制,当某个工作线程对应消费的任务队列空闲的时候它会去别的忙的任务队列的尾部分担(stealing)任务过来执行(好伙伴啊)。然后那个忙的任务队列还是头部出任务给它对应的工作线程消费。这样双端就井然有序,不会有任务争抢的情况。
ForkJoinTask
这就是分治任务啦,就等同于我们平日用的Runnable
。它是一个抽象类,核心方法就是fork
和join
。fork
方法用来异步执行一个子任务,join
方法会阻塞当前线程等待子任务返回。
ForkJoinTask
有两个子类分别是RecursiveAction
和RecursiveTask
。这两个子类也是抽象类,都是通过递归来执行分治任务。每个子类都有抽象方法compute
差别就在于RecursiveAction
的没有返回值而RecursiveTask
有返回值。
简单应用
这样分治思想用递归实现的经典案例就是斐波那契数列了。
斐波那契数列:1、1、2、3、5、8、13、21、34、…… 公式 :F(1)=1,F(2)=1, F(n)=F(n-1)+F(n-2)(n>=3,n∈N*)
public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(4); // 最大并发数4 Fibonacci fibonacci = new Fibonacci(20); long startTime = System.currentTimeMillis(); Integer result = forkJoinPool.invoke(fibonacci); long endTime = System.currentTimeMillis(); System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms."); } //以下为官方API文档示例 static class Fibonacci extends RecursiveTask<Integer> { final int n; Fibonacci(int n) { this.n = n; } @Override protected Integer compute() { if (n <= 1) { return n; } Fibonacci f1 = new Fibonacci(n - 1); f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); return f2.compute() + f1.join(); } }
当然你也可以两个任务都fork
,要注意的是两个任务都fork
的情况,必须按照f1.fork(),f2.fork(), f2.join(),f1.join()
这样的顺序,不然有性能问题。JDK官方文档有说明,有兴趣的可以去研究下。
我是推荐使用invokeAll方法
Fibonacci f1 = new Fibonacci(n - 1); Fibonacci f2 = new Fibonacci(n - 2); invokeAll(f1,f2); return f2.join() + f1.join();
Method invokeAll (available in multiple versions) performs the most common form of parallel invocation: forking a set of tasks and joining them all.
官方API文档是这样写到的,所以平日用invokeAll
就好了。invokeAll会把传入的任务的第一个交给当前线程来执行,其他的任务都fork加入工作队列,这样等于利用当前线程也执行任务了。以下为invokeAll
源码
public static void invokeAll(ForkJoinTask<?>... tasks) { Throwable ex = null; int last = tasks.length - 1; for (int i = last; i >= 0; --i) { ForkJoinTask<?> t = tasks[i]; if (t == null) { if (ex == null) ex = new NullPointerException(); } else if (i != 0) //除了第一个都fork t.fork(); else if (t.doInvoke() < NORMAL && ex == null) //留一个自己执行 ex = t.getException(); } for (int i = 1; i <= last; ++i) { ForkJoinTask<?> t = tasks[i]; if (t != null) { if (ex != null) t.cancel(false); else if (t.doJoin() < NORMAL) ex = t.getException(); } } if (ex != null) rethrow(ex); }
结语
Fork/Join
就是利用了分治的思想组建的框架,平日里很多场景都能利用到分治思想。框架的核心ForkJoinPool
,因为含有任务队列和窃取的特性所以能更好的利用资源。
最后祝大家五一快乐!