Java Stream API:集合操作与并行处理

简介: Stream API 是 Java 8 提供的集合处理工具,通过声明式编程简化数据操作。它支持链式调用、延迟执行和并行处理,能够高效实现过滤、转换、聚合等操作,提升代码可读性和性能。

💡 摘要:你是否曾为复杂的集合操作编写冗长的循环代码?是否想知道如何利用多核处理器加速数据处理?是否对函数式编程的声明式风格感到好奇?

别担心,Stream API是Java 8引入的现代集合处理工具,它能让你用更简洁的代码实现更强大的功能。

本文将带你从Stream的基本概念讲起,理解流与集合的根本区别。然后深入流的操作类型,学习中间操作和终止操作的用法。

接着通过实战案例展示Stream在数据过滤、转换、聚合等方面的强大能力。最后重点探索并行流的使用技巧和性能优化。从顺序处理到并行计算,从简单操作到复杂流水线,让你全面掌握Stream API的精髓。文末附性能对比和面试高频问题,助你写出更高效的集合处理代码。

一、Stream API概述:为什么需要流?

1. 传统集合处理的痛点

命令式编程的缺点

java

List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David", "Anna", "Andrew");


// 传统方式:找出以A开头的名字,转换为大写,排序,收集到列表

List<String> result = new ArrayList<>();

for (String name : names) {

   if (name.startsWith("A")) {

       result.add(name.toUpperCase());

   }

}

Collections.sort(result);

System.out.println(result);


// 问题分析:

// 🔴 代码冗长:需要多个步骤和临时集合

// 🔴 意图模糊:业务逻辑被实现细节淹没

// 🔴 难以并行:手动实现并行处理复杂易错

2. Stream的解决方案

声明式编程的优势

java

List<String> result = names.stream()

   .filter(name -> name.startsWith("A"))  // 过滤

   .map(String::toUpperCase)              // 转换

   .sorted()                              // 排序

   .collect(Collectors.toList());         // 收集


System.out.println(result);


// 优势分析:

// ✅ 代码简洁:链式调用表达业务逻辑

// ✅ 意图清晰:每个操作明确表达做什么

// ✅ 易于并行:只需调用parallel()方法

3. Stream vs Collection

根本区别

特性 Collection Stream
存储 存储数据 不存储数据,只是数据的视图
操作 外部迭代(用户控制循环) 内部迭代(Stream控制循环)
数据处理 eager(急切执行) lazy(延迟执行)
可重用性 可多次使用 一次性使用(终端操作后关闭)
并行能力 需要手动实现 内置并行支持

二、Stream操作类型:中间操作与终止操作

1. 操作分类图解

text

Stream源 → 中间操作 → 中间操作 → ... → 终止操作 → 结果

          (filter)   (map)           (collect)

         

特点:延迟执行,遇到终止操作才触发处理

2. 中间操作(Intermediate Operations)

常用的中间操作

filter() - 过滤

java

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);


// 过滤偶数

List<Integer> evens = numbers.stream()

   .filter(n -> n % 2 == 0)

   .collect(Collectors.toList()); // [2, 4, 6, 8, 10]


// 多重过滤

List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David", "Anna");

List<String> result = names.stream()

   .filter(name -> name.length() > 3)    // 长度大于3

   .filter(name -> name.contains("a"))   // 包含字母a

   .collect(Collectors.toList());        // [Alice, Charlie, David, Anna]

map() - 转换

java

// 字符串转换为长度

List<String> words = Arrays.asList("Java", "Stream", "API");

List<Integer> lengths = words.stream()

   .map(String::length)

   .collect(Collectors.toList()); // [4, 6, 3]


// 对象属性提取

class Person {

   String name;

   int age;

   // getters

}


List<Person> people = Arrays.asList(

   new Person("Alice", 25),

   new Person("Bob", 30),

   new Person("Charlie", 35)

);


List<String> names = people.stream()

   .map(Person::getName)

   .collect(Collectors.toList()); // [Alice, Bob, Charlie]

flatMap() - 扁平化转换

java

// 将多个列表合并为一个

List<List<String>> listOfLists = Arrays.asList(

   Arrays.asList("Apple", "Banana"),

   Arrays.asList("Orange", "Grape"),

   Arrays.asList("Peach", "Pear")

);


List<String> flatList = listOfLists.stream()

   .flatMap(List::stream)

   .collect(Collectors.toList()); // [Apple, Banana, Orange, Grape, Peach, Pear]


// 拆分字符串为单词

