Java8 Lambda实现源码解析

简介: Java8的lambda应该大家都比较熟悉了,本文主要从源码层面探讨一下lambda的设计和实现。


基础示例与解析


先看下面的示例代码:

  static class A {
        @Getter
        private String a;

        @Getter
        private Integer b;

        public A(String a, Integer b) {
            this.a = a;
            this.b = b;
        }
    }

    public static void main(String[] args) {
        List<Integer> ret = Lists.newArrayList(new A("a", 1), new A("b", 2), new A("c", 3)).stream()
            .map(A::getB)
            .filter(b -> b >= 2)
            .collect(Collectors.toList());
        System.out.println(ret);
    }

上面代码中,其实主要就是几步:


  1. ArrayList.stream
  2. .map
  3. .filter
  4. .collect

一步步来看,ArrayList.stream 实际调用的是Collector.stream方法:


    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }


spliterator()方法生成的是 IteratorSpliterator 对象,spliterator的意思就是可以split的iterator,这个主要是用于lambda中的parallelStream中的并行操作,上面的例子中由于调用的是stream,所以parallel=false。


StreamSupport.stream最后生成的是一个ReferencePipeline.Head对象:

   public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

Head类是从ReferencePipeline派生的,表示lambda的pipeline中的头节点。


有了这个Head对象之后,在它之上调用.map,实际上就是调用了基类ReferencePipeline.map方法:


    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

返回的是一个StatelessOp,表示一个无状态的算子,这个类也是ReferencePipeline的子类,可以看到它的构造函数,第一个参数this,表示把Head对象作为StatelessOp对象的upstream,也就是它的上游。StatelessOp.opWrapSink方法先不讲,后面会讲到。


接着调用StatelessOp.filter方法,也还是会回到ReferencePipeline.filter方法:

 public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

可以看到,仍然生成的是一个StatelessOp对象,只是它的upstream变了而已。


最后调用StatelessOp.collect,继续回到ReferencePipeline.collect方法:

 public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        A container;
        if (isParallel()
                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
            container = collector.supplier().get();
            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
            forEach(u -> accumulator.accept(container, u));
        }
        else {
            container = evaluate(ReduceOps.makeRef(collector));
        }
        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);
    }

在前面几步,.map, .filter方法其实都只是创建StatelessOp对象,但是到collect就不一样了,了解spark/flink的就知道,collect其实是个action/sink,调用了collect,就会真实地触发这个stream上各个operator的执行。这也就是我们经常听到的lazy execution,所有的操作,只有碰到action的算子才会开始执行。


之前讲到这个stream的parallel=false,所以上面的实际执行逻辑是:


A container = evaluate(ReduceOps.makeRef(collector));
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);
    }

在进入evaluate方法之前,先看一下ReduceOps.makeRef(collector),它实际上就是基于Collectors.toList生成的CollectorImpl实例包装了一层,返回了一个 TerminalOp对象(实际是ReduceOp)。

 public static <T, I> TerminalOp<T, I>
    makeRef(Collector<? super T, I, ?> collector) {
        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
        BiConsumer<I, ? super T> accumulator = collector.accumulator();
        BinaryOperator<I> combiner = collector.combiner();
        class ReducingSink extends Box<I>
                implements AccumulatingSink<T, I, ReducingSink> {
            @Override
            public void begin(long size) {
                state = supplier.get();
            }

            @Override
            public void accept(T t) {
                accumulator.accept(state, t);
            }

            @Override
            public void combine(ReducingSink other) {
                state = combiner.apply(state, other.state);
            }
        }
        return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }

            @Override
            public int getOpFlags() {
                return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                       ? StreamOpFlag.NOT_ORDERED
                       : 0;
            }
        };
    }

上面代码可以看到,基本也就是直接调用了collector的实现,稍微需要注意的是,ReducingSink从Box派生,Box的意思就是盒子,它里面有个state成员,表示一个计算的状态。ReducingSink就是通过这个state,进行combine, accumulate操作(实际就是一个List)。


回到evaluate方法,它实际调用了:


terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

这里this就是最后阶段的ReferencePipeline,即StatelessOp,这里我们称它为 ReferencePipeline$2,即经过两个算子操作的pipeline。


sourceSpliterator 则会取到sourceStage的spliterator,即最上面Head的spliterator。


ReduceOp.evaluateSequential:

   public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                           Spliterator<P_IN> spliterator) {
            return helper.wrapAndCopyInto(makeSink(), spliterator).get();
        }

helper即ReferencePipeline$2,这里makeSink即上面返回的ReducingSink重载的方法。


ReferencePipeline.wrapAndCopyInto,在其父类AbstractPipeline中实现:

    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;

