java8 Stream Pipelines 浅析

简介: 相信现在很多人都已经使用过java8提供的java.util.stream编程接口,下面我们就试着去了解Stream API

Stream的使用


首先我们看一下stream的基本使用方法:


ArrayList<String> list = Lists.newArrayList("America", "ABC", "CNN", "OK", "ASYNC");
        List<String> strings = list.stream().filter(e -> e.startsWith("A")).map(e -> e + " nice").collect(Collectors.toList());

最终我们会得到ArrayList中以A开头的字母加上“nice”的字符串List,如果放在jdk7里我们会这样写:


ArrayList<String> strings = Lists.newArrayList();
        for (String s : list) {
            if(s.startsWith("A")){
                String newStr = s + "nice";
                strings.add(newStr);
            }
        }

我试着去看源代码,发现Stream实质上就是这样执行我们的需求的。下面就说说我看到了什么。


Stream相关类的介绍


打开java.util.stream包,可以看到核心接口Stream类,顾名思义就是流水的意思,官方文档原话说的是

A sequence of elements supporting sequential and parallel aggregate operations.


image.png

Stream就是一个支持串行和并行的聚集操作的一系列元素。

定义了一些中间操作(Intermediate operations)结束操作(Terminal operations)

中间操作包括无状态(Stateless)操作比如:filter, map, flatMap等,有状态(Stateful)操作比如:distinct, sorted, limit等;

结束操作(Terminal operations)包括非短路操作(short-circuiting)比如:forEach, reduce, collect等和短路操作如:findFirst, findAny;

中间操作不是真正的操作而是一种操作的描述,只有执行到结束操作才会触发实际计算,在结束操作执行之前只是把中间操作记录了下来。无状态中间操作指元素的操作不受其他元素的影响,比如以某一Predicate去filter元素,元素和元素之前不互相影响。而有状态中间操作指的是元素和元素之间是有关联的,比如sorted,只有读取所有元素之后才能确定排序结果。

短路结束操作指的是不用处理所有元素才能返回结果,比如findFirst,只要找到第一个符合条件的元素即可返回结果。非短路结束操作则必须处理完所有元素才能返回结果。

Stream继承了BaseStream,定义了一些Stream的基本操作。


Pipeline记录操作


以上所说的操作需要被按顺序记录下来,这里就需要管道流水线Pipeline的概念来实现。

管道有一个基类PipelineHelper,他是执行Stream管道的一个helper,将Stream的所有信息收集到一个地方。

上面所说的操作其实都定义在PipelineHelper的一个子类ReferencePipeline中,包括Head(Source stage of a ReferencePipeline)StatelessOp(Base class for a stateless intermediate stage of a Stream.)StatefulOp(Base class for a stateful intermediate stage of a Stream.)静态内部类。

ReferencePipeline是描述中间操作管道流和源管道流的一个类,同时也实现了Stream接口


image.png


在Stream中使用stage(阶段)来描述一个完整的操作,而HeadStatelessOpStatefulOp这三个操作都是实例化的PipelineHelper,也就是stage。可以把stage理解为带管道的流(Stream with Pipeline)



image.png


在本文一开始的例子中,我们分析一下有几个stage,下图:


image.png


每一步Stream的方法调用都产生一个新的stage,在随后的分析中会发现,这些stage会以双向链表的方式链接,而每个stage都记录了每一个阶段的操作,这样我们就可以依赖这种数据结构来保存对数据源的所有操作了。


链接stage


stage的链接靠Sink来实现,我们先看一下Sink的接口,我们这里只看ChainedReference


image.png


ChainedReference包括:

  • begin:在遍历元素前调用,做好遍历准备
  • accept:遍历每个元素的时候调用,包含每个stage的操作和回掉函数
  • end:遍历结束后调用
  • cancellationRequested:是否能够尽早结束遍历,用于短路操作

每个stage都把操作实现在Sink里,上游stage调用下游stageaccept方法,达到按顺序执行每个操作的目的。


