Stream学习笔记(二)map与reduce

简介: Stream学习笔记(二)map与reduce

前言

我们今天主要介绍的是Stream中的map与reduce方法,为什么介绍这个呢?原因在于我之前看过一个大数据领域的框架叫MapReduce,在这个大数据框架中核心关键词就是Map和Reduce,同时这两个关键词也是MapReduce框架中的两个关键函数,Map函数的作用是从获取输入并将其做为key-value对,当作函数的入参,经过Map函数的处理,返回key-value对。Reduce对结果进行处理也就是合并,下面的图演示了MapReduce过程:

input输入,split分割,map映射转换,combine组合,reduce是一个动词,减少数量、价格等。这让我很迷惑,从示意图来看reduce执行的是最后的合并过程,我们姑且将其理解为聚合吧。在上面的图例中,combine想key相同的进行聚合,最后分成四组,而reduce阶段则是对每个小块本身进行聚合。基本同Stream的reduce函数是类似的。

reduce

Java的Stream里面也有map、reduce。我们这里先讲reduce,reduce这个函数更难理解一点,在Stream中reduce函数一共有三个重载:

1. Optional<T> reduce(BinaryOperator<T> accumulator);
2. T reduce(T identity, BinaryOperator<T> accumulator);
3. <U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
理解难度从上往下依次递增。

我们一个一个来看,第一个有下面这样的注释:

boolean foundAny = false;
     T result = null;
     for (T element : this stream) {
         if (!foundAny) {
             foundAny = true;
             result = element;
         }
         else
             result = accumulator.apply(result, element);
     }
     return foundAny ? Optional.of(result) : Optional.empty();

第一个函数等价于上面的代码,上面的代码首先从集合中找到第一个元素,然后和集合中的其他元素做apply运算,得到结果之后,这个结果再和其他元素做运算,直到所有元素参与运算完毕。这样说可能有点抽象,一个简单的例子是累加求和,如下图所示

所以reduce函数表达的意思是: reduce() 方法对流中的每个元素按序执行一个由您提供的 reducer 函数,每一次运行 reducer 会将先前元素的计算结果作为参数传入,最后将其结果汇总为单个返回值。第一次执行reduce函数时,没有先前元素的计算结果,可以从外部指定(reduce函数的第二个重载), 如果外部没指定默认选取第一个元素中作为上一次reduce的计算结果(对应reduce函数的第一个重载)。

那第三个重载是做什么的,或者说第三个函数的第三个参数是用来做什么的?首先让我们来看下上面的注释:

Performs a reduction on the elements of this stream, using the provided identity, accumulation and combining functions. 
使用提供的初始值、accumulation、combining对流执行reduction操作。等价于下面这个函数
This is equivalent to:
   U result = identity;
     for (T element : this stream)
         result = accumulator.apply(result, element)
     return result;
but is not constrained to execute sequentially.
但是不限于顺序流
The identity value must be an identity for the combiner function. This means that for all u, combiner(identity, u) is equal to u. Additionally, the combiner function must be compatible(兼容的) with the accumulator function; for all u and t, the following must hold:
 combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
特征值一定要作为combine函数的特征, 这也就是说对于任意元素u,combine(identity,u) 与 u相等。除此之外,combiner一定要与accumulator函数兼容也就是说,对于任意的u、t, 要求满足 combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)

看这个注释是不是有点云里雾里,没关系,我们看例子,在例子中分析combiner和accumulator在流里面是如何运作的。

@Test
 public void reduceDemo03(){
        String[] arr = {"lorem", "ipsum", "sit", "amet"};
        List<String> strs = Arrays.asList(arr);
        int ijk = strs.stream().reduce(1,
                (identity, element) -> {
                    System.out.println("execute Thread name"+Thread.currentThread().getName());
                    System.out.println("Accumulator, identity = " + identity + ", element = " + element);
                    return identity + element.length();
                },
                (leftResult, rightResult) -> {
                    System.out.println("execute Thread name"+Thread.currentThread().getName());
                    System.out.println("combine, identity = " + leftResult + ", element = " + rightResult);
                    return leftResult * rightResult;
                });
        System.out.println(ijk);
    }

上面的例子,我们传入的accumulator函数是将传入的元素进行相加,combiner函数将传入的元素进行相乘。执行结果如下:

好像我们传入的combine函数压根就没用,我们换成并行流试试看:

@Test 
 public void reduceDemo05(){
        String[] arr = {"lorem", "ipsum", "sit", "amet"};
        List<String> strs = Arrays.asList(arr);
        int reduceResult  = strs.parallelStream().reduce(1,
                (identity, element) -> {
                    System.out.println("Accumulator execute Thread name: "+Thread.currentThread().getName());
                    System.out.println("Accumulator, identity = " + identity + ", element = " + element);
                    return identity + element.length();
                },
                (leftResult, rightResult) -> {
                    System.out.println("combine execute Thread name"+Thread.currentThread().getName());
                    System.out.println("combine, identity = " + leftResult + ", element = " + rightResult);
                    return leftResult * leftResult;
                });
        System.out.println("reduceResult: "+reduceResult);
    }

输出结果如下:

我们换成并行流之后combine函数就执行了,执行过程为先让我们传入的identity和流中的元素做accumulator运算, 流里面又有四个元素,经过accumulator运算之后,得到了四个结果,最后由combine函数将这四个结果进行合并。

