Java 8中stream流处理(入门教程)

简介: 本文中流处理是指对运动中的数据的处理,在生成或接收数据直接计算数据。应用程序中分析和查询不断存在,数据不断地流经它们。在从流中接收到事件时,流处理应用程序对该事件作出反应。

基础概念

“流”是一个抽象的概念,它是对输入输出设备的一种抽象理解,在java中,对数据的输入输出操作都是以“流”的方式进行的。“流”具有方向性,输入流、输出流是相对的。当程序需要从数据源中读入数据的时候就会开启一个输入流,相反,写出数据到某个数据源目的地的时候也会开启一个输出流。数据源可以是文件、内存或者网络等。

本文中流处理是指对运动中的数据的处理,在生成或接收数据直接计算数据。应用程序中分析和查询不断存在,数据不断地流经它们。在从流中接收到事件时,流处理应用程序对该事件作出反应。

如果我们使用传统的循环迭代方式对数据集进行复杂计算,常常会带来两个弊端:

  1. 迭代次数多,迭代次数跟函数调用的次数相等。
  2. 频繁产生中间结果,存储开销无法接受。

流处理可以立即对事件做出反应,且可以处理比其他数据处理系统大得多的数据量:直接处理事件流,并且只保留数据中有意义的子集。尤其是面对持续生成,本质上是无穷尽的数据集。

Java Stream 类

JDK 1.8 新增。将要处理的元素集合看作一种流,在管道的节点上进行处理。使代码更简洁易读。

集合接口有两个方法来生成流,数据类型将由 Collection 转化为 Stream 。

  • stream() 方法:为集合创建串行流。
  • parallelStream() 方法:为集合创建并行流。
  1. Stream 的遍历方式和结果与 Iterator 无差别(便于转化),其优势在于其原型链的设计使得它可以对遍历处理后的数据进行再处理。
  2. parallelStream 提供了流的并行处理,底层使用 Fork/Join 框架,简单理解就是多线程异步任务的一种实现。处理过程中会有多个线程处理元素,具体由 JDK 负责管理。不保证有序性。
  3. 串行流和并行流之间可以通过 parallel 和 sequential 方法相互转化。
Stream<Integer> stream = list.stream();                     // 声明作为流处理
ParellerStream<Integer> pStream = stream.parallel();        // 转化为并行流

流操作

流处理的每个操作阶段都会封装到一个 Sink 接口里,处理数据后再将数据传递给下游的 Sink。
Stream 上的所有操作分为两类:中间操作和结束操作。
Stream 是延迟执行的,只有调用到结束操作,才触发整个流水线的执行。如果未定义结束操作,那么流处理什么也不会做。

流的初体验
long count = strings.stream()                   //声明作为流处理
                .filter(e -> e.isEmpty())      //中间操作,过滤空元素
                .count();                     //结束操作,计算

中间操作

过滤filter

==filter()方法用于通过设置的条件过滤出元素==

 List<String> strings =Arrays.asList("aa", "bb", "cc", "", "");
        long count = strings.stream()           //声明作为流处理
                .filter(e -> e.isEmpty())      //中间操作,过滤空元素
                .count();                      //结束操作,计算
        System.out.println(count);

筛选limit/skip

==limit()方法用于获取指定数量的流(前n个),skip()方法用于去除指定数量的流(前n个)==

Random random = new Random();
        random.ints()
                .limit(10)
                .skip(5)
                .forEach(value -> System.out.println(value));

映射map

==map()方法用于映射每个元素到对应的结果,其实就是对结果进行转化==

List<Integer> integers = Arrays.asList(3, 2, 5, 4, 6, 8, 9, 1);
        List<Integer> list = integers.stream()
                                     .map(i -> i * i)
                                     .collect(Collectors.toList());
        System.out.println(list.toString());

排序sorted

==sorted()方法通过Comparable接口对流进行排序,也可以自定义==

 Random random = new Random();
        random.ints().limit(10)
                     .sorted()
                     .forEach(System.out::println);

使用Comparator 来排序一个list
说明:统计学生毕业后薪资,将同学按年龄由大到小进行排序

 ArrayList<Student> students = new ArrayList<>();
        students.add(new Student("zx",28,8000));
        students.add(new Student("zr",29,8500));
        students.add(new Student("zt",25,8000));
        students.add(new Student("zy",35,10000));
        students.add(new Student("zu",22,6000));
        
        //默认升序排列,使用reversed()达到降序效果
        students.stream().sorted(Comparator.comparing(Student::getAge).reversed()) 
                         .forEach(System.out::println);

通过Comparator.thenComparing(Comparator<? super T> other) 实现多字段排序
说明:统计学生毕业后薪资,将同学按年龄由小到大进行排序,年龄相等时,按薪资升序排序

 ArrayList<Student> students = new ArrayList<>();
        students.add(new Student("zx",25,8000));
        students.add(new Student("zr",25,8500));
        students.add(new Student("zt",25,8000));
        students.add(new Student("zy",35,10000));
        students.add(new Student("zu",22,6000));

        students.stream().sorted(Comparator.comparing(Student::getAge).thenComparing(Student::getSalary))
                         .forEach(System.out::println);

Student类的代码

public class Student {
    private String name;
    private int age;
    private int salary;

    public Student() {
    }

    public Student(String name, int age, int salary) {
        this.name = name;
        this.age = age;
        this.salary = salary;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public int getSalary() {
        return salary;
    }

    public void setSalary(int salary) {
        this.salary = salary;
    }

    @Override
    public String toString() {
        return "Student{" +
                "name='" + name + '\'' +
                ", age=" + age +
                ", salary=" + salary +
                '}';
    }
}

去重distinct

==distinct()方法通过流元素的hashCode和equals方法去除重复元素==
说明:录入学生毕业后薪资时,确保无重复数据录入

