【小家java】java8新特性之---Stream API 详解 (Map-reduce、Collectors收集器、并行流、groupby多字段分组)(下)

简介: 【小家java】java8新特性之---Stream API 详解 (Map-reduce、Collectors收集器、并行流、groupby多字段分组)(下)

现在抽取一些不太常用,稍微不太好理解的一些拿来讲一下:


toMap: 若要线程安全的Map,用**toConcurrentMap、groupingByConcurrent**

如果生成一个Map,我们需要调用toMap方法。由于Map中有Key和Value这两个值,故该方法与toSet、toList等的处理方式是不一样的。toMap最少应接受两个参数,一个用来生成key,另外一个用来生成value。toMap方法有三种变形:


注:使用Collectors.toMap方法时的两个问题:

1、当key重复时,会抛出异常:java.lang.IllegalStateException: Duplicate key **

2、当value为null时,会抛出异常:java.lang.NullPointerException


  1. toMap(Function keyMapper,Function valueMapper) keyMapper: 该Funtion用来生成Key valueMapper:该Funtion用来生成value


  public static void main(String[] args) {
        //使用toMap两个参数的(最常用的) 但遇上相同key和null的value都会抛出异常
        //List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        //Map<Integer, Integer> map = list.stream().collect(Collectors.toMap(k -> k, v -> v));
        //System.out.println(map); //{1=1, 2=2, 3=3, 4=4, 5=5}
        //里面放重复的key
        //List<Integer> list = Arrays.asList(1, 2, 3, 4, 1);
        //list.stream().collect(Collectors.toMap(k -> k, v -> v)); //java.lang.IllegalStateException: Duplicate key 1
        //对于里面有重复key的情况,采用三个参数的toMap进行改良
        // java8处理思路:即 两个key相同时 只能有一个key存在,那对应的value如何处理?  value交由我们自己处理
        //Map<Integer, Integer> map = list.stream().collect(Collectors.toMap(k -> k, v -> v, (oldV, newV) -> oldV + newV + 10));
        //System.out.println(map); //{1=12, 2=2, 3=3, 4=4}
        //里面放null值
        //List<Integer> list = Arrays.asList(1, 2, 3, 4, null);
        //list.stream().collect(Collectors.toMap(x -> x, y -> y)); //java.lang.NullPointerException
        //最后 四个参数的toMap 提供了mergeFunction和mapSupplier 调用者可以自定义希望返回什么类型的Map
        List<Integer> list = Arrays.asList(1, 2, 1, 4);
        HashMap<Integer, Integer> map = list.stream().collect(Collectors.toMap(
                k -> k,
                v -> v,
                (oldV, newV) -> oldV + newV + 10,
                HashMap::new)
        );
        System.out.println(map); //{1=12, 2=2, 4=4}
    }


我们常常遇到要把List转成Map的现象。并且要求保证List的顺序,那么此时我们必须使用LinedHashMap,这点特别重要,处理方式如下:


Map<Integer, NormalPeriodResponse> tmpNormalPeriodMap = tmpNormalPeriods.stream()
                .collect(toMap(t -> t.getId(), Function.identity(), (k1, k2) -> k1, LinkedHashMap::new));


public static void main(String[] args) {
        //这个summarizing 算是一个比较整合的搜集
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        IntSummaryStatistics summary = list.stream().collect(Collectors.summarizingInt(x -> x));
        System.out.println(summary.getCount()); //5
        System.out.println(summary.getAverage()); //3.0
        System.out.println(summary.getSum()); //15
    }
public static void main(String[] args) {
        //连接流中的字符串  可以指定连接符、首位符等
        List<String> list = Arrays.asList("aa", "bb", "cc", "dd");
        String str = list.stream().collect(Collectors.joining(",", "==>", "<=="));
        System.out.println(str); //==>aa,bb,cc,dd<==
    }

当使用maxBy、minBy统计最值时,结果会封装在Optional中。有时候明明我们知道不可能为null,那这个时候我们优雅的处理的方式可以采用collectingAndThen函数包裹maxBy、minBy,从而将maxBy、minBy返回的Optional对象进行转换


 public static void main(String[] args) {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        Optional<Integer> max = list.stream().collect(Collectors.maxBy(Integer::compare));
        Integer maxAndThen = list.stream().collect(Collectors.collectingAndThen(Collectors.maxBy(Integer::compare), Optional::get));
        System.out.println(max); //Optional[5]
        System.out.println(maxAndThen); //5
    }


备注:groupBy搜集也是用得非常对的,并且可以无限的分组下去。这里需要注意一点groupingByConcurrent的使用方式。他和groupBy的区别就是,它返回的是ConcurrentMap,而普通的就是返回的Map,需要注意区别,这里不做演示了。


