Reactor:深入理解reactor core

简介: Reactor:深入理解reactor core

目录



简介



上篇文章我们简单的介绍了Reactor的发展史和基本的Flux和Mono的使用,本文将会进一步挖掘Reactor的高级用法,一起来看看吧。


自定义Subscriber



之前的文章我们提到了4个Flux的subscribe的方法:


Disposable subscribe(); 
Disposable subscribe(Consumer<? super T> consumer); 
Disposable subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 
Disposable subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 
Disposable subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer);


这四个方法,需要我们使用lambda表达式来自定义consumer,errorConsumer,completeSonsumer和subscriptionConsumer这四个Consumer。


写起来比较复杂,看起来也不太方便,我们考虑一下,这四个Consumer是不是和Subscriber接口中定义的4个方法是一一对应的呢?


public static interface Subscriber<T> {
        public void onSubscribe(Subscription subscription);
        public void onNext(T item);
        public void onError(Throwable throwable);
        public void onComplete();
    }


对的,所以我们有一个更加简单点的subscribe方法:


public final void subscribe(Subscriber<? super T> actual)


这个subscribe方法直接接收一个Subscriber类。从而实现了所有的功能。


自己写Subscriber太麻烦了,Reactor为我们提供了一个BaseSubscriber的类,它实现了Subscriber中的所有功能,还附带了一些其他的方法。


我们看下BaseSubscriber的定义:


public abstract class BaseSubscriber<T> implements CoreSubscriber<T>, Subscription,
                                                   Disposable


注意,BaseSubscriber是单次使用的,这就意味着,如果它首先subscription到Publisher1,然后subscription到Publisher2,那么将会取消对第一个Publisher的订阅。


因为BaseSubscriber是一个抽象类,所以我们需要继承它,并且重写我们需要自己实现的方法。


下面看一个自定义的Subscriber:


public class CustSubscriber<T> extends BaseSubscriber<T> {
    public void hookOnSubscribe(Subscription subscription) {
        System.out.println("Subscribed");
        request(1);
    }
    public void hookOnNext(T value) {
        System.out.println(value);
        request(1);
    }
}


BaseSubscriber中有很多以hook开头的方法,这些方法都是我们可以重写的,而Subscriber原生定义的on开头的方法,在BaseSubscriber中都是final的,都是不能重写的。


我们看一个定义:


@Override
  public final void onSubscribe(Subscription s) {
    if (Operators.setOnce(S, this, s)) {
      try {
        hookOnSubscribe(s);
      }
      catch (Throwable throwable) {
        onError(Operators.onOperatorError(s, throwable, currentContext()));
      }
    }
  }


可以看到,它内部实际上调用了hook的方法。


上面的CustSubscriber中,我们重写了两个方法,一个是hookOnSubscribe,在建立订阅的时候调用,一个是hookOnNext,在收到onNext信号的时候调用。


在这些方法中,给了我们足够的自定义空间,上面的例子中我们调用了request(1),表示再请求一个元素。


其他的hook方法还有: hookOnComplete, hookOnError, hookOnCancel 和 hookFinally。


Backpressure处理



我们之前讲过了,reactive stream的最大特征就是可以处理Backpressure。


什么是Backpressure呢?就是当consumer处理过不来的时候,可以通知producer来减少生产速度。


我们看下BaseSubscriber中默认的hookOnSubscribe实现:


protected void hookOnSubscribe(Subscription subscription){
    subscription.request(Long.MAX_VALUE);
  }


可以看到默认是request无限数目的值。 也就是说默认情况下没有Backpressure。


通过重写hookOnSubscribe方法,我们可以自定义处理速度。


除了request之外,我们还可以在publisher中限制subscriber的速度。


public final Flux<T> limitRate(int prefetchRate) {
    return onAssembly(this.publishOn(Schedulers.immediate(), prefetchRate));
  }


在Flux中,我们有一个limitRate方法,可以设定publisher的速度。


比如subscriber request(100),然后我们设置limitRate(10),那么最多producer一次只会产生10个元素。


创建Flux



接下来,我们要讲解一下怎么创建Flux,通常来讲有4种方法来创建Flux。


使用generate


第一种方法就是最简单的同步创建的generate.


先看一个例子:


public void useGenerate(){
        Flux<String> flux = Flux.generate(
                () -> 0,
                (state, sink) -> {
                    sink.next("3 x " + state + " = " + 3*state);
                    if (state == 10) sink.complete();
                    return state + 1;
                });
        flux.subscribe(System.out::println);
    }


输出结果:


3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30


上面的例子中,我们使用generate方法来同步的生成元素。


generate接收两个参数:


public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)


第一个参数是stateSupplier,用来指定初始化的状态。


