ForkJoin(分支合并)

简介: ForkJoin(分支合并)

ForkJoin(分支合并)

fork():分支(拆分)   join():合并 是ForkJoin种两个真实的方法

什么是ForkJoin?

它可以根据程序进行调节 下面代码会举例

ForkJoin是JDK1.7之后出现的,一般用来并行执行任务,提高效率,尤其是大数据量的情况下,它是一个线程并发成多个去操作的,主要思想是把大任务拆分为若干个小任务,然后由小任务分别去执行,最后汇总结果

image.png


ForkJoin的特点:工作窃取

这个里面维护的都是双端队列(可以从两个方向自由插入队列去执行任务,而不是传统的只能从左往右)

工作窃取:比如现在有两个线程线程A和线程B,他们分别执行子任务,如果线程A还没有执行完,但是线程B已经执行完毕,此时线程B不会等待线程A执行完毕,而是会把线程A没有执行完的任务偷过来执行,从而提高效率,当然也有弊端,就是线程B执行完毕去抢线程A的资源,但是线程A也执行到这了,就会造成竞争,当然利大于弊,只有大数据量的情况下才会使用它

操作ForkJoin,如何使用它

大概步骤如下所示

1 ForkJoinPool 通过它来执行

2 新建一个计算任务 通过 ForkJoinPool.execute(ForkJoinTask task)

3 计算类继承 RecursiveTask (extends RecursiveTask)

4 通过ForkJoinTask下的 RecursiveAction(递归事件 没有返回值 )  RecursiveTask(递归任务 有返回值)

5 调用ForkJoinTask的submit方法或者execute方法去执行(提交任务) ( submit是异步的  有返回值 execute是同步的 没有返回值)

代码测试


package com.wyh.ForkJoin;
/**
 * @program: JUC
 * @description: 分支合并  求和计算
 * @author: 魏一鹤
 * @createDate: 2022-03-07 22:06
 **/
import java.util.concurrent.RecursiveTask;
/**
 * 如何使用ForkJoin 其实就以下步骤
 * 1  ForkJoinPool 通过它来执行
 * 2  新建一个计算任务 通过 ForkJoinPool.execute(ForkJoinTask task)
 * 3  计算类继承 RecursiveTask  extends RecursiveTask
 * 4  通过ForkJoinTask下的 RecursiveAction(递归事件 没有返回值 )  RecursiveTask(递归任务 有返回值)
**/
//继承RecursiveTask递归任务
public class ForkJoinDemo01 extends RecursiveTask<Long> {
//最小值 用long数据类型要比int大 int最多到20亿
    private long min;
//最大值
    private long max;
//构造器
    public ForkJoinDemo01(long min,long max) {
this.min = min;
this.max = max;
    }
//临界值 超过这个值分成两个任务  1万
    //它可以根据程序进行调节
    private long temp=10000L;
//实现递归任务的compute计算方法
    @Override
protected Long compute() {
//如果最大值减去最小值小于临界值 走普通计算  不需要分支合并
        if((max-min)<temp){
//用于计算总数的变量
            Long sum=0L;
//循环1亿之间的数求和
            for (Long i = min; i <= max; i++) {
                sum+=i;
            }
//计算总数完毕 进行返回
          return sum;
        }//否则走分支合并 ForkJoin
        else{
//计算最大值和最小值的中间值
            long mid = (max + min) / 2;
//创建计算任务
            //把一个大任务拆分为两个小任务
            //第一个任务
            ForkJoinDemo01 task1 = new ForkJoinDemo01(min, mid);
            task1.fork(); //拆分任务 把任务压入线程队列
            //第二个任务
            ForkJoinDemo01 task2 = new ForkJoinDemo01(mid+1,max);
            task2.fork(); //拆分任务 把任务压入线程队列
            //合并结果
            return  task1.join() + task2.join();
        }
    }
}


不同的方式去求和


  1. 异步回调(future 准确来说是CompletableFuture)

发起这个请求后,不一定非要等待结果(不用一直阻塞等待结果)和服务器和客户端通信的ajax一样的道理

如何使用?

  1. 使用Future准确来说是CompletableFuture来操作
  2. 使用CompletableFuture的supplyAsync(有返回类型回调)或者runAsync(没有返回值回调)去异步回调任务 通过get方法阻塞获取执行结果 get需要抛异常
  3. 使用completableFuture的whenComplete(t,u)成功回调 t=正常的返回结果  u=如果出错就是异常信息
  4. 使用completableFuture的exceptionally(e) e是失败回调返回的错误信息 通过e.getMessage()来接受

代码测试  

有/没有返回值的异步回调 成功回调/异步回调


package com.wyh.ForkJoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
/**
 * @program: JUC
 * @description: ForkJoin测试
 * @author: 魏一鹤
 * @createDate: 2022-03-07 22:39
 **/
public class ForkJoinTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1(); //sum=500000000500000000,使用时间:272
        test2();  //sum=500000000500000000,使用时间:4772
        test3(); //sum=500000000500000000,耗费时间:113
    }
   //方式1 普通求和
    public static void test1(){
//求和变量
        long sum=0;
//开始时间
        long start = System.currentTimeMillis();
for (int i = 1; i <= 10_0000_0000; i++) {
//求和
            sum+=i;
        }
//结束时间
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+",使用时间:"+(end-start));
//sum=500000000500000000,使用时间:263
    }
   //方式2  使用ForkJoin分支合并
    public static void test2() throws ExecutionException, InterruptedException {
//开始时间
        long start = System.currentTimeMillis();
//创建ForkJoinPool池
        ForkJoinPool forkJoinPool = new ForkJoinPool();
//创建ForkJoinTask任务
        ForkJoinTask<Long> task = new ForkJoinDemo01(0L, 10_0000_0000);
//执行/提交 ForkJoinTask任务
        //forkJoinPool.submit()  提交任务  submit是异步的  有返回值
       // forkJoinPool.execute(task); 执行任务  execute是同步的 没有返回值
        ForkJoinTask<Long> submit = forkJoinPool.submit(task); //提交任务
        //获取结果 get会阻塞等待结果 需要抛出异常
        Long sum = submit.get();
//结束时间
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+",使用时间:"+(end-start));
//sum=500000000500000000,使用时间:5278
    }
