一、parallelStream说明
Java 8引入了流的概念去对数据进行复杂的操作,而且使用并行流(Parallel Steams)支持并发,大大加快了运行效率。
parallelStream默认使用了fork-join框架,其默认线程数是CPU核心数。
二、parallelStream默认的并发数
@Test public void testParallelism1() throws ExecutionException, InterruptedException { int cupNum = Runtime.getRuntime().availableProcessors(); log.info("CPU num:{}",cupNum); long firstNum = 1; long lastNum = 10000; List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed() .collect(Collectors.toList()); aList.parallelStream().forEach(e->{ log.info("输出:{}",e); }); }
执行结果:
说明:
可以发现有8个线程参与任务执行,分别是main主线程、ForkJoinPool.commonPool-worker-1 到ForkJoinPool.commonPool-worker-7,正好与CPU的核心数8匹配。
默认情况下,parallelStream使用的是ForkJoinPool.commonPool(),这是一个公用的线程池,被整个程序所使用。
可以通过以下方法修改parallelStream默认的多线程数量:
三、设置ForkJoinPool.commonPool()的并发数
设置parallelStream默认的公用线程池的全局并发数:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");
实战:
@Test public void testParallelism3() throws ExecutionException, InterruptedException { int cupNum = Runtime.getRuntime().availableProcessors(); log.info("CPU num:{}",cupNum); long firstNum = 1; long lastNum = 10000; List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed() .collect(Collectors.toList()); System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4"); aList.parallelStream().forEach(e->{ log.info("输出:{}",e); }); }
执行结果:
说明:
执行结果中出现ForkJoinPool.commonPool-worker-0到ForkJoinPool.commonPool-worker-3,说明公共线程池的并发数配置的并发数4确实生效了。
此时任务的并发数是5, mian主线程 + ForkJoinPool.commonPool(0~4)
推荐:
由于主线程也会参与任务抢占CPU,所以ForkJoinPool.commonPool的线程数尽量设置为
(CPU核心数*N - 1)
四、通过ForkJoinPool定义私有线程池
@Test public void testParallelism4() { int cupNum = Runtime.getRuntime().availableProcessors(); log.info("CPU num:{}",cupNum); long firstNum = 1; long lastNum = 10000; List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed() .collect(Collectors.toList()); ForkJoinPool forkJoinPool = new ForkJoinPool(8); try{ List<Long> longs = forkJoinPool.submit(() -> aList.parallelStream().map(e->{ return e+1; }).collect(Collectors.toList())).get(); //通过调用get方法,等待任务执行完毕 System.out.println(longs.size()); System.out.println("执行结束"); }catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e){ e.printStackTrace(); }finally { forkJoinPool.shutdown(); } }
执行结果:
###|||2021-08-27 15:03:31.080|||INFO|||-|||-|||main|||SimpleTest--->CPU num:8 10000 执行结束
说明:
采用自定义的forkJoinPool线程池去提交任务,主线程不会参与计算。
forkJoinPool线程池采用submit异步提交任务,通过get方法阻塞主线程,直到任务执行完成,再调用shutdown方法关闭线程池。
注意,等待提交任务执行完毕不能采用awaitTermination()方法,该方法是等待指定时间后强制关闭线程池。
五、ForkJoinPool的错误使用
错误一:
通过forkJoinPool线程池结合parallelStream.forEach并发提交任务,get方法不能起到阻塞主线程、等待任务执行完毕的作用。
@Test public void testParallelism5() { long firstNum = 1; long lastNum = 1000; List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed() .collect(Collectors.toList()); ForkJoinPool forkJoinPool = new ForkJoinPool(8); try{ ForkJoinTask future = forkJoinPool.submit(() -> aList.parallelStream().forEach(e->{ log.info("输出:{}",e); })); //通过调用get方法,等待任务执行完毕 future.get(10, TimeUnit.MINUTES); //这里不能使用log打印日志 //log.info("执行结束"); System.out.println("执行结束"); }catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e){ e.printStackTrace(); }finally { forkJoinPool.shutdown(); } }
执行结果:
ForkJoinPool线程池中的任务没有执行完成,主线程中就打印执行结束。get方法没有起到阻塞任务、等待任务执行完成的作用。
错误二:
通过forkJoinPool线程池的awaitTermination()方法是等待指定时间后关闭线程池,而不是等待任务结束后关闭线程池。
@Test public void testParallelism5() { long firstNum = 1; long lastNum = 1000; List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed() .collect(Collectors.toList()); ForkJoinPool forkJoinPool = new ForkJoinPool(8); try{ ForkJoinTask future = forkJoinPool.submit(() -> aList.parallelStream().forEach(e->{ log.info("输出:{}",e); })); long startTime = System.currentTimeMillis(); forkJoinPool.awaitTermination(20,TimeUnit.SECONDS); long totalTime = (System.currentTimeMillis() - startTime)/1000; System.out.println("耗时:"+totalTime); System.out.println("执行结束"); }catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e){ e.printStackTrace(); }finally { forkJoinPool.shutdown(); } }
执行结果:
任务几秒就执行完毕了,但是却等待了20秒才继续往下执行。显然用awaitTermination来等待任务执行完成是不合适的。
小结:
采用get方法阻塞主线程,等待ForkJoinPool线程池中的任务执行完毕,适合聚合运算有结果返回的情况,不适合forEach这样的遍历操作。
六、执行效率对比
@Test public void testParallelism2() throws ExecutionException, InterruptedException { long firstNum = 1; long lastNum = 1_000_000; List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed() .collect(Collectors.toList()); //4 055755750 //8 091087375 //20 250872750 ForkJoinPool customThreadPool = new ForkJoinPool(8); StopWatch stopWatch = new StopWatch("task"); stopWatch.start("parallelStream"); long actualTotal = customThreadPool.submit( () -> aList.parallelStream().reduce(0L, Long::sum)).get(); stopWatch.stop(); log.info("result:{}",actualTotal); log.info("耗时统计:\n" +stopWatch.prettyPrint()); }
执行结果:
并发数 |
耗时 |
4 |
055755750 |
8 | 091087375 |
20 | 250872750 |
说明:
针对这类高密度的CPU计算任务,提高线程池的并发数,反而会降低任务的执行效率,因为CPU抢占和大量线程频繁切换会增加任务的耗时。
总结
本文主要介绍了java8中如何修改parallelStream的默认并发数。
主要有以下两种方式:
1、设置ForkJoinPool.commonPool公共池的全局并发数。
2、自定义ForkJoinPool线程池指定并发数。
然后分析了ForkJoinPool并发任务如何阻塞等待任务执行完毕。
最后通过一个简单的测试,说明了针对CPU密集型计算任务,线程池的并发数越大,任务执行效率反而更低。