Java Stream API:集合操作与并行处理

简介: Stream API 是 Java 8 提供的集合处理工具,通过声明式编程简化数据操作。它支持链式调用、延迟执行和并行处理,能够高效实现过滤、转换、聚合等操作,提升代码可读性和性能。

💡 摘要:你是否曾为复杂的集合操作编写冗长的循环代码?是否想知道如何利用多核处理器加速数据处理?是否对函数式编程的声明式风格感到好奇?

别担心,Stream API是Java 8引入的现代集合处理工具,它能让你用更简洁的代码实现更强大的功能。

本文将带你从Stream的基本概念讲起,理解流与集合的根本区别。然后深入流的操作类型,学习中间操作和终止操作的用法。

接着通过实战案例展示Stream在数据过滤、转换、聚合等方面的强大能力。最后重点探索并行流的使用技巧和性能优化。从顺序处理到并行计算,从简单操作到复杂流水线,让你全面掌握Stream API的精髓。文末附性能对比和面试高频问题,助你写出更高效的集合处理代码。

一、Stream API概述:为什么需要流?

1. 传统集合处理的痛点

命令式编程的缺点

java

List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David", "Anna", "Andrew");


// 传统方式:找出以A开头的名字,转换为大写,排序,收集到列表

List<String> result = new ArrayList<>();

for (String name : names) {

   if (name.startsWith("A")) {

       result.add(name.toUpperCase());

   }

}

Collections.sort(result);

System.out.println(result);


// 问题分析:

// 🔴 代码冗长:需要多个步骤和临时集合

// 🔴 意图模糊:业务逻辑被实现细节淹没

// 🔴 难以并行:手动实现并行处理复杂易错

2. Stream的解决方案

声明式编程的优势

java

List<String> result = names.stream()

   .filter(name -> name.startsWith("A"))  // 过滤

   .map(String::toUpperCase)              // 转换

   .sorted()                              // 排序

   .collect(Collectors.toList());         // 收集


System.out.println(result);


// 优势分析:

// ✅ 代码简洁:链式调用表达业务逻辑

// ✅ 意图清晰:每个操作明确表达做什么

// ✅ 易于并行:只需调用parallel()方法

3. Stream vs Collection

根本区别

特性 Collection Stream
存储 存储数据 不存储数据,只是数据的视图
操作 外部迭代(用户控制循环) 内部迭代(Stream控制循环)
数据处理 eager(急切执行) lazy(延迟执行)
可重用性 可多次使用 一次性使用(终端操作后关闭)
并行能力 需要手动实现 内置并行支持

二、Stream操作类型:中间操作与终止操作

1. 操作分类图解

text

Stream源 → 中间操作 → 中间操作 → ... → 终止操作 → 结果

          (filter)   (map)           (collect)

         

特点:延迟执行,遇到终止操作才触发处理

2. 中间操作(Intermediate Operations)

常用的中间操作

filter() - 过滤

java

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);


// 过滤偶数

List<Integer> evens = numbers.stream()

   .filter(n -> n % 2 == 0)

   .collect(Collectors.toList()); // [2, 4, 6, 8, 10]


// 多重过滤

List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David", "Anna");

List<String> result = names.stream()

   .filter(name -> name.length() > 3)    // 长度大于3

   .filter(name -> name.contains("a"))   // 包含字母a

   .collect(Collectors.toList());        // [Alice, Charlie, David, Anna]

map() - 转换

java

// 字符串转换为长度

List<String> words = Arrays.asList("Java", "Stream", "API");

List<Integer> lengths = words.stream()

   .map(String::length)

   .collect(Collectors.toList()); // [4, 6, 3]


// 对象属性提取

class Person {

   String name;

   int age;

   // getters

}


List<Person> people = Arrays.asList(

   new Person("Alice", 25),

   new Person("Bob", 30),

   new Person("Charlie", 35)

);


List<String> names = people.stream()

   .map(Person::getName)

   .collect(Collectors.toList()); // [Alice, Bob, Charlie]

flatMap() - 扁平化转换

java

// 将多个列表合并为一个

List<List<String>> listOfLists = Arrays.asList(

   Arrays.asList("Apple", "Banana"),

   Arrays.asList("Orange", "Grape"),

   Arrays.asList("Peach", "Pear")

);


