Java8 Stream 并行流

简介: Java8 Stream 并行流

并行流就是把一系列数据自动拆分成多个数据块,并使用多个线程来处理这些数据块,这样就可以利用现代CPU多核的优势,把计算任务分配给多个CPU核心,最后再汇总结果。让它们都忙起来~


# 并行流使用的线程池


  • 先来看看并行流所使用的线程

public static void main(String[] args) {
    Random random = new Random();
    //非并行流
    Stream.generate(() -> random.nextInt(1000))
            .limit(10)
            .forEach(x -> System.out.println(x + ": " + Thread.currentThread().getName()));
    //并行流
    Stream.generate(() -> random.nextInt(1000))
            .limit(1000)
            //转换成并行流
            .parallel()
            .forEach(x -> System.out.println(x + ": " + Thread.currentThread().getName()));
}


  • 结果: 可以看到并行流除了使用main线程,还使用了ForkJoinPool线程。


image.png

image.png


  • ForkJoinPool使用演示

@Getter
@Setter
@Slf4j
public class ForkJoinCalculator extends RecursiveTask<Long> {
    /**
     * 获取通用的ForkJoinPool
     */
    private static final ForkJoinPool FORK_JOIN_POOL = ForkJoinPool.commonPool();
    /**
     * 最小批次元素数量
     */
    private static final int MIN_BATCH_SIZE = 1000;
    private long[] dataArray;
    private int startIndex;
    private int endIndex;
    public ForkJoinCalculator(long[] dataArray) {
        this.dataArray = dataArray;
        this.startIndex = 0;
        this.endIndex = dataArray.length;
    }
    private ForkJoinCalculator(long[] dataArray, int startIndex, int endIndex) {
        this.dataArray = dataArray;
        this.startIndex = startIndex;
        this.endIndex = endIndex;
    }
    @Override
    protected Long compute() {
        ForkJoinCalculator.printWithThread("cur startIndex=%s,endIndex=%s", startIndex, endIndex);
        long curTotal = 0L;
        if (endIndex - startIndex <= MIN_BATCH_SIZE) {
            //如果需要计算的元素个数小于最小阈值则直接计算
            for (int i = startIndex; i < endIndex; i++) {
                curTotal += dataArray[i];
            }
            ForkJoinCalculator.printWithThread("直接计算curTotal=%s", curTotal);
        } else {
            // 如要要计算的元素个数大于设定的最小阈值,则进行任务拆分
            // 将元素startIndex~endIndex个任务拆分成两份
            // 计算中间索引
            int middleIndex = (startIndex + endIndex) / 2;
            ForkJoinCalculator leftForkJoinCalculator = new ForkJoinCalculator(dataArray, startIndex, middleIndex);
            ForkJoinCalculator rightForkJoinCalculator = new ForkJoinCalculator(dataArray, middleIndex, endIndex);
            // fork():将任务push到线程的工作队列
            // join(): 计算结果
            long leftTotal = leftForkJoinCalculator.fork().join();
            // 第二个子任务,有可能继续划分
            long rightTotal = rightForkJoinCalculator.compute();
            ForkJoinCalculator.printWithThread("leftTotal=%s,rightTotal=%s", leftTotal, rightTotal);
            curTotal = leftTotal + rightTotal;
        }
        return curTotal;
    }
    public static void printWithThread(String format, Object... args) {
        String formatStr = String.format(format, args);
        String threadName = Thread.currentThread().getName();
        System.out.println(threadName + ": " + formatStr);
    }
}


  • 工作窃取: work stealing
  • 为了保证每个线程完成的任务量相对平均,每个线程都会将分配给自己的任务保存在一个双向队列,每执行完一个任务,就会从队列头取出下一个任务执行,如果当前线程比较优秀,早早地完成了自己队列内的所有任务,则会从其他线程的队列的尾巴上"窃取"一个任务来执行,直到所有线程的队列都清空,以保证以最快的速度执行完所有的任务。


# 性能与注意点


使用并行流并不保证性能一定比非并行流和for循环好,有时候可能更差,这取决于要处理的数据集的数据结构。并且,在使用并行流之前,必须确保用的对,否则可能出现计算结果错误的严重后果。请记住,并行化并不是没有代价的。


  • 演示一个错误使用并行流的例子

public static class Add {
    private long total;
    public void add(long curVal) {
        total += curVal;
    }
}
public void streamDemo() {
    Add add = new Add();
    LongStream.rangeClosed(1, 1_000_000)
            .forEach(add::add);
    System.out.println(add.total);
}
public void parallelStreamDemo() {
    Add add = new Add();
    LongStream.rangeClosed(1, 1_000_000)
            .parallel()
            .forEach(add::add);
    System.out.println(add.total);
}


  • 结果: 使用并行流计算出来的结果与正确结果500000500000出现了非常大的差异,这是因为在使用并行流时,多个线程同时访问total+=curVal;,会出现线程安全问题。当然,你可以将add()设置为synchronized同步方法,但是很显然性能会很差。

500000500000
53692171876

