开发者社区> 一切总会归于平淡> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

【JDK8 新特性 7】并行的Stream流&parallelStream背后的技术

简介: 目前我们使用的Stream流是串行的,就是在一个线程上执行。
+关注继续查看

目前我们使用的Stream流是串行的,就是在一个线程上执行。
32.png

1、获取并行Stream流的两种方式

parallelStream是一个并行执行的流。它通过默认的ForkJoinPool,可能提高多线程任务的速度。

1.1 直接获取并行的流

1.png

1.2 将串行流转成并行流

2.png

2、 并行和串行Stream流的效率对比

使用for循环,串行Stream流,并行Stream流来对5亿个数字求和。然后我们就看各自的消耗的时间。、
首先我们看看for循环的表现如何。
3.png
然后是串行的表现:
4.png
最后就是我们的并行了
5.png
我们可以看到parallelStream的效率是最高的。
Stream并行处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作。

3、parallelStream线程安全问题

我们先来看一段代码的执行效果。、
6.png
我们明明是往集合中添加1000个元素,而实际上只有894个元素。
解决方法: 加锁、使用线程安全的集合或者调用Stream的 toArray() / collect() 操作就是满足线程安全的了。

加锁 :
7.png

使用线程安全的集合:
8.png

调用Stream的 toArray() / collect()
9.png

4、parallelStream背后的技术

4.1 Fork/Join框架介绍

  • parallelStream使用的是Fork/Join框架。
  • Fork/Join框架自JDK 7引入。
  • Fork/Join框架可以将一个大任务拆分为很多小任务来异步执行。

Fork/Join框架主要包含三个模块:

  1. 线程池:ForkJoinPool
  2. 任务对象:ForkJoinTask
  3. 执行任务的线程:ForkJoinWorkerThread

11.png

4.2 Fork/Join原理-分治法

ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。
典型的应用比如快速排序算法, ForkJoinPool需要使用相对少的线程来处理大量的任务。
比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。
以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。
比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。
那么到最后,所有的任务加起来会有大概2000000+个。
问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。

4.3 Fork/Join原理-工作窃取算法

Fork/Join最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的cpu,那么如何利用好这个空闲的cpu就成了提高性能的关键,而这里我们要提到的工作窃取(work-stealing)算法就是整个Fork/Join框架的核心理念 。
Fork/Join工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

那么为什么需要使用工作窃取算法呢?
优点:
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。
但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。
干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。
而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
缺点:
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争, 比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
上文中已经提到了在Java 8引入了自动并行化的概念。它能够让一部分Java代码自动地以并行的方式执行,也就是我们使用了ForkJoinPool的ParallelStream。
对于ForkJoinPool通用线程池的线程数量,通常使用默认值就可以了,即运行时计算机的处理器数量。可以通过设置系统属性:java.util.concurrent.ForkJoinPool.common.parallelism=N (N为线程数量),来调整ForkJoinPool的线程数量,可以尝试调整成不同的参数来观察每次的输出结果。

4.4 Fork/Join案例

需求:
使用Fork/Join计算1-10000的和,当一个任务的计算数量大于3000时拆分任务,数量小于3000时计算。
12.png
首先我们新建一个 类 并让其继承 RecursiveTask

package com.jie.test;

import java.util.concurrent.RecursiveTask;

/**
 * @description:进行求和 RecursiveTask : 一个任务
 * @author: jie
 * @time: 2022/6/7 0:15
 */
public class SumRecursiveTask extends RecursiveTask<Long> {
    /**
     * 拆分的临界值
     */
    private static final long THRESHOLD = 3000L;

    /**
     * 起始值
     */
    private final long start;

    /**
     * 结束值
     */
    private final long end;

    public SumRecursiveTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    /**
     * @description: 计算
     * @author: jie
     * @time: 2022/6/7 0:17
     */
    @Override
    protected Long compute() {
        long length = end - start;
        // 如果拆分后的值小于三千
        if (length <= THRESHOLD) {
            // 任务不用再拆分了.可以计算了
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            System.out.println("计算: " + start + " -> " + end + ",结果为: " + sum);
            return sum;
        } else {
            // 数量大于预定的数量,任务还需要再拆分
            long middle = (start + end) / 2;
            System.out.println("拆分: 左边 " + start + " -> " + middle + ", 右边 " + (middle + 1) + " -> " + end);
            // 左边的变量
            SumRecursiveTask left = new SumRecursiveTask(start, middle);
            // 表示新开了一个任务
            left.fork();
            SumRecursiveTask right = new SumRecursiveTask(middle + 1, end);
            right.fork();
            return left.join() + right.join();
        }
    }
}

然后我们进行测试:
13.png

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
【已解决】Dubbo版本升级引起的循环注册异常
【已解决】Dubbo版本升级引起的循环注册异常
104 0
PostgreSQL 并行计算解说 之21 - parallel partition table wise agg
标签 PostgreSQL , cpu 并行 , smp 并行 , 并行计算 , gpu 并行 , 并行过程支持 背景 PostgreSQL 11 优化器已经支持了非常多场合的并行。简单估计,已支持27余种场景的并行计算。 parallel seq scan
157 0
全志A33 linux led驱动编程(附实测参考代码)
开发平台 * 芯灵思SinlinxA33开发板 实验原理 在芯灵思开发板上,没有led灯模块,只能通过引脚电平观察: 这里我选择LS-INT引脚。 全志A33一共有10组IO口,每组IO有9个相关功能控制器,LS-INT属于PB7,相关寄存器如图 本次实验只用到这两个寄存器,在程序中命名为gpio_con,gpio_dat ,设置为输出引脚。
1451 0
《OpenACC并行编程实战》—— 3.5 计算构件parallel
OpenACC中的计算构件有两个,一个是前面介绍的kernels构件,一个就是这里要介绍的parallel构件。两个计算构件的作用都是将循环并行化,但有一些重要区别。本节将结合一些例子详细对比介绍。 parallel这个基本构件开启加速器设备上的并行执行。
2084 0
Atom飞行手册翻译: 2.14 小结
小结 到目前为止,你应该是一个Atom高级用户了。你应该能够像一个行家那样浏览和处理文本和文件。你也应该能够从里到外定制Atom,来让它看起来和表现得和你想象中一样。
631 0
MFC SDI 中 通过注册表保存当前窗体的 显示状态位置
    在 框架类响应 WM_CLOSE消息的时候添加如下代码    HKEY  key;        //用于接收注册表项句柄  WINDOWPLACEMENT  info;  //获得窗体位置状态信息  GetWindowPlacement(&info);  //C...
607 0
+关注
一切总会归于平淡
在活着的每一天,跳舞吧。 用脚尖着地,
72
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载