List<String> sentences = Arrays.asList(

   "Hello world",

   "Java Stream API",

   "Functional programming"

);


List<String> words = sentences.stream()

   .flatMap(sentence -> Arrays.stream(sentence.split(" ")))

   .collect(Collectors.toList()); // [Hello, world, Java, Stream, API, Functional, programming]

distinct() - 去重

java

List<Integer> numbers = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4);

List<Integer> distinctNumbers = numbers.stream()

   .distinct()

   .collect(Collectors.toList()); // [1, 2, 3, 4]

sorted() - 排序

java

List<String> names = Arrays.asList("Charlie", "Alice", "Bob");

List<String> sortedNames = names.stream()

   .sorted()

   .collect(Collectors.toList()); // [Alice, Bob, Charlie]


// 自定义排序

List<String> customSorted = names.stream()

   .sorted((s1, s2) -> s2.compareTo(s1)) // 逆序

   .collect(Collectors.toList()); // [Charlie, Bob, Alice]


// 多字段排序

List<Person> sortedPeople = people.stream()

   .sorted(Comparator.comparing(Person::getAge).thenComparing(Person::getName))

   .collect(Collectors.toList());

limit() 和 skip() - 分页

java

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);


// 分页操作:每页3条数据,获取第2页

List<Integer> page2 = numbers.stream()

   .skip(3)        // 跳过前3个

   .limit(3)       // 取3个

   .collect(Collectors.toList()); // [4, 5, 6]

3. 终止操作(Terminal Operations)

collect() - 收集

java

// 收集到List

List<String> list = stream.collect(Collectors.toList());


// 收集到Set

Set<String> set = stream.collect(Collectors.toSet());


// 收集到Map

Map<String, Integer> map = people.stream()

   .collect(Collectors.toMap(Person::getName, Person::getAge));


// 连接字符串

String joined = names.stream()

   .collect(Collectors.joining(", ")); // "Alice, Bob, Charlie"


// 分组

Map<Integer, List<Person>> peopleByAge = people.stream()

   .collect(Collectors.groupingBy(Person::getAge));


// 分区

Map<Boolean, List<Person>> partitioned = people.stream()

   .collect(Collectors.partitioningBy(p -> p.getAge() > 30));

forEach() - 遍历

java

// 遍历处理

names.stream()

   .filter(name -> name.startsWith("A"))

   .forEach(System.out::println); // 输出所有以A开头的名字


// 注意:forEach是终止操作,执行后流关闭

reduce() - 归约

java

// 求和

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

int sum = numbers.stream()

   .reduce(0, Integer::sum); // 15


// 最大值

Optional<Integer> max = numbers.stream()

   .reduce(Integer::max); // Optional[5]


// 字符串连接

List<String> words = Arrays.asList("Hello", "World", "!");

Optional<String> combined = words.stream()

   .reduce((s1, s2) -> s1 + " " + s2); // Optional["Hello World !"]

匹配操作

java

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);


// 任意匹配

boolean anyEven = numbers.stream().anyMatch(n -> n % 2 == 0); // true


// 全部匹配  

boolean allEven = numbers.stream().allMatch(n -> n % 2 == 0); // false


// 无匹配

boolean noneNegative = numbers.stream().noneMatch(n -> n < 0); // true

查找操作

java

Optional<Integer> firstEven = numbers.stream()

   .filter(n -> n % 2 == 0)

   .findFirst(); // Optional[2]


Optional<Integer> anyEven = numbers.stream()

   .filter(n -> n % 2 == 0)

   .findAny(); // 可能是Optional[2]或Optional[4]

三、并行流:利用多核处理能力

1. 创建并行流

多种创建方式

java

List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David");


// 从集合创建并行流

Stream<String> parallelStream1 = names.parallelStream();


// 将顺序流转换为并行流

Stream<String> parallelStream2 = names.stream().parallel();


// 并行数组流

int[] numbers = {1, 2, 3, 4, 5};

IntStream parallelIntStream = Arrays.stream(numbers).parallel();

2. 并行流的使用

并行处理示例

java

List<Integer> numbers = IntStream.rangeClosed(1, 1_000_000)

   .boxed()

   .collect(Collectors.toList());


// 顺序处理

long startSeq = System.currentTimeMillis();

long sumSeq = numbers.stream()

   .mapToLong(Integer::longValue)

   .sum();

long timeSeq = System.currentTimeMillis() - startSeq;


// 并行处理

long startPar = System.currentTimeMillis();

