Java 22 新增利器: 使用 Java Stream Gather 优雅地处理流中的状态

简介: 本文中我们分析了 什么 是 “流”,对比了 Java 上几种常见的 “流”库,引入和详细介绍了 Java 22 中的 Stream Gather API 。同时也简单分享了利用虚拟线程 如何简化 Stream map Concurrent操作符的实现。希望抛砖引玉和大家分享新的特性,共同进步。同时也希望大家都可以升级到新版本的 JDK,更好的赋能业务。

1. 背景


Java Stream 自从 Java 1.8 引入以来,迅速成为了各位开发者手中信手拈来的工具,大家日常在工作中谈论起来也是如数家珍。但由于 Java Stream 的操作符不足够的丰富,经常会遇到捉襟见肘的情况。对于此,大家可能会继而采用操作符更加丰富的库来作为替代。作为 Java 标准库的一部分,Oracle 的 Java 架构师们也一直以来都在探索如何更好的适应日益增长的用户需求和语言的可维护性,随着 Stream Gather API 的到来,这个情况得到了极大的改善,下面让我结合自身经验,和大家分享 Java Stream Gather API。


2. 什么 是 Stream ?


在编程语言生态中,Stream 是一个抽象的概念,代表了一组连续地对数据的处理的操作及流经其的数据,类似汽车生产流水线一样,下面用 “流”来指代。通常来说,”流“可以分为“有限流”和“无限流”。如 长江(滚滚长江东逝水) 可以看做一个”无限流“,沿途的各种水电站可以看做 “流”中的中间处理节点。通常来说,拆分细致的库可能会将流拆分为 “Source(源)”、“Flow(流处理)”、”Sink(终点)“三个部分,不过在 API 层不一定体现。如下所示:

image.png


这些 “Source”、“Flow”和 “Sink”的组合,最终形成了一个复杂的处理流图。也就是我们的代码业务逻辑。通常来说,在 Java 中,我们使用的标准库中的 Stream 则可以看做是一个最简单的单向流,如下图所示:

image.png

现在我们应该对什么 是 “流”有了一个清晰的认识了。通常我们如果不需要自己开发 Java Stream 的操作符,因此几乎接触不到 AbstractPipeline和 Sink这两个类,当然截止 Java 22, Java Stream 也没有足够的扩展点。


3. 各种库有什么不一样

在目前日常工作中,作为 Java 开发者,我们可能会用到多种面向 “流”的工具包,简单对比如下:

image.png


从上可以看到,虽然 Java 标准库的 Stream API 开箱即用,但是在很多特定场景下,我们也不得不选用其他的库;为了改善这一点,Viktor Klang 提出了 Stream Gather API,作为除 Stream Collector 之外的另一个扩展点,来弥补 Java Stream 的短板。


题外话:Viktor Klang 之前是 Akka 的开发团队的一员。


4. 为什么需要 Stream Gather API?


4.1 问题空间

Java Stream API 目前主要缺乏丰富的 操作符,而如果像其他的库一样添加很多的操作符到 Stream API,又会有较大的维护负担,如果希望只添加有限的操作符,却可以解决绝大部分的问题。则新的操作符需要支持下面的特性:

  1. 支持多种类型的数据转换:1 :1, 如 map 操作符1 :N, 如 filter ,flatmapN :1, 如 bufferN:M, 如 intersperse
  2. 支持 “有限流” 和 “无限流”
  3. 支持 “有状态” 和 “无状态”操作
  4. 支持“提前终止”和 “全量消费”
  5. 支持 “单线程顺序处理”和 “多线程并行处理”
  6. 支持感知 “结束”,如实现一个 fold 操作符


4.2 Stream Collector & Gather

Java Stream 目前仅提供的了一个扩展点:Collector , 其包含下面几个部分:

image.png


基于上面的说明,在处理过程中我们只会用到 accumulator 返回的 BiConsumerfinisher返回的 Function, 所以只能最终产生 1个值 (N :1)。比如我们的 Collectors.toList()只会在流结束的时候产生一个 List,所以 并不能满足我们实现 mapfilterbufferscanzipWithIndex等操作符的要求。为此,在 JEP 461应运而生,API 和 Stream Collector 有点类似,具体的对比如下:

