java8中修改parallelStream默认并发数

简介: java8中修改parallelStream默认并发数

一、parallelStream说明


Java 8引入了流的概念去对数据进行复杂的操作,而且使用并行流(Parallel Steams)支持并发,大大加快了运行效率。


parallelStream默认使用了fork-join框架,其默认线程数是CPU核心数。


二、parallelStream默认的并发数


@Test
  public void testParallelism1() throws ExecutionException, InterruptedException {
        int cupNum = Runtime.getRuntime().availableProcessors();
        log.info("CPU num:{}",cupNum);
        long firstNum = 1;
        long lastNum = 10000;
        List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
                .collect(Collectors.toList());
        aList.parallelStream().forEach(e->{
              log.info("输出:{}",e);
        });
  }


执行结果:

95.png

说明:

可以发现有8个线程参与任务执行,分别是main主线程、ForkJoinPool.commonPool-worker-1 到ForkJoinPool.commonPool-worker-7,正好与CPU的核心数8匹配。


默认情况下,parallelStream使用的是ForkJoinPool.commonPool(),这是一个公用的线程池,被整个程序所使用。


可以通过以下方法修改parallelStream默认的多线程数量:


三、设置ForkJoinPool.commonPool()的并发数


设置parallelStream默认的公用线程池的全局并发数:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");


实战:


@Test
 public void testParallelism3() throws ExecutionException, InterruptedException {
       int cupNum = Runtime.getRuntime().availableProcessors();
       log.info("CPU num:{}",cupNum);
       long firstNum = 1;
       long lastNum = 10000;
       List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
               .collect(Collectors.toList());
       System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");
       aList.parallelStream().forEach(e->{
             log.info("输出:{}",e);
       });
 }


执行结果:

94.png

说明:

执行结果中出现ForkJoinPool.commonPool-worker-0到ForkJoinPool.commonPool-worker-3,说明公共线程池的并发数配置的并发数4确实生效了。

此时任务的并发数是5, mian主线程 + ForkJoinPool.commonPool(0~4)


推荐:

由于主线程也会参与任务抢占CPU,所以ForkJoinPool.commonPool的线程数尽量设置为

(CPU核心数*N - 1)


四、通过ForkJoinPool定义私有线程池


@Test
public void testParallelism4() {
       int cupNum = Runtime.getRuntime().availableProcessors();
       log.info("CPU num:{}",cupNum);
       long firstNum = 1;
       long lastNum = 10000;
       List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
               .collect(Collectors.toList());
       ForkJoinPool forkJoinPool = new ForkJoinPool(8);
       try{
             List<Long> longs = forkJoinPool.submit(() -> aList.parallelStream().map(e->{
                   return e+1;
             }).collect(Collectors.toList())).get();
             //通过调用get方法,等待任务执行完毕
             System.out.println(longs.size());
             System.out.println("执行结束");
       }catch (InterruptedException e) {
             e.printStackTrace();
       } catch (Exception e){
             e.printStackTrace();
       }finally {
             forkJoinPool.shutdown();
       }
 }


执行结果:

###|||2021-08-27 15:03:31.080|||INFO|||-|||-|||main|||SimpleTest--->CPU num:8
10000
执行结束


说明:

采用自定义的forkJoinPool线程池去提交任务,主线程不会参与计算。

forkJoinPool线程池采用submit异步提交任务,通过get方法阻塞主线程,直到任务执行完成,再调用shutdown方法关闭线程池。

注意,等待提交任务执行完毕不能采用awaitTermination()方法,该方法是等待指定时间后强制关闭线程池。


五、ForkJoinPool的错误使用


错误一:

通过forkJoinPool线程池结合parallelStream.forEach并发提交任务,get方法不能起到阻塞主线程、等待任务执行完毕的作用。


@Test
public void testParallelism5() {
      long firstNum = 1;
      long lastNum = 1000;
      List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
              .collect(Collectors.toList());
      ForkJoinPool forkJoinPool = new ForkJoinPool(8);
      try{
            ForkJoinTask future = forkJoinPool.submit(() -> aList.parallelStream().forEach(e->{
                  log.info("输出:{}",e);
            }));
            //通过调用get方法,等待任务执行完毕
            future.get(10, TimeUnit.MINUTES);
            //这里不能使用log打印日志
            //log.info("执行结束");
            System.out.println("执行结束");
      }catch (InterruptedException e) {
            e.printStackTrace();
      } catch (Exception e){
            e.printStackTrace();
      }finally {
            forkJoinPool.shutdown();
      }
}


执行结果:

ForkJoinPool线程池中的任务没有执行完成,主线程中就打印执行结束。get方法没有起到阻塞任务、等待任务执行完成的作用。

93.png


错误二:

通过forkJoinPool线程池的awaitTermination()方法是等待指定时间后关闭线程池,而不是等待任务结束后关闭线程池。