long sumPar = numbers.parallelStream()

   .mapToLong(Integer::longValue)

   .sum();

long timePar = System.currentTimeMillis() - startPar;


System.out.printf("顺序处理: %d ms, 结果: %d%n", timeSeq, sumSeq);

System.out.printf("并行处理: %d ms, 结果: %d%n", timePar, sumPar);

System.out.printf("加速比: %.2f倍%n", (double)timeSeq / timePar);

3. 并行流的注意事项

状态共享问题

java

// 错误示例:共享可变状态

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

List<Integer> result = Collections.synchronizedList(new ArrayList<>());


numbers.parallelStream()

   .forEach(n -> {

       // 可能产生竞态条件

       result.add(n * 2);

   });


// 正确做法:使用线程安全的收集器

List<Integer> safeResult = numbers.parallelStream()

   .map(n -> n * 2)

   .collect(Collectors.toList());

顺序敏感性操作

java

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);


// 顺序敏感操作:并行可能产生错误结果

List<Integer> sorted = numbers.parallelStream()

   .sorted() // 并行排序仍然正确

   .collect(Collectors.toList());


// 但某些操作顺序很重要

List<Integer> sequentialResult = numbers.stream()

   .limit(3)

   .collect(Collectors.toList()); // [1, 2, 3]


List<Integer> parallelResult = numbers.parallelStream()

   .limit(3)

   .collect(Collectors.toList()); // 结果不确定!

四、实战应用:复杂数据处理

1. 数据统计与分析

复杂数据聚合

java

class Order {

   String customer;

   double amount;

   String category;

   // getters

}


List<Order> orders = Arrays.asList(

   new Order("Alice", 100.0, "Electronics"),

   new Order("Bob", 200.0, "Books"),

   new Order("Alice", 150.0, "Electronics"),

   new Order("Charlie", 300.0, "Clothing"),

   new Order("Bob", 50.0, "Books")

);


// 按客户分组求总金额

Map<String, Double> totalByCustomer = orders.stream()

   .collect(Collectors.groupingBy(

       Order::getCustomer,

       Collectors.summingDouble(Order::getAmount)

   ));


// 按类别分组求平均金额

Map<String, Double> avgByCategory = orders.stream()

   .collect(Collectors.groupingBy(

       Order::getCategory,

       Collectors.averagingDouble(Order::getAmount)

   ));


// 多级分组:按类别再按客户

Map<String, Map<String, Double>> multiLevel = orders.stream()

   .collect(Collectors.groupingBy(

       Order::getCategory,

       Collectors.groupingBy(

           Order::getCustomer,

           Collectors.summingDouble(Order::getAmount)

       )

   ));

2. 文件处理

流式文件操作

java

// 读取文件并处理

try (Stream<String> lines = Files.lines(Paths.get("data.txt"))) {

   List<String> importantLines = lines

       .filter(line -> line.contains("ERROR"))

       .map(String::toUpperCase)

       .collect(Collectors.toList());

} catch (IOException e) {

   e.printStackTrace();

}


// 大文件处理:逐行处理避免内存溢出

try (Stream<String> lines = Files.lines(Paths.get("largefile.txt"))) {

   long errorCount = lines

       .parallel() // 并行处理大文件

       .filter(line -> line.contains("ERROR"))

       .count();

   System.out.println("错误行数: " + errorCount);

} catch (IOException e) {

   e.printStackTrace();

}

3. 数据库查询模拟

流式数据处理

java

// 模拟数据库查询结果

List<Employee> employees = Arrays.asList(

   new Employee("Alice", "IT", 5000),

   new Employee("Bob", "HR", 4000),

   new Employee("Charlie", "IT", 6000),

   new Employee("David", "Finance", 5500),

   new Employee("Eva", "HR", 4500)

);


// 部门薪资统计

Map<String, Double> avgSalaryByDept = employees.stream()

   .collect(Collectors.groupingBy(

       Employee::getDepartment,

       Collectors.averagingDouble(Employee::getSalary)

   ));


// 最高薪资员工

Optional<Employee> highestPaid = employees.stream()

   .max(Comparator.comparingDouble(Employee::getSalary));


// 部门员工分组

Map<String, List<Employee>> employeesByDept = employees.stream()

   .collect(Collectors.groupingBy(Employee::getDepartment));

五、性能优化与最佳实践

1. 流操作性能考虑

操作顺序优化

java

List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David", "Eva", "Frank");


// 不好的顺序:先排序再过滤

