Java并行流陷阱:为什么指定线程池可能是个坏主意

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。

Java并行流陷阱:为什么指定线程池可能是个坏主意

Java Stream 并不支持指定线程池,实际编码中,有些开发者可能会使用一些“技巧”来指定线程池。实际上,所谓的技巧不仅降低了可读性,而且很容易出现bug。本文将分析并行流式编程的设计思想、”技巧“会带来的问题,并提出相关的解决方案。

1. Stream 为什么不支持指定线程池

简单总结就是官方考虑过相关方案,认为没必要,parallel 提供了简单直接的使用方式,官方的初衷就是流式编程需要轻松实现并行支持,而指定线程池会使API更复杂化,不便使用,还会有线程安全性问题。需要指出的是,官方考虑过相关方案。

并行流默认使用公共线程池,基本思想为分治。公共池类型为 ForkJoinPool, 公共线程池并发度为CUP核数 - 1,适用于处理CPU密集型任务。任务为递归型任务,任务可以划分为子任务,空闲线程可以”窃取“待执行任务,充分利用线程池。一般情况下,使用公用池时,任务队列中会存在比较多的小任务。使用公用池的好处是可以避免创建过多无用的线程,特别是对于CPU密集型任务,新增线程反而会增加上下文开销。

流式编程可能是函数式编程最被大众接受的一种编程方式。理论上,流的处理过程中,所有的方法都应该是纯函数,遵循引用透明原则,内部可以对具体执行流程进行优化,其不为 IO 密集型任务是理所应当的。

不妨看看官方对于 ForkJoinTask 的描述:

A ForkJoinTask is a lightweight form of Future. The efficiency of ForkJoinTasks stems from a set of restrictions (that are only partially statically enforceable) reflecting their main use as computational tasks calculating pure functions or operating on purely isolated objects. The primary coordination mechanisms are fork, that arranges asynchronous execution, and join, that doesn't proceed until the task's result has been computed. Computations should ideally avoid synchronized methods or blocks, and should minimize other blocking synchronization apart from joining other tasks or using synchronizers such as Phasers that are advertised to cooperate with fork/ join scheduling. Subdividable tasks should also not perform blocking I/ O, and should ideally access variables that are completely independent of those accessed by other running tasks. These guidelines are loosely enforced by not permitting checked exceptions such as IOExceptions to be thrown.

简单总结就是 Future类型,通过fork、join 编排异步任务执行,应该避免阻塞方法。

若想指定线程池或者实现细粒度的代码执行控制,官方推荐使用 CompletableFuture,JUC 等相关类库。

2. 使用”技巧“执行线程池

以下stackoverflow上有关于指定线程池的解答

java

代码解读

复制代码

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

通过这个”技巧“可以指定 ForkJoinPool。其底层逻辑可以总结为:当 ForkJoin 任务执行时,其可以获得线程池上下文,任务(子任务)会在线程池中执行。

但是,这个 trick 是不可靠的。你需要注意 JDK 的版本,openjdk8u222 之前的版本实现有bug,使用的依然是公共池,官方bug修改相关信息可以看这里Parallel stream execution within a custom ForkJoinPool should obey the parallelism

以下是使用CompletableFuture 的 trick 实现,基本思路是一样的:

java

代码解读

复制代码

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);

需要注意的是:由于forkJoinTask的存在,虽然看似提交了一条任务,实际上提交了很多条。

3. 慎用并行流

首先,无论如何不要在并行流里执行阻塞任务,除非你对于其内部实现非常了解,否则,你会遇到各种各样所谓的坑。

当然,官方也没有完全禁止,其表述如下:

It is possible to define and use ForkJoinTasks that may block, but doing so requires three further considerations: (1) Completion of few if any other tasks should be dependent on a task that blocks on external synchronization or I/ O. Event-style async tasks that are never joined (for example, those subclassing CountedCompleter) often fall into this category. (2) To minimize resource impact, tasks should be small; ideally performing only the (possibly) blocking action. (3) Unless the ForkJoinPool. ManagedBlocker API is used, or the number of possibly blocked tasks is known to be less than the pool's ForkJoinPool. getParallelism level, the pool cannot guarantee that enough threads will be available to ensure progress or good performance.

如果你能够对以上描述有深入理解,比如 ManagedBlocker 的工作原理,那么可能没人可以拦住你使用阻塞任务。但是,由于公共池是公用的,每一次任务的成功执行不一定能保证整体执行多条任务时能够执行成功(这也是推荐使用自定义线程池的原因之一)。

其次,应该理解并行流的基本执行流程。不是所有stream 都可以 parallel 的。笔者遇到的生产代码没有遇到非要使用parallel 才能解决问题的,而且99%遇到的parallel 使用都是有问题。

Effective Java 第48条(谨慎使用并行流)指出了一些规则:

  1. stream.iterate 、数据源为 迭代器、中间操作使用了limit,使用并行流不能提高性能。
  2. 使用方便切分的数据结构,如ArrayList、HashMap、数组、range 等。你需要理解 Spliterator 的底层实现。
  3. 使用规约 reduce 方法的效率最高,短路操作(如 anyMatch、allMatch)次之,collect 操作最低。

再次,你需要能写出无需状态、无副作用的纯函数。

最后,需要进行性能测试。实际上,使用并行流不一定提高性能。我们可以在最初阶段估算并行度,比如并行排序,一方面只有可以并行的运算才可以提高性能;另一方面,任务划分可能会划分过多的子任务,结果收集难以并行运算,还有线程上下文切换、数据同步等开销。性能测试不是一次性的,应该尽可能模拟实际生产场景。

总之,轻松使用并行流可能应了这句话:“理想很丰满,现实很骨感”。我们已经有了容易理解的API(CompletableFuture、线程池、JUC等工具类),没必要舍近求远使用复杂、易错、性能不一定好的实现。

4. Parallel collector

Stream类中可拓展性最好的方法是 collect, 你可以传入不同的Collector 实现,比如 使用 toConcurrentMap 返回并发支持Map、Guava 中使用 toImmutableList, toImmutableSet 返回不可变集合、使用 Comparators. greatest(k, comparator) 高效计算 topK问题等等。

对于阻塞任务,开源类库 Parallel Collector 提供了收集阻塞任务的能力,示例代码如下:

java

代码解读

复制代码

list.stream()
  .collect(parallel(i -> blockingOp(i), toList()))
  	// 加入超时机制,提高系统韧性
    .orTimeout(1000, MILLISECONDS)
    .thenAcceptAsync(System.out::println, executor)
    .thenRun(() -> System.out.println("Finished!"));

总结其特点如下:

  1. 其返回类型为 CompletableFuture,无需等待即可返回。实现了 CompletableFuture 和 Stream 流的转换。
  2. 其可以指定执行器和并发度,方便并发控制。
  3. 和标准库无缝衔接,没有所谓的 trick,做法仅仅是实现 Collector 接口,没有对Stream底层实现有依赖。
  4. 成熟,目前支持 JDK21+(虚拟线程) 和 JDK8+(线程池)。截至本文发表前 JDK8+支持版本为
    com.pivovarit:parallel-collectors:2.6.1


转载来源:https://juejin.cn/post/7419869992617050152

相关文章
|
8天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
48 17
|
19天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
4天前
|
缓存 安全 算法
Java 多线程 面试题
Java 多线程 相关基础面试题
|
21天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
21天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
21天前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
45 3
|
21天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
116 2
|
29天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
48 6
|
1月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
1月前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
59 3