前言:上一篇文章RxJava2源码分析(一):基本流程分析,是对RxJava2基本流程的分析,有了上一篇的基础,这篇就再深入一点,开始分析一下RxJava2操作符的原理。
为了方便理解RxJava2操作符的原理,这里选择最常用的map
操作符来讲解操作符的原理,示例代码如下
private void basicUseRxJava() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } }).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return "我是数字" + integer; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { Log.e("wizardev", "onNext: " + s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }
内容回顾
因为这一篇的内容是在上一篇的基础上进行讲解的,所以在讲解操作符之前,先回顾一下前一篇主要的知识,如下:
- Observable执行的create方法后返回的是ObservableCreate实例。
- create方法的参数,实际是注入到ObservableCreate类中,作为它的成员变量。
- 调用Observable的subscribe方法最终调用的是ObservableCreate类中的subscribeActual方法。
操作符分析
同样,这里分析源码的顺序依然按照代码的执行顺序,create
方法前文已经分析过了,这里就直接看map
方法,map
方法的代码如下
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
有了前一篇的经验,可以直接从这个方法中得出结论即map
方法返回的是ObservableMap实例,同时将map
方法的参数及Observable自身注入了其构造方法中。
现在看下ObservableMap类的源码,如下
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { super(source);//调用了其父类的构造方法 this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } //省略部分无关代码 //... }
从上面的代码中可以看出,ObservableMap
继承至AbstractObservableWithUpstream
,继续进入AbstractObservableWithUpstream
类中看下源码,如下
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> { protected final ObservableSource<T> source; AbstractObservableWithUpstream(ObservableSource<T> source) { this.source = source; } @Override public final ObservableSource<T> source() { return source; } }
从上面的一段代码,可以知道AbstractObservableWithUpstream
类其实就是Observable
类的装饰类,这个类的作用就是将实例化的Observable
注入进来,作为其成员变量。分析到这里可以得出这几个类的关系如下
注:ObservableMap中的成员变量function
就是我们写的这段代码
new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return "我是数字" + integer; } }
好了,到这里算是将map
方法所做的事情分析完了,下面来看实例代码的最后一个方法subscribe
.
subscribe方法分析
通过上一篇文章可以知道subscribe
方法实际调用的是Observable子类的subscribeActual
方法,而这里调用subscribe
方法的类是ObservableMap
,所以这里调用的就是ObservableMap
类的subscribeActual
方法。现在来看下ObservableMap类的subscribeActual
方法的源码,如下(为了分析方便,这里将与subscribeActual方法有关的代码一起贴了出来)
final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); }
从上面的代码可以发现ObservableMap
类的subscribeActual
方法又调用了其上游的subscribe
方法,为了便于理解这里解释一下文中所说的上游下游
文中说的“上游”及“下游”其实是相对而言的,这里的“上游”是靠近Observable的,如示例代码中的subscribe是最下游,map是其上游,而map操作符又是crate方法的下游。
这里上游的subscribe
方法就是ObservableCreate调用的subscribe
方法,实际就是调用ObservableCreate的subscribeActual
方法,接着就是前一篇讲过的流程了。
结论
分析到现在可以得出以下结论
subscribe
方法的调用流程是从下往上的,就是从下游往上游分别调用其subscribe
方法。
为了方便理解,这里我用时序图表示subscribe
的调用顺序,如下图
接收数据流程分析
上面的内容分析了RxJava2基本流程加入操作符后的subscribe
方法的执行顺序,接着就来看下,数据的接收顺序。经过上面的分析可以知道最终调用的是ObservableCreate类的subscribeActual
方法,这里与前一篇文章subscribeActual
方法不同的就是subscribeActual
方法的参数改变了,这里的参数是MapObserver
类的实例,再来看下ObservableCreate类的subscribeActual
方法的源码,如下
//这里的参数实际是MapObserver类的实例 protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { //这句代码的最终调用的就是MapObserver类的onNext方法。 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
一些重要的内容已经在上面的代码中进行了注释,可能这句source.subscribe(parent);
代码,不好理解,这里就来解释一下这句代码,由上一篇文章可知,这里的source
就是示例代码中的new ObservableOnSubscribe()...
实例,这里就是调用了这个实例的subscribe
方法,而这个方法中的代码就是调用了其参数的onNext
方法,**最终调用的就是MapObserver类的onNext方法。**现在,来看下MapObserver类的onNext方法的代码,如下
@Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { downstream.onNext(null); return; } U v; try { //1、 v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } //2、 downstream.onNext(v); }
主要来看下
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.")
这句代码,这句代码中的mapper就是注入到ObservableMap中的成员变量function,详细内容可以查看源码。调用的apply方法,就是示例代码中的这句代码
@Override public String apply(Integer integer) throws Exception { return "我是数字" + integer; }
接着,可以发现又调用了“2”处的代码downstream.onNext(v);
,这句代码中的downstream
就是示例代码中最下游的subscribe
方法中的参数即是下面的代码
new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { Log.e("wizardev", "onNext: " + s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }
所以这句downstream.onNext(v);
调用的就是new Observer()
匿名类中的onNext
方法。关于downstream
的是何时初始化的,可以从MapObserver的父类BasicFuseableObserver类中知晓。
结论
分析到这里,又可以得出一些结论
- 关于数据的处理上从上游到下游一级级的处理的。
- 在MapObserver类中的
onNext
方法,首先调用的是function
中的apply
方法,然后再调用下游的onNext
方法并将处理后的参数传入。
总结
通过分析map
操作符,可以知道订阅方法(subscribe)是从下游到上游进行订阅的,而数据的发射是从上游到下游进行的。这两个特性不仅仅是map
操作符的特性,对其他的操作符同样适用。为了讲清楚这两个特性,本文就选了比较具有代表性的map
操作符,如果想了解其他操作符的原理,就顺着这两个特性分析就行了。
授之以鱼,不如授之以渔。本文的目的就是解释清楚操作符的思想及原理,理解了这种思想及原理,分析其他的操作符也就不在话下了。