stage的自动执行


直接上代码

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }
                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

上面代码是Streamfilter方法,fiter是一个无状态操作,返回一个新的stage,还实现了AbstractPipeline.opWrapSink来返回stage实现的sink。这里filter的参数是一个predicate,在predicate.test返回true时调用下游的stage的sink的accept方法,这样整个操作流就连续执行下去了。


stage的双向链接


在说Stream自动执行之前,有必要说一说每个stage是怎么链接起来的。Stream在操作时产生的Operation类是如何用双向链表的结构来前后链接的?

在上面Stream.filter的源代码可以看到,filter返回了一个StatelessOp对象,构造函数接受了当前对象this为第一个参数,然后来看StatelessOp的代码:

abstract static class StatelessOp<E_IN, E_OUT>
            extends ReferencePipeline<E_IN, E_OUT> {
        /**
         * Construct a new Stream by appending a stateless intermediate
         * operation to an existing stream.
         *
         * @param upstream The upstream pipeline stage
         * @param inputShape The stream shape for the upstream pipeline stage
         * @param opFlags Operation flags for the new stage
         */
        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
                    StreamShape inputShape,
                    int opFlags) {
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }
        @Override
        final boolean opIsStateful() {
            return false;
        }
    }

可以看到StatelessOp实现了ReferencePipeline接口,在构造函数里调用了super(upstream, opFlags),而这个upstream(上游流)参数就是上面传入的this,下游流StatelessOpupstream就指向this了,这样就通过下游流的upstream链接上游流。目前每个操作之间还只是单链表。

那有人就会想了,下游流保存了上游流的引用,那上游流是怎么保存下游流的引用呢?这就要看最后的结束操作了,我们来看Stream.collect代码:

public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        A container;
        if (isParallel()
                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
            container = collector.supplier().get();
            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
            forEach(u -> accumulator.accept(container, u));
        }
        else {
            container = evaluate(ReduceOps.makeRef(collector));
        }
        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);
    }

这里我们只看串行操作的分支。filter返回了一个结束操作的计算结果。我们来看evaluate方法:

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

AbstractPipeline.evaluate方法接收了一个结束操作对象,我们只看串行操作:

public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                           Spliterator<P_IN> spliterator) {
            return helper.wrapAndCopyInto(makeSink(), spliterator).get();
        }

继续看AbstractPipeline.wrapAndCopyInto

@Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }
    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

AbstractPipeline.wrapAndCopyInto接收了结束操作的sink,继续看AbstractPipeline.wrapSink:

@Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

从结束操作的sink开始,一层一层包装sink,最后第一个中间操作的sink在最外层,在每个操作的opWrapSink方法里返回的sink都维护了一个downstream指向后一个操作,这样,双向链表的结构就完成了。这样,我们在copyInto方法里调用beginacceptend的时候就会通过downstream一层一层的调用下去,最终在结束操作执行实际计算。


结束

Stream的基本原理就分析到这里,希望大家和我一起讨论学习。希望看不明白的同学可以向我提问,看过源码的同学欢迎指出错误!大家一起学习!

