Java并行流陷阱:为什么指定线程池可能是个坏主意

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。

Java并行流陷阱:为什么指定线程池可能是个坏主意

Java Stream 并不支持指定线程池,实际编码中,有些开发者可能会使用一些“技巧”来指定线程池。实际上,所谓的技巧不仅降低了可读性,而且很容易出现bug。本文将分析并行流式编程的设计思想、”技巧“会带来的问题,并提出相关的解决方案。

1. Stream 为什么不支持指定线程池

简单总结就是官方考虑过相关方案,认为没必要,parallel 提供了简单直接的使用方式,官方的初衷就是流式编程需要轻松实现并行支持,而指定线程池会使API更复杂化,不便使用,还会有线程安全性问题。需要指出的是,官方考虑过相关方案。

并行流默认使用公共线程池,基本思想为分治。公共池类型为 ForkJoinPool, 公共线程池并发度为CUP核数 - 1,适用于处理CPU密集型任务。任务为递归型任务,任务可以划分为子任务,空闲线程可以”窃取“待执行任务,充分利用线程池。一般情况下,使用公用池时,任务队列中会存在比较多的小任务。使用公用池的好处是可以避免创建过多无用的线程,特别是对于CPU密集型任务,新增线程反而会增加上下文开销。

流式编程可能是函数式编程最被大众接受的一种编程方式。理论上,流的处理过程中,所有的方法都应该是纯函数,遵循引用透明原则,内部可以对具体执行流程进行优化,其不为 IO 密集型任务是理所应当的。

不妨看看官方对于 ForkJoinTask 的描述:

A ForkJoinTask is a lightweight form of Future. The efficiency of ForkJoinTasks stems from a set of restrictions (that are only partially statically enforceable) reflecting their main use as computational tasks calculating pure functions or operating on purely isolated objects. The primary coordination mechanisms are fork, that arranges asynchronous execution, and join, that doesn't proceed until the task's result has been computed. Computations should ideally avoid synchronized methods or blocks, and should minimize other blocking synchronization apart from joining other tasks or using synchronizers such as Phasers that are advertised to cooperate with fork/ join scheduling. Subdividable tasks should also not perform blocking I/ O, and should ideally access variables that are completely independent of those accessed by other running tasks. These guidelines are loosely enforced by not permitting checked exceptions such as IOExceptions to be thrown.

简单总结就是 Future类型,通过fork、join 编排异步任务执行,应该避免阻塞方法。

若想指定线程池或者实现细粒度的代码执行控制,官方推荐使用 CompletableFuture,JUC 等相关类库。

2. 使用”技巧“执行线程池

以下stackoverflow上有关于指定线程池的解答

java

代码解读

复制代码

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

通过这个”技巧“可以指定 ForkJoinPool。其底层逻辑可以总结为:当 ForkJoin 任务执行时,其可以获得线程池上下文,任务(子任务)会在线程池中执行。

但是,这个 trick 是不可靠的。你需要注意 JDK 的版本,openjdk8u222 之前的版本实现有bug,使用的依然是公共池,官方bug修改相关信息可以看这里Parallel stream execution within a custom ForkJoinPool should obey the parallelism

以下是使用CompletableFuture 的 trick 实现,基本思路是一样的:

java

代码解读

复制代码

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);

需要注意的是:由于forkJoinTask的存在,虽然看似提交了一条任务,实际上提交了很多条。

3. 慎用并行流

首先,无论如何不要在并行流里执行阻塞任务,除非你对于其内部实现非常了解,否则,你会遇到各种各样所谓的坑。

当然,官方也没有完全禁止,其表述如下:

It is possible to define and use ForkJoinTasks that may block, but doing so requires three further considerations: (1) Completion of few if any other tasks should be dependent on a task that blocks on external synchronization or I/ O. Event-style async tasks that are never joined (for example, those subclassing CountedCompleter) often fall into this category. (2) To minimize resource impact, tasks should be small; ideally performing only the (possibly) blocking action. (3) Unless the ForkJoinPool. ManagedBlocker API is used, or the number of possibly blocked tasks is known to be less than the pool's ForkJoinPool. getParallelism level, the pool cannot guarantee that enough threads will be available to ensure progress or good performance.

