java 中的fork join框架

简介: Java中的Fork Join框架于Java 7引入,旨在提升并行计算能力。它通过“分而治之”的思想,将大任务拆分为多个小任务(fork),再将结果合并(join)。核心组件包括:ForkJoinPool(管理线程池和工作窃取机制)、ForkJoinWorkerThread(执行具体任务的工作线程)和ForkJoinTask(定义任务逻辑,常用子类为RecursiveAction和RecursiveTask)。框架支持通过invoke、fork/join等方式提交任务,广泛应用于高性能并发场景。

java 中的fork join框架

fork join框架是java 7中引入框架,这个框架的引入主要是为了提升并行计算的能力。

fork join主要有两个步骤,第一就是fork,将一个大任务分成很多个小任务,第二就是join,将第一个任务的结果join起来,生成最后的结果。如果第一步中并没有任何返回值,join将会等到所有的小任务都结束。

还记得之前的文章我们讲到了thread pool的基本结构吗?

  1. ExecutorService - ForkJoinPool 用来调用任务执行。
  2. workerThread - ForkJoinWorkerThread 工作线程,用来执行具体的任务。
  3. task - ForkJoinTask 用来定义要执行的任务。

下面我们从这三个方面来详细讲解fork join框架。

ForkJoinPool

ForkJoinPool是一个ExecutorService的一个实现,它提供了对工作线程和线程池的一些便利管理方法。

scala

代码解读

复制代码

public class ForkJoinPool extends AbstractExecutorService 

一个work thread一次只能处理一个任务,但是ForkJoinPool并不会为每个任务都创建一个单独的线程,它会使用一个特殊的数据结构double-ended queue来存储任务。这样的结构可以方便的进行工作窃取(work-stealing)。

什么是work-stealing呢?

默认情况下,work thread从分配给自己的那个队列头中取出任务。如果这个队列是空的,那么这个work thread会从其他的任务队列尾部取出任务来执行,或者从全局队列中取出。这样的设计可以充分利用work thread的性能,提升并发能力。

下面看下怎么创建一个ForkJoinPool。

最常见的方法就是使用ForkJoinPool.commonPool()来创建,commonPool()为所有的ForkJoinTask提供了一个公共默认的线程池。

ini

代码解读

复制代码

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

另外一种方式是使用构造函数:

ini

代码解读

复制代码

ForkJoinPool forkJoinPool = new ForkJoinPool(2);

这里的参数是并行级别,2指的是线程池将会使用2个处理器核心。

ForkJoinWorkerThread

ForkJoinWorkerThread是使用在ForkJoinPool的工作线程。

scala

代码解读

复制代码

public class ForkJoinWorkerThread extends Thread
}

和一般的线程不一样的是它定义了两个变量:

arduino

代码解读

复制代码

    final ForkJoinPool pool;                // the pool this thread works in
    final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

一个是该worker thread所属的ForkJoinPool。 另外一个是支持 work-stealing机制的Queue。

再看一下它的run方法:

ini

代码解读

复制代码

   public void run() {
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
                onStart();
                pool.runWorker(workQueue);
            } catch (Throwable ex) {
                exception = ex;
            } finally {
                try {
                    onTermination(exception);
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    pool.deregisterWorker(this, exception);
                }
            }
        }
    }

简单点讲就是从Queue中取出任务执行。

ForkJoinTask

ForkJoinTask是ForkJoinPool中运行的任务类型。通常我们会用到它的两个子类:RecursiveAction和RecursiveTask。

他们都定义了一个需要实现的compute()方法用来实现具体的业务逻辑。不同的是RecursiveAction只是用来执行任务,而RecursiveTask可以有返回值。

既然两个类都带了Recursive,那么具体的实现逻辑也会跟递归有关,我们举个使用RecursiveAction来打印字符串的例子:

java

代码解读

复制代码

public class CustomRecursiveAction extends RecursiveAction {

    private String workload = "";
    private static final int THRESHOLD = 4;

    private static Logger logger =
            Logger.getAnonymousLogger();

    public CustomRecursiveAction(String workload) {
        this.workload = workload;
    }

    @Override
    protected void compute() {
        if (workload.length() > THRESHOLD) {
            ForkJoinTask.invokeAll(createSubtasks());
        } else {
            processing(workload);
        }
    }

    private List<CustomRecursiveAction> createSubtasks() {
        List<CustomRecursiveAction> subtasks = new ArrayList<>();

        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2, workload.length());

        subtasks.add(new CustomRecursiveAction(partOne));
        subtasks.add(new CustomRecursiveAction(partTwo));

        return subtasks;
    }

    private void processing(String work) {
        String result = work.toUpperCase();
        logger.info("This result - (" + result + ") - was processed by "
                + Thread.currentThread().getName());
    }
}

上面的例子使用了二分法来打印字符串。

我们再看一个RecursiveTask的例子:

csharp

代码解读

复制代码

public class CustomRecursiveTask extends RecursiveTask<Integer> {
    private int[] arr;

    private static final int THRESHOLD = 20;

    public CustomRecursiveTask(int[] arr) {
        this.arr = arr;
    }

    @Override
    protected Integer compute() {
        if (arr.length > THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
                    .stream()
                    .mapToInt(ForkJoinTask::join)
                    .sum();
        } else {
            return processing(arr);
        }
    }

