46 优先选择Stream中无副作用的函数
Stream最重要的是把将计算结构构造成一系列变型,其中每个阶段的结果尽可能接近前一阶段结果的纯函数(pure function)。纯函数的结果仅取决于其输入的函数:它不依赖于任何可变状态,也不更新任何状态。为了实现这一点,Stream操作的任何中间操作和终结操作都应该是没有副作用的。
如下代码,将单词出现的频率打印出来:
Map<String, Long> freq = new HashMap<>(); try (Stream<String> words = new Scanner(file).tokens()) { words.forEach(word -> { freq.merge(word.toLowerCase(), 1L, Long::sum); }); }
实际上这根本不是Stream代码,只不过是伪装成Stream的迭代代码。可读性也差,forEach里面逻辑太多了,正确的应该是这么写:
Map<String, Long> freq; try (Stream<String> words = new Scanner(file).tokens()) { freq = words .collect(groupingBy(String::toLowerCase, counting())); }
所以forEach 操作应仅用于报告Stream计算的结果,而不是进行计算
对于初学者来说,可以忽略Collector接口,将其看做是黑盒对象即可,这个黑盒可以将Stream的元素合并到单个集合里。
有三个这样的Collector:toList()、toSet() 和 toCollection(collectionFactory)。基于此,我们可以从频率表中提取排名前10的单词列表:
List<String> topTen = freq.keySet().stream() .sorted(comparing(freq::get).reversed()) .limit(10) .collect(toList());
注意上述代码用的是.collect(toList()),而不是.collect(Collectors.toList()),这是因为静态导入了Collectors所有成员,也是一种提高代码可读性的手段。
接下来介绍Collector中比较重要的三个方法:
1. toMap(keyMapper、valueMapper)
它接受两个函数:一个将Stream元素映射到键,另一个将它映射到值。例如下面将枚举的字符串形式映射到枚举本身:
private static final Map<String, Operation> stringToEnum = Stream.of(values()).collect(toMap(Object::toString, e -> e));
还有带三个参数的toMap,假设有一个Stream代表不同艺术家(artists)的专辑(albums),可以得到每个歌唱家最畅销的那一张专辑,用map来存储:
Map<Artist, Album> topHits = albums.collect(toMap(Album::artist, a->a, maxBy(comparing(Album::sales))));
比较器使用静态工厂方法maxBy,它是从BinaryOperator import进来的。此方法将Comparator<T> 转换为BinaryOperator<T>,用于计算指定比较器产生的最大值。
对于`toMap`,阿里巴巴开发规约也专门做了要求:
2. groupingBy
该方法返回Collector,基于分类函数(classifier function)将元素分类,返回值是一个map,value是存储了每个类别的所有元素的List
words.collect(groupingBy(word -> alphabetize(word)))
上面代码就返回的是collect,key是alphabetize(word),value是word列表
还有传入两个参数的groupingBy,传入counting()作为下游收集器,这样会生成一个映射,将每个类别与该类别中的元素数量关联起来
Map<String, Long> freq = words.collect(groupingBy(String::toLowerCase, counting()));
3. groupingByConcurrent
该方法提供了提供了groupingBy所有三种重载的变体,支持并发安全性,最终返回的也是ConcurrentHashMap实例
4. joining
它仅对 CharSequence 实例(如字符串)的Stream进行操作。可以传入一个参数CharSequence delimiter作为分界符。如果传入一个逗号作为分隔符,Collector就会返回一个用逗号隔开的字符串
47 Stream要优先用Collection作为返回类型
先说结论:
在编写会返回一系列元素的方法时,某些程序员可能希望将它们作为 Stream 处理,其他程序员可能希望使用迭代方式(Iterable)。
如何做到兼顾呢?
如果可以返回集合,就返回集合
如果集合中已经有元素,或者元素数量不多,就返回一个标准集合,比如 ArrayList
否则,就需要自定义集合,如下文将提到的幂集
如果不能返回集合,则返回Stream或Iterable
如果想要用for-each循环遍历返回序列的话,必须将方法引用转换成合适的Iterable类型:
for (ProcessHandle ph : (Iterable<ProcessHandle>)ProcessHandle.allProcesses()::iterator)
但是上面的代码在实际使用时过于杂乱、不清晰。解决方案就是写一个适配器:
public static <E> Iterable<E> iterableOf(Stream<E> stream) { return stream::iterator; }
有了这个适配器,就可以使用 for-each 语句迭代任何Sream了:
for (ProcessHandle p : iterableOf(ProcessHandle.allProcesses())) { // Process the process }
想要利用Stream pipeline处理序列的程序员,如果API只提供了Iterable的话,我们需要手动将Iterable转Stream:
public static <E> Stream<E> streamOf(Iterable<E> iterable) { return StreamSupport.stream(iterable.spliterator(), false); }
Collection接口是Iterable的子类型,有一个stream方法,因此提供了迭代和stream访问。所以Collection或适当的子类型通常是公共序列返回方法的最佳返回类型。
如果返回的序列很大,可以考虑实现一个专用的集合。例如想要返回一个指定集合的幂集
例如{a,b,c} 的幂集为 {{},{a},{b},{c},{a,b},{a,c},{b,c},{a,b, c}}
技巧是,使用幂集中每个元素的索引作为位向量(bit vector),在索引中的第 n 位,表示源集合中是否存在第 n 位元素。
public class PowerSet { public static final <E> Collection<Set<E>> of(Set<E> s) { List<E> src = new ArrayList<>(s); if (src.size() > 30) throw new IllegalArgumentException("Set too big " + s); return new AbstractList<Set<E>>() { @Override public int size() { return 1 << src.size(); // 2 to the power srcSize } @Override public boolean contains(Object o) { return o instanceof Set && src.containsAll((Set)o); } @Override public Set<E> get(int index) { Set<E> result = new HashSet<>(); for (int i = 0; index != 0; i++, index >>= 1) if ((index & 1) == 1) result.add(src.get(i)); return result; } }; } }
为了在 AbstractCollection 上编写 Collection 实现,除了 Iterable 所需的方法之外,只需要实现两种方法:contains 和 size。
48 谨慎使用Stream并行
下面的代码逻辑是求所有2 ^ p - 1数字里为素数的数字
public static void main(String[] args) { primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE)) .filter(mersenne -> mersenne.isProbablePrime(50)) .limit(20) .forEach(System.out::println); } static Stream<BigInteger> primes() { return Stream.iterate(TWO, BigInteger::nextProbablePrime); }
TWO.pow(p.intValueExact()).subtract(ONE))表示2 ^ p - 1
mersenne.isProbablePrime(50)表示mersenne是否可能是素数,传入的数字表示可能性阈值
如果添加一个parallel()的话,原意是想要提速,但实际结果是根本不打印任何内容,CPU使用率却很高。
原因是:Stream不知道如何去并行这个pipeline。如果源头是Stream.iterate,或者使用了中间操作limit,并行则不太可能提高其性能。
默认的并行策略在处理limit的不可预知性时,每查找到一个素数时,所花费的时间都等于计算所有之前元素总和的时间,所以不要任意地并行Stream pipeline。
并行能带来性能收益的应用场景在于ArrayList、HashMap、HashSet 和 ConcurrentHashMap、数组、int 范围和long 范围。
这些数据结构有共同的特点:
1. 可以精确、轻松地分割成任意大小的子范围
Stream类库里用来执行分工任务的是spliterator,由 spliterator 方法在Stream 和Iterable 上返回。
2. 在顺序处理时提供了较好的引用局部性(localityof reference)
时间局部性是指,被引用一次的储存器位置,在接下来的时间会经常被引用
空间局部性是指,被引用一次的储存器位置,在接下来的时间,他旁边的储存位置也会被引用
引用局部性非常重要:没有它,线程会出现闲置,需要等待数据从内存转移到处理器的缓存中。
并行pipeline效率会受限的场景: 终止操作里做了复杂的运算
并行的最佳终止操作是用Stream的reduce方法,将所有pipeline产生的元素合并到一起,或者事先打包min、max、count和sum这类方法。短路操作anyMatch、allMatch 和 noneMatch 也可以支持并行。由Stream的collect 方法执行的操作,不适合并行性,因为组合集合的开销非常大。
如果是编写自己的Stream、Iterable 或Collection,并且希望获得良好的并行性能,则必须重写 spliterator 方法并广泛测试性能。编写高质量的 spliterator 很困难,不建议这么做!
并行Stream不仅可能降低性能,它会导致不正确的结果和不可预知的行为
对于一个高效的可拆分的源Stream、一个可并行化或简单的终止操作、互不干扰干扰的函数对象,由于他们本身就性能很好地,所以也无法从并行化中获得良好的加速效果。
切记: 并行化Stream是严格的性能优化,必须在更改前后进行测试性能。
通常,程序中的所有并行Stream pipeline都在公共fork-join池中运行。只要有一个pipeline不正常,都会损害到系统里其他不相关部分的性能。
当然,在某些条件下,给Stream pipeline添加parallel调用,确实可以基于CPU核实现性能倍增,典型的就是机器学习和数据处理领域。
比如下面这个数学计算的代码就能通过并行来提速:
static long pi(long n) { return LongStream.rangeClosed(2, n) .parallel() .mapToObj(BigInteger::valueOf) .filter(i -> i.isProbablePrime(50)) .count(); }