List<String> flatList = listOfLists.stream()

   .flatMap(List::stream)

   .collect(Collectors.toList()); // [Apple, Banana, Orange, Grape, Peach, Pear]


// 拆分字符串为单词

List<String> sentences = Arrays.asList(

   "Hello world",

   "Java Stream API",

   "Functional programming"

);


List<String> words = sentences.stream()

   .flatMap(sentence -> Arrays.stream(sentence.split(" ")))

   .collect(Collectors.toList()); // [Hello, world, Java, Stream, API, Functional, programming]

distinct() - 去重

java

List<Integer> numbers = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4);

List<Integer> distinctNumbers = numbers.stream()

   .distinct()

   .collect(Collectors.toList()); // [1, 2, 3, 4]

sorted() - 排序

java

List<String> names = Arrays.asList("Charlie", "Alice", "Bob");

List<String> sortedNames = names.stream()

   .sorted()

   .collect(Collectors.toList()); // [Alice, Bob, Charlie]


// 自定义排序

List<String> customSorted = names.stream()

   .sorted((s1, s2) -> s2.compareTo(s1)) // 逆序

   .collect(Collectors.toList()); // [Charlie, Bob, Alice]


// 多字段排序

List<Person> sortedPeople = people.stream()

   .sorted(Comparator.comparing(Person::getAge).thenComparing(Person::getName))

   .collect(Collectors.toList());

limit() 和 skip() - 分页

java

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);


// 分页操作:每页3条数据,获取第2页

List<Integer> page2 = numbers.stream()

   .skip(3)        // 跳过前3个

   .limit(3)       // 取3个

   .collect(Collectors.toList()); // [4, 5, 6]

3. 终止操作(Terminal Operations)

collect() - 收集

java

// 收集到List

List<String> list = stream.collect(Collectors.toList());


// 收集到Set

Set<String> set = stream.collect(Collectors.toSet());


// 收集到Map

Map<String, Integer> map = people.stream()

   .collect(Collectors.toMap(Person::getName, Person::getAge));


// 连接字符串

String joined = names.stream()

   .collect(Collectors.joining(", ")); // "Alice, Bob, Charlie"


// 分组

Map<Integer, List<Person>> peopleByAge = people.stream()

   .collect(Collectors.groupingBy(Person::getAge));


// 分区

Map<Boolean, List<Person>> partitioned = people.stream()

   .collect(Collectors.partitioningBy(p -> p.getAge() > 30));

forEach() - 遍历

java

// 遍历处理

names.stream()

   .filter(name -> name.startsWith("A"))

   .forEach(System.out::println); // 输出所有以A开头的名字


// 注意:forEach是终止操作,执行后流关闭

reduce() - 归约

java

// 求和

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

int sum = numbers.stream()

   .reduce(0, Integer::sum); // 15


// 最大值

Optional<Integer> max = numbers.stream()

   .reduce(Integer::max); // Optional[5]


// 字符串连接

List<String> words = Arrays.asList("Hello", "World", "!");

Optional<String> combined = words.stream()

   .reduce((s1, s2) -> s1 + " " + s2); // Optional["Hello World !"]

匹配操作

java

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);


// 任意匹配

boolean anyEven = numbers.stream().anyMatch(n -> n % 2 == 0); // true


// 全部匹配  

boolean allEven = numbers.stream().allMatch(n -> n % 2 == 0); // false


// 无匹配

boolean noneNegative = numbers.stream().noneMatch(n -> n < 0); // true

查找操作

java

Optional<Integer> firstEven = numbers.stream()

   .filter(n -> n % 2 == 0)

   .findFirst(); // Optional[2]


Optional<Integer> anyEven = numbers.stream()

   .filter(n -> n % 2 == 0)

   .findAny(); // 可能是Optional[2]或Optional[4]

三、并行流:利用多核处理能力

1. 创建并行流

多种创建方式

java

List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David");


// 从集合创建并行流

Stream<String> parallelStream1 = names.parallelStream();


// 将顺序流转换为并行流

Stream<String> parallelStream2 = names.stream().parallel();


// 并行数组流

int[] numbers = {1, 2, 3, 4, 5};

IntStream parallelIntStream = Arrays.stream(numbers).parallel();

2. 并行流的使用

并行处理示例

java