多字段分组案例


此处为我后续新增内容,因为很多同学问我多字段怎么groupby,其实非常简单哈。看一下API就能知道怎么处理


    public static <T, K, A, D>
    Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
                                          Collector<? super T, A, D> downstream) {
        return groupingBy(classifier, HashMap::new, downstream);
    }


给个栗子:多字段分组


Map<String, Map<String, List<Article>>> result = articles.stream()
                .collect(Collectors.groupingBy(Article::getCountryCode
                ,Collectors.groupingBy(Article::getProvince)));

给个栗子:分组统计

 //统计每个应用实际支付总额
Map<Long, Long> tradeAmountMap = list.stream().filter(o->o.getStatus()==2)
                .collect(Collectors.groupingBy(OrdersDO::getAppId
                //downstream其实可以做任何搜集的作用
                ,Collectors.summingLong(OrdersDO::getTradeAmount)));


collectingAndThen可用于很多实例,进行持续操作。比如先根据某属性去重,然后再收集等等


分区:partitioningBy

partitioningBy(Predicate predicate)
partitioningBy(Predicate predicate,Collector downstream)

分区是分组的一种特殊情况,它只能分成true、false两组。


下面这个实例:其实就是数据在手上,可以各种玩

public static void main(String[] args) {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 5, 5, 6, 9);
        Map<Boolean, List<Integer>> partition = list.stream().collect(Collectors.partitioningBy(x -> x >= 4));
        System.out.println(partition); //{false=[1, 2, 3], true=[4, 5, 5, 5, 6, 9]}
        Map<Boolean, Map<Boolean, List<Integer>>> partition2 = list.stream().collect(Collectors.partitioningBy(x -> x >= 4, Collectors.partitioningBy(x -> x > 6)));
        System.out.println(partition2); //{false={false=[1, 2, 3], true=[]}, true={false=[4, 5, 5, 5, 6], true=[9]}}
        //也可以结合groupBy搞
        Map<Boolean, Map<Integer, List<Integer>>> collect = list.stream().collect(Collectors.partitioningBy(x -> x >= 4, Collectors.groupingBy(x -> x)));
        System.out.println(collect); //{false={1=[1], 2=[2], 3=[3]}, true={4=[4], 5=[5, 5, 5], 6=[6], 9=[9]}}
    }


mapping :跟map操作类似


 String str = Stream.of("a", "b", "c").collect(Collectors.mapping(x -> x.toUpperCase(), Collectors.joining(",")));
        System.out.println(str); //A,B,C

它的源码声明如下:

mapping(Function<? super T, ? extends U> mapper,
                               Collector<? super U, A, R> downstream)

现在我们有了mapping,可以更加优雅的处理如下


List<String> list = Arrays.asList("1", "2", "3");
        Map<Integer, List<Integer>> collect = list.stream().collect(groupingBy(x -> x.hashCode(), mapping(x -> Integer.parseInt(x), toList())));
        System.out.println(collect); //{1=[1], 2=[2], 3=[3]}


生成统计信息(IntSummaryStatistics、DoubleSummaryStatistics等)


另一组非常有用的收集器是用来产生统计信息的收集器。这能够在像int、double和long这样的原始数据类型上起到作用;并且能被用来生成像下面这样的统计信息。


IntSummaryStatistics summaryStatistics = tasks.stream().map(Task::getTitle).collect(summarizingInt(String::length));
System.out.println(summaryStatistics.getAverage()); //32.4
System.out.println(summaryStatistics.getCount()); //5
System.out.println(summaryStatistics.getMax()); //44
System.out.println(summaryStatistics.getMin()); //24
System.out.println(summaryStatistics.getSum()); //162


也有其它的变种形式,像针对其它原生类型的LongSummaryStatistics和DoubleSummaryStatistics。


    public static void main(String[] args) {
        IntSummaryStatistics statistics1 = new IntSummaryStatistics();
        IntSummaryStatistics statistics2 = new IntSummaryStatistics();
        statistics2.combine(statistics1);
    }


你也可以通过使用combine操作来将一个IntSummaryStatistics与另一个组合起来(必须是同一类型哦)。


    public static void main(String[] args) {
        IntSummaryStatistics statistics1 = new IntSummaryStatistics();
        statistics1.accept(10);
        System.out.println(statistics1.getSum()); //10
        System.out.println(statistics1.getCount()); //1
        IntSummaryStatistics statistics2 = new IntSummaryStatistics();
        statistics2.accept(20);
        statistics1.combine(statistics2);
        System.out.println(statistics1.getSum()); //30
        System.out.println(statistics1.getCount()); //2
    }