image.png

上面最大的变化在于,integrator finisher都可以产生 1 : N 地产生元素 R。这个可选的 0 ... N个 R 通过DownStream来传递给下游。如果直接在方法的返回值中返回,则仅可以表达 0 ...  1个 R 的情况。DownStream的定义了两个核心方法,比较类似于 Reactor-coreEmitter

image.png

integrator 主要用于结合当前的状态 A 和 输入的元素 T,利用了 DownStream的 能力来产生 可选的 N 个 R。核心的方法如下:

image.png

结合目前的信息,我们可以发现整个 Gather API 的设计其实还挺复杂的,因为引入了太多的中间接口名称,同时支持了单线程和多线程等情况,并且有一些隐藏的约束。下面让我们庖丁解牛,细细道来。


5. 分析和对比


5.1 Stream Gather API 拆解

结合上面的信息和其他类库中的经验,我们其实知道,“流”的任意中间处理过程都可以看做一个函数, 如下图所示:

image.png

而上面的整个 GatherAPI 其实是在这个基础上做的变体。具体来说:

  1. supplier:产生最初的 State
  2. integrator + downstream:转换 和 传递值;

finisher主要用于终止信号处理, combiner主要用于并行流。这样一看整个API 就简化很多了。在实现特定的 操作符的时候,我们可以只关心特定部分。比如下面分别实现 map和  filter操作符(无状态)。

  1. map操作符无状态, 1 :1 地产生元素
  2. filter 操作符无状态, 1 : 0 ... 1 地产生元素



    public static <T, R> Gatherer<T, ?, R> map(final Function<T, R> mapper) {
        return Gatherer.of((notused, element, downstream) -> downstream.push(mapper.apply(element)));
    }

    public static <T> Gatherer<T, ?, T> filter(final Predicate<T> predicate) {
        return Gatherer.of((notused, element, downstream) -> {
            if (predicate.test(element)) {
                return downstream.push(element);
            }
            return true;
        });
    }


5.2 和其他的类库对比


类似 gather 这样的操作符,也存在其他类库中,比如 reactor-core 、mutiny 和 pekko-stream, 比如以 pekko-stream 为例,和 gather最相近是 statefulMap操作符, 该操作符在别的 函数式流 库中一般叫做 mapAccumulate. 具体的定义如下:


 
  def statefulMap[S, T](
      create: function.Creator[S],
      f: function.Function2[S, Out, Pair[S, T]],
      onComplete: function.Function[S, Optional[T]]): javadsl.Flow[In, T, Mat]

操作符


和 Stream Gather 做一个简单的对比:

image.png

statefulMapgather分别实现 zipWithIndex如下:

  public static <T> Source<Pair<T, Integer>, NotUsed> zipWithIndex(final Source<T, NotUsed> source) {
        return source.statefulMap(
            () -> 0,
            (state, element) -> Pair.create(state + 1, Pair.create(element, state)),
            notused -> Optional.empty()
        );
    }

    public static <T> Stream<Pair<T, Integer>> zipWithIndex(final Stream<T> stream) {
        class State {
            int index;
        }
        return stream.gather(Gatherer.ofSequential(
            State::new,
            (state, element, downstream) -> downstream.push(Pair.create(element, state.index++))
        ));
    }

需要注意的是,在 GatherAPI 的设计中, 由 initializer返回的 状态 A在整个生命周期中使用,如果我们的操作符是带状态的,那么需要将状态放在一个 可变类 中,如上面实现中的 class State。这个和函数式的(返回新的不可变状态)的方式不太一样。这样做的一个优势就是避免生产小对象 Pair(State, Ouput).  缺点则是会有很多这样的局部类。其他的重要的约束还有:不要将State对象传递给多个线程并发修改、传递给 integratorState 需要是 initializer返回的 State 等。至于并行化的支持这里不再展开,简单来说就是 Java  Stream 是通过 combiner合并聚合状态来完成的。无独有偶,在有个 statefulMapAsync的PR 中也有类似的思考。


5.3 自带的 gathers 分析