List<Integer> numbers = IntStream.rangeClosed(1, 1_000_000)

   .boxed()

   .collect(Collectors.toList());


// 顺序处理

long startSeq = System.currentTimeMillis();

long sumSeq = numbers.stream()

   .mapToLong(Integer::longValue)

   .sum();

long timeSeq = System.currentTimeMillis() - startSeq;


// 并行处理

long startPar = System.currentTimeMillis();

long sumPar = numbers.parallelStream()

   .mapToLong(Integer::longValue)

   .sum();

long timePar = System.currentTimeMillis() - startPar;


System.out.printf("顺序处理: %d ms, 结果: %d%n", timeSeq, sumSeq);

System.out.printf("并行处理: %d ms, 结果: %d%n", timePar, sumPar);

System.out.printf("加速比: %.2f倍%n", (double)timeSeq / timePar);

3. 并行流的注意事项

状态共享问题

java

// 错误示例:共享可变状态

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

List<Integer> result = Collections.synchronizedList(new ArrayList<>());


numbers.parallelStream()

   .forEach(n -> {

       // 可能产生竞态条件

       result.add(n * 2);

   });


// 正确做法:使用线程安全的收集器

List<Integer> safeResult = numbers.parallelStream()

   .map(n -> n * 2)

   .collect(Collectors.toList());

顺序敏感性操作

java

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);


// 顺序敏感操作:并行可能产生错误结果

List<Integer> sorted = numbers.parallelStream()

   .sorted() // 并行排序仍然正确

   .collect(Collectors.toList());


// 但某些操作顺序很重要

List<Integer> sequentialResult = numbers.stream()

   .limit(3)

   .collect(Collectors.toList()); // [1, 2, 3]


List<Integer> parallelResult = numbers.parallelStream()

   .limit(3)

   .collect(Collectors.toList()); // 结果不确定!

四、实战应用:复杂数据处理

1. 数据统计与分析

复杂数据聚合

java

class Order {

   String customer;

   double amount;

   String category;

   // getters

}


List<Order> orders = Arrays.asList(

   new Order("Alice", 100.0, "Electronics"),

   new Order("Bob", 200.0, "Books"),

   new Order("Alice", 150.0, "Electronics"),

   new Order("Charlie", 300.0, "Clothing"),

   new Order("Bob", 50.0, "Books")

);


// 按客户分组求总金额

Map<String, Double> totalByCustomer = orders.stream()

   .collect(Collectors.groupingBy(

       Order::getCustomer,

       Collectors.summingDouble(Order::getAmount)

   ));


// 按类别分组求平均金额

Map<String, Double> avgByCategory = orders.stream()

   .collect(Collectors.groupingBy(

       Order::getCategory,

       Collectors.averagingDouble(Order::getAmount)

   ));


// 多级分组:按类别再按客户

Map<String, Map<String, Double>> multiLevel = orders.stream()

   .collect(Collectors.groupingBy(

       Order::getCategory,

       Collectors.groupingBy(

           Order::getCustomer,

           Collectors.summingDouble(Order::getAmount)

       )

   ));

2. 文件处理

流式文件操作

java

// 读取文件并处理

try (Stream<String> lines = Files.lines(Paths.get("data.txt"))) {

   List<String> importantLines = lines

       .filter(line -> line.contains("ERROR"))

       .map(String::toUpperCase)

       .collect(Collectors.toList());

} catch (IOException e) {

   e.printStackTrace();

}


// 大文件处理:逐行处理避免内存溢出

try (Stream<String> lines = Files.lines(Paths.get("largefile.txt"))) {

   long errorCount = lines

       .parallel() // 并行处理大文件

       .filter(line -> line.contains("ERROR"))

       .count();

   System.out.println("错误行数: " + errorCount);

} catch (IOException e) {

   e.printStackTrace();

}

3. 数据库查询模拟

流式数据处理

java

// 模拟数据库查询结果

List<Employee> employees = Arrays.asList(

   new Employee("Alice", "IT", 5000),

   new Employee("Bob", "HR", 4000),

   new Employee("Charlie", "IT", 6000),

   new Employee("David", "Finance", 5500),

   new Employee("Eva", "HR", 4500)

);


// 部门薪资统计

Map<String, Double> avgSalaryByDept = employees.stream()

   .collect(Collectors.groupingBy(

       Employee::getDepartment,

       Collectors.averagingDouble(Employee::getSalary)

   ));


