JUC并发编程——ForkJoin模式

简介: JUC并发编程——ForkJoin模式

正文


一、什么是ForkJoin


“分而治之”是一种思想,所谓“分而治之”就是把一个复杂的算法问题按一定的“分解”方法分为规模较小的若干部分,然后逐个解决,分别找出各部分的解,最后把各部分的解合并。而ForkJoin模式就是这种思想,把一个大任务分解成许多个独立的子任务,然后开启多个线程去并行执行这些子任务。对任务一直拆分,直到拆分到最小单位。


111.png


二、ForkJoin简单使用


需求是计算1-1000000的和


package com.xiaojie.fork;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
 * @description: 使用forkJoin 计算1-1000000的和
 * @author xiaojie
 * @date 2022/1/20 9:56
 * @version 1.0
 */
public class ForkJoinDemo extends RecursiveTask<Integer> {
    //RecursiveTask 有返回值的
    //RecursiveAction 没有返回值
    int start;
    int end;
    int max = 5000; //以5000为单位分组
    int sum;
    @Override
    protected Integer compute() {
        if (end - start < max) {
            System.out.println(Thread.currentThread().getName() + "," + "start:" + start + ",end:" + end);
            for (int i = start; i < end; i++) {
                sum += i;
            }
        } else {
            //拆分
            int middle = (start + end) / 2;
            ForkJoinDemo left = new ForkJoinDemo(start, middle);
            left.fork();//拆分
            ForkJoinDemo right = new ForkJoinDemo(middle + 1, end);
            right.fork();//拆分
            try {
                //合并任务
                sum = left.get() + right.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return sum;
    }
    public ForkJoinDemo(int start, int end) {
        this.start = start;
        this.end = end;
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinDemo forkJoinDemo = new ForkJoinDemo(1, 1000000);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> result = forkJoinPool.submit(forkJoinDemo);
        System.out.println(result.get());
        while (true) {
           // ForkJoinPool在守护进程模式下使用线程,所以一般程序退出时无需显式关闭这样的池
        }
    }
}


注意:


1、ForkJoinPool在守护进程模式下使用线程,所以一般程序退出时无需显式关闭这样的池,而自动结束任务。


2、当任务量很大时才适合使用ForkJoin模式,当任务量不多时,并不适合使用,因为ForkJoin拆分任务时也需要消耗时间和资源(任务量较小时,大部分的时间都用来拆分任务而不划算)。


三、ForkJoin原理


核心API


ForkJoin框架的核心是ForkJoinPool线程池。该线程池使用一个无锁的栈来管理空闲线程,如果一个工作线程暂时取不到可执行的任务,则可能会挂起,而挂起的线程会被压入由ForkJoinPool维护的栈中,等有新任务到来时,再从栈中唤醒这些线程。


构造器


  public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(parallelism, factory, handler, asyncMode,
             0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
    }


参数解释


int parallelism —— 并行度默认为CPU数量,最小值为1,决定并行执行的线程数量。


ForkJoinWorkerThreadFactory factory ——线程创建工厂


UncaughtExceptionHandler handler——异常处理类,当执行任务出现异常时,被handler捕获。


boolean asyncMode——是否为异步模式,默认值为false,如果为true表示子任务执行遵循FIFO(先进先出)顺序,如果为false表示子任务执行顺序为LIFO(后进先出)顺序,并且子任务可以被合并。


ForkJoin框架为每一个独立工作的线程创建了对应的执行任务的工作队列,这个工作队列是使用数组进行双向组合的双向队列。


无参构造函数


  public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false,
             0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
    }


这个构造函数的含义是并行度是CPU的核数,线程工厂为defaultForkJoinWorkerThreadFactory,异常处理类为null表示不处理异常,异步模式为false表示执行LIFO(后进先出)可以合并子任务。


工作窃取算法


ForkJoinPool线程池的任务分为“外部任务”和“内部任务(任务拆分出来的子任务)”,两种任务存放位置不同,外部任务存放在ForkJoinPool的全局队列中,子任务会作为“内部任务”放到内部队列中,ForkJoinPool池中的每个线程都会维护一个内部队列,用于存放这些“内部任务”。由于ForkJoinPool有多个线程,那么就会对应的有多个队列,就会出现任务分步不均的问题,有的队列任务多,一直在执行任务,有的队列为空没有任务,一直空闲。那么就需要一种方式来解决这个问题,答案就是使用工作窃取算法。


工作窃取核心思想是,工作线程自己的任务执行完了,就会去查找其他任务队列有没有任务,有的话就去执行其他队列的任务,这样也提高了效率。其实说白了就是我没活了,但是我又闲不住,我就去帮你干活,帮别人干活。那么会存在这样一种情况,自己的任务执行完了,会帮其他线程干活,但是会不会和其他线程同时执行同一个任务呢,就会产生竞争,简化的方案就是线程自己本地的队列采用LIFO(后进先出),窃取其他任务的队列任务采用FIFO(先进先出)策略,就是从队列的两头同时执行。


工作窃取的优点


1、线程不会因为等待某个子任务执行或者没有任务执行而被阻塞等待,而是会扫描所有的队列窃取任务,直到所有的队列都为空时才会挂起。


2、ForkJoin为每个线程维护一个队列,这个对列是一个基于数组的双向对列,可以从首尾两端获取任务,极大的减少竞争的可能性,提高并行的性能。


ForKJoin原理


ForkJoinPool线程池的任务分为“外部任务”和“内部任务”。“内部任务“”和“外部任务”只是个抽象的概念,不是真的内外之分。

“外部任务”存放在ForkJoinPool的全局队列中。

ForkJoinPool池中的每个线程都会维护一个任务队列,用于存放“内部任务”,线程切割任务得到的子任务作为“内部任务”放到内部队列中。

当工作线程想要获取子任务的执行结果时,会先判断子任务有没有完成,如果没有完成,再判断子任务有没有被其他线程窃取,如果没有被窃取,就有本线程来帮忙完成,如果子任务被窃取了,就去执行本线程“内部队列”的其他任务,或者扫描其他队列并窃取任务。

当工作线程完成“内部任务”处于空闲状态时,就会扫描其他任务队列窃取任务,尽可能不会阻塞等待。

ForkJoin线程在等待一个任务完成时,要么自己来完成这个任务,要么在其他线程窃取了这个任务的情况下,去执行其他任务,是不会阻塞等待的,从而避免资源浪费,除非所有的任务队列为空。


参考


《JAVA高并发核心编程(卷2):多线程、锁、JMM、JUC、高并发设计》-尼恩编著


相关文章
|
3月前
|
安全 Java 编译器
高并发编程之什么是 JUC
高并发编程之什么是 JUC
33 1
|
3月前
|
Java
JUC并发编程之等待唤醒机制
在JUC(Java Util Concurrent)并发编程中,线程等待唤醒机制是实现线程之间协作和同步的重要手段。这种机制允许一个线程挂起等待某个条件满足后被唤醒,以及另一个线程在满足某个条件后唤醒等待的线程。在Java中,有多种实现线程等待唤醒机制的方式,包括使用Object的wait()和notify()方法、Condition接口以及LockSupport类。
|
2月前
|
存储 缓存 Java
深入剖析Java并发库(JUC)之StampedLock的应用与原理
深入剖析Java并发库(JUC)之StampedLock的应用与原理
深入剖析Java并发库(JUC)之StampedLock的应用与原理
|
2月前
|
存储 安全 Java
Java并发基础-线程间通信
Java并发基础-线程间通信
11 0
|
3月前
|
缓存 Java 编译器
关于Java并发多线程的一点思考
关于Java并发多线程的一点思考
52 1
|
10月前
|
存储 Dubbo Java
JUC第二十五讲:JUC线程池-CompletableFuture 实现原理与实践
JUC第二十五讲:JUC线程池-CompletableFuture 实现原理与实践
|
11月前
|
Java
并发编程——JUC并发工具
JUC 是Java并发编程工具类库,提供了一些常用的并发工具,例如锁、信号量、计数器、事件循环、线程池、并发集合等。这些工具可以帮助开发人员简化并发编程的复杂性,提高程序效率和可靠性。
45 0
|
11月前
|
存储 消息中间件 算法
多线程和并发编程(4)—并发数据结构之BlockingQueue
多线程和并发编程(4)—并发数据结构之BlockingQueue
69 0
|
并行计算 Java 应用服务中间件
JUC并发编程超详细详解篇(一)
JUC并发编程超详细详解篇
1540 1
JUC并发编程超详细详解篇(一)
|
存储 缓存 安全
【并发编程】线程池实现原理
你知道线程池通过什么技术维护多个线程,和等待管理者分配可并发执行任务吗? 看到这个问题你应该能想到“池化技术”、“阻塞队列”但内部的实现原理可能不太清楚,下面让我们来一起深入一下线程池的内部实现原理。
144 0
【并发编程】线程池实现原理