第二个参数是一个generator,用来消费SynchronousSink,并生成新的状态。


上面的例子中,我们每次将state+1,一直加到10。


然后使用subscribe来将所有的生成元素输出。


使用create



Flux也提供了一个create方法来创建Flux,create可以是同步也可以是异步的,并且支持多线程操作。


因为create没有初始的state状态,所以可以用在多线程中。


create的一个非常有用的地方就是可以将第三方的异步API和Flux关联起来,举个例子,我们有一个自定义的EventProcessor,当处理相应的事件的时候,会去调用注册到Processor中的listener的一些方法。


interface MyEventListener<T> {
        void onDataChunk(List<T> chunk);
        void processComplete();
    }


我们怎么把这个Listener的响应行为和Flux关联起来呢?


public void useCreate(){
        EventProcessor myEventProcessor = new EventProcessor();
        Flux<String> bridge = Flux.create(sink -> {
            myEventProcessor.register(
                    new MyEventListener<String>() {
                        public void onDataChunk(List<String> chunk) {
                            for(String s : chunk) {
                                sink.next(s);
                            }
                        }
                        public void processComplete() {
                            sink.complete();
                        }
                    });
        });
    }


使用create就够了,create接收一个consumer参数:


public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)


这个consumer的本质是去消费FluxSink对象。


上面的例子在MyEventListener的事件中对FluxSink对象进行消费。


使用push



push和create一样,也支持异步操作,但是同时只能有一个线程来调用next, complete 或者 error方法,所以它是单线程的。


使用Handle



Handle和上面的三个方法不同,它是一个实例方法。


它和generate很类似,也是消费SynchronousSink对象。


Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);


不同的是它的参数是一个BiConsumer,是没有返回值的。


看一个使用的例子:


public void useHandle(){
        Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
                .handle((i, sink) -> {
                    String letter = alphabet(i);
                    if (letter != null)
                        sink.next(letter);
                });
        alphabet.subscribe(System.out::println);
    }
    public String alphabet(int letterNumber) {
        if (letterNumber < 1 || letterNumber > 26) {
            return null;
        }
        int letterIndexAscii = 'A' + letterNumber - 1;
        return "" + (char) letterIndexAscii;
    }
相关文章
|
1月前
|
Java
Reactor模式
通过一个具体的Java代码示例展示了如何在NIO框架下实现Reactor模式,用于处理网络IO事件,包括事件的接收、分发和处理。
36 4
Reactor模式
|
6月前
|
监控 安全 Linux
reactor的原理与实现
前情回顾 网络IO,会涉及到两个系统对象:   一个是用户空间调用的进程或线程   一个是内核空间的内核系统 如果发生IO操作read时,会奖励两个阶段:
71 1
|
6月前
|
监控 Java 应用服务中间件
Reactor反应器模式
在Java的OIO编程中,最初和最原始的网络服务器程序使用一个while循环,不断地监听端口是否有新的连接,如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网络连接的处理没有结束,那么后面的连接请求没法被接收,于是后面的请求统统会被阻塞住,服务器的吞吐量就太低了。 为了解决这个严重的连接阻塞问题,出现了一个即为经典模式:Connection Per Thread。即对于每一个新的网络连接都分配一个线程,每个线程都独自处理自己负责的输入和输出,任何socket连接的输入和输出处理不会阻塞到后面新socket连接的监听和建立。早期版本的Tomcat服务器就是这样实现的。
|
网络协议 数据处理
Reactor模式(二)
Reactor模式
82 0
|
设计模式 网络协议 数据处理
Reactor模式(一)
Reactor模式
120 0
|
编解码 弹性计算 缓存
Netty源码和Reactor模型
Netty源码和Reactor模型
103 0
|
设计模式 存储 监控
|
设计模式 Java 程序员
Netty线程模型 - Reactor 模式
我相信有很多人会对这个Reactor模式比较陌生,但是Netty这个名字大家都会比较熟悉,即使没有学习使用过,也会对它有所耳闻,它可以说是Java高性能网络编程的代名词。Reactor模式就是Netty线程模型设计的核心,本文我们就以Reactor模式入手,探究一下经典的设计。
179 0
|
编解码 监控 网络协议
基于NIO的三种Reactor模式
基于NIO的三种Reactor模式
196 0
|
Java 调度
【Netty】反应器 Reactor 模式 ( 单反应器 Reactor 单线程 | 单反应器 Reactor 多线程 )
【Netty】反应器 Reactor 模式 ( 单反应器 Reactor 单线程 | 单反应器 Reactor 多线程 )
184 0
【Netty】反应器 Reactor 模式 ( 单反应器 Reactor 单线程 | 单反应器 Reactor 多线程 )