前言:经过前面两篇文章对RxJava2源码的分析,我们已经对RxJava2的基本流程及操作符的原理有了一定程度的认识。这篇文章将在前面两篇文章的基础上,对RxJava2的线程调度进行分析,建议先阅读前面两篇的文章,再阅读本文。
注:文章内容过多,建议在空闲时阅读。
相关文章
示例代码
为了更好的理解RxJava2的线程调度原理,不被其他的代码所干扰,这里就只贴出与线程调度有关的代码,如下
private void threadScheduleCode() { Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.e("wizardev", "上游所在的线程: "+Thread.currentThread().getName()); Thread.sleep(2*1000); emitter.onNext("wizardev"); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.e("wizardev", "onSubscribe: "+Thread.currentThread().getName() ); } @Override public void onNext(String s) { Log.e("wizardev", "接收到上游发射的数据为: " + s); Log.e("wizardev", "下游所在的线程: "+ Thread.currentThread().getName()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }
可以看下执行这段代码后打印的日志,如下
可以发现上游和下游确实不在同一个线程中,那么RxJava2是怎么进行线程切换的呢?想知道答案,请继续阅读本文。
本文要解决的问题
本文要解决的问题其实就一个,就是RxJava2是如何进行线程调度的?但是,围绕着这个问题又会有两个小的问题需要解决:
- subscribeOn是怎样将要处理的数据放到到工作线程的?
- observeOn是怎样将工作线程切换到主线程的?
为了能够更容易理解线程调度的原理,这里对源码分析的顺序将会按照代码的执行顺序进行分析。
subscribeOn方法分析
因为前面的文章已经分析过了create
方法,所以就直接分析subscribeOn
这个方法,直接上源码,如下
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
有了前面分析源码的经验,可以知道,subscribeOn
方法其实就是返回了ObservableSubscribeOn类的实例并将上游的ObservableCreate和subscribeOn
方法的参数注入到了它的构造方法中。 继续看下ObservableSubscribeOn类的源码,如下
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } //... //省略部分源码 }
从源码中可以看到,这里分别将ObservableCreate类的实例以及subscribeOn
方法的参数即Schedulers.io()
作为了ObservableSubscribeOn类的成员变量。 好了,上面的这些就是执行
subscribeOn(Schedulers.io())
这句代码所做的事情了,下面来看下
observeOn(AndroidSchedulers.mainThread())
这句代码所做的事情。