浅谈JDK 1.8中的PrarllelStream

简介: Stream是JDK 1.8中新增加的一个特性,就如同一个高级版迭代器(Iterator),可无限数据源,单向,不可往复,遍历过一次后即用尽了,正如流水一去不复返。而和迭代器只能命令式地、串行化操作不同,Stream可以并行化操作; 而ParallelStream正是一个并行执行的流,它是通过默认...

Stream是JDK 1.8中新增加的一个特性,就如同一个高级版迭代器(Iterator),可无限数据源,单向,不可往复,遍历过一次后即用尽了,正如流水一去不复返。而和迭代器只能命令式地、串行化操作不同,Stream可以并行化操作;
而ParallelStream正是一个并行执行的流,它是通过默认的ForkJoinPool提高多线程任务处理速度;

1. 一个栗子

static List<String> construct() {

        List<String> Strings = new ArrayList<String>();
        for (int i = 0; i < 50; i++) {
            String p = "name" + i;
            Strings.add(p);
        }
        return Strings;
    }

    static void doFor(List<String> Strings) {
        long start = System.currentTimeMillis();

        for (String p : Strings) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
            // System.out.println(p);
        }

        long end = System.currentTimeMillis();
        System.out.println("doFor cost:" + (end - start));
    }

    static void doStream(List<String> Strings) {
        long start = System.currentTimeMillis();

        Strings.stream().forEach(x -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
            // System.out.println(x);
        });

        long end = System.currentTimeMillis();
        System.out.println("doStream cost:" + (end - start));
    }

    static void doParallelStream(List<String> Strings) {

        long start = System.currentTimeMillis();

        Strings.parallelStream().forEach(x -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
            // System.out.println(x);
        });

        long end = System.currentTimeMillis();

        System.out.println("doParallelStream cost:" + (end - start));
    }

运行结果:

doFor cost:5119
doStream cost:5221
doParallelStream cost:724

从执行结果来看,stream顺序输出,而parallelStream无序输出;parallelStream执行效率最快;
下面来刨析下背后的运行机制;

2. 认识 ForkJoin

ForkJoin是JDK 1.7中推出的一个新特性,它同ThreadPoolExecutor一样实现了Executor和ExecutorService接口。核心线程的数量默认值采用当前可用的CPU数量,并使用了一个无限队列来保存需要执行的任务;

ForkJoinPool的核心算法主要是分治法(Divide-and-Conquer Algorithm),可以将一个任务分拆为多个子任务(所有子任务都完成之后才执行主任务),子任务执行完毕后,再把结果合并起来;能够使用相对少的线程来处理大量的任务,并且这些任务之间是有父子依赖的,必须是子任务执行完成后,父任务才能执行;也可以让其中的线程创建新的任务,并挂起当前的任务,任务以及子任务会保留在一个内部队列中,此时线程就能够从队列中选择任务顺序执行。
_

如在典型的快速排序算法应用中:需要对1000万、个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于2时,会停止分割,转而使用插入排序对它们进行排序。所有的任务加起来会有大概2000000+个;

而同样的任务交由ThreadPoolExecutor则几乎是不可能的任务,因为ThreadPoolExecutor中的线程无法向任务队列中再添加一个任务并且在等待该任务完成之后再继续执行,同时也无法选择优先执行子任务,当需要完成200万个具有父子关系的任务时,需要200万个并行线程,显然这是不可行的。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行;

3. 了解 Work Stealing / 工作窃取

工作窃取(work-stealing)算法是整个forkjion框架的核心理念,是指某个线程从其他队列里窃取任务来执行;充分的利用了现代CPU多核,提高新能;

那么为什么需要使用工作窃取算法呢?
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行;

4. ParallelStream 运行机制

JDK 1.8中ForkJoinPool添加了一个默认线程数量为CPU核心数的通用线程池静态类型,用来处理那些没有被显式提交到任何线程池的任务。如在上例子中,对于列表中的元素的操作都会以并行的方式执行。forEach方法会为每个元素的计算操作创建一个任务,该任务会被通用线程池处理;代码的可读性和代码量较ThreadPoolExecutor明显更胜一筹;

ForkJoinPool的线程数量可以通过设置系统属性:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=N (N为线程数量)