如果你能够对以上描述有深入理解,比如 ManagedBlocker 的工作原理,那么可能没人可以拦住你使用阻塞任务。但是,由于公共池是公用的,每一次任务的成功执行不一定能保证整体执行多条任务时能够执行成功(这也是推荐使用自定义线程池的原因之一)。

其次,应该理解并行流的基本执行流程。不是所有stream 都可以 parallel 的。笔者遇到的生产代码没有遇到非要使用parallel 才能解决问题的,而且99%遇到的parallel 使用都是有问题。

Effective Java 第48条(谨慎使用并行流)指出了一些规则:

  1. stream.iterate 、数据源为 迭代器、中间操作使用了limit,使用并行流不能提高性能。
  2. 使用方便切分的数据结构,如ArrayList、HashMap、数组、range 等。你需要理解 Spliterator 的底层实现。
  3. 使用规约 reduce 方法的效率最高,短路操作(如 anyMatch、allMatch)次之,collect 操作最低。

再次,你需要能写出无需状态、无副作用的纯函数。

最后,需要进行性能测试。实际上,使用并行流不一定提高性能。我们可以在最初阶段估算并行度,比如并行排序,一方面只有可以并行的运算才可以提高性能;另一方面,任务划分可能会划分过多的子任务,结果收集难以并行运算,还有线程上下文切换、数据同步等开销。性能测试不是一次性的,应该尽可能模拟实际生产场景。

总之,轻松使用并行流可能应了这句话:“理想很丰满,现实很骨感”。我们已经有了容易理解的API(CompletableFuture、线程池、JUC等工具类),没必要舍近求远使用复杂、易错、性能不一定好的实现。

4. Parallel collector

Stream类中可拓展性最好的方法是 collect, 你可以传入不同的Collector 实现,比如 使用 toConcurrentMap 返回并发支持Map、Guava 中使用 toImmutableList, toImmutableSet 返回不可变集合、使用 Comparators. greatest(k, comparator) 高效计算 topK问题等等。

对于阻塞任务,开源类库 Parallel Collector 提供了收集阻塞任务的能力,示例代码如下:

java

代码解读

复制代码

list.stream()
  .collect(parallel(i -> blockingOp(i), toList()))
  	// 加入超时机制,提高系统韧性
    .orTimeout(1000, MILLISECONDS)
    .thenAcceptAsync(System.out::println, executor)
    .thenRun(() -> System.out.println("Finished!"));

总结其特点如下:

  1. 其返回类型为 CompletableFuture,无需等待即可返回。实现了 CompletableFuture 和 Stream 流的转换。
  2. 其可以指定执行器和并发度,方便并发控制。
  3. 和标准库无缝衔接,没有所谓的 trick,做法仅仅是实现 Collector 接口,没有对Stream底层实现有依赖。
  4. 成熟,目前支持 JDK21+(虚拟线程) 和 JDK8+(线程池)。截至本文发表前 JDK8+支持版本为
    com.pivovarit:parallel-collectors:2.6.1


转载来源:https://juejin.cn/post/7419869992617050152

相关文章
|
12天前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
14天前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####
|
7天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
7天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
24 3
|
8天前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
13天前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
41 5
|
12天前
|
监控 Java 数据库连接
Java线程管理:守护线程与用户线程的区分与应用
在Java多线程编程中,线程可以分为守护线程(Daemon Thread)和用户线程(User Thread)。这两种线程在行为和用途上有着明显的区别,了解它们的差异对于编写高效、稳定的并发程序至关重要。
21 2
|
12天前
|
监控 Java 开发者
Java线程管理:守护线程与本地线程的深入剖析
在Java编程语言中,线程是程序执行的最小单元,它们可以并行执行以提高程序的效率和响应性。Java提供了两种特殊的线程类型:守护线程和本地线程。本文将深入探讨这两种线程的区别,并探讨它们在实际开发中的应用。
18 1
|
13天前
|
安全 Java 开发者
Java中的多线程编程:从基础到实践
本文深入探讨了Java多线程编程的核心概念和实践技巧,旨在帮助读者理解多线程的工作原理,掌握线程的创建、管理和同步机制。通过具体示例和最佳实践,本文展示了如何在Java应用中有效地利用多线程技术,提高程序性能和响应速度。
50 1
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
51 1
C++ 多线程之初识多线程