前言
RxJava已经出现很多个年头了,但是依然被很多公司使用,如果现在还对RxJava了解的不够透彻, 可以看这个系列对它的分析:相信看完后你对它会有个更全面的认识。 这个系列主要从下面几个方面来讲解: RxJava基本操作符使用 RxJava响应式编程是如何实现的 RxJava的背压机制及Flowable是如何实现背压的 **RxJava的线程切换原理
关于RxJava的其他系列文章,可以点击下方链接
面试官:RxJava是如何做到响应式编程的?
今天我从源码角度来分析下Rxjava的订阅和事件分发和线程切换过程,看他是如何实现响应式编程的
我们先放一张网络上找到的神图:可以结合源码分析仔细品味
简介:
对于源码分析:我们从一段代码来分析 下面是一段经典代码:
Disposable mDisposable = Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Object> emitter) throws Exception {
emitter.onNext("1");
emitter.onComplete();
}
}).map(new Function<Object, Object>() {
@Override
public Object apply(@NonNull Object o) throws Exception {
return null;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
}
});
mDisposable.dispose();
下面我们将其分为5个步骤进行分析:
1.创建Observable 2.订阅Observer 3.订阅事件线程切换 4.分发事件线程切换 5.解除订阅dispose
过程源码分析
1.创建Observable
1.Observable.create:-->new ObservableCreate<T>(source)
//上面代码创建了一个ObservableCreate 继承Observable
2.Observable.create.map(fun)-->new ObservableMap<T, R>(this, mapper),
//也继承Observable
//这里的this指的是第一步创建的ObservableCreate
//此时第二步获取的ObservableMap持有第一步的ObservableCreate
3.Observable.create.map(fun).subscribeOn(io)-->new ObservableSubscribeOn<T>(this, scheduler)
//同步骤2,这里的this指的是步骤2中的ObservableMap,此时持有第二步中的ObservableMap对象
4.Observable.create.map(fun).subscribeOn(io).observerOn(io)-->new ObservableObserveOn<T>(this, scheduler)
//同步骤3,这里的this指的是步骤3中的ObservableSubscribeOn,此时持有第3步中的ObservableSubscribeOn对象
- 根据以上分析可以看出在调用subscribe提交被观察者之前,都是对前面的被观察者的封装,使用的是装饰器模式
- 整个链路使用的是责任链模式。按责任划分可以理解为观察者Observer和被观察者Observable模式
下面是Observable装饰的过程
内部关系:
An(A(n-1)(A(n-2)(..A2(A1))):
2.订阅Observer
1.调用subscribe方法的时候:subscribe(onNext)->subscribe(LambdaObserver)->subscribeActual(LambdaObserver); 这里回调的是第n个被观察者的subscribeActual,假设按上面创建流程则上一个被观察者是ObservableObserveOn
ObservableObserveOn.java
protected void subscribeActual(Observer<? super T> observer) {//记住这里的observer是LambdaObserver
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
这里回调ObservableSubscribeOn的subscribeActual方法,传入的是下游的ObserveOnObserver
2.继续来看2.ObservableSubscribeOn.subscribeActual(B)()->
ObservableSubscribeOn.java
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
- 可以看到这里对传下来的LambdaObserver做了一层封装到SubscribeOnObserver对象parent中
这里调用了scheduler.scheduleDirect(new SubscribeTask(parent)),其实是线程切换操作,具体操作到SubscribeTask的run方法
SubscribeTask.java public void run() { source.subscribe(parent); }
这里的source是上游的被观察者ObservableMap-->
ObservableMap.java
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
一样将下游传过来SubscribeOnObserver封装到了MapObserver中 同理这里的source是上游的ObservableCreate,
ObservableCreate.java
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);//这里是订阅的时候返回的dispose参数,可以使用他终止下面的流程
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
一样将下游传过来MapObserver封装到了CreateEmitter中
这里调用的source就是最初的新建的ObservableOnSubscribe
ObservableOnSubscribe.java public void subscribe(@NonNull ObservableEmitter<Object> emitter) throws Exception { emitter.onNext("1"); emitter.onComplete(); }
emitter为下游的CreateEmitter 上面分析可知,每层也是对下游传过来的观察者Observer进行了封装,和创建Observable类似
使用下面连接方式表示观察者之间的关系:
B1(B2(B3(B4(..Bn-1(Bn-))))) //这里的B1=LambdaObserver
3.订阅事件线程切换
在调用subscribeOn创建的ObservableSubscribeOn中:
@Override
public void subscribeActual(final Observer<? super T> s) {
...
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
这里调用了scheduler.scheduleDirect,其实就是开启一个线程池进行处理,切换到指定的线程中,所以可以看出,之后调用的
ObservableMap的subscribe操作其实都是运行在这个切换后的线程中
直到遇到调用observerOn操作。下面在onNext线程中分析
来看分发事件onNext操作:
//在订阅步骤中走到了emitter.onNext("1");
CreateEmitter.java
public void onNext(T t) {
...
if (!isDisposed()) {
observer.onNext(t);
}
}
//调用了observer.onNext,这里的observer是下游的MapObserver
MapObserver.java
public void onNext(T t) {
...
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
...
}
actual.onNext(v);
}
//调用了传入的function,将返回值做了新的值传入下一个观察者中,
//由2中可知,下个Observer 是SubscribeOnObserver
SubscribeOnObserver.java
public void onNext(T t) {
actual.onNext(t);
}
//直接调用下游的Observer,因为任务主要是切换订阅线程
//这里的下游观察者是:ObserveOnObserver,注意这个方法是调用了observerOn指定分发线程后订阅的观察者,前面分析中没有这个,是为了减少分析步骤
//ObserveOnObserver留到下个事件分发线程切换来讲解
下面图表示了线程之间切换操作关系:
3.分发事件线程切换
继续3中的分析
ObserveOnObserver.java
public void onNext(T t) {
...
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);//这里将异步操作放到queue中,可以引出背压的概念,缓存事件用
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
这里根据worker对象,切换到对应的线程执行onNext操作
下面的onNext操作都会执行在这个线程上,直到再次调用obServerOn也会切换到新的线程
- 通过上面的分析可知,observerOn可以执行多次
下面是源码内部线程切换过程:
5.解除订阅dispose
调用mDisposable.dispose()->
根据之前分析这个mDisposable是最后执行subscribe操作中创建的LambdaObserver
Observable.java
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
...
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
...
return ls;
}
LambdaObserver.java
public void dispose() {
DisposableHelper.dispose(this);
}
DisposableHelper.java
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
将field置为DISPOSED:解除订阅关系
这里的current = 上游传下来的dispose:依次向上游调用dispose解除订阅
注意dispose也是用了流模式,每个下游的dispose都持有上游的dispose引用,这个dispose表示每层的观察者Observer
最后来看下Rxjava内部线程池的实现方式
static final class SingleHolder {
static final Scheduler DEFAULT = new SingleScheduler();
}
static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
- 在Schedulers内部创建了几个Scheduler的实现对象, 根据传入的线程方法类型,返回对应的Scheduler对象,如IOScheduler,内部会创建一个CachedWorkerPool对象,而CachedWorkerPool内部有个get方法会创建一个ThreadWorker,后面会用到
在订阅的时候,会调用scheduler.scheduleDirect(new SubscribeTask(parent),这个scheduler = IoScheduler
执行父类Scheduler的scheduleDirect
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();//1
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);//2
w.schedule(task, delay, unit);//3
return task;
}
1.这里创建了一个Worker:
IoScheduler.java
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
2.将源task封装到一个新的DisposeTask中
3.调用1中创建的EventLoopWorker的schedule方法
EventLoopWorker.java
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);//5
}
4.这个threadWorker是在EventLoopWorker构造方法中生成
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
//来看CachedWorkerPool的get方法
CachedWorkerPool.java
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
//可以看到创建了一个ThreadWorker,跟踪ThreadWorker的构造方法
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
//这里直接执行父类NewThreadWorker的构造方法
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);//6
}
//可以看到这里创建了一个SchedulerThreadPool的线程池,周期性线程池,5中会用到
5.继续3中的操作调用ThreadWorker的scheduleActual
这个方法在父类NewThreadWorker中实现了
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
...
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
...
}
return sr;
}
- 执行了executor.submit方法:executor在4中分析过是SchedulerPoolFactory
- 调用其submit接口,将任务放到线程池中去执行,这样就实现了线程切换操作
同理调用observerOn接口设置next线程切换也是用到一个道理,这里就不再描述
线程池原理:
总结
以上就是我对RxJava源码内部流程和结构的分析,希望对大家有帮助
喜欢的关注下,后期会推出更多Android涉及的各方面知识。
关于RxJava的其他系列文章,可以点击下方链接
面试官:RxJava是如何做到响应式编程的?