Java 8 - Stream基本实例及Stream的并行处理在线程上的表现

简介: Java 8 - Stream基本实例及Stream的并行处理在线程上的表现

20200510181139786.png


什么是流


集合是Java中使用最多的API 。 可以让你把数据分组并加以处理。尽管集合对于几乎任何一个Java应用都是不可或缺的,但集合操作却远远算不上完美。


流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。就现在来说,可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理,无需写任何多线程代码 !


Java 8中的集合支持一个新的stream 方法,它会返回一个流(接口定义在 java.util.stream.Stream 里)


元素序列

就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。因为集合是数据结构,所以它的主要目的是以特定的时间/空间复杂度存储和访问元素(如 ArrayList 与 LinkedList )。但流的目的在于表达计算,比如 filter 、 sorted 和 map 。集合讲的是数据,流讲的是计算。


流会使用一个提供数据的源,如集合、数组或输入/输出资源。 请注意,从有序集合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致。


数据处理操作

流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中的常用操作,如 filter 、 map 、 reduce 、 find 、 match 、 sort 等。流操作可以顺序执行,也可并行执行


此外,流操作有两个重要的特点


流水线

很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大

的流水线。 流水线的操作可以看作对数据源进行数据库式查询。


内部迭代

与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的。


实例解释

   /**
     * 需求: 卡路里前三的dish的名字
     * @param dishList
     * @return
     */
    public static List<String> getTop3HighCalories(List<Dish> dishList){
        return dishList.stream().filter(dish->dish.getCalories()>300)
                .map(Dish::getName)
                .limit(3)
                .collect(Collectors.toList());
    }



20210307235344794.png


20210307235352191.png


先是对 menu 调用 stream 方法,得到一个流。 数据源是dishList ,它给流提供一个元素列表


接下来,对流应用一系列数据处理操作: filter 、 map 、 limit除了 collect 之外,所有这些操作都会返回另一个流,这样它们就可以接成一条 流水线 , 于是就可以看作对源的一个查询


最后, collect 操作开始处理流水线,并返回结果(它和别的操作不一样,因为它返回的不是流,在这里是一个 List )


在调用 collect 之前,没有任何结果产生,实际上根本就没有从 menu 里选择元素。可以这么理解:链中的方法调用都在排队等待,直到调用 collect 。


20210308000040802.png


filter :接受Lambda,从流中排除某些元素。在本例中,通过传递lambda d ->d.getCalories() > 300 ,选择出超过300卡路里的Dish

map : 接受一个Lambda,将元素转换成其他形式或提取信息。在本例中,通过传递方法引用 Dish::getName ,相当于Lambda d -> d.getName() ,提取了名字。

limit :截断流,使其元素不超过给定数量。

collect :将流转换为其他形式。在本例中,流被转换为一个列表。 可以把 collect 看作能够接受各种方案作为参数,并将流中的元素累计成为一个汇总结果的操作。 这里的toList() 就是将流转换为列表的方案。


流 VS 集合


Java现有的集合概念和新的流概念都提供了接口,来配合代表元素型有序值的数据接口。


粗略地说,集合与流之间的差异就在于什么时候进行计算。集合是一个内存中的数据结构,它包含数据结构中目前所有的值——集合中的每个元素都得先算出来才能添加到集合中。(你可以往集合里加东西或者?东西,但是不管什么时候,集合中的每个元素都是放在内存里的,元素都得先算出来才能成为集合的一部分。)


相比之下,流则是在概念上固定的数据结构(你不能添加或删除元素),其元素则是按需计算的。 是一种生产者?消费者的关系。


从另一个角度来说,流就像是一个延迟创建的集合:只有在消费者要求的时候才会计算值 。 与此相反,集合则是急切创建的。


以质数为例,要是想创建一个包含所有质数的集合,那这个程序算起来就没完没了了,因为总有新的质数要算,然后把它加到集合里面。当然这个集合是永远也创建不完的,消费者这辈子都见不着了。


另一个例子是用浏览器进行互联网搜索。假设你搜索的短语在Google或是网?里面有很多匹配项。你用不着等到所有结果和照片的集合下载完,而是得到一个流,里面有最好的10个或20个匹配项,还有一个按钮查看下面10个或20个。当你作为消费者点“下面10个”的时候,供应商就按需计算这些结果,然后再返回你的浏览器上显示。

20210308001110628.png


只能遍历一次


和迭代器类似,流只能遍历一次。遍历完之后,我们就说这个流已经被消费了。可以从原始数据源那里再获得一个新的流来重新遍历一遍,就像迭代器一样(这里假设它是集合之类的可重复的源,如果是I/O通道就不行了)