另外需要注意是调用forEach方法时它会将执行forEach本身的线程也作为线程池中的一个工作线程。因此,即使将ForkJoinPool的通用线程池的线程数量设置为1,实际上也会有2个工作线程。因此在使用forEach的时候,线程数为1的ForkJoinPool通用线程池和线程数为2的ThreadPoolExecutor是等价的;所以当ForkJoinPool通用线程池实际需要4个工作线程时,可以将它设置成3,那么在运行时可用的工作线程就是4了;

5. 线程安全考虑

再看一个例子:

static void doThreadUnSafe() {
        List<Integer> listFor = new ArrayList<>(1000);
        List<Integer> listParallel = new ArrayList<>(1000);

        IntStream.range(0, 1000).forEach(listFor::add);
        IntStream.range(0, 1000).parallel().forEach(listParallel::add);

        System.out.println("listFor size :" + listFor.size());
        System.out.println("listParallel size :" + listParallel.size());
    }

运行结果:

listFor size :1000
listParallel size :917

显而易见,stream.parallel.forEach()中执行的操作并非线程安全。如果需要线程安全,可以把集合转换为同步集合,即:Collections.synchronizedList(new ArrayList<>())。

6. 正确使用 ParallelStream

当我们对parallelStream有了足够的了解之后,再来考虑是否需要使用ParallelStream:

  1. 使用ParallelStream可以简洁高效的写出并发代码;
  2. ParallelStream并行执行是无序的,因此对于依赖于顺序的任务而言,并行化也许不能给出正确的结果,在这种情况下需要慎重选择;
  3. ParallelStream提供了更简单的并发执行的实现,但并不意味着更高的性能;比如当数据量不大时,顺序执行往往比并行执行更快。毕竟线程池准备,频繁切换线程是耗时的。但是当任务涉及到I/O操作并且任务之间不互相依赖时,将这类程序并行化之后,执行速度将会明显的提升;
  4. 任务之间最好是状态无关的,因为ParallelStream默认是非线程安全的,可能带来结果的不确定性。
目录
相关文章
|
6月前
|
API 索引
JDK8之findAny和findFirst
JDK8之findAny和findFirst
294 0
|
并行计算 安全 Java
JDK1.8介绍
JDK 1.8是Java Development Kit(Java开发工具包)的一个版本,也被称为Java 8。它引入了许多新特性和改进,对Java编程语言和平台进行了重要的更新。以下是JDK 1.8的一些主要特点:
1306 0
|
7天前
|
Oracle Java 关系型数据库
安装 JDK 时应该注意哪些问题
选择合适的JDK版本需考虑项目需求与兼容性,推荐使用LTS版本如JDK 17或21。安装时注意操作系统适配,配置环境变量PATH和JAVA_HOME,确保合法使用许可证,并进行安装后测试以验证JDK功能正常。
|
存储 算法 Java
带你了解JDK
JDK(Java Development Kit)是Java开发工具包,它提供了开发和运行Java应用程序所需的工具、库和资源。下面是JDK的一些重点介绍: 1. Java编译器(javac):JDK包含了Java编译器,可以将Java源代码编译为Java字节码。通过编译器,开发人员可以将Java源代码转换为可在JVM上运行的字节码文件。 2. 核心类库(Core Libraries):JDK提供了丰富的核心类库,其中包含了常用的类和接口,用于处理字符串、集合、IO、网络通信等各种操作。开发人员可以利用这些类库来构建功能丰富的Java应用程序。 3. 调试工具(Debugging Tools)
98 0
|
存储 网络协议 安全
JDK 9 介绍
Java 9提供了超过150项新功能特性,包括备受期待的模块化系统、可交互的REPL工具: jshell, JDK编译工具,语法层面的改变:Java公共API和私有代码,以及安全增强、扩展提升、性能管理改善等。
112 0
|
Java Shell 开发工具
安装多个jdk
安装多个jdk
|
NoSQL Java 数据库
JDK的安装
现在电脑上很多软件都是基于JAVA语言开发的,并且在学习JAVA编程时,JDK的安装变得十分平常。这里展示了一个关联NEO4J图数据库的JDK安装教程。
131 0
|
Java 程序员 编译器
【错误收集】JDK的安装
【错误收集】JDK的安装
89 0
|
Java
jdk问题
错误: java.lang.UnsatisfiedLinkError: no tcnative-1 in java.library.path: [C:\User
128 0
jdk问题
|
Oracle Java 关系型数据库
JDK的安装-详细版
JDK的安装-详细版
147 0
JDK的安装-详细版