ForkJoin(分支合并)

简介: ForkJoin(分支合并)

ForkJoin(分支合并)

fork():分支(拆分)   join():合并 是ForkJoin种两个真实的方法

什么是ForkJoin?

它可以根据程序进行调节 下面代码会举例

ForkJoin是JDK1.7之后出现的,一般用来并行执行任务,提高效率,尤其是大数据量的情况下,它是一个线程并发成多个去操作的,主要思想是把大任务拆分为若干个小任务,然后由小任务分别去执行,最后汇总结果

image.png


ForkJoin的特点:工作窃取

这个里面维护的都是双端队列(可以从两个方向自由插入队列去执行任务,而不是传统的只能从左往右)

工作窃取:比如现在有两个线程线程A和线程B,他们分别执行子任务,如果线程A还没有执行完,但是线程B已经执行完毕,此时线程B不会等待线程A执行完毕,而是会把线程A没有执行完的任务偷过来执行,从而提高效率,当然也有弊端,就是线程B执行完毕去抢线程A的资源,但是线程A也执行到这了,就会造成竞争,当然利大于弊,只有大数据量的情况下才会使用它

操作ForkJoin,如何使用它

大概步骤如下所示

1 ForkJoinPool 通过它来执行

2 新建一个计算任务 通过 ForkJoinPool.execute(ForkJoinTask task)

3 计算类继承 RecursiveTask (extends RecursiveTask)

4 通过ForkJoinTask下的 RecursiveAction(递归事件 没有返回值 )  RecursiveTask(递归任务 有返回值)

5 调用ForkJoinTask的submit方法或者execute方法去执行(提交任务) ( submit是异步的  有返回值 execute是同步的 没有返回值)

代码测试


package com.wyh.ForkJoin;
/**
 * @program: JUC
 * @description: 分支合并  求和计算
 * @author: 魏一鹤
 * @createDate: 2022-03-07 22:06
 **/
import java.util.concurrent.RecursiveTask;
/**
 * 如何使用ForkJoin 其实就以下步骤
 * 1  ForkJoinPool 通过它来执行
 * 2  新建一个计算任务 通过 ForkJoinPool.execute(ForkJoinTask task)
 * 3  计算类继承 RecursiveTask  extends RecursiveTask
 * 4  通过ForkJoinTask下的 RecursiveAction(递归事件 没有返回值 )  RecursiveTask(递归任务 有返回值)
**/
//继承RecursiveTask递归任务
public class ForkJoinDemo01 extends RecursiveTask<Long> {
//最小值 用long数据类型要比int大 int最多到20亿
    private long min;
//最大值
    private long max;
//构造器
    public ForkJoinDemo01(long min,long max) {
this.min = min;
this.max = max;
    }
//临界值 超过这个值分成两个任务  1万
    //它可以根据程序进行调节
    private long temp=10000L;
//实现递归任务的compute计算方法
    @Override
protected Long compute() {
//如果最大值减去最小值小于临界值 走普通计算  不需要分支合并
        if((max-min)<temp){
//用于计算总数的变量
            Long sum=0L;
//循环1亿之间的数求和
            for (Long i = min; i <= max; i++) {
                sum+=i;
            }
//计算总数完毕 进行返回
          return sum;
        }//否则走分支合并 ForkJoin
        else{
//计算最大值和最小值的中间值
            long mid = (max + min) / 2;
//创建计算任务
            //把一个大任务拆分为两个小任务
            //第一个任务
            ForkJoinDemo01 task1 = new ForkJoinDemo01(min, mid);
            task1.fork(); //拆分任务 把任务压入线程队列
            //第二个任务
            ForkJoinDemo01 task2 = new ForkJoinDemo01(mid+1,max);
            task2.fork(); //拆分任务 把任务压入线程队列
            //合并结果
            return  task1.join() + task2.join();
        }
    }
}


不同的方式去求和


  1. 异步回调(future 准确来说是CompletableFuture)

发起这个请求后,不一定非要等待结果(不用一直阻塞等待结果)和服务器和客户端通信的ajax一样的道理