wrapSink代码:

 final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);

        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

可以看到,这里就是将pipeline从后至前,分别调用每个pipeline的opWrapSink方法,就是一个责任链的模式。opWrapSink可以看上面map的opWrapSink的filter的opWrapSink实现,map的很简单,直接调用mapper.apply,实际上就是A::getB方法,filter的也很简单,调用的是 predicate.test 方法。


接下来到copyInto方法,到这里才会有真正的执行逻辑:

  final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

它会走入到这部分的逻辑中:


wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();

这里面最重要的是就是中间这行了,由于spliterator持有的Collection引用,是ArrayList,因此它会调用ArrayList.forEachRemaining方法:


public void forEachRemaining(Consumer<? super E> action) {
    // ...
    if ((i = index) >= 0 && (index = hi) <= a.length) {
       for (; i < hi; ++i) {
           @SuppressWarnings("unchecked") E e = (E) a[i];
           action.accept(e);
       }
       if (lst.modCount == mc)
           return;
   }
    // ...

这里的action参数,就是上面经过责任链封装的Sink(它也是Consumer的子类)。
而这里调用action.accept,就会通过责任链来一层层调用每个算子的accept,我们从map的accept开始:


@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
    return new Sink.ChainedReference<P_OUT, R>(sink) {
        @Override
        public void accept(P_OUT u) {
            downstream.accept(mapper.apply(u));
        }
    };
}

可以看到,它先调用mapper.apply,然后把结果直接传给downstream.accept,也就是调用filter的accept,接着来到ReducingSink.accept,也就是往state中添加一个结果元素,这样forEach执行完之后,结果自然就有了。


看完上面的流程,接下来看一下lambda里面部分类设计,首先来看一下Stream,它的基类是BaseStream,提供以下接口:


public interface BaseStream<T, S extends BaseStream<T, S>>
        extends AutoCloseable {
    /**
     * 返回stream中元素的迭代器
        */
    Iterator<T> iterator();

    /**
     * 返回stream中元素的spliterator,用于并行执行
     */
    Spliterator<T> spliterator();

    /**
     * 是否并行
     */
    boolean isParallel();

    /**
     * 返回串行的stream,即强制parallel=false
     */
    S sequential();

    /**
     * 返回并行的stream,即强制parallel=true
     */
    S parallel();

    // ...
}

直接继承此接口的,是如IntStream, LongStream,DoubleStream等,这些是在BaseStream基础上,提供了filter, map, mapToObj, distinct等算子的接口,但是这些算子,是限定类型的,如IntStream.filter, 它接受的就是 IntPredicate, 而不是常规的Predicate;map方法也是,接受的是 IntUnaryOperator。


IntStream, LongStream这些都是接口,也就是仅仅用来描述算子的。它们的实现都是基于Pipeline的,基类为 AbstractPipeline,它的几个关键成员变量:

 /**
      * 最顶上的pipeline,即Head
      */
    private final AbstractPipeline sourceStage;

    /**
     * 直接上游pipeline
     */
    private final AbstractPipeline previousStage;

    /**
     * 直接下游pipeline
     */
    @SuppressWarnings("rawtypes")
    private AbstractPipeline nextStage;

    /**
     * pipeline深度
     */
    private int depth;
    
    /**
     * head的spliterator
     */
    private Spliterator<?> sourceSpliterator;

     // ...

这个基类还提供了pipeline的基础实现,以及对BaseStream和PipelineHelper接口的实现,如evaluate, sourceStageSpliterator, wrapAndCopyInto, wrapSink等。


类似地,从AbstractPipeline派生的子类有:IntPipeline, LongPipeline, DoublePipeline, ReferencePipeline等。前面三种比较容易理解,提供的是基于原始类型的lambda操作(且都实现了对应的XXStream接口),而ReferencePipeline提供的是基于对象的lambda操作。


类层次如下:

image.png

注意这些子类,也都是abstract的,每一种pipeline下面,都有Head, StatelessOp, StatefulOp三个子类。分别用于描述pipeline的头节点,无状态中间算子,有状态中间算子。


Head是非抽象类,StatelessOp也是抽象类,它在map、filter、mapToObj等算子中,会动态创建它的匿名子类,并实现opWrapSink方法。


通过这种设计,除了collect之外,所有算子的返回结果都是Stream的子类,在IntPipeline中,map, flatMap, filter等都返回IntStream,即使它们的实现可能是StatelessOp, Head等,都对外提供了统一的接口。同时由于lambda中每个算子的实现是动态的,如最上面例子中A::getB, b -> b>=2等,那就通过每个算子重载 opWrapSink 方法来动态封装这些逻辑。