相关文章
|
20天前
|
安全 Java API
告别繁琐编码,拥抱Java 8新特性:Stream API与Optional类助你高效编程,成就卓越开发者!
【8月更文挑战第29天】Java 8为开发者引入了多项新特性,其中Stream API和Optional类尤其值得关注。Stream API对集合操作进行了高级抽象,支持声明式的数据处理,避免了显式循环代码的编写;而Optional类则作为非空值的容器,有效减少了空指针异常的风险。通过几个实战示例,我们展示了如何利用Stream API进行过滤与转换操作,以及如何借助Optional类安全地处理可能为null的数据,从而使代码更加简洁和健壮。
52 0
|
6天前
|
Java API C++
Java 8 Stream Api 中的 peek 操作
本文介绍了Java中`Stream`的`peek`操作,该操作通过`Consumer&lt;T&gt;`函数消费流中的每个元素,但不改变元素类型。文章详细解释了`Consumer&lt;T&gt;`接口及其使用场景,并通过示例代码展示了`peek`操作的应用。此外,还对比了`peek`与`map`的区别,帮助读者更好地理解这两种操作的不同用途。作者为码农小胖哥,原文发布于稀土掘金。
Java 8 Stream Api 中的 peek 操作
|
6天前
|
Java C# Swift
Java Stream中peek和map不为人知的秘密
本文通过一个Java Stream中的示例,探讨了`peek`方法在流式处理中的应用及其潜在问题。首先介绍了`peek`的基本定义与使用,并通过代码展示了其如何在流中对每个元素进行操作而不返回结果。接着讨论了`peek`作为中间操作的懒执行特性,强调了如果没有终端操作则不会执行的问题。文章指出,在某些情况下使用`peek`可能比`map`更简洁,但也需注意其懒执行带来的影响。
Java Stream中peek和map不为人知的秘密
|
18天前
|
Java API
Java 8新特性:Lambda表达式与Stream API的深度解析
【7月更文挑战第61天】本文将深入探讨Java 8中的两个重要特性:Lambda表达式和Stream API。我们将首先介绍Lambda表达式的基本概念和语法,然后详细解析Stream API的使用和优势。最后,我们将通过实例代码演示如何结合使用Lambda表达式和Stream API,以提高Java编程的效率和可读性。
|
16天前
|
Java
盘点java8 stream中隐藏的函数式接口
`shigen`是一位坚持更新文章的博客作者,记录成长历程,分享认知见解,留住感动瞬间。本文介绍了函数式接口的概念及其在Java中的应用,包括`Comparator`、`Runnable`、`Callable`等常见接口,并详细讲解了`Function`、`Predicate`、`Consumer`、`Supplier`和`Comparator`等函数式接口的使用方法及应用场景,展示了如何利用这些接口简化代码并提高编程效率。**个人IP:shigen**,与shigen一起,每天进步一点点!
28 0
盘点java8 stream中隐藏的函数式接口
|
1月前
|
Java API 开发者
|
1月前
|
存储 算法 Oracle
19 Java8概述(Java8概述+lambda表达式+函数式接口+方法引用+Stream+新时间API)
19 Java8概述(Java8概述+lambda表达式+函数式接口+方法引用+Stream+新时间API)
55 8
|
1月前
|
自然语言处理 Java API
"告别Java8 Stream噩梦,JDFrame神器来袭!让你的代码简洁如诗,效率翻倍,编程新体验等你尝鲜!"
【8月更文挑战第11天】Java 8的Stream API以强大的函数式编程能力革新了集合数据处理方式,但其抽象概念和复杂的链式调用让不少开发者望而却步。为此,JDFrame框架应运而生,通过直观易懂的操作符简化Stream使用,减少代码量并提高效率。
41 3
|
1月前
|
数据可视化 IDE Java
Java8的Stream流太难用了?看看JDFrame带来的革新体验
【8月更文挑战第6天】在Java开发者的日常工作中,Java 8引入的Stream API无疑是一个革命性的特性,它极大地简化了集合(Collection)的处理方式,使得数据操作更加声明式、函数式。然而,对于初学者或是从早期Java版本迁移过来的开发者而言,Stream API的复杂性和抽象性可能会成为一道门槛。今天,我们就来探讨如何通过JDFrame这样的工具或框架,以及掌握一些高效学习策略,让Java Stream的使用变得更加得心应手。
100 5
|
1月前
|
存储 Java API
探索Java中的Stream API: 提升数据处理的效率与优雅
在Java的海洋中,Stream API如同一股清流,为数据处理注入了新的活力。本文将深入探讨Stream API的核心概念、操作以及它如何改变我们编写和理解代码的方式。通过实际案例,我们将揭示这一现代编程范式如何简化集合处理,提高代码的可读性与性能。