如何使用?

  1. 使用Future准确来说是CompletableFuture来操作
  2. 使用CompletableFuture的supplyAsync(有返回类型回调)或者runAsync(没有返回值回调)去异步回调任务 通过get方法阻塞获取执行结果 get需要抛异常
  3. 使用completableFuture的whenComplete(t,u)成功回调 t=正常的返回结果  u=如果出错就是异常信息
  4. 使用completableFuture的exceptionally(e) e是失败回调返回的错误信息 通过e.getMessage()来接受

代码测试  

有/没有返回值的异步回调 成功回调/异步回调


package com.wyh.ForkJoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
/**
 * @program: JUC
 * @description: ForkJoin测试
 * @author: 魏一鹤
 * @createDate: 2022-03-07 22:39
 **/
public class ForkJoinTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1(); //sum=500000000500000000,使用时间:272
        test2();  //sum=500000000500000000,使用时间:4772
        test3(); //sum=500000000500000000,耗费时间:113
    }
   //方式1 普通求和
    public static void test1(){
//求和变量
        long sum=0;
//开始时间
        long start = System.currentTimeMillis();
for (int i = 1; i <= 10_0000_0000; i++) {
//求和
            sum+=i;
        }
//结束时间
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+",使用时间:"+(end-start));
//sum=500000000500000000,使用时间:263
    }
   //方式2  使用ForkJoin分支合并
    public static void test2() throws ExecutionException, InterruptedException {
//开始时间
        long start = System.currentTimeMillis();
//创建ForkJoinPool池
        ForkJoinPool forkJoinPool = new ForkJoinPool();
//创建ForkJoinTask任务
        ForkJoinTask<Long> task = new ForkJoinDemo01(0L, 10_0000_0000);
//执行/提交 ForkJoinTask任务
        //forkJoinPool.submit()  提交任务  submit是异步的  有返回值
       // forkJoinPool.execute(task); 执行任务  execute是同步的 没有返回值
        ForkJoinTask<Long> submit = forkJoinPool.submit(task); //提交任务
        //获取结果 get会阻塞等待结果 需要抛出异常
        Long sum = submit.get();
//结束时间
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+",使用时间:"+(end-start));
//sum=500000000500000000,使用时间:5278
    }
//方式3 并行流
    public static void test3(){
//开始时间
        long start = System.currentTimeMillis();
//计算交给Stream流    parallel:并行 reduce获取结果 开始值,二进制操作结果
        long sum = LongStream.rangeClosed(0, 10_0000_0000).parallel().reduce(0, Long::sum);
//结束时间
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+",耗费时间:"+(end-start));
//sum=500000000500000000,耗费时间:113
    }
}


package com.wyh.async;
/**
 * @program: JUC
 * @description: 异步回调
 * @author: 魏一鹤
 * @createDate: 2022-03-08 23:38
 **/
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
 * 和ajax是一样的
 * 异步执行
 * 成功回调
 * 失败回调
**/
public class asyncDemo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//没有返回值的异步回调 runAsync
        //发起一个请求  runAsync执行异步任务 参数是一个Runnable线程 可以使用lambda表达式
      //  CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            //休眠2s
            //try {
            //    TimeUnit.SECONDS.sleep(2);
            //} catch (InterruptedException e) {
            //    e.printStackTrace();
            //}
            //System.out.println(Thread.currentThread().getName()+"runAsync->Void");
            //1111111
            //ForkJoinPool.commonPool-worker-9runAsync->Void
      //  });
      //  System.out.println("1111111");
        //阻塞获取线程执行结果 get需要抛异常
      //  completableFuture.get();
        //有返回值的异步回调 supplyAsync
        //有返回值只有两种情况 要么成功要么失败 成功返回成功数据 失败返回错误信息
        CompletableFuture<String> completableFuture =  CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"supplyAsync->String");
//return和泛型数据类型一样的结果 参数类型自定义
            //故意让代码报错执行错误回调
            int a=10/0;
return "1024";
        });
//whenComplete 执行成功的操作
        System.out.println(completableFuture.whenComplete((t,u)->{
            System.out.println("t:"+t); //t=正常的返回结果 1024
            System.out.println("u:"+u);  //如果出错就是异常信息 java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
            //exceptionally执行失败的操作
        }).exceptionally((e)->{
//打印异常信息
            System.out.println(e.getMessage());
//失败的返回值
            return "400";
        }).get());
    }
}


