已经超过一个月没有写文章了,原因无非就是工作太忙。最近终于恢复以前的节奏,任务开始正常了起来。忙里偷闲,写一写人们写烂了的RxJava。这篇文章主要分析RxJava事件的产生以及变化的原理,Ok,let's go!
0. 前言
本次源码分析使用的是RxJava2,版本2.1.14。RxJava1和RxJava2区别还是很大的,今天去github上看了下,RxJava1在三月底就停止更新了。
1. 关于RxJava
套用扔物线大神的话就是:a library for composing asynchronous and event-based programs by using observable sequences.(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)
。就简单说这句吧,详细还是看大神的文章吧。
2. Observable创建
在使用RxJava时,通常使用Observable.just(T t...)/Observable.create(ObservableOnSubscribe<T> source)
来创建Observable
对象,看其源码如下:
Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// 判空
ObjectHelper.requireNonNull(source, "source is null");
// 通过RxJavaPlugins
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
RxJavaPlugins.java
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
// 这个与hook有关,默认为null,所以返回值即传入值
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
整个创建过程其实就是返回了一个ObservableCreate
对象,该对象如下:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 代理模式的运用
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
// 调用了ObservableOnSubscribe的subscribe方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}
ObservableCreate
作为Observable
的子类,实现了其subscribeActual
方法,创建时需要传入接口ObservableOnSubscribe
,该接口作为连接了观察者与被观察者。通过实现其subscribe
方法,来通知观察者事件发生。
当然,我们也可以看到在subscribeActual
方法中通过使用CreateEmitter
作为观察者的代理类,用于控制观察这事件是否需要通知。
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
// 传入值不能为null
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 如果没有被处理则调用
if (!isDisposed()) {
observer.onNext(t);
}
}
......
}
CreateEmitter
作为Observer
的代理类,通知观察者时需要判断传入参数是否为空以及是否被处理来判断是否调用Observer
的onNext(T t)
方法。
同理,我们也可以知道通过just(T... t)
也是通过这种方式创建,这里不详细讲了。
3. subscribe订阅
订阅方法作为观察者和被观察者连接的桥梁,通过该方法,我们可以获得被观察者的事件发送,以及接受该事件做出的响应。
Observable.java
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// hook相关略过
observer = RxJavaPlugins.onSubscribe(this, observer);
// 判空
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// 调用了Observable中的抽象方法subscribeActual,该方法的在ObservableCreate中实现
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
......
throw npe;
}
}
订阅方法更加的简单,调用了其抽象方法subscribeActual
,该方法在ObservableCreate
中实现。我们再看下subscribeActual
代码:
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 代理模式的运用
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
// 调用了ObservableOnSubscribe的subscribe方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
到这里整个事件发送就完全走通了:
- 通过
create
方法创建Observable
,传入匿名内部类ObservableOnSubscribe
。 - 调用
ObservableOnSubscribe
的subscribe
方法,通过实现类中来发送数据。 - 通过
Observable
的subscribe
方法来将Observer
订阅,在ObservableOnSubscribe
调用了Observer
的代理类CreateEmitter
的方法来通知Observer
的事件。
emmm,用更简单的话说就是ObservableOnSubscribe
的subscribe
方法调用了接口Observer
的方法。
4. map变换
前面讲了关于Observable
的创建以及事件的发送,这些都只是基本操作。RxJava的强大是对事件流的各种操作,比如过滤、变化以及线程切换等。这里我们看看我们常用的map(Function<? super T, ? extends R> mapper)
来分析一下吧!
Observable.java
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
// 创建了ObservableMap,这里作为一个新的Observable对象返回,返回的是转换后的泛型R
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
ObservableMap.java
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
// 创建过程是需要传入第一个Observable以及一个Function接口
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
// 调用subscribe时需要重写的方法
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
// 转换的Observer
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
// 需要将实际的Observer传入
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
......
U v;
try {
// 这里调用了Function的apply方法,将T转成U
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 将转换后的U发送给Observer接收
actual.onNext(v);
}
......
}
}
OK,源码就这些,代码量很少,理解起来也很容易。这里再总结下:
- 创建新的
Observable
对象ObservableMap
,并将当前Observable
对象传入。 - 调用新的
Observable
对象的subscribe
方法,在ObservableMap
中调用source.subscribe(new MapObserver<T, U>(t, function))
。 -
MapObserver
实现了Observer
接口,在其onNext
方法中调用了Function
的apply()
方法,并且最终调用传入的Observer
的onNext()
方法。
如下图所示:
5. 线程切换
RxJava的强大之处不只是体现在各个操作符,也体现在线程的切换。RxJava默认工作在当前线程中,如果需要发送事件产生在新的线程,接收并处理事件在另一个线程怎么办?RxJava给我们提供了两个方法subscribeOn()
和observerOn()
。subscribeOn()
是指事件产生的线程,该方法只会在第一次设置有效。而observerOn()
是指定事件处理的的线程,每调用一次就会切换线程。下面还是分开来说吧。
5.1 subscribeOn
分析
上面说到subscribeOn
作用于事件产生,并且只有第一个设置有效,我我们分析下源码:
Observable.java
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
ObservableSubscribeOn.java
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
// 这里还是老套路
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
// 新的Observer,最为原始Observer的代理
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
......
@Override
public void onNext(T t) {
actual.onNext(t);
}
......
}
// 创建了subscribe任务,这里实现了runnable接口
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// source是上面传入的Observable对象
source.subscribe(parent);
}
}
}
上面的步骤已经很熟悉了,通过该方法创建了一个新的Observable
对象,最终执行subscribe()
方法时调用了scheduler.scheduleDirect(new SubscribeTask(parent))
,SubscribeTask
作为Runnable
的实现类,所以我们最终会调用run()
方法。其实到这里我们也就清楚为什么subscribeOn
只有第一个设置有效了——不管创建多少个新的Observable
对象,最终还会调用第一个Observable
对象的subscribe
方法,而该方法工作在该线程中!
我们接着看到底是怎么进行线程切换的(这里使用Schedulers.newThread()
)为例:
Schedulers.java
@NonNull
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
static {
......
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
NewThreadScheduler.java
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
NewThreadWorker.java
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
// 线程池,可以实现循环或延迟任务。
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
// 创建方法
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable run) {
return schedule(run, 0, null);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 创建一个新的Runnable,最终仍然调用传入的run方法,这里不去看了
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
......
}
通过上面的代码,我们可以看到最终通过线程池调用SubscribeTask
的run
方法,从而达到线程切换的目的。
5.2 observerOn
实现
observerOn
实现关于observerOn
的实现其实和subscribeOn
很像,只不过一个是在新的线程中调用source.subscribe(parent)
方法;而observerOn
是在新的线程调用相应的onNext\onError
等方法,所以我们没调用一次observerOn
就会切换一次线程,并且下面的操作都会工作在紧挨着该操作的observerOn
所指定的线程中。
6. 总结
RxJava的基本原理就是这些,RxJava使用的较多的模式就是代理模式。整个源码不难(没有包括背压以及高级用法),用心看仔细梳理就能很好理解。我所认为的理解可能就是看懂了,然后可以根据它来仿写出一个相似的RxJava,这样我觉得是真正的掌握了。RxJava, Out!