介绍几个Stream的静态方法


of 这个方法不说了


需要注意的是,不能全是null,否则报错。这个在JDK9做了改善


empty 构造一个空流

public static void main(String[] args) {
        List<Integer> list = Stream.<Integer>empty().collect(toList());
        System.out.println(list); //[]
    }


由此课件,流生成的集合,都是不会为null的


iterate、generate 这两个上面已经介绍了,很好用



concat


顾名思义,就是拼接流。这个在很多场合比较实用。比如要合并提取两个或者更多的List集合的时候,就没必要先合并集合,再处理流了,可以一步到位,并且效率很高。


    public static void main(String[] args) {
        List<Integer> list1 = Arrays.asList(1,2,3);
        List<Integer> list2 = Arrays.asList(4,3,2);
        Stream.concat(list1.stream(),list2.stream()).forEach(System.out::print);
    }


并行流(ParallelStream)


首先简单的介绍下Fork/Join 框架(JDK1.7后提出)

Fork/Join 框架与传统线程池的区别:

采用 “工作窃取”模式 (work-stealing) : 当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中


相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上。在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行。或者当线程任务完成速度快,就会随机抽取其它未完成任务的进程中的最后一个任务进行计算操作。这种方式减少了线程的等待时间,提高了性能


   普通 for(最慢,数据量越大CPU使用率低,速度越慢)


   备注:如果数据量较小,它还是蛮快的,毕竟for循环是偏底层的代码


   ForkJoin框架(比较快) 但任务拆分的代码门槛有点高,使用起来过于复杂


   Java8 并行流(底层使用ForkJoin框架,速度最快 CPU使用率可以达到 100%)


所以,如果是大任务(小任务并行流没有任何效果反而可能还会慢一些),极力推荐使用并行流处理大数量的计算。比如从1加到1000亿的和这种,或者类似的更加耗时的操作(比如多次访问库等等)

Stream的执行原理


Stream的执行原理过于复杂,本文不做过多讨论,请关注后续博文


结束语


Stream 的特性可以归纳为:


不是数据结构


它没有内部存储,它只是用操作管道从 source(数据结构、数组、generator function、IO channel)抓取数据。


它也绝不修改自己所封装的底层数据结构的数据。例如 Stream 的 filter 操作会产生一个不包含被过滤元素的新 Stream,而不是从 source 删除那些元素。


所有 Stream 的操作必须以 lambda 表达式为参数。


不支持索引访问


你可以请求第一个元素,但无法请求第二个,第三个,或最后一个。


惰性化(惰性求值)操作是向后延迟的,一直到它弄清楚了最后需要多少数据才会开始


并行能力(当一个 Stream 是并行化的,就不需要再写多线程代码,所有对它的操作会自动并行进行的)


可以是无限的。集合有固定大小,Stream 则不必。limit(n) 和 findFirst() 这类的 short-circuiting 操作可以对无限的 Stream 进行运算并很快完成。


相关文章
|
1天前
|
Java API
Java基础&API(3)
Java基础&API(3)
|
1天前
|
Java 机器人 API
Java基础&常用API(1)
Java基础&常用API(1)
|
1天前
|
安全 Java 程序员
|
5天前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
24 11
|
7天前
|
安全 Java API
java借助代理ip,解决访问api频繁导致ip被禁的问题
java借助代理ip,解决访问api频繁导致ip被禁的问题
|
9天前
|
存储 安全 Java
说说Java 8 引入的Stream API
说说Java 8 引入的Stream API
12 0
|
存储 搜索推荐 Java
Java8 Stream 数据流,大数据量下的性能效率怎么样?
Stream 是Java SE 8类库中新增的关键抽象,它被定义于 java.util.stream (这个包里有若干流类型:Stream<T> 代表对象引用流,此外还有一系列特化流,如 IntStream,LongStream,DoubleStream等。
Java8 Stream 数据流,大数据量下的性能效率怎么样?
|
算法 IDE Java
Java8 Stream性能如何及评测工具推荐
Java8 Stream性能如何及评测工具推荐
249 0
Java8 Stream性能如何及评测工具推荐
|
Java 测试技术 程序员
牛逼哄洪的 Java 8 Stream,性能也牛逼么?
牛逼哄洪的 Java 8 Stream,性能也牛逼么?
157 0
牛逼哄洪的 Java 8 Stream,性能也牛逼么?
|
1天前
|
缓存 Java
【Java基础】简说多线程(上)
【Java基础】简说多线程(上)
5 0