并发容器与框架——Fork/Join框架

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器镜像服务 ACR,镜像仓库100个 不限时长
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 参考资料《Java并发编程的艺术》

1. Fork/Join框架概念

    Fork/Join框架是Java提供的一个用于并行执行任务的框架,它会将一个大任务分成多个小任务,并且将每个小任务的最终结果汇总得到大任务结果的框架。比如对1+2+3+····+100求和,可以分成十个子任务分别对10个数求和,最后再汇总这十个子任务的结果。

2.工作窃取算法

    工作窃取算法是指某个线程从其他任务队列里窃取任务来执行。

    假如我们可以将一个总任务分割成多个互不相干的子任务,为了减少线程的竞争,我们会将这些子任务放在不同的队列中,并为每个队列都建造一个线程执行该队列的任务,线程和队列一一对应。但是有些线程可能会很早的执行完自己队列中的所有任务,而其他线程还会处理自己拥有的队列中的任务,此时已处理完任务的线程与其等待其他线程执行任务,不如帮助其他线程一起执行剩余任务。这时他们会从其他线程的队列里窃取一个线程执行任务,所以为了避免因为工作窃取引起的两个线程之间的竞争,通常任务队列会使用双端队列。任务队列线程从头部取任务,窃取线程从尾部取任务。

    优点是充分利用线程进行并行运算,并减少了竞争,缺点就是还是存在竞争情况,比如队列中任务数为1时,还会因为创建多个线程和队列造成更多的资源消耗。

3. Fork/Join框架设计实现类

    通过前面的介绍我们可以了解到Fork/Join框架主要实现两个步骤:分割任务以及执行任务并汇总结果。

  1. 分割任务:我们需要一个Fork类来分割任务,并且要将大任务分割的足够小。
  2. 执行任务并汇总结果:分割的子任务分别放在双端队列里,然后几个启动线程分 别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程 从队列里拿数据,然后合并这些数据。

    Fork/Join框架设计了两个类来完成以上两个步骤的功能。

  1. ForkJoinTask:这是一个抽象类,主要使用该类来创建一个ForkJoin任务,它提供在任务中执行fork()和join()操作的机制。但Fork/Join框架提供了ForkJoinTask的两个抽象子类, RecursiveAction(用于没有返回结果的任务); RecursiveTask(用于有返回结果的任务)。通过继承这两个子类来创建一个ForkJoin任务。
  2. ②ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当 一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

4.简单使用Fork/Join框架

    用Fork/Join框架来计算一个1+2+3+····+n的结果。注意,我们必须先想好如何分割任务,分割计算1+2+3+····+n时,必须至少分割为两个数字一组才能计算,但如果n过大两个数字一组会造成极大资源消耗,所以应考虑好任务分割的程度。此处仅以1+2+3+4为例。