 ArrayList<Student> students = new ArrayList<>();
        students.add(new Student("zx",25,8000));
        students.add(new Student("zx",25,8000));
        students.add(new Student("zt",35,10000));
        students.add(new Student("zt",35,10000));
        students.add(new Student("zu",22,6000));

        students.stream().distinct()
                         .forEach(System.out::println);

在Student类中增加hashCode()和equals()方法

@Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Student student = (Student) o;
        return age == student.age &&
                salary == student.salary &&
                Objects.equals(name, student.name);
    }

    @Override
    public int hashCode() {
        return Objects.hash(name, age, salary);
    }

结束操作

迭代 forEach

==forEach 迭代流中的每个数据,即对每个数据进行最后的处理(比如保存到数据库中或打印)==

// 输出 10 个随机数 
Random random = new Random();
random.ints().limit(10).forEach(System.out::println);

聚合Collectors

==Collectors 类实现了归约操作,例如将流转换成集合和聚合元素,可用于返回列表或字符串==
Stream转化为List

 List<String> strings = Arrays.asList("aa", "bb", "", "cc", "dd", "ee", "", "ff");
        List<String> res = strings.stream()
                                  .filter(string -> !string.isEmpty())
                                  .collect(Collectors.toList());
        System.out.println(res);

Stream转化为Set

 List<String> strings = Arrays.asList("aa", "bb", "", "bb", "dd", "dd", "", "aa");
        Set<String> res = strings.stream()
                                  .filter(string -> !string.isEmpty())
                                  .collect(Collectors.toSet());
        System.out.println(res);

Stream转化为String

joining参数说明:

  1. 第一个参数(delimiter):在每一个元素后追加的元素
  2. 第二个参数(prefix):在转化后的整个字符串首部追加的元素
  3. 第三个参数(suffix):在转化后的整个字符串尾部追加的元素
List<String> strings = Arrays.asList("aa", "bb", "bb");
        String collect = strings.stream()
                                .filter(string -> !string.isEmpty())
                                .collect(Collectors.joining("qq","cc","oo"));
        System.out.println(collect);

输出结果为:ccaaqqbbqqbboo

统计SummaryStatistics

==收集最终产生的统计结果,它们主要用于int,double,long等基本类型上==

 List<Integer> integerList = Arrays.asList(3, 2, 3, 4, 7, 9);
        IntSummaryStatistics statistics = integerList.stream()
                                                     .mapToInt((x) -> x)
                                                     .summaryStatistics();
        System.out.println("列表中最大的数 : " + statistics.getMax());
        System.out.println("列表中最小的数 : " + statistics.getMin());
        System.out.println("所有数之和 : " + statistics.getSum());
        System.out.println("平均数 : " + statistics.getAverage());

结果:
在这里插入图片描述

目录
相关文章
|
1天前
|
分布式计算 Java API
Java 8带来了流处理与函数式编程等新特性,极大提升了开发效率
Java 8带来了流处理与函数式编程等新特性,极大提升了开发效率。流处理采用声明式编程模型,通过filter、map等操作简化数据集处理,提高代码可读性。Lambda表达式支持轻量级函数定义,配合Predicate、Function等接口,使函数式编程无缝融入Java。此外,Optional类及新日期时间API等增强功能,让开发者能更优雅地处理潜在错误,编写出更健壮的应用程序。
7 1
|
4天前
|
Java API 开发者
|
10天前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
32 8
|
12天前
|
存储 算法 Oracle
19 Java8概述(Java8概述+lambda表达式+函数式接口+方法引用+Stream+新时间API)
19 Java8概述(Java8概述+lambda表达式+函数式接口+方法引用+Stream+新时间API)
39 8
|
9天前
|
自然语言处理 Java API
"告别Java8 Stream噩梦,JDFrame神器来袭!让你的代码简洁如诗,效率翻倍,编程新体验等你尝鲜!"
【8月更文挑战第11天】Java 8的Stream API以强大的函数式编程能力革新了集合数据处理方式,但其抽象概念和复杂的链式调用让不少开发者望而却步。为此,JDFrame框架应运而生,通过直观易懂的操作符简化Stream使用,减少代码量并提高效率。
26 3
|
13天前
|
存储 Java API
探索Java中的Stream API: 提升数据处理的效率与优雅
在Java的海洋中,Stream API如同一股清流,为数据处理注入了新的活力。本文将深入探讨Stream API的核心概念、操作以及它如何改变我们编写和理解代码的方式。通过实际案例,我们将揭示这一现代编程范式如何简化集合处理,提高代码的可读性与性能。
|
3天前
|
并行计算 Java API
|
5天前
|
前端开发 Oracle Java
Java 22 新增利器: 使用 Java Stream Gather 优雅地处理流中的状态
本文我们分析了 什么 是 “流”,对比了 Java 上几种常见的 “流”库,引入和详细介绍了 Java 22 中的 Stream Gather API 。同时也简单分享了利用 虚拟线程 如何简化 StreammapConcurrent操作符的实现。
|
5天前
|
Java API
Java8 Lambda 设计和实现问题之在Java 8的Stream API中,parallel=false时collect方法是如何实现的
Java8 Lambda 设计和实现问题之在Java 8的Stream API中,parallel=false时collect方法是如何实现的
|
5天前
|
Java
Java Lambda Stream
Java Lambda Stream
12 0