我们知道,subscribeOn() 方法通过接收一个 Scheduler 参数,来指定对数据的处理运行在特定的线程调度器 Scheduler 上。若多次执行 subscribeOn() 方法,则只有最初的一次起作用。
subscribeOn() 方法只有第一次调用才有效,需要满足以下条件:
- 被观察者必须是 Cold Observable。
- 被观察者多次调用 subscribeOn() 之后,并不意味着线程只会切换一次,而是线程多次切换之后,最终切换到第一次设置的线程。
所以, subscribeOn() 方法的调用并非一直有效。本文会通过列举一些事例,分析其失效的原因。
一. 创建 Observable 后内部使用了多线程发射数据
使用 RxJava 创建 Observable 后,假如内部使用了多线程发射数据,会带来什么影响呢?
RxJava 会通过 Scheduler、subscribeOn() 来管理线程,但只有在不手动更改线程的情况下,它才会这样做。
通常情况下,RxJava 发射的数据会在同一个线程上,但是稍作一些变化,发射的数据来自不同的线程会怎样呢?
public static void main(String[] args) { Observable.create(emitter -> { emitter.onNext(1); new Thread("main") { @Override public void run() { emitter.onNext(2); emitter.onNext(3); emitter.onNext(4); emitter.onComplete(); } }.start(); }) .subscribeOn(Schedulers.io()) .map(integer -> { log(integer + " - I want this happen on an io thread"); return integer + ""; }) .subscribe(s -> log("Consume: "+s)); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } public static void log(String msg) { System.out.println("Current Thread Name:"+Thread.currentThread().getName() + ", "+ msg); }
执行结果:
Current Thread Name:RxCachedThreadScheduler-1, 1 - I want this happen on an io thread Current Thread Name:RxCachedThreadScheduler-1, Consume: 1 Current Thread Name:main, 2 - I want this happen on an io thread Current Thread Name:main, Consume: 2 Current Thread Name:main, 3 - I want this happen on an io thread Current Thread Name:main, Consume: 3 Current Thread Name:main, 4 - I want this happen on an io thread Current Thread Name:main, Consume: 4
上述执行结果表明,除了 emitter 发射的 1 是在 io 线程中执行的,其余的数字都是在
main 线程中运行的。
一旦 create 操作符中的 emitter 发射了数值,甚至在新的线程发射了值,RxJava 还是会很高兴地接受这些数值并将它们进一步传递给流。此时 RxJava 没有改变线程,是因为 subscribeOn() 方法已经完成了工作,订阅已经在其他线程上进行了。这时,没有理由 RxJava 会再次更改线程。所以,会看到上述的运行结果。
二. Hot Observable 对 subscribeOn() 调用造成的影响
2.1 特殊的创建操作符 just
just 是一个比较“特殊”的创建操作符,just 的作用是将单个数据转换为发射这个单个数据的 Observable。just 类似于 fromXXX,但是 fromXXX 会将数组或 Iterable 的数据取出然后逐个发射,而 just 只是简单地原样发射,将数组或 Iterable 当作单个数据。另外,just 创建的不是一个 Cold Observable。
下面以 just、fromCallable 为例:
public static void main(String[] args) { System.out.println("from Just"); Observable justObservable = Observable.just(new Random().nextInt()); justObservable.subscribe(System.out::println); justObservable.subscribe(System.out::println); System.out.println("\nfrom Callable"); Observable callableObservable = Observable.fromCallable(() -> new Random().nextInt()); callableObservable.subscribe(System.out::println); callableObservable.subscribe(System.out::println); }
执行结果:
from Just 1208207476 1208207476 from Callable 774558265 1432625739
我们不难发现,上述执行结果中 just 操作符创建的 Observable 即使被订阅多次,所产生的值依然保持不变。该值是从 Observable 外部生成的,而 Observable 仅将其存储以供以后使用。
另外,使用 just 操作符时,不需要 subscribe 订阅也会立即执行。
public static void main(String[] args) { System.out.println("from just"); Observable.just(getRandomInteger()); } public static Integer getRandomInteger() { System.out.println("generating Integer"); return new Random().nextInt(); }
执行结果:
from just generating Integer
上述代码,没有进行订阅也执行了打印“generating Integer”,而 Cold Observable 必须使用 subscribe() 才会生效。这就相当于 just 可以立即执行,而 fromCallable 是延迟执行。
通过比较 just 和 fromCallable 操作符,接下来我们可以总结 Hot Observable 和 Cold Observable 之间的区别。
Hot Observable | Cold Observable |
在外部生成 Observable | 在内部生成 Observable |
发生订阅之前创建 Observable | 发生订阅之后创建 Observable |
2.2 just 和 fromCallable 分别调用 subscribeOn() 会怎样?
public static void main(String[] args) { Observable.just(getRandomInteger("from just")) .subscribeOn(Schedulers.io()) .subscribe(s -> log("Consume just: " + s)); Observable.fromCallable(() -> getRandomInteger("from callable")) .subscribeOn(Schedulers.io()) .subscribe(s -> log("Consume fromCallable: " + s)); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } public static Integer getRandomInteger(String prefix) { log(prefix + " generating Integer"); return new Random().nextInt(); } public static void log(String msg) { System.out.println("Current Thread Name:"+Thread.currentThread().getName() + ", "+ msg); }
执行结果:
Current Thread Name:main, from just generating Integer Current Thread Name:RxCachedThreadScheduler-1, Consume just: 147620150 Current Thread Name:RxCachedThreadScheduler-2, from callable generating Integer Current Thread Name:RxCachedThreadScheduler-2, Consume fromCallable: -1120243490
使用 just 操作符时,getRandomInteger() 函数在 main 函数中运行。而使用 fromCallable 时,getRandomInteger() 函数是在 io 线程中运行。
因为 Hot Observable 是在订阅之前就创建了 Observable,所以使用 just 操作符后,getRandomInteger() 函数的调用并没有受到 subscribeOn() 的影响。
当然,在最后 subscribe() 中他们都切换到了 io 线程。
三. Subject 是一种特殊的存在,对 subscribeOn() 调用也会造成影响
我们先来介绍一下什么是 Subject?Subject 和 Processor 的作用是相同的。Processor 是 RxJava 2.x 新增的类,是 Reactive Stream 标准库中的接口,它继承自 Flowable 支持背压控制。而 Subject 则不支持背压控制。
举个 Subject 使用的例子:
Consumer<Long> subscriber1 = new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { System.out.println("subscriber1: "+aLong); } }; Consumer<Long> subscriber2 = new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { System.out.println(" subscriber2: "+aLong); } }; Consumer<Long> subscriber3 = new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { System.out.println(" subscriber3: "+aLong); } }; Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() { @Override public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception { Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation()) .take(Integer.MAX_VALUE) .subscribe(e::onNext); } }).observeOn(Schedulers.newThread()); PublishSubject<Long> subject = PublishSubject.create(); observable.subscribe(subject); subject.subscribe(subscriber1); subject.subscribe(subscriber2); try { Thread.sleep(20L); } catch (InterruptedException e) { e.printStackTrace(); } subject.subscribe(subscriber3); try { Thread.sleep(100L); } catch (InterruptedException e) { e.printStackTrace(); }
执行结果:
subscriber1: 0 subscriber2: 0 subscriber1: 1 subscriber2: 1 subscriber1: 2 subscriber2: 2 subscriber3: 2 subscriber1: 3 subscriber2: 3 subscriber3: 3 subscriber1: 4 subscriber2: 4 subscriber3: 4 subscriber1: 5 subscriber2: 5 subscriber3: 5 subscriber1: 6 subscriber2: 6 subscriber3: 6 subscriber1: 7 subscriber2: 7 subscriber3: 7 subscriber1: 8 subscriber2: 8 subscriber3: 8 subscriber1: 9 subscriber2: 9 subscriber3: 9 subscriber1: 10 subscriber2: 10 subscriber3: 10 subscriber1: 11 subscriber2: 11 subscriber3: 11
可以看到,多个订阅的 subscriber(或者说观察者)共享同一事件。
Subject 的特殊性在于它既是 Observable 又是 Observer(Subscriber)。从 Subject 的源码上看到,继承自 Observable 实现 Observer。
当 Subject 作为观察者时,它可以订阅目标 Cold Observable 使对方开始发送事件。同时它又作为 Observable 转发或者发送新的事件,让 Cold Observable 借助 Subject 转换为 Hot Observable。
Subject 并不是线程安全的,如果想要其线程安全需要调用
toSerialized()
方法。(在 RxJava 1.x 的时代还可以用 SerializedSubject 代替 Subject,但是在 RxJava 2.x 以后 SerializedSubject 不再是一个 public class)
RxJava 的官网称 Subject 可以看成是一个桥梁或者代理。Subject 包含四种类型分别是 AsyncSubject、BehaviorSubject、ReplaySubject 和 PublishSubject。
用一句话分别介绍四种 Subject 的特性:
Subject | 发射行为 |
AsyncSubject | 不论订阅发生在什么时候,只会发射最后一个数据 |
BehaviorSubject | 发送订阅之前一个数据和订阅之后的全部数据 |
ReplaySubject | 不论订阅发生在什么时候,都发射全部数据 |
PublishSubject | 发送订阅之后的全部数据 |
下面我们以 2 两种常见的 Subject 为例,来看看他们调用 subscribeOn() 方法后会不会起作用?
3.1 PublishSubject
Observer 只接收 PublishSubject 被订阅之后发送的数据。如果 PublishSubject 在订阅之前,已经执行了 onComplete() 方法,则无法发射数据。
下面的例子使用 PublishSubject 创建一个数据流,稍后向其发送值。 PublishSubject 使用 subscribeOn() 切换到不同的线程池,并不会起作用。
public static void main(String[] args) { PublishSubject<Integer> subject = PublishSubject.create(); subject.subscribeOn(Schedulers.io()) .doOnNext(i-> log("value: "+ i+" - I want this happen on an io thread")).subscribe(); subject.subscribeOn(Schedulers.newThread()) .doOnNext(i-> log("value: "+ i+" - I want this happen on a new thread")).subscribe(); subject.subscribeOn(Schedulers.computation()) .doOnNext(i-> log("value: "+ i+" - I want this happen on a computation thread")).subscribe(); try { Thread.sleep(20); subject.onNext(1); Thread.sleep(20); subject.onNext(2); Thread.sleep(20); subject.onNext(3); Thread.sleep(20); subject.onComplete(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void log(String msg) { System.out.println("Current Thread Name:"+Thread.currentThread().getName() + ", "+ msg); }
执行结果:
Current Thread Name:main, value: 1 - I want this happen on an io thread Current Thread Name:main, value: 1 - I want this happen on a new thread Current Thread Name:main, value: 1 - I want this happen on a computation thread Current Thread Name:main, value: 2 - I want this happen on an io thread Current Thread Name:main, value: 2 - I want this happen on a new thread Current Thread Name:main, value: 2 - I want this happen on a computation thread Current Thread Name:main, value: 3 - I want this happen on an io thread Current Thread Name:main, value: 3 - I want this happen on a new thread Current Thread Name:main, value: 3 - I want this happen on a computation thread
从执行结果上 subscribeOn() 并没有起作用,所有的操作都是在主线程中运行。如果想达到切换线程的效果,需要让 Subject 使用 observeOn() 替换 subscribeOn() 。
3.2 BehaviorSubject
Observer 会接收到 BehaviorSubject 被订阅之前的最后一个数据,再接收订阅之后发射过来的数据。如果 BehaviorSubject 被订阅之前没有发送任何数据,则会发送一个默认数据。
由于 BehaviorSubject 的这个特性,subscribeOn() 变得很微妙了,它可能会影响到线程的切换。例如:
public static void main(String[] args) { BehaviorSubject<Integer> subject = BehaviorSubject.create(); subject.subscribeOn(Schedulers.io()) .doOnNext(i-> log("value: "+ i+" - I want this happen on an io thread")).subscribe(); subject.onNext(1); subject.subscribeOn(Schedulers.newThread()) .doOnNext(i-> log("value: "+ i+" - I want this happen on a new thread")).subscribe(); subject.subscribeOn(Schedulers.computation()) .doOnNext(i-> log("value: "+ i+" - I want this happen on a computation thread")).subscribe(); try { Thread.sleep(20); subject.onNext(2); Thread.sleep(20); subject.onNext(3); Thread.sleep(20); subject.onComplete(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void log(String msg) { System.out.println("Current Thread Name:"+Thread.currentThread().getName() + ", "+ msg); }
执行结果:
Current Thread Name:main, value: 1 - I want this happen on an io thread Current Thread Name:RxNewThreadScheduler-1, value: 1 - I want this happen on a new thread Current Thread Name:RxComputationThreadPool-1, value: 1 - I want this happen on a computation thread Current Thread Name:main, value: 2 - I want this happen on an io thread Current Thread Name:main, value: 2 - I want this happen on a new thread Current Thread Name:main, value: 2 - I want this happen on a computation thread Current Thread Name:main, value: 3 - I want this happen on an io thread Current Thread Name:main, value: 3 - I want this happen on a new thread Current Thread Name:main, value: 3 - I want this happen on a computation thread
从执行结果上看,这段代码在3个线程上运行。
当我们的 subject 发射第一个值时,第一个观察者已经被订阅。由于订阅代码在我们调用 onNext() 时已经完成,因此订阅调度程序没有任何作用。在这种情况下,当我们调用 onNext() 它类似于 PublishSubject 的工作方式。
第二和第三个观察者都在初始 onNext() 之后订阅。这是 BehaviorSubject 特性,对于任何新的订阅,它将重播最后一个发射的数据。因此,对于这两个观察者来说,BehaviorSubject 已缓存了这个发射的值(1),并将其作为预订的一部分发出。这样,将尊重订阅调度程序,并在它提供的线程上通知观察者。
所有后续的发射的值都发生在订阅之后,因此,值再次与 onNext() 在同一线程上发出,类似于 PublishSubject 的工作方式。
四. timer、interval 等函数其实有默认的 Scheduler
RxJava 的某些操作符,例如:timer、interval、buffer、debounce、delay 等都支持 Scheduler ,例如:
@CheckReturnValue @SchedulerSupport(SchedulerSupport.COMPUTATION) @NonNull public static Observable<Long> timer(long delay, @NonNull TimeUnit unit) { return timer(delay, unit, Schedulers.computation()); } @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) @NonNull public static Observable<Long> timer(long delay, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableTimer(Math.max(delay, 0L), unit, scheduler)); }
通常情况下,我们不指定 Scheduler,就是使用默认的 ComputationScheduler,所以即使 subscribeOn() 指定了某个 Scheduler,也不会起作用。
public static void main(String[] args) { Observable.timer(5, TimeUnit.MILLISECONDS) .subscribeOn(Schedulers.io()) .subscribe(s -> log("Consume: " + s)); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } public static void log(String msg) { System.out.println("Current Thread Name:"+Thread.currentThread().getName() + ", "+ msg); }
执行结果:
Current Thread Name:RxComputationThreadPool-1, Consume: 0
因此,执行的结果运行在 computation() 线程上也不奇怪。
只有,在 timer 函数中指定 Scheduler,观察者才会运行在相应的线程。
public static void main(String[] args) { Observable.timer(5, TimeUnit.MILLISECONDS,Schedulers.io()) .subscribe(s -> log("Consume: " + s)); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } public static void log(String msg) { System.out.println("Current Thread Name:"+Thread.currentThread().getName() + ", "+ msg); }
五. 总结
RxJava 用好不易,很多东西需要深究其源码。
本文介绍了几种方式,RxJava 即使调用了 subscribeOn() 方法,线程切换也不会起作用。任何细微使用线程切换的地方,都需要非常注意。虽然有时它不会使应用程序奔溃,但是它们可能会造成一些意外的结果。尚书有云:“不矜细行,终累大德。为山九仞,功亏一篑”。我们在学习和使用过程中也不能忽视任何细节。