// 最高薪资员工

Optional<Employee> highestPaid = employees.stream()

   .max(Comparator.comparingDouble(Employee::getSalary));


// 部门员工分组

Map<String, List<Employee>> employeesByDept = employees.stream()

   .collect(Collectors.groupingBy(Employee::getDepartment));

五、性能优化与最佳实践

1. 流操作性能考虑

操作顺序优化

java

List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David", "Eva", "Frank");


// 不好的顺序:先排序再过滤

List<String> badOrder = names.stream()

   .sorted()                       // 排序所有元素

   .filter(name -> name.length() > 3) // 然后过滤

   .collect(Collectors.toList());


// 好的顺序:先过滤再排序

List<String> goodOrder = names.stream()

   .filter(name -> name.length() > 3) // 先减少数据量

   .sorted()                       // 然后排序较少的数据

   .collect(Collectors.toList());

原始类型流优化

java

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);


// 使用原始类型流避免装箱开销

int sum = numbers.stream()

   .mapToInt(Integer::intValue) // IntStream避免装箱

   .sum();


// 性能对比

long start1 = System.nanoTime();

int sum1 = numbers.stream().reduce(0, Integer::sum); // 有装箱

long time1 = System.nanoTime() - start1;


long start2 = System.nanoTime();

int sum2 = numbers.stream().mapToInt(Integer::intValue).sum(); // 无装箱

long time2 = System.nanoTime() - start2;


System.out.printf("有装箱: %d ns, 无装箱: %d ns, 提升: %.1f%%%n",

   time1, time2, (double)(time1 - time2) / time1 * 100);

2. 何时使用并行流

适合并行的场景

  • ✅ 大数据集处理(数万以上元素)
  • ✅ 计算密集型操作
  • ✅ 无状态、无顺序要求的操作
  • ✅ 独立的元素处理

不适合并行的场景

  • 🔴 小数据集(并行开销可能超过收益)
  • 🔴 顺序敏感操作(limit、findFirst等)
  • 🔴 有状态操作(共享可变状态)
  • 🔴 I/O密集型操作(线程阻塞)

六、总结:Stream API的最佳实践

1. 使用建议

代码可读性

java

// 好的写法:清晰的链式调用

List<String> result = employees.stream()

   .filter(emp -> emp.getSalary() > 5000)

   .map(Employee::getName)

   .sorted()

   .collect(Collectors.toList());


// 坏的写法:过长的链式操作

List<String> badResult = employees.stream().filter(e -> e.getSalary() > 5000 && e.getDepartment().equals("IT") || e.getJoinDate().isAfter(LocalDate.of(2020, 1, 1))).map(e -> e.getName().toUpperCase()).sorted((a, b) -> b.compareTo(a)).limit(10).collect(Collectors.toList());


// 改进:提取变量和方法

Predicate<Employee> highSalary = e -> e.getSalary() > 5000;

Predicate<Employee> itDepartment = e -> e.getDepartment().equals("IT");

Predicate<Employee> recentHire = e -> e.getJoinDate().isAfter(LocalDate.of(2020, 1, 1));


List<String> improvedResult = employees.stream()

   .filter(highSalary.and(itDepartment).or(recentHire))

   .map(Employee::getName)

   .map(String::toUpperCase)

   .sorted(Comparator.reverseOrder())

   .limit(10)

   .collect(Collectors.toList());

2. 性能最佳实践

  1. 优先使用基本类型流:IntStream、LongStream、DoubleStream
  2. 优化操作顺序:先filter后map,先减少数据量
  3. 避免重复计算:缓存中间结果
  4. 谨慎使用并行流:测试性能提升效果
  5. 使用收集器优化:选择合适的收集方式

七、面试高频问题

❓1. Stream和Collection有什么区别?

:Collection是存储数据的容器,Stream是处理数据的计算工具。Stream不存储数据,支持延迟执行和内部迭代,适合函数式编程。

❓2. 中间操作和终止操作有什么区别?

:中间操作返回新的Stream(延迟执行),终止操作产生结果或副作用(触发实际计算)。中间操作可以连接形成流水线。

❓3. 什么是延迟执行(Lazy Evaluation)?

:中间操作不会立即执行,只有在终止操作被调用时才会触发整个流水线的计算。这允许优化和短路操作。

