一. Stream的特性
Stream是Java 8新增的接口,Stream可以认为是一个高级版本的 Iterator。它代表着数据流,流中的数据元素的数量可以是有限的,也可以是无限的。
Stream跟Iterator的差别是
- 无存储:Stream是基于数据源的对象,它本身不存储数据元素,而是通过管道将数据源的元素传递给操作。
- 函数式编程:对Stream的任何修改都不会修改背后的数据源,比如对Stream执行filter操作并不会删除被过滤的元素,而是会产生一个不包含被过滤元素的新的Stream。
- 延迟执行:Stream的操作由零个或多个中间操作(intermediate operation)和一个结束操作(terminal operation)两部分组成。只有执行了结束操作,Stream定义的中间操作才会依次执行,这就是Stream的延迟特性。
- 可消费性:Stream只能被“消费”一次,一旦遍历过就会失效。就像容器的迭代器那样,想要再次遍历必须重新生成一个新的Stream。
二. Java 8新增的函数式接口
Stream的操作是建立在函数式接口的组合之上的。Java8中新增的函数式接口都在java.util.function包下。这些函数式接口可以有多种分类方式。
Java 8函数式接口的分类.png
Java 8函数式接口第二种分类.png
2.1 Function
Function是从T到R的一元映射函数。将参数T传递给一个函数,返回R。即R = Function(T)
@FunctionalInterface public interface Function<T, R> { /** * Applies this function to the given argument. * * @param t the function argument * @return the function result */ R apply(T t); /** * Returns a composed function that first applies the {@code before} * function to its input, and then applies this function to the result. * If evaluation of either function throws an exception, it is relayed to * the caller of the composed function. * * @param <V> the type of input to the {@code before} function, and to the * composed function * @param before the function to apply before this function is applied * @return a composed function that first applies the {@code before} * function and then applies this function * @throws NullPointerException if before is null * * @see #andThen(Function) */ default <V> Function<V, R> compose(Function<? super V, ? extends T> before) { Objects.requireNonNull(before); return (V v) -> apply(before.apply(v)); } /** * Returns a composed function that first applies this function to * its input, and then applies the {@code after} function to the result. * If evaluation of either function throws an exception, it is relayed to * the caller of the composed function. * * @param <V> the type of output of the {@code after} function, and of the * composed function * @param after the function to apply after this function is applied * @return a composed function that first applies this function and then * applies the {@code after} function * @throws NullPointerException if after is null * * @see #compose(Function) */ default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) { Objects.requireNonNull(after); return (T t) -> after.apply(apply(t)); } /** * Returns a function that always returns its input argument. * * @param <T> the type of the input and output objects to the function * @return a function that always returns its input argument */ static <T> Function<T, T> identity() { return t -> t; } }
Function默认实现了3个default方法,分别是compose、andThen和identity。
方法名 | 对应函数 | 描述 |
compose | V=Function(ParamFunction(T)) | 它体现了嵌套关系 |
andThen | V= ParamFunction(Function(T)) | 转换了嵌套的顺序 |
identity | Function(T)=T | 传递自身的函数调用 |
compose和andThen对于两个函数f和g来说,f.compose(g)等价于g.andThen(f)。
2.2 Predicate
Predicate是一个谓词函数,主要作为一个谓词演算推导真假值存在,返回布尔值的函数。Predicate等价于一个Function的boolean型返回值的子集。
@FunctionalInterface public interface Predicate<T> { /** * Evaluates this predicate on the given argument. * * @param t the input argument * @return {@code true} if the input argument matches the predicate, * otherwise {@code false} */ boolean test(T t); /** * Returns a composed predicate that represents a short-circuiting logical * AND of this predicate and another. When evaluating the composed * predicate, if this predicate is {@code false}, then the {@code other} * predicate is not evaluated. * * <p>Any exceptions thrown during evaluation of either predicate are relayed * to the caller; if evaluation of this predicate throws an exception, the * {@code other} predicate will not be evaluated. * * @param other a predicate that will be logically-ANDed with this * predicate * @return a composed predicate that represents the short-circuiting logical * AND of this predicate and the {@code other} predicate * @throws NullPointerException if other is null */ default Predicate<T> and(Predicate<? super T> other) { Objects.requireNonNull(other); return (t) -> test(t) && other.test(t); } /** * Returns a predicate that represents the logical negation of this * predicate. * * @return a predicate that represents the logical negation of this * predicate */ default Predicate<T> negate() { return (t) -> !test(t); } /** * Returns a composed predicate that represents a short-circuiting logical * OR of this predicate and another. When evaluating the composed * predicate, if this predicate is {@code true}, then the {@code other} * predicate is not evaluated. * * <p>Any exceptions thrown during evaluation of either predicate are relayed * to the caller; if evaluation of this predicate throws an exception, the * {@code other} predicate will not be evaluated. * * @param other a predicate that will be logically-ORed with this * predicate * @return a composed predicate that represents the short-circuiting logical * OR of this predicate and the {@code other} predicate * @throws NullPointerException if other is null */ default Predicate<T> or(Predicate<? super T> other) { Objects.requireNonNull(other); return (t) -> test(t) || other.test(t); } /** * Returns a predicate that tests if two arguments are equal according * to {@link Objects#equals(Object, Object)}. * * @param <T> the type of arguments to the predicate * @param targetRef the object reference with which to compare for equality, * which may be {@code null} * @return a predicate that tests if two arguments are equal according * to {@link Objects#equals(Object, Object)} */ static <T> Predicate<T> isEqual(Object targetRef) { return (null == targetRef) ? Objects::isNull : object -> targetRef.equals(object); } }
Predicate的默认方法是and、negate、or。
2.3 Consumer
Consumer是从T到void的一元函数,接受一个入参但不返回任何结果的操作。
@FunctionalInterface public interface Consumer<T> { /** * Performs this operation on the given argument. * * @param t the input argument */ void accept(T t); /** * Returns a composed {@code Consumer} that performs, in sequence, this * operation followed by the {@code after} operation. If performing either * operation throws an exception, it is relayed to the caller of the * composed operation. If performing this operation throws an exception, * the {@code after} operation will not be performed. * * @param after the operation to perform after this operation * @return a composed {@code Consumer} that performs in sequence this * operation followed by the {@code after} operation * @throws NullPointerException if {@code after} is null */ default Consumer<T> andThen(Consumer<? super T> after) { Objects.requireNonNull(after); return (T t) -> { accept(t); after.accept(t); }; } }
Consumer的默认方法是andThen。
2.4 Supplier
Supplier是表示结果的供应者。
@FunctionalInterface public interface Supplier<T> { /** * Gets a result. * * @return a result */ T get(); }
Supplier的用法:
Supplier<String> supplier = new Supplier<String>() { @Override public String get() { return "hello suppiler"; } }; System.out.println(supplier.get());
或者:
Supplier<User> userSupplier = User::new; userSupplier.get(); // new User
Java 8新增了CompletableFuture,它的很多方法的入参都用到了Supplier。
三. Stream用法
Stream操作.png
3.1 Stream的创建
Java 8有多种方式来创建Stream:
- 通过集合的stream()方法或者parallelStream()
- 使用流的静态方法,比如Stream.of(Object[]), IntStream.range(int, int) 或者 Stream.iterate(Object, UnaryOperator)。
- 通过Arrays.stream(Object[])方法。
- BufferedReader.lines()从文件中获得行的流。
- Files类的操作路径的方法,如list、find、walk等。
- 随机数流Random.ints()。
- 其它一些类提供了创建流的方法,如BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence), 和 JarFile.stream()。
其实最终都是依赖底层的StreamSupport类来完成Stream创建。
3.2 中间操作
中间操作又可以分为无状态的(Stateless)和有状态的(Stateful),无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果。
Stream的中间操作只是一种标记,只有执行了结束操作才会触发实际计算。
熟悉RxJava、Scala的同学可以看到,Stream中间操作的各个方法在RxJava、Scala中都可以找到熟悉的身影。
3.3 结束操作
3.3.1 短路操作
短路操作是指不用处理全部元素就可以返回结果。短路操作必须一个元素处理一次。
3.3.1 非短路操作
非短路操作可以批量处理数据,但是需要处理完全部元素才会返回结果。
四. 并行流
在创建Stream时,默认是创建串行流。但是可以使用parallelStream()来创建并行流或者parallel()将串行流转换成并行流。并行流也可以通过sequential()转换成串行流。
Java 8 Stream的并行流,本质上还是使用Fork/Join模型。
五. 总结
在Java开发中,如果使用了Java 8,那么强烈建议使用Stream。因为Stream的每个操作都可以依赖Lambda表达式,它是一种声明式的数据处理方式,并且Stream提高了数据处理效率和开发效率。