同时,通过将XXStream和XXPipeline分开的设计,可以保持Stream接口的简洁(对用户透出的接口)。否则如果将BaseStream做成抽象类,将AbstractPipeline相关的逻辑移到这里面,会导致Stream变得非常臃肿,在API层面用户使用的时候也会很困惑。


创建Pipeline的地方,则统一收口到了StreamSupport类中,这是一个大的工厂类。虽然ArrayList, Arrays等类中都提供了stream的方法,但是最后都统一调用了StreamSupport里来创建Pipeline的实例,通常也就是创建 XXPipeline.Head对象,然后通过这个对象进行其他lambda算子的添加。


双流concat的场景示例及解析

接下来看一个相对比较复杂的例子,双流concat的场景,代码如下:

  static class Mapper1 implements IntUnaryOperator {

        @Override
        public int applyAsInt(int operand) {
            return operand * operand;
        }
    }
    
    static class Filter1 implements IntPredicate {

        @Override
        public boolean test(int value) {
            return value >= 2;
        }
    }

    static class Mapper2 implements IntUnaryOperator {

        @Override
        public int applyAsInt(int operand) {
            return operand + operand;
        }
    }

    static class Filter2 implements IntPredicate {

        @Override
        public boolean test(int value) {
            return value >= 10;
        }
    }
    
    static class Mapper3 implements IntUnaryOperator {

        @Override
        public int applyAsInt(int operand) {
            return operand * operand;
        }
    }

    static class Filter3 implements IntPredicate {

        @Override
        public boolean test(int value) {
            return value >= 10;
        }
    }

    public static void main(String[] args) {
        IntStream s1 = Arrays.stream(new int[] {1, 2, 3})
            .map(new Mapper1())
            .filter(new Filter1());

        IntStream s2 = Arrays.stream(new int[] {4, 5, 6})
            .map(new Mapper2())
            .filter(new Filter2());

        IntStream s3 = IntStream.concat(s1, s2)
            .map(new Mapper3())
            .filter(new Filter3());
        int sum = s3.sum();
    }

上面代码中,先分别创建两个IntStream:s1, s2。然后进行concat操作,生成s2,最后调用sum操作做reduce。


代码分析还是从sink开始,reduce跟前面的collect类似,实际会基于s3这个stream, 在AbstractPipeline.evaluate方法中执行:


terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

这里terminalOp即为sum这个ReduceOp,sourceSpliterator为Streams.ConcatSpliterator,也即调用s3这个pipeline的wrapAndCopyInto方法:

  final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }

这里的wrapSink,就会将s3中的算子与最后的reduce串在一起,大致如下:
Head(concated s1 + s2 stream) -> Mapper3 -> Filter3 -> ReduceOp(sum)


到目前为止,我们还只看到s3的逻辑,那么s1和s2两个stream的mapper和filter逻辑在哪里呢,接着看下面的copyInto方法:

final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
            // ...

上面讲到,这里的spliterator是Streams.ConcatSpliterator对象,看下Streams.ConcatSpliterator.forEachRemaining实现:

    public void forEachRemaining(Consumer<? super T> consumer) {
            if (beforeSplit)
                aSpliterator.forEachRemaining(consumer);
            bSpliterator.forEachRemaining(consumer);
        }

这里就区分出了两个不同的流,对每个流的spliterator分别调用forEachRemaining方法,这里的spliterator是IntWrappingSpliterator, 它是对s1/s2的一个封装,它有两个关键成员:

   // 包装的原始pipeline
        final PipelineHelper<P_OUT> ph;

        // 原始pipeline的spliterator
        Spliterator<P_IN> spliterator;