# 高效使用并行流的建议



  • 如果不确定使用并行流是否能提高程序执行的效率:请测量、测试。使用并行流的结果并不一定会产生与我们预期相符的结果,最好的方法就是在使用之前做足够的基准测试来检测性能。
  • 注意装箱和拆箱成本,尽量使用原始类型特化流IntStream, LongStreamDoubleStream
  • 对于较少的数据量,使用并行流从来都不是一个好的决定。使用并行流带来的好处还抵不过线程的额外开销。
  • 因为并行流计算之前需要将数据集拆分,所以在使用并行流之前需要考虑数据集是否易于拆分。例如,ArrayList就比LinkedList易于拆分,因为ArrayList不需要遍历整个数据集就可拆分,而后者必须完整遍历。
  • 按照可拆分性:


数据源 可拆分性
ArrayList 极佳
LinkedList
Stream.range() 极佳
Stream.iterate()
HashSet
TreeSet


相关文章
|
22天前
|
安全 Java API
告别繁琐编码,拥抱Java 8新特性:Stream API与Optional类助你高效编程,成就卓越开发者!
【8月更文挑战第29天】Java 8为开发者引入了多项新特性,其中Stream API和Optional类尤其值得关注。Stream API对集合操作进行了高级抽象,支持声明式的数据处理,避免了显式循环代码的编写;而Optional类则作为非空值的容器,有效减少了空指针异常的风险。通过几个实战示例,我们展示了如何利用Stream API进行过滤与转换操作,以及如何借助Optional类安全地处理可能为null的数据,从而使代码更加简洁和健壮。
56 0
|
9天前
|
Java API C++
Java 8 Stream Api 中的 peek 操作
本文介绍了Java中`Stream`的`peek`操作,该操作通过`Consumer&lt;T&gt;`函数消费流中的每个元素,但不改变元素类型。文章详细解释了`Consumer&lt;T&gt;`接口及其使用场景,并通过示例代码展示了`peek`操作的应用。此外,还对比了`peek`与`map`的区别,帮助读者更好地理解这两种操作的不同用途。作者为码农小胖哥,原文发布于稀土掘金。
Java 8 Stream Api 中的 peek 操作
|
9天前
|
Java C# Swift
Java Stream中peek和map不为人知的秘密
本文通过一个Java Stream中的示例,探讨了`peek`方法在流式处理中的应用及其潜在问题。首先介绍了`peek`的基本定义与使用,并通过代码展示了其如何在流中对每个元素进行操作而不返回结果。接着讨论了`peek`作为中间操作的懒执行特性,强调了如果没有终端操作则不会执行的问题。文章指出,在某些情况下使用`peek`可能比`map`更简洁,但也需注意其懒执行带来的影响。
Java Stream中peek和map不为人知的秘密
|
21天前
|
Java API
Java 8新特性:Lambda表达式与Stream API的深度解析
【7月更文挑战第61天】本文将深入探讨Java 8中的两个重要特性:Lambda表达式和Stream API。我们将首先介绍Lambda表达式的基本概念和语法,然后详细解析Stream API的使用和优势。最后,我们将通过实例代码演示如何结合使用Lambda表达式和Stream API,以提高Java编程的效率和可读性。
|
19天前
|
Java
盘点java8 stream中隐藏的函数式接口
`shigen`是一位坚持更新文章的博客作者,记录成长历程,分享认知见解,留住感动瞬间。本文介绍了函数式接口的概念及其在Java中的应用,包括`Comparator`、`Runnable`、`Callable`等常见接口,并详细讲解了`Function`、`Predicate`、`Consumer`、`Supplier`和`Comparator`等函数式接口的使用方法及应用场景,展示了如何利用这些接口简化代码并提高编程效率。**个人IP:shigen**,与shigen一起,每天进步一点点!
29 0
盘点java8 stream中隐藏的函数式接口
|
1月前
|
Java API 开发者
|
24天前
|
Java API 网络安全
探索Java中的Stream API:从基础到高级应用云计算与网络安全:技术融合与挑战
【8月更文挑战第27天】在Java的海洋中,Stream API犹如一艘强大的船,让开发者能以声明式的方式处理集合数据。本文将启航,先带你了解Stream的基本概念和用法,再深入探讨其高级特性,如并行流、管道操作以及性能考量。我们将通过具体代码示例,展示如何高效利用Stream API简化数据处理流程,提升代码的可读性和性能。无论你是初学者还是有经验的开发者,这篇文章都将为你打开一扇通往更优雅编程风格的大门。
|
30天前
|
数据可视化 IDE Java
Java8的Stream流太难用了?看看JDFrame如何简化开发
【8月更文挑战第21天】在Java的世界里,Java 8引入的Stream API无疑是一场革命,它极大地提升了集合处理的表达能力和简洁性。然而,对于许多开发者而言,尤其是那些刚从旧版本Java迁移过来的开发者,Stream API的复杂性和抽象性可能会让人感到困惑和挫败。今天,我们就来探讨如何通过JDFrame这样的框架或工具,来简化Java 8 Stream的使用,提升开发效率。
38 0
|
1月前
|
并行计算 Java API
|
1月前
|
前端开发 Oracle Java
Java 22 新增利器: 使用 Java Stream Gather 优雅地处理流中的状态
本文我们分析了 什么 是 “流”,对比了 Java 上几种常见的 “流”库,引入和详细介绍了 Java 22 中的 Stream Gather API 。同时也简单分享了利用 虚拟线程 如何简化 StreammapConcurrent操作符的实现。