线程操纵术并行策略问题之ForkJoinTask提交任务的问题如何解决

简介: 线程操纵术并行策略问题之ForkJoinTask提交任务的问题如何解决

问题一:在下面的示例代码中,为什么要使用join()方法?它的作用是什么?

在下面的示例代码中,为什么要使用join()方法?它的作用是什么?

package learning.multithreading;
import java.util.Random;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;
public class ParallelSumComputationUsingForkJoin {    private static final int[] LARGE_ARR = largeArr();
    private static final int LENGTH = LARGE_ARR.length;
    public static void main(String[] args) throws ExecutionException, InterruptedException {        RecursiveSumTask recursiveTask = new RecursiveSumTask(0, LENGTH, LARGE_ARR);        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();        long start = System.currentTimeMillis();        long sum = forkJoinPool.invoke(recursiveTask);        System.out.println("The sum is : "                + sum                + ", Time Taken by Parallel(Fork/Join) Execution: "                + (System.currentTimeMillis() - start) + " millis");    }
    private static int[] largeArr() {        return new Random().ints(500000000, 10, 1000).toArray();    }
    static class RecursiveSumTask extends RecursiveTask<Long> {
        private static final int SEQUENTIAL_COMPUTE_THRESHOLD = 4000;        private final int startIndex;        private final int endIndex;        private final int[] data;
        RecursiveSumTask(int startIndex, int endIndex, int[] data) {            this.startIndex = startIndex;            this.endIndex = endIndex;            this.data = data;        }
        @Override        protected Long compute() {            if (SEQUENTIAL_COMPUTE_THRESHOLD >= (endIndex - startIndex)) {                long sum = 0;                for (int i = startIndex; i < endIndex; i++) {                    sum += data[i];                }                return sum;            }            int mid = startIndex + (endIndex - startIndex) / 2;            RecursiveSumTask leftSumTask = new RecursiveSumTask(startIndex, mid, data);            RecursiveSumTask rightSumTask = new RecursiveSumTask(mid, endIndex, data);            leftSumTask.fork(); // Fork the Left Task in a Separate Execution            long rightSum = rightSumTask.compute(); // Compute the Right Part            long leftSum = leftSumTask.join(); // Wait for the results from the Left Part            return leftSum + rightSum; // Return Both        }    }}/** * Output: * The sum is : 252235235953, Time Taken by Parallel(Fork/Join) Execution: 139 millis *


参考回答:

join()方法被用于等待左侧子任务的完成,并获取其结果。join()是阻塞调用,调用线程会等待左侧子任务执行完成后才继续执行,从而确保在合并结果之前,所有的子任务都已经完成。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/625186


问题二:在下面的示例代码中,SEQUENTIAL_COMPUTE_THRESHOLD变量起什么作用?

在下面的示例代码中,SEQUENTIAL_COMPUTE_THRESHOLD变量起什么作用?

package learning.multithreading;
import java.util.Random;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;
public class ParallelSumComputationUsingForkJoin { private static final int[] LARGE_ARR = largeArr();
private static final int LENGTH = LARGE_ARR.length;
public static void main(String[] args) throws ExecutionException, InterruptedException { RecursiveSumTask recursiveTask = new RecursiveSumTask(0, LENGTH, LARGE_ARR); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); long start = System.currentTimeMillis(); long sum = forkJoinPool.invoke(recursiveTask); System.out.println("The sum is : " + sum + ", Time Taken by Parallel(Fork/Join) Execution: " + (System.currentTimeMillis() - start) + " millis"); }
private static int[] largeArr() { return new Random().ints(500000000, 10, 1000).toArray(); }
static class RecursiveSumTask extends RecursiveTask<Long> {
private static final int SEQUENTIAL_COMPUTE_THRESHOLD = 4000; private final int startIndex; private final int endIndex; private final int[] data;
RecursiveSumTask(int startIndex, int endIndex, int[] data) { this.startIndex = startIndex; this.endIndex = endIndex; this.data = data; }
@Override protected Long compute() { if (SEQUENTIAL_COMPUTE_THRESHOLD >= (endIndex - startIndex)) { long sum = 0; for (int i = startIndex; i < endIndex; i++) { sum += data[i]; } return sum; } int mid = startIndex + (endIndex - startIndex) / 2; RecursiveSumTask leftSumTask = new RecursiveSumTask(startIndex, mid, data); RecursiveSumTask rightSumTask = new RecursiveSumTask(mid, endIndex, data); leftSumTask.fork(); // Fork the Left Task in a Separate Execution long rightSum = rightSumTask.compute(); // Compute the Right Part long leftSum = leftSumTask.join(); // Wait for the results from the Left Part return leftSum + rightSum; // Return Both } }}/** * Output: * The sum is : 252235235953, Time Taken by Parallel(Fork/Join) Execution: 139 millis *


参考回答:

SEQUENTIAL_COMPUTE_THRESHOLD变量在示例代码中设定了一个阈值,当需要处理的数组元素数量小于这个阈值时,任务将不再进行进一步的拆分,而是直接顺序计算。这是为了避免过度拆分任务造成的额外开销,提高计算效率。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/625188


问题三:ForkJoinTask可以通过哪些方法来提交任务?

ForkJoinTask可以通过哪些方法来提交任务?


参考回答:

可以使用execute()、invoke()和submit()方法来提交任务。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/625189


问题四:ForkJoinPool中任务是如何排队的?

ForkJoinPool中任务是如何排队的?


参考回答:

ForkJoinPool维护一个全局共享队列,所有提交的任务都会在这个共享队列中排队。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/625190


问题五:在ForkJoinPool模型中,工作线程如何从多个来源检查任务?

在ForkJoinPool模型中,工作线程如何从多个来源检查任务?


参考回答:

工作线程会检查来自全局共享队列、本地工作窃取队列以及其他线程的工作窃取队列的任务。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/625191

相关文章
|
28天前
|
Java 测试技术 PHP
父子任务使用不当线程池死锁怎么解决?
在Java多线程编程中,线程池有助于提升性能与资源利用效率,但若父子任务共用同一池,则可能诱发死锁。本文通过一个具体案例剖析此问题:在一个固定大小为2的线程池中,父任务直接调用`outerTask`,而`outerTask`再次使用同一线程池异步调用`innerTask`。理论上,任务应迅速完成,但实际上却超时未完成。经由`jstack`输出的线程调用栈分析发现,线程陷入等待状态,形成“死锁”。原因是子任务需待父任务完成,而父任务则需等待子任务执行完毕以释放线程,从而相互阻塞。此问题在测试环境中不易显现,常在生产环境下高并发时爆发,重启或扩容仅能暂时缓解。
|
20天前
|
数据采集 XML JavaScript
C# 中 ScrapySharp 的多线程下载策略
C# 中 ScrapySharp 的多线程下载策略
|
28天前
|
算法 Java
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
该博客文章综合介绍了Java并发编程的基础知识,包括线程与进程的区别、并发与并行的概念、线程的生命周期状态、`sleep`与`wait`方法的差异、`Lock`接口及其实现类与`synchronized`关键字的对比,以及生产者和消费者问题的解决方案和使用`Condition`对象替代`synchronized`关键字的方法。
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
|
13天前
|
监控 负载均衡 算法
线程数突增!领导说再这么写就GC掉我:深入理解与优化策略
【8月更文挑战第29天】在软件开发的世界里,性能优化总是开发者们绕不开的话题。特别是当面对“线程数突增”这样的紧急情况时,更是考验着我们的技术功底和问题解决能力。今天,我们就来深入探讨这一话题,分享一些工作学习中积累的技术干货,帮助大家避免被“GC”(垃圾回收,也常用来幽默地表示“被炒鱿鱼”)的尴尬。
32 2
|
20天前
|
存储 监控 Java
|
11天前
|
前端开发 JavaScript 大数据
React与Web Workers:开启前端多线程时代的钥匙——深入探索计算密集型任务的优化策略与最佳实践
【8月更文挑战第31天】随着Web应用复杂性的提升,单线程JavaScript已难以胜任高计算量任务。Web Workers通过多线程编程解决了这一问题,使耗时任务独立运行而不阻塞主线程。结合React的组件化与虚拟DOM优势,可将大数据处理等任务交由Web Workers完成,确保UI流畅。最佳实践包括定义清晰接口、加强错误处理及合理评估任务特性。这一结合不仅提升了用户体验,更为前端开发带来多线程时代的全新可能。
19 0
|
20天前
|
Java 调度
|
27天前
|
Cloud Native Java 调度
项目环境测试问题之线程同步器会造成执行完任务的worker等待的情况如何解决
项目环境测试问题之线程同步器会造成执行完任务的worker等待的情况如何解决
|
28天前
|
算法 Java
JDK版本特性问题之想控制 G1 垃圾回收器的并行工作线程数量,如何解决
JDK版本特性问题之想控制 G1 垃圾回收器的并行工作线程数量,如何解决
|
16天前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
43 1