所以就走到了

 IntWrappingSpliterator.foreachMaining方法中:

     public void forEachRemaining(IntConsumer consumer) {
            if (buffer == null && !finished) {
                Objects.requireNonNull(consumer);
                init();

                ph.wrapAndCopyInto((Sink.OfInt) consumer::accept, spliterator);
                finished = true;
            }
            // ...

可以看到,又调用了原始pipeline的wrapAndCopyInto方法中,而这里的consumer即为上面s3的逻辑。这样又递归回到了:


AbstractPipeline.wrapAndCopyInto -> AbstractPipeline.wrapSink-> AbstractPipeline.copyInto


方法中,而在这时的wrapSink中,现在的pipeline就是s1/s2了,这时就会对s1/s2下面的所有算子,调用AbstractPipeline.opWrapSink串联起来,以s1为例就变成:


Head(array[1,2,3]) -> Mapper1 -> Filter1 -> Mapper3 -> Filter3 -> ReduceOp(sum)


这样s1流跟s3流就串起来执行完成了,然后就是s2和s3流串起来执行。


作者 | 王逸(卫乐)

来源 | 阿里云开发者公众号


相关文章
|
9天前
|
搜索推荐 算法 Java
2025 年互联网大厂校园招聘 JAVA 工程师笔试题及备考要点解析
本文针对互联网大厂校招Java工程师笔试题进行解析,涵盖基础知识、面向对象编程、数据结构与算法、异常处理及集合框架等核心内容。从数据类型、运算符到流程控制语句,从类与对象、继承多态到数组链表、排序算法,再到异常捕获与集合框架应用,结合实际案例深入剖析,助你系统掌握考点,提升应试能力。资源链接:[点此获取](https://pan.quark.cn/s/14fcf913bae6)。
34 9
|
8天前
|
SQL Java 数据库连接
java 校招需要准备哪些内容及关键要点解析
这是一篇针对Java校招准备的详细指南,涵盖六大核心板块:扎实的Java基础知识(如数据类型、面向对象编程、集合框架)、数据库相关知识(SQL操作与管理工具)、Java开发框架(Spring、Spring Boot、MyBatis)、其他重要知识(多线程编程、网络编程、数据结构与算法)、项目经验准备以及面试技巧。文章结合技术方案与应用实例,帮助应届生全面掌握校招所需技能,从理论到实践全面提升竞争力。资源地址:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
27 1
|
8天前
|
算法 Java 关系型数据库
校招 Java 面试基础题目解析及学习指南含新技术实操要点
本指南聚焦校招Java面试,涵盖Java 8+新特性、多线程与并发、集合与泛型改进及实操项目。内容包括Lambda表达式、Stream API、Optional类、CompletableFuture异步编程、ReentrantLock与Condition、局部变量类型推断(var)、文本块、模块化系统等。通过在线书店系统项目,实践Java核心技术,如书籍管理、用户管理和订单管理,结合Lambda、Stream、CompletableFuture等特性。附带资源链接,助你掌握最新技术,应对面试挑战。
28 2
|
9天前
|
SQL Java 数据库连接
阿里腾讯互联网公司校招 Java 面试题总结及答案解析
本文总结了阿里巴巴和腾讯等互联网大厂的Java校招面试题及答案,涵盖Java基础、多线程、集合框架、数据库、Spring与MyBatis框架等内容。从数据类型、面向对象特性到异常处理,从线程安全到SQL优化,再到IOC原理与MyBatis结果封装,全面梳理常见考点。通过详细解析,帮助求职者系统掌握Java核心知识,为校招做好充分准备。资源链接:[点击下载](https://pan.quark.cn/s/14fcf913bae6)。
27 2
|
9天前
|
Java 数据库连接 API
互联网大厂校招 JAVA 工程师笔试题解析及常见考点分析
本文深入解析互联网大厂校招Java工程师笔试题,涵盖基础知识(数据类型、流程控制)、面向对象编程(类与对象、继承与多态)、数据结构与算法(数组、链表、排序算法)、异常处理、集合框架、Java 8+新特性(Lambda表达式、Stream API)、多线程与并发、IO与NIO、数据库操作(JDBC、ORM框架MyBatis)及Spring框架基础(IoC、DI、AOP)。通过技术方案讲解与实例演示,助你掌握核心考点,提升解题能力。
49 2
|
9天前
|
设计模式 算法 Java
2025 春季校招 Java 研发笔试题详细解析及高效学习指南
本指南专为2025春季校招Java研发岗位笔试设计,涵盖Java 17+新特性(如模式匹配、文本块、记录类和密封类)、现代技术栈(Spring Boot 3、响应式编程、Stream API增强)以及算法与数据结构实战。同时深入解析Spring Data JPA、事务管理、性能优化等内容,并结合实际案例讲解常见算法题解与设计模式应用。资源包含核心知识点、面试题及笔试技巧,助力高效备考。下载地址:[链接](https://pan.quark.cn/s/14fcf913bae6)。
24 1
|
8天前
|
存储 算法 Java
校招 java 面试基础题目及解析
本文围绕Java校招面试基础题目展开,涵盖平台无关性、面向对象特性(封装、继承、多态)、数据类型、关键字(static、final)、方法相关(重载与覆盖)、流程控制语句、数组与集合、异常处理等核心知识点。通过概念阐述和代码示例,帮助求职者深入理解并掌握Java基础知识,为校招面试做好充分准备。文末还提供了专项练习建议及资源链接,助力提升实战能力。
57 0
|
3月前
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
332 29
|
3月前
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
96 4
|
3月前
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~

推荐镜像

更多
  • DNS