Stream学习笔记(二)map与reduce

简介: Stream学习笔记(二)map与reduce

前言

我们今天主要介绍的是Stream中的map与reduce方法,为什么介绍这个呢?原因在于我之前看过一个大数据领域的框架叫MapReduce,在这个大数据框架中核心关键词就是Map和Reduce,同时这两个关键词也是MapReduce框架中的两个关键函数,Map函数的作用是从获取输入并将其做为key-value对,当作函数的入参,经过Map函数的处理,返回key-value对。Reduce对结果进行处理也就是合并,下面的图演示了MapReduce过程:

input输入,split分割,map映射转换,combine组合,reduce是一个动词,减少数量、价格等。这让我很迷惑,从示意图来看reduce执行的是最后的合并过程,我们姑且将其理解为聚合吧。在上面的图例中,combine想key相同的进行聚合,最后分成四组,而reduce阶段则是对每个小块本身进行聚合。基本同Stream的reduce函数是类似的。

reduce

Java的Stream里面也有map、reduce。我们这里先讲reduce,reduce这个函数更难理解一点,在Stream中reduce函数一共有三个重载:

1. Optional<T> reduce(BinaryOperator<T> accumulator);
2. T reduce(T identity, BinaryOperator<T> accumulator);
3. <U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
理解难度从上往下依次递增。

我们一个一个来看,第一个有下面这样的注释:

boolean foundAny = false;
     T result = null;
     for (T element : this stream) {
         if (!foundAny) {
             foundAny = true;
             result = element;
         }
         else
             result = accumulator.apply(result, element);
     }
     return foundAny ? Optional.of(result) : Optional.empty();

第一个函数等价于上面的代码,上面的代码首先从集合中找到第一个元素,然后和集合中的其他元素做apply运算,得到结果之后,这个结果再和其他元素做运算,直到所有元素参与运算完毕。这样说可能有点抽象,一个简单的例子是累加求和,如下图所示

所以reduce函数表达的意思是: reduce() 方法对流中的每个元素按序执行一个由您提供的 reducer 函数,每一次运行 reducer 会将先前元素的计算结果作为参数传入,最后将其结果汇总为单个返回值。第一次执行reduce函数时,没有先前元素的计算结果,可以从外部指定(reduce函数的第二个重载), 如果外部没指定默认选取第一个元素中作为上一次reduce的计算结果(对应reduce函数的第一个重载)。

那第三个重载是做什么的,或者说第三个函数的第三个参数是用来做什么的?首先让我们来看下上面的注释:

Performs a reduction on the elements of this stream, using the provided identity, accumulation and combining functions. 
使用提供的初始值、accumulation、combining对流执行reduction操作。等价于下面这个函数
This is equivalent to:
   U result = identity;
     for (T element : this stream)
         result = accumulator.apply(result, element)
     return result;
but is not constrained to execute sequentially.
但是不限于顺序流
The identity value must be an identity for the combiner function. This means that for all u, combiner(identity, u) is equal to u. Additionally, the combiner function must be compatible(兼容的) with the accumulator function; for all u and t, the following must hold:
 combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
特征值一定要作为combine函数的特征, 这也就是说对于任意元素u,combine(identity,u) 与 u相等。除此之外,combiner一定要与accumulator函数兼容也就是说,对于任意的u、t, 要求满足 combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)

看这个注释是不是有点云里雾里,没关系,我们看例子,在例子中分析combiner和accumulator在流里面是如何运作的。

@Test
 public void reduceDemo03(){
        String[] arr = {"lorem", "ipsum", "sit", "amet"};
        List<String> strs = Arrays.asList(arr);
        int ijk = strs.stream().reduce(1,
                (identity, element) -> {
                    System.out.println("execute Thread name"+Thread.currentThread().getName());
                    System.out.println("Accumulator, identity = " + identity + ", element = " + element);
                    return identity + element.length();
                },
                (leftResult, rightResult) -> {
                    System.out.println("execute Thread name"+Thread.currentThread().getName());
                    System.out.println("combine, identity = " + leftResult + ", element = " + rightResult);
                    return leftResult * rightResult;
                });
        System.out.println(ijk);
    }

上面的例子,我们传入的accumulator函数是将传入的元素进行相加,combiner函数将传入的元素进行相乘。执行结果如下:

好像我们传入的combine函数压根就没用,我们换成并行流试试看:

@Test 
 public void reduceDemo05(){
        String[] arr = {"lorem", "ipsum", "sit", "amet"};
        List<String> strs = Arrays.asList(arr);
        int reduceResult  = strs.parallelStream().reduce(1,
                (identity, element) -> {
                    System.out.println("Accumulator execute Thread name: "+Thread.currentThread().getName());
                    System.out.println("Accumulator, identity = " + identity + ", element = " + element);
                    return identity + element.length();
                },
                (leftResult, rightResult) -> {
                    System.out.println("combine execute Thread name"+Thread.currentThread().getName());
                    System.out.println("combine, identity = " + leftResult + ", element = " + rightResult);
                    return leftResult * leftResult;
                });
        System.out.println("reduceResult: "+reduceResult);
    }

输出结果如下:

我们换成并行流之后combine函数就执行了,执行过程为先让我们传入的identity和流中的元素做accumulator运算, 流里面又有四个元素,经过accumulator运算之后,得到了四个结果,最后由combine函数将这四个结果进行合并。

理解了这个过程之后我们再来看上面的注释: combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t), 我们取上面过程的值代入看看式子是不是成立,  u是执行完accumulator操纵之后的值,所以我们选择u=6,t是流中的元素,我们选取ipsum

  • combiner.apply(6, accumulator.apply(1, 5)) = 36
  • accumulator.apply(6, 5) = 30

combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)不成立,如果成立会由什么样的效果?我们尝试改写上面的reduceDemo05,让combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t),看看会是什么结果:

@Test
 public void reduceDemo05(){
        String[] arr = {"lorem", "ipsum", "sit", "amet"};
        List<String> strs = Arrays.asList(arr);
        int parallelStreamReduceResult  = strs.parallelStream().reduce(0,
                (identity, element) -> {
                    System.out.println("Accumulator execute Thread name: "+Thread.currentThread().getName());
                    System.out.println("Accumulator, identity = " + identity + ", element = " + element);
                    return identity + element.length();
                },
                (leftResult, rightResult) -> {
                    System.out.println("combine execute Thread name"+Thread.currentThread().getName());
                    System.out.println("combine, identity = " + leftResult + ", element = " + rightResult);
                    return leftResult + rightResult;
                });
        int  streamResult = strs.stream().reduce(0,
                (identity, element) -> {
                    System.out.println("Accumulator execute Thread name: " + Thread.currentThread().getName());
                    System.out.println("Accumulator, identity = " + identity + ", element = " + element);
                    return identity + element.length();
                },
                (leftResult, rightResult) -> {
                    System.out.println("combine execute Thread name" + Thread.currentThread().getName());
                    System.out.println("combine, identity = " + leftResult + ", element = " + rightResult);
                    return leftResult + rightResult;
                });
        System.out.println("parallelStreamReduceResult: "+parallelStreamReduceResult);
        System.out.println("streamResult: "+streamResult);
    }

结论是对combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)成立, 则reduce的第三个重载函数在并行流和串行流下结果相等。

并行流中如何指定自定义线程池

记得之前有人再微信群里问过一个问题,并行流中如何指定自定义的ForkJoinPool,如果不指定所有的并行流都会使用一个线程池,我们有的时候想再流中指定线程池,我记得我当时的回答是不能,想来是有些误认子弟了,  事实上可以通过下面的方式用自定义的线程池:

@Test
 public void forkJoinPool(){
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        forkJoinPool.submit(()->{
            Stream.of("lorem", "ipsum", "sit", "amet").parallel().forEach(e->{
                System.out.println(Thread.currentThread().getName());
            });
        });
  }

输出结果:

参考资料

相关文章
|
3月前
|
索引
ES5常见的数组方法:forEach ,map ,filter ,some ,every ,reduce (除了forEach,其他都有回调,都有return)
ES5常见的数组方法:forEach ,map ,filter ,some ,every ,reduce (除了forEach,其他都有回调,都有return)
|
3月前
|
Java C# Swift
Java Stream中peek和map不为人知的秘密
本文通过一个Java Stream中的示例,探讨了`peek`方法在流式处理中的应用及其潜在问题。首先介绍了`peek`的基本定义与使用,并通过代码展示了其如何在流中对每个元素进行操作而不返回结果。接着讨论了`peek`作为中间操作的懒执行特性,强调了如果没有终端操作则不会执行的问题。文章指出,在某些情况下使用`peek`可能比`map`更简洁,但也需注意其懒执行带来的影响。
177 2
Java Stream中peek和map不为人知的秘密
|
3月前
|
JavaScript 前端开发
js map和reduce
js map和reduce
|
5月前
|
人工智能 算法 大数据
算法金 | 推导式、生成器、向量化、map、filter、reduce、itertools,再见 for 循环
这篇内容介绍了编程中避免使用 for 循环的一些方法,特别是针对 Python 语言。它强调了 for 循环在处理大数据或复杂逻辑时可能导致的性能、可读性和复杂度问题。
59 6
算法金 | 推导式、生成器、向量化、map、filter、reduce、itertools,再见 for 循环
|
4月前
|
存储 算法 Java
Stream很好,Map很酷,但答应我别滥用toMap()!
【8月更文挑战第27天】在Java的世界里,Stream API和Map数据结构无疑是现代编程中的两大瑰宝。Stream API以其函数式编程的优雅和强大的数据处理能力,让集合操作变得简洁而高效;而Map则以其键值对的存储方式,为数据的快速检索和更新提供了便利。然而,当这两者相遇,特别是当我们试图通过Stream的toMap()方法将流中的元素转换为Map时,一些潜在的问题和陷阱便悄然浮现。今天,我们就来深入探讨一下这个话题,并探讨如何更加安全、高效地利用这些强大的工具。
63 0
|
4月前
|
分布式计算 Python
【python笔记】高阶函数map、filter、reduce
【python笔记】高阶函数map、filter、reduce
|
5月前
|
JavaScript API
js【最佳实践】遍历数组的八种方法(含数组遍历 API 的对比)for,forEach,for of,map,filter,reduce,every,some
js【最佳实践】遍历数组的八种方法(含数组遍历 API 的对比)for,forEach,for of,map,filter,reduce,every,some
90 1
|
6月前
|
Python
在Python中,`map()`, `filter()` 和 `reduce()` 是函数式编程中的三个核心高阶函数。
【6月更文挑战第24天】Python的`map()`应用函数到序列元素,返回新序列;`filter()`筛选满足条件的元素,生成新序列;`reduce()`累计操作序列元素,返回单一结果。
42 3
|
6月前
|
存储 C++ 容器
【C++】学习笔记——map和set
【C++】学习笔记——map和set
37 0
|
6月前
|
Java
java中Stream流中的forEach、filter、map、count、limit、skip、concat
java中Stream流中的forEach、filter、map、count、limit、skip、concat
224 0