响应式编程 - Flow 的理解
之前说过 FLow 是 Java 9 中引入的响应式编程的抽象概念,对应的类就是:
java.util.concurrent.Flow
Flow 是一个概念类,其中定义了三个接口供实现。这三个接口分别是:Publisher
, Subscriber
和 Subscription
。
//标注是一个FunctionalInterface,因为只有一个抽象方法 @FunctionalInterface public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); } public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); } public static interface Subscription { public void request(long n); public void cancel(); }
Publisher
是负责生成 item 的,其中的subscribe
方法就是注册Subscriber
进去,用于消费。注册成功后,会调用Subscriber
的onSubscribe
方法,传Subscription
进来。这个Subscription
里面的 request 用于请求Publisher
发送多少 item 过来,cancel 用于告诉Publisher
不要再发 item 过来了。每次Publisher
有 item 生成并且没有超过Subscription
request 的个数限制,onNext
方法会被调用用于发送这个 item。当有异常发生时,onError
就会被调用。当Publisher
判断不会有新的 item 或者异常发生的时候,就会调用onComplete
告诉Subscriber
消费完成了。大体上就是这么个流程。
Project Reactor 就是Flow
的一种实现。并且在Flow
这个模型的基础上,参考了 Java 8 Stream 的接口功能设计,加入了流处理的机制。
Project Reactor - Flux
如何实现Flow
的接口
Flux就是一串相同类型数据的流,他包括并且会发射 0~n 个对象,例如:
Flux<String> just = Flux.just("1", "2", "3");
这样,我们就生成了一个包含三个字符串的Flux流(底层实现实际上就是FluxArray,这个我们以后会说的)
然后,我们按照之前 Flow 里面提到的流程,先进行简单的 subscribe
Flux.just("test1", "test2", "test3") //打印详细流日志 .log() //订阅消费 .subscribe(System.out::println);
运行代码,我们会看到日志输出:
07:08:13.816 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 07:08:13.822 [main] INFO reactor.Flux.Array.1 - | request(unbounded) 07:08:13.823 [main] INFO reactor.Flux.Array.1 - | onNext(test1) test1 07:08:13.823 [main] INFO reactor.Flux.Array.1 - | onNext(test2) test2 07:08:13.823 [main] INFO reactor.Flux.Array.1 - | onNext(test3) test3 07:08:13.824 [main] INFO reactor.Flux.Array.1 - | onComplete()
这些日志很清楚的说明了subscribe
究竟是如何工作的:
- 首先在
subscribe
的同时,onSubscribe
首先被调用 - 然后调用
request(unbounded)
,这里request
代表请求多少个数据,unbounded
代表请求无限个,就是所有的数据 - 对于每个数据对象,调用
onNext
方法:onNext(test1),onNext(test2),onNext(test3) - 在最后完成的时候,
onComplete
会被调用,如果说遇到了异常,那么onError
会被调用,就不会调用onComplete
了 这些方法其实都是Subscriber
的方法,Subscriber
是Flux的订阅者,配置订阅者如何消费以及消费的具体操作。
Subscriber<String> subscriber = new Subscriber<String>() { //在订阅成功的时候,如何操作 @Override public void onSubscribe(Subscription subscription) { //取最大数量的元素个数 subscription.request(Long.MAX_VALUE); } //对于每个元素的操作 @Override public void onNext(String o) { System.out.println(o); } //在发生错误的时候 @Override public void onError(Throwable throwable) { log.error("error: {}", throwable.getMessage(), throwable); } //在完成的时候,发生错误不算完成 @Override public void onComplete() { log.info("complete"); } }; Flux.just("test1", "test2", "test3") //打印详细流日志 .log() //订阅消费 .subscribe(subscriber);
运行后,日志是:
07:28:27.227 [main] INFO reactor.Flux.Array.2 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 07:28:27.227 [main] INFO reactor.Flux.Array.2 - | request(unbounded) 07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onNext(test1) test1 07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onNext(test2) test2 07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onNext(test3) test3 07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onComplete() 07:28:27.235 [main] INFO com.test.TestMonoFlux - complete
subscribe还有如下几个api:
//在不需要消费,只需要启动Flux中间处理的话,用这个 subscribe(); //相当于: new Subscriber() { @Override public void onSubscribe(Subscription subscription) { //取最大数量的元素个数 subscription.request(Long.MAX_VALUE); } @Override public void onNext(Object o) { } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { } }; //指定消费者消费 subscribe(Consumer<? super T> consumer); //相当于: new Subscriber() { @Override public void onSubscribe(Subscription subscription) { //取最大数量的元素个数 subscription.request(Long.MAX_VALUE); } @Override public void onNext(Object o) { consumer.accept(o); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { } }; //指定消费者,还有异常处理者 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); //相当于: new Subscriber() { @Override public void onSubscribe(Subscription subscription) { //取最大数量的元素个数 subscription.request(Long.MAX_VALUE); } @Override public void onNext(Object o) { consumer.accept(o); } @Override public void onError(Throwable throwable) { errorConsumer.accept(throwable); } @Override public void onComplete() { } }; //指定消费者,异常处理着还有完成的时候的要执行的操作 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); //相当于: new Subscriber() { @Override public void onSubscribe(Subscription subscription) { //取最大数量的元素个数 subscription.request(Long.MAX_VALUE); } @Override public void onNext(Object o) { consumer.accept(o); } @Override public void onError(Throwable throwable) { errorConsumer.accept(throwable); } @Override public void onComplete() { completeConsumer.run(); } }; //指定Subscriber所有需要的元素 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer); //相当于: new Subscriber() { @Override public void onSubscribe(Subscription subscription) { subscriptionConsumer.accept(subscription); } @Override public void onNext(Object o) { consumer.accept(o); } @Override public void onError(Throwable throwable) { errorConsumer.accept(throwable); } @Override public void onComplete() { completeConsumer.run(); } };
这样,就和之前所说的Flow
的设计对应起来了。