    private Collection<CustomRecursiveTask> createSubtasks() {
        List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomRecursiveTask(
                Arrays.copyOfRange(arr, 0, arr.length / 2)));
        dividedTasks.add(new CustomRecursiveTask(
                Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
        return dividedTasks;
    }

    private Integer processing(int[] arr) {
        return Arrays.stream(arr)
                .filter(a -> a > 10 && a < 27)
                .map(a -> a * 10)
                .sum();
    }
}

和上面的例子很像,不过这里我们需要有返回值。

在ForkJoinPool中提交Task

有了上面的两个任务,我们就可以在ForkJoinPool中提交了:

ini

代码解读

复制代码

int[] intArray= {12,12,13,14,15};
        CustomRecursiveTask customRecursiveTask= new CustomRecursiveTask(intArray);

        int result = forkJoinPool.invoke(customRecursiveTask);
        System.out.println(result);

上面的例子中,我们使用invoke来提交,invoke将会等待任务的执行结果。

如果不使用invoke,我们也可以将其替换成fork()和join():

ini

代码解读

复制代码

customRecursiveTask.fork();
        int result2= customRecursiveTask.join();
        System.out.println(result2);

fork() 是将任务提交给pool,但是并不触发执行, join()将会真正的执行并且得到返回结果。


转载来源:https://juejin.cn/post/7091952850879643655

目录
打赏
0
0
0
0
171
分享
相关文章
Java 集合框架中的老炮与新秀:HashTable 和 HashMap 谁更胜一筹?
嗨,大家好,我是技术伙伴小米。今天通过讲故事的方式,详细介绍 Java 中 HashMap 和 HashTable 的区别。从版本、线程安全、null 值支持、性能及迭代器行为等方面对比,帮助你轻松应对面试中的经典问题。HashMap 更高效灵活,适合单线程或需手动处理线程安全的场景;HashTable 较古老,线程安全但性能不佳。现代项目推荐使用 ConcurrentHashMap。关注我的公众号“软件求生”,获取更多技术干货!
63 3
Java机器学习实战:基于DJL框架的手写数字识别全解析
在人工智能蓬勃发展的今天,Python凭借丰富的生态库(如TensorFlow、PyTorch)成为AI开发的首选语言。但Java作为企业级应用的基石,其在生产环境部署、性能优化和工程化方面的优势不容忽视。DJL(Deep Java Library)的出现完美填补了Java在深度学习领域的空白,它提供了一套统一的API,允许开发者无缝对接主流深度学习框架,将AI模型高效部署到Java生态中。本文将通过手写数字识别的完整流程,深入解析DJL框架的核心机制与应用实践。
19 2
|
1月前
|
java语言后台管理ruoyi后台管理框架-登录提示“无效的会话,或者会话已过期,请重新登录。”-扩展知识数据库中密码加密的方法-问题如何解决-以及如何重置若依后台管理框架admin密码-优雅草卓伊凡
java语言后台管理ruoyi后台管理框架-登录提示“无效的会话,或者会话已过期,请重新登录。”-扩展知识数据库中密码加密的方法-问题如何解决-以及如何重置若依后台管理框架admin密码-优雅草卓伊凡
158 3
java语言后台管理ruoyi后台管理框架-登录提示“无效的会话,或者会话已过期,请重新登录。”-扩展知识数据库中密码加密的方法-问题如何解决-以及如何重置若依后台管理框架admin密码-优雅草卓伊凡
SaaS云计算技术的智慧工地源码,基于Java+Spring Cloud框架开发
智慧工地源码基于微服务+Java+Spring Cloud +UniApp +MySql架构,利用传感器、监控摄像头、AI、大数据等技术,实现施工现场的实时监测、数据分析与智能决策。平台涵盖人员、车辆、视频监控、施工质量、设备、环境和能耗管理七大维度,提供可视化管理、智能化报警、移动智能办公及分布计算存储等功能,全面提升工地的安全性、效率和质量。
Java中的Fork/Join框架详解
Fork/Join框架是Java并行计算的强大工具,尤其适用于需要将任务分解为子任务的场景。通过正确使用Fork/Join框架,可以显著提升应用程序的性能和响应速度。在实际应用中,应结合具体需求选择合适的任务拆分策略,以最大化并行计算的效率。
68 23
Java多线程初学者指南(5):join方法的使用
本文为原创,如需转载,请注明作者和出处,谢谢!     在上面的例子中多次使用到了Thread类的join方法。我想大家可能已经猜出来join方法的功能是什么了。
794 0
|
24天前
|
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
154 60
【Java并发】【线程池】带你从0-1入门线程池
Java网络编程,多线程,IO流综合小项目一一ChatBoxes
**项目介绍**:本项目实现了一个基于TCP协议的C/S架构控制台聊天室,支持局域网内多客户端同时聊天。用户需注册并登录,用户名唯一,密码格式为字母开头加纯数字。登录后可实时聊天,服务端负责验证用户信息并转发消息。 **项目亮点**: - **C/S架构**:客户端与服务端通过TCP连接通信。 - **多线程**:采用多线程处理多个客户端的并发请求,确保实时交互。 - **IO流**:使用BufferedReader和BufferedWriter进行数据传输,确保高效稳定的通信。 - **线程安全**:通过同步代码块和锁机制保证共享数据的安全性。
64 23
|
19天前
|
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
当我们创建一个`ThreadPoolExecutor`的时候,你是否会好奇🤔,它到底发生了什么?比如:我传的拒绝策略、线程工厂是啥时候被使用的? 核心线程数是个啥?最大线程数和它又有什么关系?线程池,它是怎么调度,我们传入的线程?...不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界...
90 0
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码