理解了这个过程之后我们再来看上面的注释: combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t), 我们取上面过程的值代入看看式子是不是成立,  u是执行完accumulator操纵之后的值,所以我们选择u=6,t是流中的元素,我们选取ipsum

  • combiner.apply(6, accumulator.apply(1, 5)) = 36
  • accumulator.apply(6, 5) = 30

combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)不成立,如果成立会由什么样的效果?我们尝试改写上面的reduceDemo05,让combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t),看看会是什么结果:

@Test
 public void reduceDemo05(){
        String[] arr = {"lorem", "ipsum", "sit", "amet"};
        List<String> strs = Arrays.asList(arr);
        int parallelStreamReduceResult  = strs.parallelStream().reduce(0,
                (identity, element) -> {
                    System.out.println("Accumulator execute Thread name: "+Thread.currentThread().getName());
                    System.out.println("Accumulator, identity = " + identity + ", element = " + element);
                    return identity + element.length();
                },
                (leftResult, rightResult) -> {
                    System.out.println("combine execute Thread name"+Thread.currentThread().getName());
                    System.out.println("combine, identity = " + leftResult + ", element = " + rightResult);
                    return leftResult + rightResult;
                });
        int  streamResult = strs.stream().reduce(0,
                (identity, element) -> {
                    System.out.println("Accumulator execute Thread name: " + Thread.currentThread().getName());
                    System.out.println("Accumulator, identity = " + identity + ", element = " + element);
                    return identity + element.length();
                },
                (leftResult, rightResult) -> {
                    System.out.println("combine execute Thread name" + Thread.currentThread().getName());
                    System.out.println("combine, identity = " + leftResult + ", element = " + rightResult);
                    return leftResult + rightResult;
                });
        System.out.println("parallelStreamReduceResult: "+parallelStreamReduceResult);
        System.out.println("streamResult: "+streamResult);
    }

结论是对combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)成立, 则reduce的第三个重载函数在并行流和串行流下结果相等。

并行流中如何指定自定义线程池

记得之前有人再微信群里问过一个问题,并行流中如何指定自定义的ForkJoinPool,如果不指定所有的并行流都会使用一个线程池,我们有的时候想再流中指定线程池,我记得我当时的回答是不能,想来是有些误认子弟了,  事实上可以通过下面的方式用自定义的线程池:

@Test
 public void forkJoinPool(){
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        forkJoinPool.submit(()->{
            Stream.of("lorem", "ipsum", "sit", "amet").parallel().forEach(e->{
                System.out.println(Thread.currentThread().getName());
            });
        });
  }

输出结果:

参考资料

相关文章
|
2月前
|
Python
高阶函数如`map`, `filter`, `reduce`和`functools.partial`在Python中用于函数操作
【6月更文挑战第20天】高阶函数如`map`, `filter`, `reduce`和`functools.partial`在Python中用于函数操作。装饰器如`@timer`接收或返回函数,用于扩展功能,如记录执行时间。`timer`装饰器通过包裹函数并计算执行间隙展示时间消耗,如`my_function(2)`执行耗时2秒。
26 3
|
1月前
|
人工智能 算法 大数据
算法金 | 推导式、生成器、向量化、map、filter、reduce、itertools,再见 for 循环
这篇内容介绍了编程中避免使用 for 循环的一些方法,特别是针对 Python 语言。它强调了 for 循环在处理大数据或复杂逻辑时可能导致的性能、可读性和复杂度问题。
35 6
算法金 | 推导式、生成器、向量化、map、filter、reduce、itertools,再见 for 循环
|
12天前
|
分布式计算 Python
【python笔记】高阶函数map、filter、reduce
【python笔记】高阶函数map、filter、reduce
|
1月前
|
JavaScript API
js【最佳实践】遍历数组的八种方法(含数组遍历 API 的对比)for,forEach,for of,map,filter,reduce,every,some
js【最佳实践】遍历数组的八种方法(含数组遍历 API 的对比)for,forEach,for of,map,filter,reduce,every,some
39 1
|
2月前
|
Python
在Python中,`map()`, `filter()` 和 `reduce()` 是函数式编程中的三个核心高阶函数。
【6月更文挑战第24天】Python的`map()`应用函数到序列元素,返回新序列;`filter()`筛选满足条件的元素,生成新序列;`reduce()`累计操作序列元素,返回单一结果。
28 3
|
2月前
|
存储 C++ 容器
【C++】学习笔记——map和set
【C++】学习笔记——map和set
18 0
|
2月前
|
JavaScript 前端开发
JavaScript 数组的函数 map/forEach/reduce/filter
JavaScript 数组的函数 map/forEach/reduce/filter
|
3月前
|
JavaScript 前端开发
JavaScript 的数组方法 map()、filter() 和 reduce() 提供了函数式编程处理元素的方式
【5月更文挑战第11天】JavaScript 的数组方法 map()、filter() 和 reduce() 提供了函数式编程处理元素的方式。map() 用于创建新数组,其中元素是原数组元素经过指定函数转换后的结果;filter() 则筛选出通过特定条件的元素生成新数组;reduce() 将数组元素累计为单一值。这三个方法使代码更简洁易读,例如:map() 可用于数组元素乘以 2,filter() 用于选取偶数,reduce() 计算数组元素之和。
32 2
|
3月前
|
前端开发 JavaScript 程序员
Javascript:forEach、map、filter、reduce、reduceRight
Javascript:forEach、map、filter、reduce、reduceRight
|
3月前
|
JavaScript 前端开发
解一下操作数组的方法reduce,some,map,find
解一下操作数组的方法reduce,some,map,find
18 0