public class TestFork_JoinFrame extends RecursiveTask<Integer>{
	private static final int THRESHOLD=2;//设置阀值,也就是任务最小分割程度为2个数字相加
	private int start;
	private int end;
	public TestFork_JoinFrame(int s,int e) {
		start=s;
		end=e;
	}
	public static void main(String[] args) {
		ForkJoinPool pool=new ForkJoinPool();
		TestFork_JoinFrame task=new TestFork_JoinFrame(1, 4);
		
		//执行任务
		Future<Integer> f=pool.submit(task);
		try {
			System.out.println("最终结果:"+f.get());
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	@Override
	protected Integer compute() {
		int sum=0;
		boolean canCompute=(end-start)<=THRESHOLD;//如果已经是最小分割值,则进行运算,否则继续分割
		if(canCompute){
			for(int i=start;i<=end;i++){
				sum+=i;
			}
		}else{
			//分隔成两个子任务
			int middle=(start+end)/2;
			TestFork_JoinFrame leftTask=new TestFork_JoinFrame(start, middle);
			TestFork_JoinFrame rightTask=new TestFork_JoinFrame(middle+1, end);
			//执行子任务
			leftTask.fork();
			rightTask.fork();
			//等待子任务完成并获取结果汇总
			sum=leftTask.join()+rightTask.join();
		}
		return sum;
	}
}

5.异常处理

    ForkJoinTask在执行过程中可能会抛出异常,但是与普通线程任务一样,我们无法在主线程中对其进行捕获,所以以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被 取消了,并且可以通过ForkJoinTask的getException方法获取异常。getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如 果任务没有完成或者没有抛出异常则返回null。

public class TestFork_JoinFrame extends RecursiveTask<Integer>{
	private static final int THRESHOLD=2;//设置阀值,也就是任务最小分割程度为2个数字相加
	private int start;
	private int end;
	public TestFork_JoinFrame(int s,int e) {
		start=s;
		end=e;
	}
	public static void main(String[] args) {
		ForkJoinPool pool=new ForkJoinPool();
		TestFork_JoinFrame task=new TestFork_JoinFrame(1, 4);
		
		//执行任务
		Future<Integer> f=pool.submit(task);
		try {
			if(task.isCompletedAbnormally()){
				System.out.println(task.getException());
			}else{
				System.out.println("最终结果:"+f.get());
			}
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	@Override
	protected Integer compute() {
		int sum=0;
		boolean canCompute=(end-start)<=THRESHOLD;//如果已经是最小分割值,则进行运算,否则继续分割
		if(canCompute){
			for(int i=start;i<=end;i++){
				sum+=i;
			}
		}else{
			//分隔成两个子任务
			int middle=(start+end)/2;
			TestFork_JoinFrame leftTask=new TestFork_JoinFrame(start, middle);
			TestFork_JoinFrame rightTask=new TestFork_JoinFrame(middle+1, end);
			//执行子任务
			leftTask.fork();
			rightTask.fork();
			//等待子任务完成并获取结果汇总
			sum=leftTask.join()+rightTask.join();
		}
		return sum;
	}
}
	}

6.框架实现原理

    ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。

   1.ForkJoinTask的fork()方法原理:调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。源码如下

public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

      pushTask方法把当前任务存放在ForkJoinTask数组队列里。然后再调用ForkJoinPool的 signalWork()方法唤醒或创建一个工作线程来执行任务。源码码如下。

 final void push(ForkJoinTask<?> task) {
            ForkJoinTask<?>[] a; ForkJoinPool p;
            int b = base, s = top, n;
            if ((a = array) != null) {    // ignore if queue removed
                int m = a.length - 1;     // fenced write for task visibility
                U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
                U.putOrderedInt(this, QTOP, s + 1);
                if ((n = s - b) <= 1) {
                    if ((p = pool) != null)
                        p.signalWork(p.workQueues, this);
                }
                else if (n >= m)
                    growArray();
            }
        }

    2.ForkJoinTask的join方法实现原理:Join方法的主要作用是阻塞当前线程并等待获取结果。源码如下

public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
 private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }

        首先,它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结 果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常 (EXCEPTIONAL)。如果任务状态是已完成,则直接返回任务结果。如果任务状态是被取消,则直接抛出CancellationException。如果任务状态是抛出异常,则直接抛出对应的异常。

        doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成, 则直接返回任务状态;如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。

相关文章
|
6月前
|
IDE API 开发工具
鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之Counter容器组件
鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之Counter容器组件
151 1
|
6月前
|
IDE 开发工具 Windows
鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之RowSplit容器组件
鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之RowSplit容器组件
154 0
|
6月前
|
IDE 开发工具 Windows
鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之ColumnSplit容器组件
鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之ColumnSplit容器组件
90 0
|
6月前
|
IDE API 开发工具
鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之Column容器组件
鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之Column容器组件
128 0
|
6月前
|
Java 持续交付 Docker
微服务框架(二十一)Piplin 持续部署 Docker 容器
此系列文章将会描述Java框架Spring Boot、服务治理框架Dubbo、应用容器引擎Docker,及使用Spring Boot集成Dubbo、Mybatis等开源框架,其中穿插着Spring Boot中日志切面等技术的实现,然后通过gitlab-CI以持续集成为Docker镜像。 本文为使用Piplin 持续部署 Docker 容器
|
4月前
|
安全 程序员 C++
C++一分钟之-C++中的并发容器
【7月更文挑战第17天】C++11引入并发容器,如`std::shared_mutex`、`std::atomic`和线程安全的集合,以解决多线程中的数据竞争和死锁。常见问题包括原子操作的误用、锁的不当使用和迭代器失效。避免陷阱的关键在于正确使用原子操作、一致的锁管理以及处理迭代器失效。通过示例展示了如何安全地使用这些工具来提升并发编程的安全性和效率。
61 1
|
4月前
|
缓存 安全 Java
Java中的并发容器:ConcurrentHashMap详解
Java中的并发容器:ConcurrentHashMap详解
|
4月前
|
安全 Java 容器
第一篇:并发容器学习开篇介绍
第一篇:并发容器学习开篇介绍
40 4
|
4月前
|
存储 安全 算法
(九)深入并发编程之并发容器:阻塞队列、写时复制容器、锁分段容器原理详谈
相信大家在学习JavaSE时都曾接触过容器这一内容,一般Java中的容器可分为四类:Map、List、Queue以及Set容器,而在使用过程中,对于ArrayList、HashMap等这类容器都是经常使用的,但问题在于这些容器在并发环境下都会存在线程安全问题。
|
6月前
|
安全 Java 容器
Java一分钟之-并发编程:并发容器(ConcurrentHashMap, CopyOnWriteArrayList)
【5月更文挑战第18天】本文探讨了Java并发编程中的`ConcurrentHashMap`和`CopyOnWriteArrayList`,两者为多线程数据共享提供高效、线程安全的解决方案。`ConcurrentHashMap`采用分段锁策略,而`CopyOnWriteArrayList`适合读多写少的场景。注意,`ConcurrentHashMap`的`forEach`需避免手动同步,且并发修改时可能导致`ConcurrentModificationException`。`CopyOnWriteArrayList`在写操作时会复制数组。理解和正确使用这些特性是优化并发性能的关键。
62 1