//成功回调结果

ForkJoinPool.commonPool-worker-9supplyAsync->String

t:1024

u:null

1024

//失败回调结果

ForkJoinPool.commonPool-worker-9supplyAsync->String

t:null

u:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

java.lang.ArithmeticException: / by zero

400

目录
相关文章
|
7月前
|
资源调度 前端开发 算法
鸿蒙OS架构设计探秘:从分层设计到多端部署
本文深入探讨了鸿蒙OS的架构设计,从独特的“1+8+N”分层架构到模块化设计,再到智慧分发和多端部署能力。分层架构让系统更灵活,模块化设计通过Ability机制实现跨设备一致性,智慧分发优化资源调度,多端部署提升开发效率。作者结合实际代码示例,分享了开发中的实践经验,并指出生态建设是未来的关键挑战。作为国产操作系统的代表,鸿蒙的发展值得每一位开发者关注与支持。
|
9月前
|
存储 SQL 缓存
PolarDB-X 在 ClickBench 数据集的优化实践
本文介绍了 PolarDB-X 在 ClickBench 数据集上的优化实践,PolarDB-X 通过增加优化器规则、优化执行器层面的 DISTINCT 和自适应两阶段 AGG、MPP 压缩等手段,显著提升了在 ClickBench 上的性能表现,达到了业内领先水平。
|
9月前
|
机器学习/深度学习 人工智能 自然语言处理
Agent Laboratory:AI自动撰写论文,AMD开源自动完成科研全流程的多智能体框架
Agent Laboratory 是由 AMD 和约翰·霍普金斯大学联合推出的自主科研框架,基于大型语言模型,能够加速科学发现、降低成本并提高研究质量。
690 23
Agent Laboratory:AI自动撰写论文,AMD开源自动完成科研全流程的多智能体框架
|
10月前
|
存储 弹性计算 分布式计算
阿里云服务器租用价格:包年包月收费标准与月付、1年、3年活动价格
租用阿里云服务器3个月、6个月、1年、3年多少钱?云服务器收费标准是怎样的?根据目前的价格信息,阿里云特价云服务器价格38元、99元、199元、298元,本文分享阿里云服务器最新的租用费用,包括包年包月的收费标准和月付3个月和6个月以及1年、3年活动价格表。
|
Go C语言
golang的类型转换
【9月更文挑战第28天】本文介绍了Go语言中的基本数据类型转换,包括数值类型之间的转换及字符串与数值类型的互转,提供了具体代码示例说明如何使用如`float64(a)`和`strconv.Atoi`等方法。同时,文章还讲解了接口类型转换,包括类型断言和类型开关的使用方法,并展示了如何在运行时获取具体类型。最后,提到了指针类型转换的注意事项及其应用场景。
195 7
|
存储 人工智能 移动开发
突发!Runway一夜删库跑路,HuggingFace已清空
活久见,Runway 一夜间清空 HuggingFace 和 GitHub,直接跑路了?很多人猜测,此事与版权纠纷有关,这就翻出了 Runway 和 Stability AI 之间的一段陈年旧案。
|
安全 网络安全 数据安全/隐私保护
Python渗透测试之流量分析:流量嗅探工具编程
Python渗透测试之流量分析:流量嗅探工具编程
161 0
|
UED
Flutter之ListView实现自动滑动到底部
Flutter之ListView实现自动滑动到底部
475 1
|
小程序 前端开发
【非常全】微信小程序下载图片到相册,微信小程序自动获取分享图片到相册
【非常全】微信小程序下载图片到相册,微信小程序自动获取分享图片到相册
1051 3
|
机器学习/深度学习 编解码 算法
YOLOv8改进 | 主干篇 | 低照度增强网络PE-YOLO改进主干(改进暗光条件下的物体检测模型)
YOLOv8改进 | 主干篇 | 低照度增强网络PE-YOLO改进主干(改进暗光条件下的物体检测模型)
547 0