概念
Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型 运算
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计 算,如归并排序、斐波那契数列、都可以用分治思想进行求解
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运 算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
关键点
Fork/Join(分而治之)是一种并行计算模型,用于解决递归性的、可以被分解为更小子任务的问题。它是 Java 7 引入的一个特性,并在 Java.util.concurrent 包中提供了相应的类来支持。
Fork/Join 模型基于以下两个关键操作:
- Fork(分解):将一个大任务分解为若干个小任务,并把这些小任务放入工作队列中,等待被执行。
- Join(合并):对小任务的执行结果进行合并,得到大任务的最终结果。
Fork/Join 模型的核心思想是递归地将问题划分为更小的子问题,直到子问题足够简单,可以直接求解。然后通过合并子问题的结果,最终得到原始问题的解。
在 Java 中,Fork/Join 模型的实现主要依赖于以下两个类:
- ForkJoinPool:是线程池的实现,用于管理任务的执行。它允许创建一个工作线程组,每个线程都有自己的工作队列。任务会被分发给空闲的工作线程执行,如果一个线程的工作队列为空,它可以从其他线程的队列中窃取任务来执行。
- RecursiveTask 和 RecursiveAction:这两个抽象类是用来表示可分解的任务的。RecursiveTask 用于返回结果的任务,而 RecursiveAction 则用于不返回结果的任务。我们需要继承这些类,并实现 compute() 方法来执行任务划分和合并操作。
典型的使用场景包括在计算密集型任务中,例如大规模数据处理、图像处理、并行排序等。
Fork/Join 模型利用任务的递归分解和合并,能够充分利用多核处理器的性能,并提供了一种简洁高效的并行计算方式。
使用
提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下 面定义了一个对 1~n 之间的整数求和的任务
1. class AddTask extends RecursiveTask<Integer> { 2. 3. int n; 4. 5. public AddTask(int n) { 6. this.n = n; 7. } 8. 9. @Override 10. 11. public String toString() { 12. return "{" + n + '}'; 13. } 14. 15. @Override 16. protected Integer compute() { 17. // 如果 n 已经为 1,可以求得结果了 18. 19. if (n == 1) { 20. System.out.println("join()"+n); 21. 22. return n; 23. } 24. 25. // 将任务进行拆分(fork) 26. 27. AddTask t1 = new AddTask(n - 1); 28. t1.fork(); 29. System.out.println("fork()"+n+" "+ t1); 30. 31. // 合并(join)结果 32. 33. int result = n + t1.join(); 34. System.out.println("join()"+n+"+"+t1+"="+result ); 35. return result; 36. } 37. 38. }
然后提交给 ForkJoinPool 来执行
1. public class Test { 2. public static void main(String[] args) { 3. ForkJoinPool pool = new ForkJoinPool(4); 4. System.out.println(pool.invoke(new AddTask(5))); 5. 6. } 7. 8. }
输出
fork()3 {2}
fork()5 {4}
fork()2 {1}
fork()4 {3}
join()1
join()2+{1}=3
join()3+{2}=6
join()4+{3}=10
join()5+{4}=15
15
用图来表示
改进
1. class AddTask3 extends RecursiveTask<Integer> { 2. 3. int begin; 4. int end; 5. 6. public AddTask3(int begin, int end) { 7. this.begin = begin; 8. this.end = end; 9. } 10. 11. @Override 12. 13. public String toString() { 14. return "{" + begin + "," + end + '}'; 15. } 16. 17. @Override 18. 19. protected Integer compute() { 20. // 5, 5 21. if (begin == end) { 22. log.debug("join() {}", begin); 23. return begin; 24. } 25. // 4, 5 26. if (end - begin == 1) { 27. log.debug("join() {} + {} = {}", begin, end, end + begin); 28. return end + begin; 29. } 30. 31. // 1 5 32. int mid = (end + begin) / 2; 33. 34. // 3 35. AddTask3 t1 = new AddTask3(begin, mid); 36. 37. // 1,3 38. t1.fork(); 39. AddTask3 t2 = new AddTask3(mid + 1, end); 40. // 4,5 41. t2.fork(); 42. log.debug("fork() {} + {} = ?", t1, t2); 43. int result = t1.join() + t2.join(); 44. log.debug("join() {} + {} = {}", t1, t2, result); 45. return result; 46. } 47. }
然后提交给 ForkJoinPool 来执行
1. public static void main(String[] args) { 2. ForkJoinPool pool = new ForkJoinPool(4); 3. System.out.println(pool.invoke(new AddTask3(1, 10))); 4. }
结果
[ForkJoinPool-1-worker-0] - join() 1 + 2 = 3
[ForkJoinPool-1-worker-3] - join() 4 + 5 = 9
[ForkJoinPool-1-worker-0] - join() 3
[ForkJoinPool-1-worker-1] - fork() {1,3} + {4,5} = ?
[ForkJoinPool-1-worker-2] - fork() {1,2} + {3,3} = ?
[ForkJoinPool-1-worker-2] - join() {1,2} + {3,3} = 6
[ForkJoinPool-1-worker-1] - join() {1,3} + {4,5} = 15 15
用图来表示