List<String> badOrder = names.stream()

   .sorted()                       // 排序所有元素

   .filter(name -> name.length() > 3) // 然后过滤

   .collect(Collectors.toList());


// 好的顺序:先过滤再排序

List<String> goodOrder = names.stream()

   .filter(name -> name.length() > 3) // 先减少数据量

   .sorted()                       // 然后排序较少的数据

   .collect(Collectors.toList());

原始类型流优化

java

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);


// 使用原始类型流避免装箱开销

int sum = numbers.stream()

   .mapToInt(Integer::intValue) // IntStream避免装箱

   .sum();


// 性能对比

long start1 = System.nanoTime();

int sum1 = numbers.stream().reduce(0, Integer::sum); // 有装箱

long time1 = System.nanoTime() - start1;


long start2 = System.nanoTime();

int sum2 = numbers.stream().mapToInt(Integer::intValue).sum(); // 无装箱

long time2 = System.nanoTime() - start2;


System.out.printf("有装箱: %d ns, 无装箱: %d ns, 提升: %.1f%%%n",

   time1, time2, (double)(time1 - time2) / time1 * 100);

2. 何时使用并行流

适合并行的场景

  • ✅ 大数据集处理(数万以上元素)
  • ✅ 计算密集型操作
  • ✅ 无状态、无顺序要求的操作
  • ✅ 独立的元素处理

不适合并行的场景

  • 🔴 小数据集(并行开销可能超过收益)
  • 🔴 顺序敏感操作(limit、findFirst等)
  • 🔴 有状态操作(共享可变状态)
  • 🔴 I/O密集型操作(线程阻塞)

六、总结:Stream API的最佳实践

1. 使用建议

代码可读性

java

// 好的写法:清晰的链式调用

List<String> result = employees.stream()

   .filter(emp -> emp.getSalary() > 5000)

   .map(Employee::getName)

   .sorted()

   .collect(Collectors.toList());


// 坏的写法:过长的链式操作

List<String> badResult = employees.stream().filter(e -> e.getSalary() > 5000 && e.getDepartment().equals("IT") || e.getJoinDate().isAfter(LocalDate.of(2020, 1, 1))).map(e -> e.getName().toUpperCase()).sorted((a, b) -> b.compareTo(a)).limit(10).collect(Collectors.toList());


// 改进:提取变量和方法

Predicate<Employee> highSalary = e -> e.getSalary() > 5000;

Predicate<Employee> itDepartment = e -> e.getDepartment().equals("IT");

Predicate<Employee> recentHire = e -> e.getJoinDate().isAfter(LocalDate.of(2020, 1, 1));


List<String> improvedResult = employees.stream()

   .filter(highSalary.and(itDepartment).or(recentHire))

   .map(Employee::getName)

   .map(String::toUpperCase)

   .sorted(Comparator.reverseOrder())

   .limit(10)

   .collect(Collectors.toList());

2. 性能最佳实践

  1. 优先使用基本类型流:IntStream、LongStream、DoubleStream
  2. 优化操作顺序:先filter后map,先减少数据量
  3. 避免重复计算:缓存中间结果
  4. 谨慎使用并行流:测试性能提升效果
  5. 使用收集器优化:选择合适的收集方式

七、面试高频问题

❓1. Stream和Collection有什么区别?

:Collection是存储数据的容器,Stream是处理数据的计算工具。Stream不存储数据,支持延迟执行和内部迭代,适合函数式编程。

❓2. 中间操作和终止操作有什么区别?

:中间操作返回新的Stream(延迟执行),终止操作产生结果或副作用(触发实际计算)。中间操作可以连接形成流水线。

❓3. 什么是延迟执行(Lazy Evaluation)?

:中间操作不会立即执行,只有在终止操作被调用时才会触发整个流水线的计算。这允许优化和短路操作。

❓4. 什么时候使用并行流?有什么注意事项?

:适合大数据集和计算密集型任务。需要注意线程安全、避免共享状态、顺序敏感性操作等问题。

❓5. map()和flatMap()有什么区别?

:map()进行一对一的转换,flatMap()进行一对多的转换并将结果展平。flatMap()常用于处理嵌套集合或Optional。


