05-流式操作:使用 Flux 和 Mono 构建响应式数据流(下)

简介: 05-流式操作:使用 Flux 和 Mono 构建响应式数据流

3 通过动态方法创建 Flux

动态创建 Flux 所采用的就是以编程的方式创建数据序列,最常用的就是 generate() 方法和 create() 方法。

generate() 方法

generate() 方法生成 Flux 序列依赖于  Reactor 所提供的 SynchronousSink 组件,定义如下。

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)

SynchronousSink 组件包括 next()、complete() 和 error() 这三个核心方法。从 SynchronousSink 组件的命名上就能知道它是一个同步的 Sink 组件,也就是说元素的生成过程是同步执行的。

next() 方法只能最多被调用一次。使用 generate() 方法创建 Flux 的示例代码如下。

Flux.generate(sink -> {
    sink.next("javaedge");
    sink.complete();
}).subscribe(System.out::println);

运行该段代码,会在系统控制台上得到“javaedge”。我们在这里调用了一次 next() 方法,并通过 complete() 方法结束了这个数据流。如果不调用 complete() 方法,那么就会生成一个所有元素均为“javaedge”的无界数据流。


这个示例非常简单,但已经具备了动态创建一个 Flux 序列的能力。如果想要在序列生成过程中引入状态,那么可以使用如下所示的 generate() 方法重载。

Flux.generate(() -> 1, (i, sink) -> {
            sink.next(i);
            if (i == 5) {
                sink.complete();
            }
            return ++i;
}).subscribe(System.out::println);

引入一个代表中间状态的变量 i,然后根据 i 的值来判断是否终止序列。显然,以上代码的执行效果会在控制台中输入 1 到 5 这 5 个数字。

create()

create() 方法与 generate() 方法比较类似,但它使用的是一个 FluxSink 组件,定义如下。

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)

FluxSink 除了 next()、complete() 和 error() 这三个核心方法外,还定义了背压策略,并且可以在一次调用中产生多个元素。使用 create() 方法创建 Flux 的示例代码如下。

Flux.create(sink -> {
            for (int i = 0; i < 5; i++) {
                sink.next("javaedge" + i);
            }
            sink.complete();
}).subscribe(System.out::println);

运行该程序,我们会在系统控制台上得到从“javaedge0”到“javaedge4”的 5 个数据。通过 create() 方法创建 Flux 对象的方式非常灵活,在本专栏中会有多种场景用到这个方法。


以上就是通过Flux 对象创建响应式流的方法,此外,还可以通过 Mono 对象来创建响应式流,我们一起来看一下。

4 通过 Mono 对象创建响应式流

可认为它是 Flux 的一种特例,所以很多创建 Flux 的方法同样适用。针对静态创建 Mono 的场景,前面给出的 just()、empty()、error() 和 never() 等方法同样适用。除了这些方法之外,比较常用的还有 justOrEmpty() 等方法。


justOrEmpty() 方法会先判断所传入的对象中是否包含值,只有在传入对象不为空时,Mono 序列才生成对应的元素,该方法示例代码如下。

Mono.justOrEmpty(Optional.of("javaedge"))
  .subscribe(System.out::println);

另一方面,如果要想动态创建 Mono,我们同样也可以通过 create() 方法并使用 MonoSink 组件,示例代码如下。

Mono.create(sink ->
sink.success("javaedge")).subscribe(System.out::println);

5 订阅响应式流

可通过 subscribe() 添加相应的订阅逻辑。调用 subscribe() 方法时可指定需要处理的消息通知类型。

Flux 和 Mono 提供了一批非常有用的 subscribe() 方法重载方法,大大简化订阅的开发例程。这些重载方法包括:

//订阅流的最简单方法,忽略所有消息通知
subscribe();
 
//对每个来自 onNext 通知的值调用 dataConsumer,但不处理 onError 和 onComplete 通知
subscribe(Consumer<T> dataConsumer);
 
//在前一个重载方法的基础上添加对 onError 通知的处理
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer);
 
//在前一个重载方法的基础上添加对 onComplete 通知的处理
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer,
Runnable completeConsumer);
 
//这种重载方法允许通过请求足够数量的数据来控制订阅过程
subscribe(Consumer<T> dataConsumer, Consumer<Throwable> errorConsumer,
Runnable completeConsumer, Consumer<Subscription> subscriptionConsumer);
 
//订阅序列的最通用方式,可以为我们的 Subscriber 实现提供所需的任意行为
subscribe(Subscriber<T> subscriber);


Reactor 中的消息通知类型有三种,即:

  • 正常消息
  • 错误消息
  • 完成消息

通过上述 subscribe() 重载方法,可以:

  • 只处理其中包含的正常消息
  • 也可同时处理错误消息和完成消息

如下代码示例展示同时处理正常和错误消息的实现方法。