❓4. 什么时候使用并行流?有什么注意事项?

:适合大数据集和计算密集型任务。需要注意线程安全、避免共享状态、顺序敏感性操作等问题。

❓5. map()和flatMap()有什么区别?

:map()进行一对一的转换,flatMap()进行一对多的转换并将结果展平。flatMap()常用于处理嵌套集合或Optional。


相关文章
|
9天前
|
缓存 安全 Java
Java反射机制:动态操作类与对象
Java反射机制是运行时动态操作类与对象的强大工具,支持获取类信息、动态创建实例、调用方法、访问字段等。它在框架开发、依赖注入、动态代理等方面有广泛应用,但也存在性能开销和安全风险。本文详解反射核心API、实战案例及性能优化策略,助你掌握Java动态编程精髓。
|
10天前
|
存储 SQL 缓存
Java字符串处理:String、StringBuilder与StringBuffer
本文深入解析Java中String、StringBuilder和StringBuffer的核心区别与使用场景。涵盖字符串不可变性、常量池、intern方法、可变字符串构建器的扩容机制及线程安全实现。通过性能测试对比三者差异,并提供最佳实践与高频面试问题解析,助你掌握Java字符串处理精髓。
|
存储 缓存 NoSQL
MySQL索引详解(一文搞懂)
索引是对数据库表中一列或多列的值进行排序的一种结构,使用索引可快速访问数据库表中的特定信息。
48983 17
MySQL索引详解(一文搞懂)
|
9天前
|
Java 编译器 测试技术
Java注解(Annotation)与元编程实践
本文深入讲解Java注解的原理与实战应用,涵盖内置注解、自定义注解、编译期与运行期处理机制,以及在依赖注入、Web框架和数据验证中的实际应用,助你掌握元编程核心技能。
|
机器学习/深度学习 自然语言处理 分布式计算
知识图谱(Knowledge Graph)之综述理解
知识图谱(Knowledge Graph)之综述理解
1278 0
知识图谱(Knowledge Graph)之综述理解
|
测试技术
Jmeter常用监听器详解
Jmeter常用监听器详解
Jmeter常用监听器详解
|
9天前
|
SQL Java 数据库连接
Java IO流(一):字节流与字符流基础
本文全面解析Java IO流,涵盖字节流、字符流及其使用场景,帮助开发者理解IO流分类与用途,掌握文件读写、编码转换、异常处理等核心技术,通过实战案例提升IO编程能力。
|
10天前
|
存储 缓存 安全
Java集合框架(二):Set接口与哈希表原理
本文深入解析Java中Set集合的工作原理及其实现机制,涵盖HashSet、LinkedHashSet和TreeSet三大实现类。从Set接口的特性出发,对比List理解去重机制,并详解哈希表原理、hashCode与equals方法的作用。进一步剖析HashSet的底层HashMap实现、LinkedHashSet的双向链表维护顺序特性,以及TreeSet基于红黑树的排序功能。文章还包含性能对比、自定义对象去重、集合运算实战和线程安全方案,帮助读者全面掌握Set的应用与选择策略。
87 23
|
11月前
|
XML 前端开发 Java
Spring,SpringBoot和SpringMVC的关系以及区别 —— 超准确,可当面试题!!!也可供零基础学习
本文阐述了Spring、Spring Boot和Spring MVC的关系与区别,指出Spring是一个轻量级、一站式、模块化的应用程序开发框架,Spring MVC是Spring的一个子框架,专注于Web应用和网络接口开发,而Spring Boot则是对Spring的封装,用于简化Spring应用的开发。
1790 0
Spring,SpringBoot和SpringMVC的关系以及区别 —— 超准确,可当面试题!!!也可供零基础学习
|
开发工具 git
成功解决:fatal: detected dubious ownership in repository at ‘E:/workspace/CSMarket‘。如何使用git工具通过命令行的形式
这篇文章分享了作者在使用Git工具初始化本地仓库时遇到的权限问题,提供了通过命令行解决Git仓库权限问题的方案,并介绍了如何使用Git命令行初始化项目、添加文件、提交以及关联远程仓库的步骤。
成功解决:fatal: detected dubious ownership in repository at ‘E:/workspace/CSMarket‘。如何使用git工具通过命令行的形式