JUC - ForkJoin

简介: JUC - ForkJoin

Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。

我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:

┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐

└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘

还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:

┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐

└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘

┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐

└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘

如果拆成两部分还是很大,我们还可以继续拆,用4个线程并行执行:

┌─┬─┬─┬─┬─┬─┐

└─┴─┴─┴─┴─┴─┘

┌─┬─┬─┬─┬─┬─┐

└─┴─┴─┴─┴─┴─┘

┌─┬─┬─┬─┬─┬─┐

└─┴─┴─┴─┴─┴─┘

┌─┬─┬─┬─┬─┬─┐

└─┴─┴─┴─┴─┴─┘

这就是Fork/Join任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。

我们来看如何使用Fork/Join对大数据进行并行求和:


import java.util.Random;
import java.util.concurrent.*;
public class Main {
    public static void main(String[] args) throws Exception {
        // 创建2000个随机数组成的数组:
        long[] array = new long[2000];
        long expectedSum = 0;
        for (int i = 0; i < array.length; i++) {
            array[i] = random();
            expectedSum += array[i];
        }
        System.out.println("Expected sum: " + expectedSum);
        // fork/join:
        ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
        long startTime = System.currentTimeMillis();
        Long result = ForkJoinPool.commonPool().invoke(task);
        long endTime = System.currentTimeMillis();
        System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
    }
    static Random random = new Random(0);
    static long random() {
        return random.nextInt(10000);
    }
}
class SumTask extends RecursiveTask<Long> {
    static final int THRESHOLD = 500;
    long[] array;
    int start;
    int end;
    SumTask(long[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }
    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            // 如果任务足够小,直接计算:
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += this.array[i];
                // 故意放慢计算速度:
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                }
            }
            return sum;
        }
        // 任务太大,一分为二:
        int middle = (end + start) / 2;
        System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
        SumTask subtask1 = new SumTask(this.array, start, middle);
        SumTask subtask2 = new SumTask(this.array, middle, end);
        invokeAll(subtask1, subtask2);
        Long subresult1 = subtask1.join();
        Long subresult2 = subtask2.join();
        Long result = subresult1 + subresult2;
        System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
        return result;
    }
}
// 输出
Expected sum: 9788366
split 0~2000 ==> 0~1000, 1000~2000
split 0~1000 ==> 0~500, 500~1000
split 1000~2000 ==> 1000~1500, 1500~2000
result = 2391591 + 2419573 ==> 4811164
result = 2485485 + 2491717 ==> 4977202
result = 4811164 + 4977202 ==> 9788366
Fork/join sum: 9788366 in 1091 ms.

观察上述代码的执行过程,一个大的计算任务0~2000首先分裂为两个小任务0~1000和1000~2000,这两个小任务仍然太大,继续分裂为更小的0~500,500~1000,1000~1500,1500~2000,最后,计算结果被依次合并,得到最终结果。

因此,核心代码SumTask继承自RecursiveTask,在compute()方法中,关键是如何“分裂”出子任务并且提交子任务:

class SumTask extends RecursiveTask<Long> {
    protected Long compute() {
        // “分裂”子任务:
        SumTask subtask1 = new SumTask(...);
        SumTask subtask2 = new SumTask(...);
        // invokeAll会并行运行两个子任务:
        invokeAll(subtask1, subtask2);
        // 获得子任务的结果:
        Long result1 = fork1.join();
        Long result2 = fork2.join();
        // 汇总结果:
        return result1 + result2;
    }
}

Fork/Join线程池在Java标准库中就有应用。Java标准库提供的java.util.Arrays.parallelSort(array)可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。

小结

Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。

ForkJoinPool线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTaskRecursiveAction

使用Fork/Join模式可以进行并行计算以提高效率。

目录
相关文章
|
9月前
|
存储 缓存 Java
JUC基础——线程池(2)
JUC基础——线程池
52 0
|
6月前
|
安全 Java 数据库
JUC第二十二讲:JUC线程池-FutureTask详解
JUC第二十二讲:JUC线程池-FutureTask详解
|
9月前
|
Java 编译器 调度
JUC是什么?
JUC是什么?
|
9月前
|
缓存 安全 Java
JUC基础——线程池(1)
JUC基础——线程池
79 0
|
10月前
|
Java
JUC
JUC
64 0
|
10月前
|
Java 调度
【JUC基础】13. 线程池(二)
我们继续前面的《【JUC基础】12.线程池(一)》。
|
11月前
|
缓存 Java 调度
【JUC基础】12. 线程池(一)
我们知道多线程的使用,是为了最大限度发挥现代多核处理器的计算能力,提高系统的吞吐量和性能。但是如果不加以控制和管理,随意使用多线程,对系统性能反而会有不利的影响。线程数量和系统CPU资源是息息相关的,随意使用甚至可能会耗尽系统CPU资源和内存资源。
|
11月前
|
消息中间件 资源调度 Java
【JUC基础】01. 初步认识JUC
前段时间,有朋友跟我说,能否写一些关于JUC的教程文章。本来呢,JUC也有在我的专栏计划之内,只是一直都还没空轮到他,那么既然有这样的一个契机,那就把JUC计划提前吧。那么今天就重点来初步认识一下什么是JUC,以及一些基本的JUC相关基础知识。
126 0
【JUC基础】01. 初步认识JUC
|
SpringCloudAlibaba 前端开发 Java
JUC系列(六) 线程池
线程池知识是多线程必备的一个技术,线程池极大的帮我们在业务中管理了线程资源
JUC系列(六) 线程池
|
Java 编译器 调度
什么是JUC
并发编程学习
461 0
什么是JUC