//方式3 并行流
    public static void test3(){
//开始时间
        long start = System.currentTimeMillis();
//计算交给Stream流    parallel:并行 reduce获取结果 开始值,二进制操作结果
        long sum = LongStream.rangeClosed(0, 10_0000_0000).parallel().reduce(0, Long::sum);
//结束时间
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+",耗费时间:"+(end-start));
//sum=500000000500000000,耗费时间:113
    }
}


package com.wyh.async;
/**
 * @program: JUC
 * @description: 异步回调
 * @author: 魏一鹤
 * @createDate: 2022-03-08 23:38
 **/
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
 * 和ajax是一样的
 * 异步执行
 * 成功回调
 * 失败回调
**/
public class asyncDemo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//没有返回值的异步回调 runAsync
        //发起一个请求  runAsync执行异步任务 参数是一个Runnable线程 可以使用lambda表达式
      //  CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            //休眠2s
            //try {
            //    TimeUnit.SECONDS.sleep(2);
            //} catch (InterruptedException e) {
            //    e.printStackTrace();
            //}
            //System.out.println(Thread.currentThread().getName()+"runAsync->Void");
            //1111111
            //ForkJoinPool.commonPool-worker-9runAsync->Void
      //  });
      //  System.out.println("1111111");
        //阻塞获取线程执行结果 get需要抛异常
      //  completableFuture.get();
        //有返回值的异步回调 supplyAsync
        //有返回值只有两种情况 要么成功要么失败 成功返回成功数据 失败返回错误信息
        CompletableFuture<String> completableFuture =  CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"supplyAsync->String");
//return和泛型数据类型一样的结果 参数类型自定义
            //故意让代码报错执行错误回调
            int a=10/0;
return "1024";
        });
//whenComplete 执行成功的操作
        System.out.println(completableFuture.whenComplete((t,u)->{
            System.out.println("t:"+t); //t=正常的返回结果 1024
            System.out.println("u:"+u);  //如果出错就是异常信息 java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
            //exceptionally执行失败的操作
        }).exceptionally((e)->{
//打印异常信息
            System.out.println(e.getMessage());
//失败的返回值
            return "400";
        }).get());
    }
}


//成功回调结果

ForkJoinPool.commonPool-worker-9supplyAsync->String

t:1024

u:null

1024

//失败回调结果

ForkJoinPool.commonPool-worker-9supplyAsync->String

t:null

u:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

java.lang.ArithmeticException: / by zero

400

目录
相关文章
|
8月前
|
Java
java线程之分支合并框架
java线程之分支合并框架
|
7月前
|
存储 Java 索引
(十二)彻悟并发之JUC分治思想产物-ForkJoin分支合并框架原理剖析下篇
在《(十二)彻悟并发之JUC分治思想产物-ForkJoin分支合并框架原理剖析上篇》中,我们曾初步了解了ForkJoin分支合并框架的使用,也分析框架的成员构成以及任务提交和创建工作的原理实现,在本篇则会对框架的任务执行、任务扫描、线程挂起、结果合并以及任务窃取的源码实现进行分析。
|
7月前
|
存储 监控 Java
(十一)彻悟并发之JUC分治思想产物-ForkJoin分支合并框架原理剖析上篇
在上篇文章《深入理解并发之Java线程池、工作原理、复用原理及源码分析》中,曾详细谈到了Java的线程池框架。在其中也说到了JDK提供的四种原生线程池以及自定义线程池,而本文则再来详细谈谈JDK1.7中新推出的线程池:ForkJoinPool。
|
机器学习/深度学习 存储 算法
|
设计模式 算法 Java
|
Java Linux 数据库
消灭成堆的分支语句之类责任链模式
摘要 分支语句是所有编程语言的基本元素,比如Java语言中的if else和switch语句,它们提供一种能力允许程序根据一些条件动态地选择执行某些代码块。这种动态性给程序带来了很多的灵活性! 正因为if else如此方便如此灵活,很多代码中它都会被滥用,就像下面这样让人崩溃的、嵌套的、成堆的分支语句: if (context.equals("tutorial-room")) { if
1385 0
|
开发工具 git
Git常见分支问题各个击破
本文首发于公众号“AntDream”,欢迎微信搜索“AntDream”或扫描文章底部二维码关注,和我一起每天进步一点点 为什么已经推到远程的分支,我本地checkout的时候还是提示找不到分支? 首先我们要弄清楚的是,checkout 是本地命令,不跟远程仓库打交道。
4761 0
|
9月前
|
Java
面试官:说一说CyclicBarrier的妙用!我:这个没用过...
【5月更文挑战第5天】面试官:说一说CyclicBarrier的妙用!我:这个没用过...
66 2
|
C语言
超详细的分支和循环语句知识点
超详细的分支和循环语句知识点
|
Java
java多线程 -- ForkJoinPool 分支/ 合并框架 工作窃取
Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行 join 汇总。 Fork/Join 框架与线程池的区别 采用 “工作窃取”模式(work-stealing):当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。
1028 0

热门文章

最新文章