Java 8 - Stream基本实例及Stream的并行处理在线程上的表现

简介: Java 8 - Stream基本实例及Stream的并行处理在线程上的表现

20200510181139786.png


什么是流


集合是Java中使用最多的API 。 可以让你把数据分组并加以处理。尽管集合对于几乎任何一个Java应用都是不可或缺的,但集合操作却远远算不上完美。


流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。就现在来说,可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理,无需写任何多线程代码 !


Java 8中的集合支持一个新的stream 方法,它会返回一个流(接口定义在 java.util.stream.Stream 里)


元素序列

就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。因为集合是数据结构,所以它的主要目的是以特定的时间/空间复杂度存储和访问元素(如 ArrayList 与 LinkedList )。但流的目的在于表达计算,比如 filter 、 sorted 和 map 。集合讲的是数据,流讲的是计算。


流会使用一个提供数据的源,如集合、数组或输入/输出资源。 请注意,从有序集合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致。


数据处理操作

流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中的常用操作,如 filter 、 map 、 reduce 、 find 、 match 、 sort 等。流操作可以顺序执行,也可并行执行


此外,流操作有两个重要的特点


流水线

很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大

的流水线。 流水线的操作可以看作对数据源进行数据库式查询。


内部迭代

与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的。


实例解释

   /**
     * 需求: 卡路里前三的dish的名字
     * @param dishList
     * @return
     */
    public static List<String> getTop3HighCalories(List<Dish> dishList){
        return dishList.stream().filter(dish->dish.getCalories()>300)
                .map(Dish::getName)
                .limit(3)
                .collect(Collectors.toList());
    }



20210307235344794.png


20210307235352191.png


先是对 menu 调用 stream 方法,得到一个流。 数据源是dishList ,它给流提供一个元素列表


接下来,对流应用一系列数据处理操作: filter 、 map 、 limit除了 collect 之外,所有这些操作都会返回另一个流,这样它们就可以接成一条 流水线 , 于是就可以看作对源的一个查询


最后, collect 操作开始处理流水线,并返回结果(它和别的操作不一样,因为它返回的不是流,在这里是一个 List )


在调用 collect 之前,没有任何结果产生,实际上根本就没有从 menu 里选择元素。可以这么理解:链中的方法调用都在排队等待,直到调用 collect 。


20210308000040802.png


filter :接受Lambda,从流中排除某些元素。在本例中,通过传递lambda d ->d.getCalories() > 300 ,选择出超过300卡路里的Dish

map : 接受一个Lambda,将元素转换成其他形式或提取信息。在本例中,通过传递方法引用 Dish::getName ,相当于Lambda d -> d.getName() ,提取了名字。

limit :截断流,使其元素不超过给定数量。

collect :将流转换为其他形式。在本例中,流被转换为一个列表。 可以把 collect 看作能够接受各种方案作为参数,并将流中的元素累计成为一个汇总结果的操作。 这里的toList() 就是将流转换为列表的方案。


流 VS 集合


Java现有的集合概念和新的流概念都提供了接口,来配合代表元素型有序值的数据接口。


粗略地说,集合与流之间的差异就在于什么时候进行计算。集合是一个内存中的数据结构,它包含数据结构中目前所有的值——集合中的每个元素都得先算出来才能添加到集合中。(你可以往集合里加东西或者?东西,但是不管什么时候,集合中的每个元素都是放在内存里的,元素都得先算出来才能成为集合的一部分。)


相比之下,流则是在概念上固定的数据结构(你不能添加或删除元素),其元素则是按需计算的。 是一种生产者?消费者的关系。


从另一个角度来说,流就像是一个延迟创建的集合:只有在消费者要求的时候才会计算值 。 与此相反,集合则是急切创建的。


以质数为例,要是想创建一个包含所有质数的集合,那这个程序算起来就没完没了了,因为总有新的质数要算,然后把它加到集合里面。当然这个集合是永远也创建不完的,消费者这辈子都见不着了。


另一个例子是用浏览器进行互联网搜索。假设你搜索的短语在Google或是网?里面有很多匹配项。你用不着等到所有结果和照片的集合下载完,而是得到一个流,里面有最好的10个或20个匹配项,还有一个按钮查看下面10个或20个。当你作为消费者点“下面10个”的时候,供应商就按需计算这些结果,然后再返回你的浏览器上显示。

20210308001110628.png


只能遍历一次


和迭代器类似,流只能遍历一次。遍历完之后,我们就说这个流已经被消费了。可以从原始数据源那里再获得一个新的流来重新遍历一遍,就像迭代器一样(这里假设它是集合之类的可重复的源,如果是I/O通道就不行了)


20210308002044765.png


    public static void  testConsumeMoreTime(List<Dish> dishList){
        Stream<Dish> stream = dishList.stream();
        stream.forEach(System.out::println);
        stream.forEach(System.out::println);
    }

20210308002025345.png


集合和流的另一个关键区别在于它们遍历数据的方式.


内部迭代与外部迭代


使用 Collection 接口需要用户去做迭代(比如用 for-each ),这称为外部迭代。 相反,Streams库使用内部迭代——它帮你把迭代做了,还把得到的流值存在了某个地方,你只要给出一个函数说要干什么就可以了。下面的代码列表说明了这种区别。


