RxJava2源码分析(一):基本流程分析2

简介: 基本流程分析

subscribe方法分析

  分析完了create方法,接着来分析subscribe方法,其方法代码如下

 public final void subscribe(Observer<? super T> observer) {
     //1、判空
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //2、Hook方法,实质就是observer
            observer = RxJavaPlugins.onSubscribe(this, observer);
//判空
            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
      //4、重点,
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

这里重点看下“4”处, 这里调用了ObeservablesubscribeActual方法,可以看下Obeservable类中的这个方法,如下

 protected abstract void subscribeActual(Observer<? super T> observer);

这个方法是抽象的,实际调用的是它子类中的方法,通过上文的分析,我们知道ObservableCreateObeservable类的子类,所以,这里调用的实际就是ObservableCreate类中的subscribeActual方法。现在,我们再看下这个方法中的代码,如下

  @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //1、实例化CreateEmitter
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //2、回调方法
        observer.onSubscribe(parent);
        try {
            //3、回调方法
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

我们一步步的分析这个方法中的代码,先看“1”处的代码,这里实例化了CreateEmitter这个类,在实例化的同时将observer传了进去。看下CreateEmitter这个类的代码,如下

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
//...省略部分代码
        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
       //...省略部分代码
    }

通过上面的代码,可以发现CreateEmitter这个类实现了ObservableEmitter这个接口,而这个接口是ObservableOnSubscribe接口中subscribe方法的参数,是不是发现什么了?现在继续往下看,看下“2”处的代码,这里回调了ObserveronSubscribe方法,分析到这里,可以得出下面的结论

onSubscribe()回调所在的线程是ObservableCreate执行subscribe()所在的线程,和subscribeOn()/observeOn()无关!

重点来了,这里看下“3”处的代码,还记得source是谁吗?**它就是执行Observable.create方法时,我们注入给ObservableCreate类的成员变量,是ObservableOnSubscribe接口的实例。**这里调用的subscribe方法,实际就是下面代码的subscribe方法,

 public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }

这段代码中的subscribe方法的参数实质就是CreateEmitter,调用的onNext方法就是CreateEmitter类中的onNext方法。继续看下CreateEmitter类中的onNext方法,代码如下

  @Override
        public void onNext(T t) {
            //1、判断传入的参数是否为null
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                //2、调用Observer中的onNext方法
                observer.onNext(t);
            }
        }

分析到这里,就可以得出以下结论了

subscribe方法中发射器所调用的onNext方法,如果代码没有出错的话,最终调用的就是Observer中的onNext方法。

分析CreateEmitter中的其他方法,还可以知道为什么Observer中的onErroronComplete方法只有一个会回调的原因了,原因就是无论调用的是哪一个方法都会调用dispose()方法取消订阅。

结论

  对Observable.subscribe方法的分析可以得出以下结论

  1. subscribe方法最终调用了ObservableCreate类中的subscribeActual方法。
  2. subscribeActual方法中,实例化了发射器,并开始发射数据。
  3. subscribe方法中发射器所调用的onNext方法,如果代码没有出错的话,最终调用的就是Observer接口中的onNext方法。

总结

  通过对RxJava基本流程的源码分析,是不是对RxJava的原理有了更清晰的认识呢?分析完之后,我们再看下这张图,是不是感觉现在看起来就明白多了呢?

结束语

  想要了解一些开源库的原理,我们必须要阅读其源码,只有从源码中才能得到想要的答案,才能对库的原理有更清晰的认识。

  再说下,阅读开源库的注意事项,阅读源码时,我们最好带着问题来阅读,阅读前先有个目标,比如我这次阅读要搞懂什么问题,然后再开始阅读,不然就会很容易在茫茫代码中迷失。还有就是不要想着每句代码都搞懂,搞懂与自己想要获取的答案有关的代码即可。

相关文章
|
2月前
|
前端开发 网络协议 Java
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
148 0
|
缓存 Java 调度
为了更好的使用OKHttp—架构与源码分析
为了更好的使用OKHttp—架构与源码分析
为了更好的使用OKHttp—架构与源码分析
|
Java 程序员
Rxjava实战笔记 | Rxjava的基本使用解析(同步结合示例)
Rxjava实战笔记 | Rxjava的基本使用解析(同步结合示例)
|
设计模式 uml
Rxjava源码解析笔记 | Rxjava概述 & 传统观察者设计模式源码解析
Rxjava源码解析笔记 | Rxjava概述 & 传统观察者设计模式源码解析
|
存储
关于RxJava在业务上的一些思考
关于RxJava在业务上的一些思考
91 0
关于RxJava在业务上的一些思考
|
存储 算法 测试技术
源码分析 RateLimiter SmoothBursty 实现原理(文末附流程图)
源码分析 RateLimiter SmoothBursty 实现原理(文末附流程图)
源码分析 RateLimiter SmoothBursty 实现原理(文末附流程图)