java8中修改parallelStream默认并发数

简介: java8中修改parallelStream默认并发数

一、parallelStream说明


Java 8引入了流的概念去对数据进行复杂的操作,而且使用并行流(Parallel Steams)支持并发,大大加快了运行效率。


parallelStream默认使用了fork-join框架,其默认线程数是CPU核心数。


二、parallelStream默认的并发数


@Test
  public void testParallelism1() throws ExecutionException, InterruptedException {
        int cupNum = Runtime.getRuntime().availableProcessors();
        log.info("CPU num:{}",cupNum);
        long firstNum = 1;
        long lastNum = 10000;
        List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
                .collect(Collectors.toList());
        aList.parallelStream().forEach(e->{
              log.info("输出:{}",e);
        });
  }


执行结果:

95.png

说明:

可以发现有8个线程参与任务执行,分别是main主线程、ForkJoinPool.commonPool-worker-1 到ForkJoinPool.commonPool-worker-7,正好与CPU的核心数8匹配。


默认情况下,parallelStream使用的是ForkJoinPool.commonPool(),这是一个公用的线程池,被整个程序所使用。


可以通过以下方法修改parallelStream默认的多线程数量:


三、设置ForkJoinPool.commonPool()的并发数


设置parallelStream默认的公用线程池的全局并发数:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");


实战:


@Test
 public void testParallelism3() throws ExecutionException, InterruptedException {
       int cupNum = Runtime.getRuntime().availableProcessors();
       log.info("CPU num:{}",cupNum);
       long firstNum = 1;
       long lastNum = 10000;
       List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
               .collect(Collectors.toList());
       System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");
       aList.parallelStream().forEach(e->{
             log.info("输出:{}",e);
       });
 }


执行结果:

94.png

说明:

执行结果中出现ForkJoinPool.commonPool-worker-0到ForkJoinPool.commonPool-worker-3,说明公共线程池的并发数配置的并发数4确实生效了。

此时任务的并发数是5, mian主线程 + ForkJoinPool.commonPool(0~4)


推荐:

由于主线程也会参与任务抢占CPU,所以ForkJoinPool.commonPool的线程数尽量设置为

(CPU核心数*N - 1)


四、通过ForkJoinPool定义私有线程池


@Test
public void testParallelism4() {
       int cupNum = Runtime.getRuntime().availableProcessors();
       log.info("CPU num:{}",cupNum);
       long firstNum = 1;
       long lastNum = 10000;
       List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
               .collect(Collectors.toList());
       ForkJoinPool forkJoinPool = new ForkJoinPool(8);
       try{
             List<Long> longs = forkJoinPool.submit(() -> aList.parallelStream().map(e->{
                   return e+1;
             }).collect(Collectors.toList())).get();
             //通过调用get方法,等待任务执行完毕
             System.out.println(longs.size());
             System.out.println("执行结束");
       }catch (InterruptedException e) {
             e.printStackTrace();
       } catch (Exception e){
             e.printStackTrace();
       }finally {
             forkJoinPool.shutdown();
       }
 }


执行结果:

###|||2021-08-27 15:03:31.080|||INFO|||-|||-|||main|||SimpleTest--->CPU num:8
10000
执行结束


说明:

采用自定义的forkJoinPool线程池去提交任务,主线程不会参与计算。

forkJoinPool线程池采用submit异步提交任务,通过get方法阻塞主线程,直到任务执行完成,再调用shutdown方法关闭线程池。

注意,等待提交任务执行完毕不能采用awaitTermination()方法,该方法是等待指定时间后强制关闭线程池。


五、ForkJoinPool的错误使用


错误一:

通过forkJoinPool线程池结合parallelStream.forEach并发提交任务,get方法不能起到阻塞主线程、等待任务执行完毕的作用。


@Test
public void testParallelism5() {
      long firstNum = 1;
      long lastNum = 1000;
      List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
              .collect(Collectors.toList());
      ForkJoinPool forkJoinPool = new ForkJoinPool(8);
      try{
            ForkJoinTask future = forkJoinPool.submit(() -> aList.parallelStream().forEach(e->{
                  log.info("输出:{}",e);
            }));
            //通过调用get方法,等待任务执行完毕
            future.get(10, TimeUnit.MINUTES);
            //这里不能使用log打印日志
            //log.info("执行结束");
            System.out.println("执行结束");
      }catch (InterruptedException e) {
            e.printStackTrace();
      } catch (Exception e){
            e.printStackTrace();
      }finally {
            forkJoinPool.shutdown();
      }
}


执行结果:

ForkJoinPool线程池中的任务没有执行完成,主线程中就打印执行结束。get方法没有起到阻塞任务、等待任务执行完成的作用。

93.png


错误二:

通过forkJoinPool线程池的awaitTermination()方法是等待指定时间后关闭线程池,而不是等待任务结束后关闭线程池。

@Test
public void testParallelism5() {
      long firstNum = 1;
      long lastNum = 1000;
      List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
              .collect(Collectors.toList());
      ForkJoinPool forkJoinPool = new ForkJoinPool(8);
      try{
            ForkJoinTask future = forkJoinPool.submit(() -> aList.parallelStream().forEach(e->{
                  log.info("输出:{}",e);
            }));
            long startTime = System.currentTimeMillis();
            forkJoinPool.awaitTermination(20,TimeUnit.SECONDS);
            long totalTime = (System.currentTimeMillis() - startTime)/1000;
            System.out.println("耗时:"+totalTime);
            System.out.println("执行结束");
      }catch (InterruptedException e) {
            e.printStackTrace();
      } catch (Exception e){
            e.printStackTrace();
      }finally {
            forkJoinPool.shutdown();
      }
}