【集合 】


用 for-each 循环外部迭代


20210308002241130.png


  • 用背后的迭代器做外部迭代


20210308002332797.png


【流:内部迭代】


20210308002423648.png


内部迭代时,项目可以透明地并行处理,或者用更优化的顺序进行处理

Streams库的内部迭代可以自动选择一种适合你硬件的数据表示和并行实现。

与此相反,一旦通过写 for-each 而选择了外部迭代,那你基本上就要自己管理所有的并行问题了

20210308002524737.png



需求

需求: 输出小于400的Dish的名字 , 并按照卡路里排序

Java 7及之前的实现

package com.artisan.java8.stream;
import com.artisan.java8.Dish;
import java.util.*;
import java.util.stream.Collectors;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/3/7 8:23
 * @mark: show me the code , change the world
 */
public class StreamTest {
    public static void main(String[] args) {
        //have a dish list (menu)
        List<Dish> menu = Arrays.asList(
                new Dish("pork", false, 800, com.artisan.java8.Dish.Type.MEAT),
                new Dish("beef", false, 700, com.artisan.java8.Dish.Type.MEAT),
                new Dish("chicken", false, 400, com.artisan.java8.Dish.Type.MEAT),
                new Dish("french fries", true, 530, com.artisan.java8.Dish.Type.OTHER),
                new Dish("rice", true, 350, com.artisan.java8.Dish.Type.OTHER),
                new Dish("season fruit", true, 120, com.artisan.java8.Dish.Type.OTHER),
                new Dish("pizza", true, 550, com.artisan.java8.Dish.Type.OTHER),
                new Dish("prawns", false, 300, com.artisan.java8.Dish.Type.FISH),
                new Dish("salmon", false, 450, com.artisan.java8.Dish.Type.FISH));
        System.out.println(getDiskNamesByCollections(menu));
    }
    /**
     * 需求: 输出小于400的Dish的名字 , 并按照卡路里排序 
     * @param dishList
     * @return
     */
    public static List<String> getDiskNamesByCollections(List<Dish> dishList){
        List<Dish> lowCalories = new ArrayList<>();
        //  filter  过滤小于400的
        for(Dish dish : dishList){
            if (dish.getCalories() < 400) {
                lowCalories.add(dish);
            }
        }
        // sort   按照卡路里排序
        // Collections.sort(lowCalories,(d1,d2)->Integer.compare(d1.getCalories(),d2.getCalories()));
        Collections.sort(lowCalories,Comparator.comparingInt(Dish::getCalories));
        // 处理排序后的数据
        List<String> dishNames = new ArrayList<>();
        for (Dish dish :lowCalories){
            dishNames.add(dish.getName());
        }
        return dishNames;
    }
}


可以看到需要写这么多代码,这么多步骤


20210307223247585.png


还有一个“垃圾变量” lowCalories ,它唯一的作用就是作为一次性的中间容器。


我们来看下Java8的试下

Java8中流的处理

  /**
     * 需求: 输出小于400的Dish的名字 , 按照卡路里从第到高输出
     * @param dishList
     * @return
     */
    public static List<String> getDiskNamesByStream(List<Dish> dishList){
        return dishList.stream().filter(dish -> dish.getCalories() < 400)
                .sorted(Comparator.comparing(Dish::getCalories))
                .map(Dish::getName).collect(Collectors.toList());
    }


20210307223453691.png


处理流程如下:


20210307223727399.png


可以把几个基础操作链接起来,来表达复杂的数据处理流水线(在 filter 后面接上sorted 、 map 和 collect 操作,如上图所示),同时保持代码清晰可读。 filter 的结果被传给了 sorted 方法,再传给 map 方法,最后传给 collect 方法。


Java8中流的并行处理


为了利用多核架构并行执行这段代码,你只需要把 stream() 换成 parallelStream()

   public static List<String> getDiskNamesByStream(List<Dish> dishList){
        return dishList.parallelStream().filter(dish -> dish.getCalories() < 400)
                .sorted(Comparator.comparing(Dish::getCalories))
                .map(Dish::getName).collect(Collectors.toList());
    }


为了方便观察,我们在获取卡路里这一步加个休眠 ,启动Jconsole 来 观察下线程情况

    public static List<String> getDiskNamesByParallStream(List<Dish> dishList){
        return dishList.parallelStream().filter(dish -> {
            try {
                Thread.sleep(1000*1000); // 模拟休眠,观察parallelStream是否开启了多个线程计算
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return dish.getCalories() < 400 ;
        })
                .sorted(Comparator.comparing(Dish::getCalories))
                .map(Dish::getName).collect(Collectors.toList());
    }



20210307222329113.png


相关文章
|
9天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
11天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
11天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
11天前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
35 3
|
11天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
93 2
|
19天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
46 6
|
28天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
28天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
50 3
|
29天前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
1月前
|
监控 Java 数据库连接
Java线程管理:守护线程与用户线程的区分与应用
在Java多线程编程中,线程可以分为守护线程(Daemon Thread)和用户线程(User Thread)。这两种线程在行为和用途上有着明显的区别,了解它们的差异对于编写高效、稳定的并发程序至关重要。
36 2