带你读《2022技术人的百宝黑皮书》——如何避免写重复代码:善用抽象和组合(3)https://developer.aliyun.com/article/1339967?groupCode=taobaotech
实现 buffer
Source.fromJavaStream(() -> IntStream.rangeClosed(1, 10)) .statefulMap( () -> new ArrayList<Integer>(3), (list, element) -> { list.add(element); if (list.size() == 3) { return Pair.create(new ArrayList<Integer>(3), Collections.unmodifiableList(list)); } else { return Pair.create(list, Collections.<Integer>emptyList()); } }, listOnComplete -> Optional.ofNullable(listOnComplete)) .filterNot(List::isEmpty) .runForeach(System.out::println, system); // prints List(1, 2, 3) List(4, 5, 6) List(7, 8, 9) List(10)
更复杂的例子:处理资源
在前面看了如何实现 zipWithIndex 、bufferUntilChanged 之后,让我们进一步看看如何优雅和安全地处理资源。在任何的编程语言和框架中,资源的处理都是非常基础但是又很棘手的事项。在Java7 中首次引入了 try-with-re- sources 语法,对资源处理进行了一定程度的简化,而在反应式流中,我们又应该如何的操作呢?这里我们可以分为两种情况:
- 针对流中的每个元素都创建一个新的资源,使用这个资源,关闭这个资源。
- 针对整个流创建一个资源,并在处理流中的每个元素时使用这个资源,并在流的生命周期结束后,关闭这个资源。
因为资源通常开销较大且需要妥善管理,所以在开发过程中,我们更容易遇到的是 第2种情况,即资源的创建和销毁和流的生命周期进行了绑定。反应式流中的资源管理,还有更多的细节需要考虑:
- 资源的初始化和关闭需要支持并发安全;反应式流可以被多次物化,被多个下游订阅者订阅和处理,并且以任意的顺序进行取消订阅,需要在各种情况下(上游完成、下游取消、处理异常等)等情况下妥善的创建和销毁资源。
- 在流生命周期的各个阶段安全地创建和销毁资源;比如:即使在创建资源或者销毁资源的时触发了异常,也不会对同一个资源关闭多次。
- 支持异步从而提高资源使用的效率。
- 感知流的生命周期,支持在关闭资源时提供可选的值给到下游以标识流的结束,比如处理文件时,使用一个特殊的标识符标识文件的结尾。
综合上面的这些诉求,对应的代码就会变得很复杂,大家可以给自己一点时间思考一下:如果是自己独立实现类似的操作需要做出那些努力呢?而在现实的开发过程中,我们遇到的述求很多时候并非一起提出,而时随着迭代接踵而至,那么如果当初的代码编写的不是很易于扩展,拥有良好的测试,则可能按下葫芦浮起瓢。
比如在 reactor-core中就有如下的using 操作符:
public static <T, D> Mono<T> using( Callable<? extends D> resourceSupplier, Function<? super D, ? extends Mono<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup) {...}
resourceSupplier 针对每个订阅者,创建一个资源sourceSupplier 结合创建的资源,产生对应的元素resourceCleanup 取消订阅或者流完成时,清理对应的资源
在 reactor-core 中,对应的底层实现为 MonoUsing 共 360 行代码,而要实现我们想要的逻辑,我们还需要和另一个流进行合并,即这里的 using 类似于 unfoldResource 。那么有没有可能使用更加简单的方案来进行实现呢? 答案是肯定的,和前面的几个操作符一样,我们可以使用 statefulMap 来实现mapWithResource ,思维过程如下:
- using / mapWithResource 的生命周期管理 和 statefulMap 的 create 和 onComplete 方法对应,针对资源,onComplete 方法可以被命名为更加贴切的 release / close / cleanUp。
- 在流中使用的资源,我们可以认为是一个状态,只不过这个状态在流的整个生命周期中不再变化,一直是create 方法中返回的 Resource 。
- 在关闭资源时,我们可以通过返回一个 Optional> 来返回一个可选的值。
- 对并发资源的异步处理,则可以通过返回一个 CompletionStage> 而非 Out 来实现,在 using 方法中, 我们返回的是一个 Mono
经过上面的思维过程,我们不难得出这个流上的方法的声明可以为:
public <R, In, Out> mapWithResource( Supplier<? extends R> create, BiFunction<? super R, ? super In, ? extends Out> function, Function<? super R, ? extends Optional<? extends Out> close) {...}
resourceSupplier 针对每个订阅者/每次物化,创建一个资源function 使用create 中创建的资源处理流中的每个元素
close 在流关闭的同时关闭资源,并再向下游提供一个可选的值
具体的的实现这里留空,感兴趣的小伙伴可以结合前面的例子进行实现。下面我们看一下如何使用这个 mapWi- thResource 方法,从而加深大家的理解。
带你读《2022技术人的百宝黑皮书》——如何避免写重复代码:善用抽象和组合(5)https://developer.aliyun.com/article/1339963?groupCode=taobaotech