目前 Stream Gather 提供了几个默认的 Gather实现,比如 foldscanwindowFixedmapConcurrent, 其中 fold比较简单,也有 Collectors.reduce作为对偶实现。最有有意思的是 mapConcurrent,其可以用于指定最大并行度地执行异步转换,类似于 Pekko-Stream 的 mapAsync。其中需要注意的是,在这个实现中,利用到了 虚拟线程、信号量,并且没有用到 CompletableFuture。更加简单的实现了相同的功能。

  • mapConcurrent

    
    public static <T, R> Gatherer<T,?,R> mapConcurrent(
            final int maxConcurrency, //限制:最大并行度
            final Function<? super T, ? extends R> mapper) //虚拟线程中执行的转换

整个实现非常的简单



public static <T, R> Gatherer<T,?,R> mapConcurrent(
            final int maxConcurrency,
            final Function<? super T, ? extends R> mapper) {
        class State {
            final ArrayDeque<Future<R>> window =
                    new ArrayDeque<>(Math.min(maxConcurrency, 16));
            //使用信号量,不需要复杂的判断逻辑
            final Semaphore windowLock = new Semaphore(maxConcurrency);

            final boolean integrate(T element,
                                    Downstream<? super R> downstream) {
                if (!downstream.isRejecting())
                    createTaskFor(element);
                return flush(0, downstream);
            }

            final void createTaskFor(T element) {
                //阻塞等待permit,这里不是虚拟线程
                windowLock.acquireUninterruptibly();

                var task = new FutureTask<R>(() -> {
                    try {
                        return mapper.apply(element);
                    } finally {
                        //处理完成后释放信号量permit
                        windowLock.release();
                    }
                });

                var wasAddedToWindow = window.add(task);
                //使用虚拟线程来执行具体的任务
                Thread.startVirtualThread(task);
            }

            final boolean flush(long atLeastN,
                                Downstream<? super R> downstream) {
                //....省略很多代码,将结果值推送给下一个处理节点
              downstream.push(current.get());
            }
        }

        return Gatherer.ofSequential(
                State::new,
                Integrator.<State, T, R>ofGreedy(State::integrate),
                (state, downstream) -> state.flush(Long.MAX_VALUE, downstream)
        );
    }

我们可以看到:利用 虚拟线程 来并发的执行 mapper, 并结合 信号量来实现 maxConcurrency的限制。整个实现非常简单,感兴趣的同学可以对比下 Reactor-coreflatmap和 Pekko-Stream 中 mapAsync的实现。

  • fold

相比之下,Gather 的 fold 实现就比较简单,如下所示

  
    public static <T, R> Gatherer<T, ?, R> fold(
            Supplier<R> initial,
            BiFunction<? super R, ? super T, ? extends R> folder) {
        class State {
            R value = initial.get(); //初始状态,记录聚合结果值
            State() {}
        }
        return Gatherer.ofSequential(
                State::new,
                Integrator.ofGreedy((state, element, downstream) -> {
                    state.value = folder.apply(state.value, element);
                    return true;
                }),
                //流处理结束,返回结果值给流的下一个处理节点
                (state, downstream) -> downstream.push(state.value)
        );
    }


同样将状态保持在了 局部的 State类中,并且在结束时,调用了 finisher返回的方法,将最终的值推送给了流的下一个处理节点。因为是 fold方法不一定满足 结合律,所以上面使用的是 Gatherer.ofSequential, 来保证串行执行。同时,Stream Gather  API 也支持多个 gather 之间组合,相当于其他库中的 fuse ,继而提高性能。


6. 未来展望和小结


随着 Stream Gather API 、虚拟线程的引入,补齐了 Stream API 在扩展性上的短板,同时降低了复杂度。未来一定会有一个开源的库来扩展各种各样的 Gather 操作符, 比如 zipWithIndexzipWithNextmapConcatthrottle等。稍有遗憾的是由于 Java 目前不支持类似 Kotlin 的扩展方法,所以在 Stream API  DSL 的最终呈现上还不能是 Stream.zipWithIndex而是 Stream.gather(MyGathers.zipWithIndex)。希望在 下一个LTS来临之前 Java 能够补齐扩展方法这个短板,方便我们更优雅的设计 DSL 扩展。


来源|阿里云开发者公众号

作者|虎鸣


作者介绍
目录