Java并行流指北

简介: Java并行流,方便了 并发操作,但是不注意可能会导致问题。如 最大线程数,怎么控制并发数,类加载器,线程上下文变化,ForkJoinPool 的 execute、submit、invoke 方法的区别 等。

一、前言

  • Java并行流,方便了 并发操作,但是不注意可能会导致问题。
  • 如 最大线程数,怎么控制并发数,类加载器,线程上下文变化,ForkJoinPool 的 execute、submit、invoke 方法的区别 等。
  • 注意:本文以 openjdk 11.0.10 为例,没有特殊说明时,都是指 ForkJoinPool.commonPool()

二、注意点

1. 并行度

  • 并行度 不等于 最大线程数(maximumPoolSize),下图 commonPool 有49个线程,但是 并行度为1
  • 默认的 并行度为 CPU核数 - 1,最小为 1
  • 可通过 -Djava.util.concurrent.ForkJoinPool.common.parallelism=数量 设置
    image.png

2. 容器里面的并行度

  • 下图中,/sys/fs/cgroup/cpu/cpu.cfs_quota_us 除以 /sys/fs/cgroup/cpu/cpu.cfs_period_us = cpu核数
  • 不等于 nproc,更不等于 获得宿主机的 lscpu | grep 'CPU(s):'
    image.png

3. 最大线程数

  • 并行度 不等于 最大线程数(maximumPoolSize)
  • 即使 并行度 parallelism 为1,还有 备用线程(maximumPoolSize、COMMON_MAX_SPARES)
  • commonPool 默认 256,自定义 ForkJoinPool() 默认 32767。这样看,比较少会出现 线程数不够的情况。
    image.png

4. 并发太大,压垮后端

  • 假如 ForkJoinPool.commonPool() 线程比较多,并行流集合的元素也比较多时,给下游较大压力
  • jstack pid | grep -c commonPool

5. 线程上下文变化

如:获取不到用户信息了,可以获取到用户信息以后,传到并行流使用

final String deviceUdid = RequestUtils.getDeviceUdid();
data.parallelStream().forEach(d -> {
   
    // use deviceUdid instead of RequestUtils.getDeviceUdid() do something
});

6. ForkJoinPool 的 execute、submit、invoke 方法的区别

  • 有些简单的任务,不想单独创建线程池,可以用 ForkJoinPool.commonPool()
  • execute():异步执行,没有返回值,不能等待执行完成
  • submit():异步执行,返回 ForkJoinTask,需增加 .join() 等待完成
  • invoke():等于 submit() + join()

7. spring boot使用Java并行流发送kafka消息报错

  • 类加载器不一样,详见 spring boot 使用 Java 并行流发送 kafka 消息报错
  • 使用 spring-boot-maven-plugin 打包以后,依赖在 jar里面自定义位置(BOOT-INF/lib/),使用 org.springframework.boot.loader.LaunchedURLClassLoader 加载
  • ForkJoinPool.commonPool 默认使用 DefaultForkJoinWorkerThreadFactory,用的 系统ClassLoader,所以 并行流加载不到依赖的 class
  • 可通过 -Djava.util.concurrent.ForkJoinPool.common.threadFactory 设置 自定义线程工厂,使用当前 ClassLoader 解决
    image.png

8. 自定义并行流线程池

参考 concurrency - Custom thread pool in Java 8 parallel stream - Stack Overflow

  • 方案一(各种情况都有效)

    CompletableFuture.runAsync(list.parallelStream().forEach(), new ForkJoinPool(2)).join()
    
  • 方案二(部分场景似乎没有效果)

    // 第4个参数 asyncMode,默认 false,设置为 true 适用于 FIFO
    ForkJoinPool forkJoinPool = new ForkJoinPool(2, pool -> new ForkJoinWorkerThread(pool) {
         
    }, null, false);
    forkJoinPool.invoke(() -> list.parallelStream().forEach());
    

9. 控制并发数

  • 可考虑把 集合切分成需要的份数,然后 parallelStream()
    List<String> list = List.of("a", "b", "c");
    CollUtil.split(list, list.size() / 2 + 1).parallelStream().forEach(b -> {
         
      b.stream().forEach(System.out::println);
    });
    

10. 顺序消费

  • 如 forEachOrdered 会导致没有并发效果
  • 需要并行,还要使用输入顺序的,可考虑把 集合切分成需要的份数,然后 parallelStream()

三、总结

本文遵守【CC BY-NC】协议,转载请保留原文出处及本版权声明,否则将追究法律责任。
本文首先发布于 https://www.890808.xyz/ ,其他平台需要审核更新慢一些。

相关文章
|
6月前
|
算法 Java 数据处理
Dating Java8系列之并行数据处理
Dating Java8系列之并行数据处理
74 0
|
13天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
16天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
4月前
|
并行计算 Java 大数据
Java中的高效并行计算与多线程编程技术
Java中的高效并行计算与多线程编程技术
|
4月前
|
存储 Java 调度
线程操纵术并行策略问题之Java的并行编程优势问题如何解决
线程操纵术并行策略问题之Java的并行编程优势问题如何解决
|
5月前
|
Java
Java并行流问题之parallelStream的使用方式
Java并行流问题之parallelStream的使用方式
108 1
|
5月前
|
Java 程序员
Java多线程编程是指在一个进程中创建并运行多个线程,每个线程执行不同的任务,并行地工作,以达到提高效率的目的
【6月更文挑战第18天】Java多线程提升效率,通过synchronized关键字、Lock接口和原子变量实现同步互斥。synchronized控制共享资源访问,基于对象内置锁。Lock接口提供更灵活的锁管理,需手动解锁。原子变量类(如AtomicInteger)支持无锁的原子操作,减少性能影响。
46 3
|
4月前
|
Java 调度 Windows
Java面试之程序、进程、线程、管程和并发、并行的概念
Java面试之程序、进程、线程、管程和并发、并行的概念
29 0
|
4月前
|
并行计算 监控 Java
Java中的并行计算与任务分发策略
Java中的并行计算与任务分发策略
|
4月前
|
安全 Java 测试技术
Java中的并行流详解
Java中的并行流详解
下一篇
无影云桌面