关于 Disposable
任何订阅者模式的代码,都需要注意注册与注销的配对出现,否则会出现内存泄漏。
RxJava2 提供了 Disposable
( RxJava1 中是 Subscription
),在适当时机取消订阅、截断数据流。当在 Android 中使用时尤其要注意,避免内存泄露。
private CompositeDisposable compositeDisposable = new CompositeDisposable(); @Override public void onCreate() { compositeDisposable.add(backendApi.loadUser() .subscribe(this::displayUser, this::handleError)); } @Override public void onDestroy() { compositeDisposable.clear(); }
上面例子展示了在 Activity
等 LifecycleOwner 中的一般做法:使用 CompositeDisposable
收集所有的 Disposable 句柄,而后在 onDestroy
中调用 clear
统一注销。
clear
最终调用的是各个 Disposable 的 dispose
方法:
public interface Disposable { void dispose(); boolean isDisposed(); }
当然,除了手动调用 dispose,也有一些自动框架可供使用, 如 RxLifecycle
、uber 的 AutoDispose
等, 但最终都要调用到 Disposable 的 dispose() 。
dispose 实现原理
先看一段代码:
Disposable disposable = Observable.create( (ObservableOnSubscribe<Integer>) observableEmitter -> { for (int i = 1; i <= 3; i++) { observableEmitter.onNext(i); } }) .takeUntil(integer -> integer < 3) .subscribe();
当调用 disposable.dispose();
时,代码如何执行?
先卖个关子,文章最后揭晓答案
Disposable 是一个 Observer
调用 Observable.subscribe(...)
返回的 Disposable 本质是一个 LambdaObserver
public final Disposable subscribe( @NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError, @NonNull Action onComplete) { LambdaObserver<T> ls = new LambdaObserver<>( onNext, onError, onComplete, Functions.emptyConsumer()); subscribe(ls); return ls; //return as a Disposable }
LambdaObserver
集众多接口于一身
public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable
- 首先,是一个 Observer,被
subscribe()
后,通过onNext
发射数据; - 其次,是一个 Disposable,对外提供
dispose
方法; - 最后,通过
AtomicReference
,确保 dispose 线程安全的执行
@Override public void dispose() { DisposableHelper.dispose(this); } public static boolean dispose(AtomicReference<Disposable> field) { Disposable current = field.get(); Disposable d = DISPOSED; if (current != d) { current = field.getAndSet(d); if (current != d) { if (current != null) { current.dispose(); } return true; } } return false; }
原子地设置 DISPOSED
, 确保 AtomicReference
中的 Disposable 的 dispose 一定被调用,有且仅有一次。
onSubscribe 中传递 Disposable
AtomicReference
的 value 是在 Observer.onSubscribe
中被赋值的:
@Override public void onSubscribe(Disposable d) { if (DisposableHelper.setOnce(this, d)) { //设置 value try { onSubscribe.accept(this); } catch (Throwable ex) { ... } } }
那么 Observer.onSubscribe
又是何时被调用呢?
RxJava 的操作符都是一个 Observable 实现。操作符链式调用的本质就创建 Observable 并通过 subscribe 依次订阅。 subscribe 内部会用 subscribeActual ,这是每个操作符都必须实现的方法。
看一下 Observabel.create
的 subscribeActual
:
调用 Observer.onSubscrie()
, 将当前 Disposable 作为 parent 传递给下游
protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<>(observer); // CreateEmitter是一个Diposable observer.onSubscribe(parent); // Observer.onSubscrie() try { source.subscribe(parent); } catch (Throwable ex) { ... } }
Observer 关联上下游
除 create
、subscribe
这样的终端操作符以外,大部分的操作符的 Observer 结构如下:
/** The downstream subscriber. */ protected final Observer<? super R> downstream; /** The upstream subscription. */ protected Disposable upstream; public final void onSubscribe(Disposable d) { ... this.upstream = d; downstream.onSubscribe(this); ... } public void dispose() { upstream.dispose(); }
- Observer 持有上下游对象:upstream 和 downstream
- onSubscribe 向下递归调用
- dipose 向上递归调用
在链式订阅中,向下游订阅 Observer 的同时,也关联了上游的 Disposable(Observer)
我们在最下端调用 subscribe
时,调用链上的 Observer 会建立上下游关联,当我们在下游调用 dispose 时,最终会递归调用到顶端(create)的 dispose
再看takeUntil的例子
根据上述分析,在回顾一下最初 takeUntil
的例子。
前面说过所有的操作符都是 Observable:
takeUntil
对应的Observable:ObservableTakeUntilPredicate
;create
对应的Observable:ObservableCreate
subscribe 调用链如下:
当我们调用 dispose 方法时,通过引用链递会最终调用到 CreateEmitter
的 dispose。
由于 CreateEmitter 将 AtomicReference 的 value 设为 DISPOSED
后续,onNext
中判断状态,当为 DISPOSED 时,数据流停止发射
@Override public void onNext(T t) { if (!isDisposed()) { //是否为DISPOSED observer.onNext(t); } }
关于onComplete
通过下面的测试发现当 onComplete
调用后会,会自动调用 dispose。
@Test public void testDisposed(){ boolean isDisposed = false; TestObserver<Integer> to = Observable.<Integer>create(subscriber -> { subscriber.setDisposable(new Disposable() { @Override public boolean isDisposed() { return isDisposed; } @Override public void dispose() { isDisposed = true; } }); subscriber.onComplete(); }).test(); to.assertComplete(); assertTrue(isDisposed); }
果然,ObservableEmitter
的 onComplete
中调用了 dispose:
public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } }
关于内存泄漏
调用dipose确实可以终止数据流,但是不等于没有内存泄露。
查看 ObservableCreate
的源码可知,dispose只是简单地设置了 DISPOSED 状态,Observe 中关联的上下游对象并没有释放。所以当订阅了静态的 Observable 时,无法避免内存泄漏。
但是当订阅一个 Subject 时,dispose 确实可以有效释放对象,避免内存泄漏:
public void dispose() { if (super.tryDispose()) { parent.remove(this); //对象删除 } }
关于 dispose 的实时性
前面分析知道,对于终端操作符 create
、subscribe
等,其 Observer 在 dispose 时会标记当前状态为 DISPOSED
。但对于其他操作符的 dispose 只是递归向上调用 parent 的 dispose 而已,并没有 DISPOSED 状态的设置,也就不会拦截发射中的数据。
调用dispose后,RxJava数据流不一定会立即停止,大部分操作符在调用 dispose 后,数据依然会发射给下游
关于 dispose 的实时性测试,下文可供参考