@Test
public void testParallelism5() {
      long firstNum = 1;
      long lastNum = 1000;
      List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
              .collect(Collectors.toList());
      ForkJoinPool forkJoinPool = new ForkJoinPool(8);
      try{
            ForkJoinTask future = forkJoinPool.submit(() -> aList.parallelStream().forEach(e->{
                  log.info("输出:{}",e);
            }));
            long startTime = System.currentTimeMillis();
            forkJoinPool.awaitTermination(20,TimeUnit.SECONDS);
            long totalTime = (System.currentTimeMillis() - startTime)/1000;
            System.out.println("耗时:"+totalTime);
            System.out.println("执行结束");
      }catch (InterruptedException e) {
            e.printStackTrace();
      } catch (Exception e){
            e.printStackTrace();
      }finally {
            forkJoinPool.shutdown();
      }
}


执行结果:

任务几秒就执行完毕了,但是却等待了20秒才继续往下执行。显然用awaitTermination来等待任务执行完成是不合适的。

92.png

小结:

采用get方法阻塞主线程,等待ForkJoinPool线程池中的任务执行完毕,适合聚合运算有结果返回的情况,不适合forEach这样的遍历操作。


六、执行效率对比


@Test
  public void testParallelism2() throws ExecutionException, InterruptedException {
        long firstNum = 1;
        long lastNum = 1_000_000;
        List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
                .collect(Collectors.toList());
        //4     055755750
        //8     091087375
        //20    250872750
        ForkJoinPool customThreadPool = new ForkJoinPool(8);
        StopWatch stopWatch = new StopWatch("task");
        stopWatch.start("parallelStream");
        long actualTotal = customThreadPool.submit(
                () -> aList.parallelStream().reduce(0L, Long::sum)).get();
        stopWatch.stop();
        log.info("result:{}",actualTotal);
        log.info("耗时统计:\n" +stopWatch.prettyPrint());
  }


执行结果:

并发数

耗时

4

055755750
8 091087375
20 250872750

说明:

针对这类高密度的CPU计算任务,提高线程池的并发数,反而会降低任务的执行效率,因为CPU抢占和大量线程频繁切换会增加任务的耗时。


总结


本文主要介绍了java8中如何修改parallelStream的默认并发数。

主要有以下两种方式:

1、设置ForkJoinPool.commonPool公共池的全局并发数。

2、自定义ForkJoinPool线程池指定并发数。

然后分析了ForkJoinPool并发任务如何阻塞等待任务执行完毕。

最后通过一个简单的测试,说明了针对CPU密集型计算任务,线程池的并发数越大,任务执行效率反而更低。

目录
相关文章
|
3月前
|
Java
Java并行流问题之parallelStream的使用方式
Java并行流问题之parallelStream的使用方式
63 1
|
4月前
|
Java 数据处理
Java8的新特性parallelStream()的概念、对比线程优势与实战
parallelStream() 是 Java 8 中新增的一个方法,它是 Stream 类的一种扩展,提供了将集合数据并行处理的能力。普通的 stream() 方法是使用单线程对集合数据进行顺序处理,而 parallelStream() 方法则可以将集合数据分成多个小块,分配到多个线程并行处理,从而提高程序的执行效率。
428 3
|
11月前
|
安全 Java
Java8中的Stream()与ParallelStream()的区别
Java8中的Stream()与ParallelStream()的区别
64 0
|
并行计算 算法 安全
谨慎使用 Java8 新特性 ParallelStream并行流
谨慎使用 Java8 新特性 ParallelStream并行流
984 0
|
安全 Java
使用Java8新特性parallelStream遇到的坑
使用Java8新特性parallelStream遇到的坑
|
前端开发 安全 Java
面试官:java8中parallelStream提升数倍查询效率是怎样实现的
业务场景 在很多项目中,都有类似数据汇总的业务场景,查询今日注册会员数,在线会员数,订单总金额,支出总金额等。。。这些业务通常都不是存在同一张表中,我们需要依次查询出来然后封装成所需要的对象返回给前端。那么在此过程中,就可以把这个接口中“大任务”拆分成N个小任务,异步执行这些小任务,等到最后一个小任务执行完,把所有任务的执行结果封装到返回结果中,统一返回到前端展示。
面试官:java8中parallelStream提升数倍查询效率是怎样实现的
|
安全 Java
java8的ParallelStream踩坑记录
java8中的新特性stream流处理,让集合操作变得非常的简单,但是因为没有源码支持,所以里面有很多坑,只有踩过才知道 首先上代码 图1-1 代码很简单,就是利用并行流把一个list里面的数据导入到另外一个list中,看起来看简单,接下来...
5210 0
|
7天前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)
|
1天前
|
缓存 Java 应用服务中间件
Java虚拟线程探究与性能解析
本文主要介绍了阿里云在Java-虚拟-线程任务中的新进展和技术细节。
|
18天前
|
监控 Java 调度
【Java学习】多线程&JUC万字超详解
本文详细介绍了多线程的概念和三种实现方式,还有一些常见的成员方法,CPU的调动方式,多线程的生命周期,还有线程安全问题,锁和死锁的概念,以及等待唤醒机制,阻塞队列,多线程的六种状态,线程池等
79 6
【Java学习】多线程&JUC万字超详解