20210308002044765.png


    public static void  testConsumeMoreTime(List<Dish> dishList){
        Stream<Dish> stream = dishList.stream();
        stream.forEach(System.out::println);
        stream.forEach(System.out::println);
    }

20210308002025345.png


集合和流的另一个关键区别在于它们遍历数据的方式.


内部迭代与外部迭代


使用 Collection 接口需要用户去做迭代(比如用 for-each ),这称为外部迭代。 相反,Streams库使用内部迭代——它帮你把迭代做了,还把得到的流值存在了某个地方,你只要给出一个函数说要干什么就可以了。下面的代码列表说明了这种区别。


【集合 】


用 for-each 循环外部迭代


20210308002241130.png


  • 用背后的迭代器做外部迭代


20210308002332797.png


【流:内部迭代】


20210308002423648.png


内部迭代时,项目可以透明地并行处理,或者用更优化的顺序进行处理

Streams库的内部迭代可以自动选择一种适合你硬件的数据表示和并行实现。

与此相反,一旦通过写 for-each 而选择了外部迭代,那你基本上就要自己管理所有的并行问题了

20210308002524737.png



需求

需求: 输出小于400的Dish的名字 , 并按照卡路里排序

Java 7及之前的实现

package com.artisan.java8.stream;
import com.artisan.java8.Dish;
import java.util.*;
import java.util.stream.Collectors;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/3/7 8:23
 * @mark: show me the code , change the world
 */
public class StreamTest {
    public static void main(String[] args) {
        //have a dish list (menu)
        List<Dish> menu = Arrays.asList(
                new Dish("pork", false, 800, com.artisan.java8.Dish.Type.MEAT),
                new Dish("beef", false, 700, com.artisan.java8.Dish.Type.MEAT),
                new Dish("chicken", false, 400, com.artisan.java8.Dish.Type.MEAT),
                new Dish("french fries", true, 530, com.artisan.java8.Dish.Type.OTHER),
                new Dish("rice", true, 350, com.artisan.java8.Dish.Type.OTHER),
                new Dish("season fruit", true, 120, com.artisan.java8.Dish.Type.OTHER),
                new Dish("pizza", true, 550, com.artisan.java8.Dish.Type.OTHER),
                new Dish("prawns", false, 300, com.artisan.java8.Dish.Type.FISH),
                new Dish("salmon", false, 450, com.artisan.java8.Dish.Type.FISH));
        System.out.println(getDiskNamesByCollections(menu));
    }
    /**
     * 需求: 输出小于400的Dish的名字 , 并按照卡路里排序 
     * @param dishList
     * @return
     */
    public static List<String> getDiskNamesByCollections(List<Dish> dishList){
        List<Dish> lowCalories = new ArrayList<>();
        //  filter  过滤小于400的
        for(Dish dish : dishList){
            if (dish.getCalories() < 400) {
                lowCalories.add(dish);
            }
        }
        // sort   按照卡路里排序
        // Collections.sort(lowCalories,(d1,d2)->Integer.compare(d1.getCalories(),d2.getCalories()));
        Collections.sort(lowCalories,Comparator.comparingInt(Dish::getCalories));
        // 处理排序后的数据
        List<String> dishNames = new ArrayList<>();
        for (Dish dish :lowCalories){
            dishNames.add(dish.getName());
        }
        return dishNames;
    }
}


可以看到需要写这么多代码,这么多步骤


20210307223247585.png


还有一个“垃圾变量” lowCalories ,它唯一的作用就是作为一次性的中间容器。


我们来看下Java8的试下

Java8中流的处理

  /**
     * 需求: 输出小于400的Dish的名字 , 按照卡路里从第到高输出
     * @param dishList
     * @return
     */
    public static List<String> getDiskNamesByStream(List<Dish> dishList){
        return dishList.stream().filter(dish -> dish.getCalories() < 400)
                .sorted(Comparator.comparing(Dish::getCalories))
                .map(Dish::getName).collect(Collectors.toList());
    }


20210307223453691.png


处理流程如下:


20210307223727399.png


可以把几个基础操作链接起来,来表达复杂的数据处理流水线(在 filter 后面接上sorted 、 map 和 collect 操作,如上图所示),同时保持代码清晰可读。 filter 的结果被传给了 sorted 方法,再传给 map 方法,最后传给 collect 方法。


Java8中流的并行处理


为了利用多核架构并行执行这段代码,你只需要把 stream() 换成 parallelStream()

   public static List<String> getDiskNamesByStream(List<Dish> dishList){
        return dishList.parallelStream().filter(dish -> dish.getCalories() < 400)
                .sorted(Comparator.comparing(Dish::getCalories))
                .map(Dish::getName).collect(Collectors.toList());
    }