执行结果:

任务几秒就执行完毕了,但是却等待了20秒才继续往下执行。显然用awaitTermination来等待任务执行完成是不合适的。

92.png

小结:

采用get方法阻塞主线程,等待ForkJoinPool线程池中的任务执行完毕,适合聚合运算有结果返回的情况,不适合forEach这样的遍历操作。


六、执行效率对比


@Test
  public void testParallelism2() throws ExecutionException, InterruptedException {
        long firstNum = 1;
        long lastNum = 1_000_000;
        List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
                .collect(Collectors.toList());
        //4     055755750
        //8     091087375
        //20    250872750
        ForkJoinPool customThreadPool = new ForkJoinPool(8);
        StopWatch stopWatch = new StopWatch("task");
        stopWatch.start("parallelStream");
        long actualTotal = customThreadPool.submit(
                () -> aList.parallelStream().reduce(0L, Long::sum)).get();
        stopWatch.stop();
        log.info("result:{}",actualTotal);
        log.info("耗时统计:\n" +stopWatch.prettyPrint());
  }


执行结果:

并发数

耗时

4

055755750
8 091087375
20 250872750

说明:

针对这类高密度的CPU计算任务,提高线程池的并发数,反而会降低任务的执行效率,因为CPU抢占和大量线程频繁切换会增加任务的耗时。


总结


本文主要介绍了java8中如何修改parallelStream的默认并发数。

主要有以下两种方式:

1、设置ForkJoinPool.commonPool公共池的全局并发数。

2、自定义ForkJoinPool线程池指定并发数。

然后分析了ForkJoinPool并发任务如何阻塞等待任务执行完毕。

最后通过一个简单的测试,说明了针对CPU密集型计算任务,线程池的并发数越大,任务执行效率反而更低。

目录
相关文章
|
5月前
|
Java
Java并行流问题之parallelStream的使用方式
Java并行流问题之parallelStream的使用方式
96 1
|
6月前
|
Java 数据处理
Java8的新特性parallelStream()的概念、对比线程优势与实战
parallelStream() 是 Java 8 中新增的一个方法,它是 Stream 类的一种扩展,提供了将集合数据并行处理的能力。普通的 stream() 方法是使用单线程对集合数据进行顺序处理,而 parallelStream() 方法则可以将集合数据分成多个小块,分配到多个线程并行处理,从而提高程序的执行效率。
503 3
|
安全 Java
Java8中的Stream()与ParallelStream()的区别
Java8中的Stream()与ParallelStream()的区别
76 0
|
并行计算 算法 安全
谨慎使用 Java8 新特性 ParallelStream并行流
谨慎使用 Java8 新特性 ParallelStream并行流
1010 0
|
安全 Java
使用Java8新特性parallelStream遇到的坑
使用Java8新特性parallelStream遇到的坑
|
前端开发 安全 Java
面试官:java8中parallelStream提升数倍查询效率是怎样实现的
业务场景 在很多项目中,都有类似数据汇总的业务场景,查询今日注册会员数,在线会员数,订单总金额,支出总金额等。。。这些业务通常都不是存在同一张表中,我们需要依次查询出来然后封装成所需要的对象返回给前端。那么在此过程中,就可以把这个接口中“大任务”拆分成N个小任务,异步执行这些小任务,等到最后一个小任务执行完,把所有任务的执行结果封装到返回结果中,统一返回到前端展示。
面试官:java8中parallelStream提升数倍查询效率是怎样实现的
|
安全 Java
java8的ParallelStream踩坑记录
java8中的新特性stream流处理,让集合操作变得非常的简单,但是因为没有源码支持,所以里面有很多坑,只有踩过才知道 首先上代码 图1-1 代码很简单,就是利用并行流把一个list里面的数据导入到另外一个list中,看起来看简单,接下来...
5251 0
|
4天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
71 38
|
1天前
|
安全 Java
java 中 i++ 到底是否线程安全?
本文通过实例探讨了 `i++` 在多线程环境下的线程安全性问题。首先,使用 100 个线程分别执行 10000 次 `i++` 操作,发现最终结果小于预期的 1000000,证明 `i++` 是线程不安全的。接着,介绍了两种解决方法:使用 `synchronized` 关键字加锁和使用 `AtomicInteger` 类。其中,`AtomicInteger` 通过 `CAS` 操作实现了高效的线程安全。最后,通过分析字节码和源码,解释了 `i++` 为何线程不安全以及 `AtomicInteger` 如何保证线程安全。
java 中 i++ 到底是否线程安全?
|
5天前
|
Java 调度
[Java]线程生命周期与线程通信
本文详细探讨了线程生命周期与线程通信。文章首先分析了线程的五个基本状态及其转换过程,结合JDK1.8版本的特点进行了深入讲解。接着,通过多个实例介绍了线程通信的几种实现方式,包括使用`volatile`关键字、`Object`类的`wait()`和`notify()`方法、`CountDownLatch`、`ReentrantLock`结合`Condition`以及`LockSupport`等工具。全文旨在帮助读者理解线程管理的核心概念和技术细节。
20 1
[Java]线程生命周期与线程通信