Mono.just(“javaedge”)
         .concatWith(Mono.error(new IllegalStateException()))
         .subscribe(System.out::println, System.err::println);

以上代码的执行结果如下所示,我们得到了一个“javaedge”,同时也获取了 IllegalStateException。

javaedge 
java.lang.IllegalStateException

时候我们不想直接抛出异常,而是希望采用一种

容错策略

返回一个默认值

就可以采用如下方式。

Mono.just(“javaedge”)
          .concatWith(Mono.error(new IllegalStateException()))
          .onErrorReturn(“default”)
          .subscribe(System.out::println);

以上代码的执行结果如下所示,当产生异常时我们使用 onErrorReturn() 方法返回一个默认值“default”。

javaedge 
default

另外一种容错策略

通过 switchOnError() 方法使用另外的流来产生元素,以下代码演示了这种策略,执行结果与上面的示例一致。

Mono.just(“javaedge”)
         .concatWith(Mono.error(new IllegalStateException()))
         .switchOnError(Mono.just(“default”))
         .subscribe(System.out::println);

我们可以充分利用 Lambda 表达式来使用 subscribe() 方法,例如下面这段代码。

Flux.just("javaedge1", "javaedge2", "javaedge3").subscribe(data -> System.out.println("onNext:" + data), err -> {
        }, () -> System.out.println("onComplete"));

这段代码的执行效果如下所示,可以看到,我们分别对 onNext 通知和 onComplete 通知进行了处理。

onNext:javaedge1
onNext:javaedge2
onNext:javaedge3
onComplete

总结

本文介绍了如何创建 Flux 和 Mono 对象,以及如何订阅响应式流的系统方法。想要创建响应式流,可以利用 Reactor 框架所提供的各种工厂方法来达到静态创建的效果,同时也可以使用更加灵活的编程式方式来实现动态创建。而针对订阅过程,Reactor 框架也提供了一组面向不同场景的 subscribe 方法。

FAQ

在 Reactor 中,通过编程的方式动态创建 Flux 和 Mono 有哪些方法?

一旦我们创建了 Flux 和 Mono 对象,就可以使用操作符来操作这些对象从而实现复杂的数据流处理。下一讲,我们就要引入 Reactor 框架所提供的各种操作符来达成这一目

目录
相关文章
|
2月前
|
开发框架 JavaScript 前端开发
服务端渲染框架
服务端渲染框架
|
3月前
|
存储 JavaScript 前端开发
探索React状态管理:Redux的严格与功能、MobX的简洁与直观、Context API的原生与易用——详细对比及应用案例分析
【8月更文挑战第31天】在React开发中,状态管理对于构建大型应用至关重要。本文将探讨三种主流状态管理方案:Redux、MobX和Context API。Redux采用单一存储模型,提供预测性状态更新;MobX利用装饰器语法,使状态修改更直观;Context API则允许跨组件状态共享,无需第三方库。每种方案各具特色,适用于不同场景,选择合适的工具能让React应用更加高效有序。
79 0
|
3月前
|
运维 Serverless 对象存储
函数计算产品使用问题之如何使用Flask框架支持Stream模式
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
6月前
|
存储 Kubernetes 负载均衡
k8s 数据流向 与 核心概念详细介绍
k8s 数据流向 与 核心概念详细介绍
05-流式操作:使用 Flux 和 Mono 构建响应式数据流(上)
05-流式操作:使用 Flux 和 Mono 构建响应式数据流
279 0
|
前端开发 JavaScript UED
数据绑定(Data Binding):实现实时数据交互的前端神器
数据绑定是现代前端开发中的一个重要概念,它允许开发人员将数据模型与用户界面保持同步,实现实时的数据交互。在本博客中,我们将深入研究数据绑定的概念、类型和用法,以及为什么它对构建响应式Web应用程序至关重要。
633 0
|
存储 Java 数据处理
响应式流的核心机制——背压机制
响应式流的核心机制——背压机制
185 0
|
前端开发 JavaScript 搜索推荐
流式服务器端渲染
流式服务器端渲染(Streaming Server-Side Rendering)是一项在Web开发中备受关注的技术,它结合了服务器端渲染(SSR)和流式数据传输的优势。通过将渲染的HTML内容逐步发送给客户端,流式SSR可以显著加快页面加载速度,提供更好的用户体验,并对搜索引擎优化产生积极影响。本篇博文将深入探讨流式SSR的工作原理、优点和缺点,以及适用的场景。此外,我们还将介绍在一些知名项目中如何应用流式SSR技术。无论是初学者还是有经验的开发人员,本篇博文都将帮助你快速掌握流式SSR的概念和应用,为你的Web项目带来更好的性能和用户体验。让我们一起深入研究这一令人兴奋的前端技术吧!
820 0
|
NoSQL Shell Linux
如何使用 Flupy 构建数据处理管道
如何使用 Flupy 构建数据处理管道
161 0