为了方便观察,我们在获取卡路里这一步加个休眠 ,启动Jconsole 来 观察下线程情况

    public static List<String> getDiskNamesByParallStream(List<Dish> dishList){
        return dishList.parallelStream().filter(dish -> {
            try {
                Thread.sleep(1000*1000); // 模拟休眠,观察parallelStream是否开启了多个线程计算
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return dish.getCalories() < 400 ;
        })
                .sorted(Comparator.comparing(Dish::getCalories))
                .map(Dish::getName).collect(Collectors.toList());
    }



20210307222329113.png


相关文章
|
5天前
|
Java 程序员 开发者
深入理解Java并发编程:线程同步与锁机制
【4月更文挑战第30天】 在多线程的世界中,确保数据的一致性和线程间的有效通信是至关重要的。本文将深入探讨Java并发编程中的核心概念——线程同步与锁机制。我们将从基本的synchronized关键字开始,逐步过渡到更复杂的ReentrantLock类,并探讨它们如何帮助我们在多线程环境中保持数据完整性和避免常见的并发问题。文章还将通过示例代码,展示这些同步工具在实际开发中的应用,帮助读者构建对Java并发编程深层次的理解。
|
5天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第30天】本文将深入探讨Java并发编程中的一个重要主题——线程池。我们将从线程池的基本概念入手,了解其工作原理和优势,然后详细介绍如何使用Java的Executor框架创建和管理线程池。最后,我们将讨论一些高级主题,如自定义线程工厂和拒绝策略。通过本文的学习,你将能够更好地理解和使用Java的线程池,提高你的并发编程能力。
|
5天前
|
安全 Java 调度
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第30天】本文将深入探讨Java并发编程的核心概念,包括线程安全、同步机制、锁优化以及性能调优。我们将通过实例分析如何确保多线程环境下的数据一致性,同时介绍一些常见的并发模式和最佳实践,旨在帮助开发者在保证线程安全的同时,提升系统的性能和响应能力。
|
3天前
|
存储 缓存 前端开发
Java串口通信技术探究3:RXTX库线程 优化系统性能的SerialPortEventListener类
Java串口通信技术探究3:RXTX库线程 优化系统性能的SerialPortEventListener类
13 3
|
3天前
|
Java
JAVA难点包括异常处理、多线程、泛型和反射,以及复杂的分布式系统知识
JAVA难点包括异常处理、多线程、泛型和反射,以及复杂的分布式系统知识。入坑JAVA因它的面向对象特性、平台无关性、强大的标准库和活跃的社区支持。
14 2
|
3天前
|
Java 调度 开发者
Java中的多线程编程:基础与实践
【5月更文挑战第2天】本文将深入探讨Java中的多线程编程,从基础概念到实际应用,为读者提供全面的理解和实践指导。我们将首先介绍线程的基本概念和重要性,然后详细解析Java中实现多线程的两种主要方式:继承Thread类和实现Runnable接口。接着,我们将探讨线程同步的问题,包括synchronized关键字和Lock接口的使用。最后,我们将通过一个实际的生产者-消费者模型来演示多线程编程的实践应用。
|
3天前
|
安全 Java 程序员
Java中的多线程编程:从理论到实践
【5月更文挑战第2天】 在计算机科学中,多线程编程是一项重要的技术,它允许多个任务在同一时间段内并发执行。在Java中,多线程编程是通过创建并管理线程来实现的。本文将深入探讨Java中的多线程编程,包括线程的概念、如何创建和管理线程、以及多线程编程的一些常见问题和解决方案。
12 1
|
3天前
|
分布式计算 Java API
Java 8新特性之Lambda表达式与Stream API
【5月更文挑战第1天】本文将介绍Java 8中的两个重要特性:Lambda表达式和Stream API。Lambda表达式是一种新的函数式编程语法,可以简化代码并提高可读性。Stream API是一种用于处理集合的新工具,可以方便地进行数据操作和转换。通过结合Lambda表达式和Stream API,我们可以更加简洁高效地编写Java代码。
|
4天前
|
存储 安全 Java
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第1天】本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化。我们将详细分析线程安全问题的根源,以及如何通过合理的设计和编码实践来避免常见的并发问题。同时,我们还将探讨如何在保证线程安全的前提下,提高程序的并发性能,包括使用高效的同步机制、减少锁的竞争以及利用现代硬件的并行能力等技术手段。
|
4天前
|
并行计算 Java 数据处理
Java中的多线程编程:基础知识与实践
【5月更文挑战第1天】本文将深入探讨Java中的多线程编程,包括其基本概念、实现方式以及实际应用。我们将从理论和实践两个角度出发,详细解析线程的创建、启动、控制以及同步等关键问题,并通过实例代码演示如何在Java中有效地使用多线程。