ForkJoinPool线程池

简介: ForkJoinPool线程池

ForkJoinPool线程池

“分而治之”是处理大数据的方法,著名的MapReduce就是采用这种分而治之的思路,简单点说,如果要处理1000个数据,但是不具备处理1000个数据的能力,可以只处理10个数据,可以把1000个数据分阶段处理100次,每次处理10个,把100次的处理结果进行合成,形成最后这1000个数据。

把大任务调用fork()方法分解成小的任务,把小的任务结果进行join()合并为大任务的结果

系统还对ForkJoinPool线程池进行了优化,提交的任务数量与线程的数量不一定是一对一的关系,在多数情况下一个物理线程实际上需要处理多个逻辑任务。

ForkJoinPool就是线程池中最常用的方法就是:

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)

向线程池提交一个ForkJoinTask,支持fork()与Join()等待任务。ForkJoinTask有两个重要的子类RecursiveAction 和RecursiveTask,他们的区别在于RecursiveAction任务没有返回值,而RecursiveTask带有返回值

演示ForkJoinPool线程池的使用

package com;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class Text10 {
    private  static  class Contask extends RecursiveTask<Long> {
        private  static final int Threshold=10000;//数据阈值
        private  static  final int TaskSum=100;//定义每次把大人物分解成100个小任务
        private  long start;//计算起始值
        private  long end;
        public  Contask(long start,long end)
        {
            this.start=start;
            this.end=end;
        }
        @Override
        protected Long compute() {
        long sum=0;
        //当数量超过阈值就继续分解
        if(end-start<Threshold)
        {
            //小于阈值直接计算
            for (long i = start; i <= end; i++) {
                sum+=i;
            }
        }
        else
        {
            long step=(start+end)/TaskSum;
            //如果计算【0,200000】范围内的数列和,把该范围的数列分解成100个小任务,每个任务2000个即可
            //但是任务层次划分很深,即Threshold太小,每个人物计算量很小,系统内线程数量越来越多,导致性能下降,分解次数过多导致栈溢出
            ArrayList<Contask> list=new ArrayList<>();
            long pos=start;
            for (int i = 0; i < TaskSum; i++) {
                long lastOne=pos+step;//每个任务结束位置
                if(lastOne>end)
                {
                    lastOne=end;
                }
                //创建子任务
                Contask contask=new Contask(pos,lastOne);
                //把任务添加到集合
                list.add(contask);
                //调用fork提交子任务
                contask.fork();
                //调整下个任务起始位置
                pos+=step+1;
                //等待所有子任务结束后 计算结果
            }
            for (Contask task:list)
            {
                sum+=task.join();//join会一直等待子任务执行完毕返回结果
            }
        }
        return  sum;
        }
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建线程池
        ForkJoinPool forkJoinPool=new ForkJoinPool();
        //创建一个大的任务
        Contask contask=new Contask(0L,200000L);
        //把大人物提交给线程池
        ForkJoinTask<Long> result=forkJoinPool.submit(contask);
        System.out.println("结果为"+result.get());
    }
}

代码的意思是把0-20000之间的数字进行求和计算,把20000个数字分成了100组任务,每组任务进行2000个数字求和计算并提交,全部计算完返回结果值

相关文章
|
4月前
|
缓存 Java API
厉害了,线程池就该这么玩
厉害了,线程池就该这么玩
25 0
|
5月前
|
缓存 搜索推荐 Java
线程池之ThreadPoolExecutor
线程池之ThreadPoolExecutor
46 0
|
5月前
|
缓存 搜索推荐 Java
多线程之线程池ThreadPoolExecutor
多线程之线程池ThreadPoolExecutor
27 0
|
10月前
|
前端开发 Java 调度
你了解线程池吗
你了解线程池吗
53 0
|
11月前
|
设计模式 Java C++
|
安全
线程池中CompletionService的应用
线程池中CompletionService的应用
91 0
|
存储 缓存 监控
线程池 ThreadPoolExecutor 详解
对于操作系统而言,创建一个线程的代价是十分昂贵的, 需要给它分配内存、列入调度,同时在线程切换时要执行内存换页,清空 CPU 缓存,切换回来时还要重新从内存中读取信息,破坏了数据的局部性。因此在并发编程中,当线程创建过多时,会影响程序性能,甚至引起程序崩溃。 而线程池属于池化管理模式,具有以下优点: 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的性能消耗。 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。 提高线程的可管理性:能够对线程进行统一分配、调优和监控。
180 0
|
Java 程序员
我是一个线程池
我是一个线程池
线程池之ThreadPoolExecutor使用
通过自定义线程池,我们可以更好的让线程池为我们所用,更加适应我的实际场景