Java8 - 一文搞定Fork/Join 框架

简介: Java8 - 一文搞定Fork/Join 框架

20200510181139786.png


概述


分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。

它是 ExecutorService 接口的一个实现,它把子任务分配给线程池(称为 ForkJoinPool )中的工作线程。


CPU密集型 vs IO密集型


通常来讲,任务可以划分为计算密集型和IO密集型


计算密集型任务


特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。


这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。


计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。


IO密集型


涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。


对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。


IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少 。


简单示例


来看个最简单的求和

public class ForkJoinTest {
    private static int[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
    public static void main(String[] args) {
        System.out.println("result=> " + calc());
        System.out.println("result=> " + calcByStream());
    }
    private static int calc() {
        int result = 0;
        for (int i = 0; i < data.length; i++) {
            result += data[i];
        }
        return result;
    }
    private static Long calcByStream() {
        return LongStream.rangeClosed(0,10).reduce(0, Long::sum);
    }
 }


Fork/Join常用的类


  • ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制 。


通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:


RecursiveAction:用于没有返回结果的任务。 比如写数据到磁盘,然后就退出了。 一个RecursiveAction可以把自己的工作分割成更小的几块, 这样它们可以由独立的线程或者CPU执行。 我们可以通过继承来实现一个RecursiveAction


RecursiveTask :用于有返回结果的任务。 可以将自己的工作分割为若干更小任务,并将这些子任务的执行合并到一个集体结果。 可以有几个水平的分割和合并


CountedCompleter: 在任务完成执行后会触发执行一个自定义的钩子函数


ForkJoinPool :ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

20210328233620942.png


RecursiveTask 实现 并行计算


要把任务提交到这个池,必须创建 RecursiveTask<R> 的一个子类,其中 R 是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是 RecursiveAction 类型 。

要定义 RecursiveTask, 只需实现它唯一的抽象方法compute

protected abstract R compute();

这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。

if (任务足够小或不可分) {
  顺序计算该任务
} else {
  将任务分成两个子任务
  递归调用本方法,拆分每个子任务,等待所有子任务完成
  合并每个子任务的结果
}


一般来说并没有确?的标准决定一个任务是否应该再拆分,但有几种试探方法可以帮助你


2021032919164647.png


事实上,这只不过是著名的分治算法的并行版本而已。

20210329191754929.png


现在编写一个方法来并行对前n个自然数求和就很简单了。你只需把想要的数字数组传给ForkJoinSumCalculator 的构造函数:

public static long forkJoinSum(long n) {
  long[] numbers = LongStream.rangeClosed(1, n).toArray();
  ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
  return new ForkJoinPool().invoke(task);
}



这里用了一个 LongStream 来生成包含前n个自然数的数组,然后创建一个 ForkJoinTask( RecursiveTask 的父类),并把数组传递给 ForkJoinSumCalculator 的公共构造函数。


最后,创建了一个新的 ForkJoinPool ,并把任务传给它的调用方法 。在ForkJoinPool 中执行时,最后一个方法返回的值就是 ForkJoinSumCalculator 类定义的任务结果。


在实际应用时,使用多个 ForkJoinPool 是没有什么意义的。正是出于这个原因,一般来说把它实例化一次,然后把实例保存在静态字段中,使之成为单例,这样就可以在软件中任何部分方便地重用了。这里创建时用了其默认的无参数构造函数,这意味着想让线程池使用JVM能够使用的所有处理器。更确切地说,该构造函数将使用 Runtime.availableProcessors 的返回值来决定线程池使用的线程数。请注意 availableProcessors 方法虽然看起来是处理器,但它实际上返回的是可用内核的数量,包括超线程生成的虚拟内核。


当把 ForkJoinSumCalculator 任务传给 ForkJoinPool 时,这个任务就由池中的一个线程执行,这个线程会调用任务的 compute 方法。


该方法会检查任务是否小到足以顺序执行,如果不够小则会把要求和的数组分成两半,分给两个新的 ForkJoinSumCalculator ,而它们也由ForkJoinPool 安排执行。


因此,这一过程可以递归重复,把原任务分为更小的任务,直到满足不方便或不可能再进一步拆分的条件(本例中是求和的项目数小于等于10 000)。


这时会顺序计算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。


2021032919241916.png


package com.artisan.java8;
import java.util.concurrent.RecursiveTask;
public class AccumulatorRecursiveTask extends RecursiveTask<Integer> {
    private final int start;
    private final int end;
    private final int[] data;
    private final int LIMIT = 3;
    public AccumulatorRecursiveTask(int start, int end, int[] data) {
        this.start = start;
        this.end = end;
        this.data = data;
    }
    @Override
    protected Integer compute() {
        if ((end - start) <= LIMIT) {
            int result = 0;
            for (int i = start; i < end; i++) {
                result += data[i];
            }
            return result;
        }
        int mid = (start + end) / 2;
        AccumulatorRecursiveTask left = new AccumulatorRecursiveTask(start, mid, data);
        AccumulatorRecursiveTask right = new AccumulatorRecursiveTask(mid, end, data);
        left.fork();
        Integer rightResult = right.compute();
        Integer leftResult = left.join();
        return rightResult + leftResult;
    }
}


RecursiveAction

package com.artisan.java8;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.atomic.AtomicInteger;
public class AccumulatorRecursiveAction extends RecursiveAction {
    private final int start;
    private final int end;
    private final int[] data;
    private final int LIMIT = 3;
    public AccumulatorRecursiveAction(int start, int end, int[] data) {
        this.start = start;
        this.end = end;
        this.data = data;
    }
    @Override
    protected void compute() {
        if ((end - start) <= LIMIT) {
            for (int i = start; i < end; i++) {
                AccumulatorHelper.accumulate(data[i]);
            }
        } else {
            int mid = (start + end) / 2;
            AccumulatorRecursiveAction left = new AccumulatorRecursiveAction(start, mid, data);
            AccumulatorRecursiveAction right = new AccumulatorRecursiveAction(mid, end, data);
            left.fork();
            right.fork();
            left.join();
            right.join();
        }
    }
    static class AccumulatorHelper {
        private static final AtomicInteger result = new AtomicInteger(0);
        static void accumulate(int value) {
            result.getAndAdd(value);
        }
        public static int getResult() {
            return result.get();
        }
        static void rest() {
            result.set(0);
        }
    }
}


Fork/Join执行流程


202103282344056.png


最佳实践


虽然分支/合并框架还算简单易用,不幸的是它也很容易被误用


对一个任务调用 join 方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动。

不应该在 RecursiveTask 内部使用 ForkJoinPool 的 invoke 方法。相反,你应该始终直接调用 compute 或 fork 方法,只有顺序代码才应该用 invoke 来启动并行计算。

相关文章
|
1天前
|
设计模式 算法 Java
[设计模式Java实现附plantuml源码~行为型]定义算法的框架——模板方法模式
[设计模式Java实现附plantuml源码~行为型]定义算法的框架——模板方法模式
|
1天前
|
Java Nacos 开发者
Java从入门到精通:4.2.1学习新技术与框架——以Spring Boot和Spring Cloud Alibaba为例
Java从入门到精通:4.2.1学习新技术与框架——以Spring Boot和Spring Cloud Alibaba为例
|
1天前
|
Dubbo Java 应用服务中间件
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
|
6天前
|
Java Maven 开发工具
《Java 简易速速上手小册》第5章:Java 开发工具和框架(2024 最新版)
《Java 简易速速上手小册》第5章:Java 开发工具和框架(2024 最新版)
26 1
|
10天前
|
Java 大数据 云计算
Spring框架:Java后台开发的核心
【4月更文挑战第15天】Spring框架在Java后台开发中占据核心位置,因其控制反转(IoC)、面向切面编程(AOP)、事务管理等特性提升效率和质量。Spring提供数据访问集成、RESTful Web服务和WebSocket支持。优势包括高效开发、灵活扩展、强大生态圈和广泛应用。应用于企业级应用、微服务架构及云计算大数据场景。掌握Spring对Java开发者至关重要。
|
13天前
|
存储 Java 编译器
Java集合丛林:深入了解集合框架的秘密
Java集合丛林:深入了解集合框架的秘密
15 0
Java集合丛林:深入了解集合框架的秘密
|
16天前
|
存储 Java 数据库连接
java使用mp持久化框架,写入5000个字符,但是VARCHAR(255) 会报错
使用Java的MyBatis Plus框架时,如果尝试将超过VARCHAR(255)限制的字符串(如5000个字符)存入数据库,会抛出异常。解决方法是将列类型改为TEXT。可通过在实体类属性上添加`@TableField(typeHandler = JdbcType.CLOB)`注解,如`private String content;`,将属性映射到CLOB类型列,以存储更长字符串。
9 0
|
17天前
|
存储 Java
java反射——设计框架的灵魂
java反射——设计框架的灵魂
|
22天前
|
前端开发 安全 Java
使用Java Web框架:Spring MVC的全面指南
【4月更文挑战第3天】Spring MVC是Spring框架的一部分,用于构建高效、模块化的Web应用。它基于MVC模式,支持多种视图技术。核心概念包括DispatcherServlet(前端控制器)、HandlerMapping(请求映射)、Controller(处理请求)、ViewResolver(视图解析)和ModelAndView(模型和视图容器)。开发流程涉及配置DispatcherServlet、定义Controller、创建View、处理数据、绑定模型和异常处理。
使用Java Web框架:Spring MVC的全面指南
|
24天前
|
Java 关系型数据库 数据库连接
52 类 110 个常用 Java 组件和框架整理
52 类 110 个常用 Java 组件和框架整理
16 0