相关文章
|
20天前
|
Java
Java的CAS机制深度解析
CAS(Compare-And-Swap)是并发编程中的原子操作,用于实现多线程环境下的无锁数据同步。它通过比较内存值与预期值,决定是否更新值,从而避免锁的使用。CAS广泛应用于Java的原子类和并发包中,如AtomicInteger和ConcurrentHashMap,提升了并发性能。尽管CAS具有高性能、无死锁等优点,但也存在ABA问题、循环开销大及仅支持单变量原子操作等缺点。合理使用CAS,结合实际场景选择同步机制,能有效提升程序性能。
|
Java Android开发 Windows
用Jetpack Compose Desktop做一个推箱子小游戏,演示键盘事件绑定的方式
做Windows桌面游戏是少不了与键盘交互的,不过其实并非我们做Windows桌面应用才需要小游戏,如果要做安卓机顶盒APP,也是要监听键盘的,只不过那是遥控器的键盘,方式其实也是一样的。
469 0
用Jetpack Compose Desktop做一个推箱子小游戏,演示键盘事件绑定的方式
|
27天前
|
缓存 安全 Java
Java反射机制:动态操作类与对象
Java反射机制是运行时动态操作类与对象的强大工具,支持获取类信息、动态创建实例、调用方法、访问字段等。它在框架开发、依赖注入、动态代理等方面有广泛应用,但也存在性能开销和安全风险。本文详解反射核心API、实战案例及性能优化策略,助你掌握Java动态编程精髓。
|
27天前
|
Java 编译器 测试技术
Java注解(Annotation)与元编程实践
本文深入讲解Java注解的原理与实战应用,涵盖内置注解、自定义注解、编译期与运行期处理机制,以及在依赖注入、Web框架和数据验证中的实际应用,助你掌握元编程核心技能。
|
19天前
|
SQL 算法 Java
MySQL分库分表:应对海量数据的策略
本文深入解析MySQL分库分表策略与实战技巧,涵盖分片键选择、算法对比、数据迁移、全局ID生成及跨分片查询处理等内容,助你构建可扩展的海量数据架构,提升系统性能与可用性。
|
26天前
|
SQL Java 数据库连接
Java IO流(一):字节流与字符流基础
本文全面解析Java IO流,涵盖字节流、字符流及其使用场景,帮助开发者理解IO流分类与用途,掌握文件读写、编码转换、异常处理等核心技术,通过实战案例提升IO编程能力。
|
2月前
|
传感器 算法 数据挖掘
Python时间序列平滑技术完全指南:6种主流方法原理与实战应用
时间序列数据分析中,噪声干扰普遍存在,影响趋势提取。本文系统解析六种常用平滑技术——移动平均、EMA、Savitzky-Golay滤波器、LOESS回归、高斯滤波与卡尔曼滤波,从原理、参数配置、适用场景及优缺点多角度对比,并引入RPR指标量化平滑效果,助力方法选择与优化。
442 0
|
7月前
|
JSON API 数据格式
阿里巴巴商品详情接口(阿里巴巴 API 系列)
在电商开发中,获取阿里巴巴商品详情信息对数据分析、竞品研究等至关重要。通过调用其商品详情接口,开发者可获取标题、价格、图片、描述等数据,满足多种业务需求。接口采用HTTPS协议,支持GET/POST请求,返回JSON格式数据。示例代码展示了如何使用Python的requests库进行接口请求,需传递商品ID和访问令牌。实际应用时,请依据官方文档调整参数并确保安全性。
249 10
|
11月前
|
XML 前端开发 Java
Spring,SpringBoot和SpringMVC的关系以及区别 —— 超准确,可当面试题!!!也可供零基础学习
本文阐述了Spring、Spring Boot和Spring MVC的关系与区别,指出Spring是一个轻量级、一站式、模块化的应用程序开发框架,Spring MVC是Spring的一个子框架,专注于Web应用和网络接口开发,而Spring Boot则是对Spring的封装,用于简化Spring应用的开发。
2201 0
Spring,SpringBoot和SpringMVC的关系以及区别 —— 超准确,可当面试题!!!也可供零基础学习
|
存储 人工智能
深度解析RAG大模型知识冲突,清华西湖大学港中文联合发布
【7月更文挑战第27天】清华大学、西湖大学与香港中文大学联合发布的论文深入探讨了RAG(Retrieval-Augmented Generation)大模型在处理信息时遇到的知识冲突问题及其解决方案。RAG模型通过结合预训练语言模型与外部知识库生成准确内容,但会面临上下文记忆、上下文间及内部记忆冲突。研究提出了基于上下文感知的记忆管理、多上下文推理及知识选择权衡等方法来缓解这些问题。尽管取得了进展,但在计算资源需求、解决方案效果验证及模型鲁棒性等方面仍有挑战待克服。[论文](https://arxiv.org/abs/2403.08319)
407 3

热门文章

最新文章