你是否了解 RxJava 的 Disposable ?

简介: 你是否了解 RxJava 的 Disposable ?

关于 Disposable

任何订阅者模式的代码,都需要注意注册注销的配对出现,否则会出现内存泄漏。

RxJava2 提供了 Disposable( RxJava1 中是 Subscription),在适当时机取消订阅、截断数据流。当在 Android 中使用时尤其要注意,避免内存泄露。

private CompositeDisposable compositeDisposable = new CompositeDisposable();
@Override 
public void onCreate() {
    compositeDisposable.add(backendApi.loadUser()
      .subscribe(this::displayUser, this::handleError));
}
@Override public void onDestroy() {
    compositeDisposable.clear();
}

上面例子展示了在 Activity 等 LifecycleOwner 中的一般做法:使用 CompositeDisposable 收集所有的 Disposable 句柄,而后在 onDestroy 中调用 clear 统一注销。

clear 最终调用的是各个 Disposable 的 dispose 方法:

public interface Disposable {
  void dispose();
  boolean isDisposed();
}

当然,除了手动调用 dispose,也有一些自动框架可供使用, 如 RxLifecycle 、uber 的 AutoDispose 等, 但最终都要调用到 Disposable 的 dispose() 。


dispose 实现原理

先看一段代码:

Disposable disposable = Observable.create(
  (ObservableOnSubscribe<Integer>) observableEmitter -> {
    for (int i = 1; i <= 3; i++) {
      observableEmitter.onNext(i);
    }
  })
  .takeUntil(integer -> integer < 3)
  .subscribe();

当调用 disposable.dispose(); 时,代码如何执行?

先卖个关子,文章最后揭晓答案

Disposable 是一个 Observer

调用 Observable.subscribe(...) 返回的 Disposable 本质是一个 LambdaObserver

public final Disposable subscribe(
    @NonNull Consumer<? super T> onNext,
    @NonNull Consumer<? super Throwable> onError,
    @NonNull Action onComplete) {
  LambdaObserver<T> ls = new LambdaObserver<>(
      onNext, onError, onComplete,
      Functions.emptyConsumer());
  subscribe(ls);
  return ls; //return as a Disposable 
}

LambdaObserver 集众多接口于一身

public final class LambdaObserver<T> extends AtomicReference<Disposable>
        implements Observer<T>, Disposable
  • 首先,是一个 Observer,被subscribe()后,通过onNext发射数据;
  • 其次,是一个 Disposable,对外提供 dispose 方法;
  • 最后,通过 AtomicReference,确保 dispose 线程安全的执行
@Override
public void dispose() {
  DisposableHelper.dispose(this);
}
public static boolean dispose(AtomicReference<Disposable> field) {
    Disposable current = field.get();
    Disposable d = DISPOSED;
    if (current != d) {
        current = field.getAndSet(d);
        if (current != d) {
            if (current != null) {
                current.dispose();
            }
            return true;
        }
    }
    return false;
}

原子地设置 DISPOSED, 确保 AtomicReference 中的 Disposable 的 dispose 一定被调用,有且仅有一次。

onSubscribe 中传递 Disposable

AtomicReference 的 value 是在 Observer.onSubscribe 中被赋值的:


@Override
public void onSubscribe(Disposable d) {
  if (DisposableHelper.setOnce(this, d)) { //设置 value
    try {
      onSubscribe.accept(this);
    } catch (Throwable ex) {
                ...
    }
  }
}

那么 Observer.onSubscribe 又是何时被调用呢?

RxJava 的操作符都是一个 Observable 实现。操作符链式调用的本质就创建 Observable 并通过 subscribe 依次订阅。 subscribe 内部会用 subscribeActual ,这是每个操作符都必须实现的方法。

看一下 Observabel.createsubscribeActual

调用 Observer.onSubscrie(), 将当前 Disposable 作为 parent 传递给下游

protected void subscribeActual(Observer<? super T> observer) {
  CreateEmitter<T> parent = new CreateEmitter<>(observer); // CreateEmitter是一个Diposable
  observer.onSubscribe(parent); // Observer.onSubscrie()
  try {
        source.subscribe(parent);
  } catch (Throwable ex) {
        ...
    }
}

Observer 关联上下游

createsubscribe 这样的终端操作符以外,大部分的操作符的 Observer 结构如下:

/** The downstream subscriber. */
protected final Observer<? super R> downstream;
/** The upstream subscription. */
protected Disposable upstream;
public final void onSubscribe(Disposable d) {
  ...
  this.upstream = d;
  downstream.onSubscribe(this);
  ...
}
public void dispose() {
  upstream.dispose();
}
  • Observer 持有上下游对象:upstream 和 downstream
  • onSubscribe 向下递归调用
  • dipose 向上递归调用

在链式订阅中,向下游订阅 Observer 的同时,也关联了上游的 Disposable(Observer)

我们在最下端调用 subscribe 时,调用链上的 Observer 会建立上下游关联,当我们在下游调用 dispose 时,最终会递归调用到顶端(create)的 dispose

再看takeUntil的例子

根据上述分析,在回顾一下最初 takeUntil 的例子。

前面说过所有的操作符都是 Observable:

  • takeUntil 对应的Observable: ObservableTakeUntilPredicate;
  • create 对应的Observable: ObservableCreate

subscribe 调用链如下:

image.png

当我们调用 dispose 方法时,通过引用链递会最终调用到 CreateEmitter 的 dispose。

由于 CreateEmitter 将 AtomicReference 的 value 设为 DISPOSED 后续,onNext 中判断状态,当为 DISPOSED 时,数据流停止发射

@Override
public void onNext(T t) {
    if (!isDisposed()) { //是否为DISPOSED
        observer.onNext(t);
    }
}

关于onComplete

通过下面的测试发现当 onComplete 调用后会,会自动调用 dispose。

@Test 
public void testDisposed(){
  boolean isDisposed = false;
    TestObserver<Integer> to = Observable.<Integer>create(subscriber -> {
        subscriber.setDisposable(new Disposable() {
            @Override
            public boolean isDisposed() {
                return isDisposed;
            }
            @Override
            public void dispose() {
                isDisposed = true;
            }
        });
        subscriber.onComplete();
    }).test();
    to.assertComplete();
    assertTrue(isDisposed);
}

果然,ObservableEmitteronComplete 中调用了 dispose:

 public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

关于内存泄漏

调用dipose确实可以终止数据流,但是不等于没有内存泄露。

查看 ObservableCreate 的源码可知,dispose只是简单地设置了 DISPOSED 状态,Observe 中关联的上下游对象并没有释放。所以当订阅了静态的 Observable 时,无法避免内存泄漏。

但是当订阅一个 Subject 时,dispose 确实可以有效释放对象,避免内存泄漏:

public void dispose() {
  if (super.tryDispose()) {
    parent.remove(this); //对象删除
  }
}

关于 dispose 的实时性

前面分析知道,对于终端操作符 createsubscribe 等,其 Observer 在 dispose 时会标记当前状态为 DISPOSED。但对于其他操作符的 dispose 只是递归向上调用 parent 的 dispose 而已,并没有 DISPOSED 状态的设置,也就不会拦截发射中的数据。

调用dispose后,RxJava数据流不一定会立即停止,大部分操作符在调用 dispose 后,数据依然会发射给下游

关于 dispose 的实时性测试,下文可供参考

medium.com/stepstone-t…

目录
相关文章
|
7月前
|
缓存 调度 数据库
RxJava基础操作符和高级操作符
RxJava基于观察者模式,通过Observable、Observer、Subscription和Disposable四大核心实现响应式编程。支持冷热数据流,提供丰富操作符进行变换、过滤与组合,并借助Scheduler实现线程调度,Flowable解决背压问题,Subject用于状态共享,适用于异步事件处理与复杂数据流管理。
286 0
|
Java Android开发
Android系统 获取用户最后操作时间回调实现和原理分析
Android系统 获取用户最后操作时间回调实现和原理分析
679 0
|
大数据 Java C语言
什么是CPU密集型、IO密集型?
CPU密集型(CPU-bound) CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。
4677 0
|
人工智能 自然语言处理 IDE
通义灵码--我的编程好伙伴,让我晋升编程大神,从安装到使用
作为一名大数据开发工程师,我在编程过程中使用了通义灵码的@workspace与@terminal工具,这些工具极大地提高了我的工作效率,使我在处理新项目代码和实现新需求时更加得心应手。通过这些工具,我能够在不离开IDE的情况下快速解决问题,生成代码,优化现有代码,并通过智能问答功能获取即时帮助。通义灵码不仅支持多种主流编程语言,还提供了丰富的功能,如代码生成、单元测试生成、代码优化等,显著提升了我的编程体验。强烈推荐给所有希望提高编程效率的开发者。
|
算法 Java Android开发
Android rxjava和LiveData中的内存泄漏
Android rxjava和LiveData中的内存泄漏
487 0
|
网络协议 网络安全 API
Http和Socks的区别?
HTTP 和 SOCKS 协议各有其优势和应用场景。在选择使用哪种协议时,应根据具体需求和应用环境做出决定。HTTP 适用于 Web 服务相关的通信,而 SOCKS 则更适用于需要通用代理功能和复杂网络环境的场景。了解它们的区别和特点,有助于在不同的网络应用中做出最佳选择。
819 1
|
JavaScript API
vue3父子组件相互调用方法详解
vue3父子组件相互调用方法详解
10075 10
|
人工智能 算法 C++
C++在游戏开发中的应用与挑战
C++在游戏开发中扮演关键角色,常用于引擎开发、游戏逻辑与算法实现,以及跨平台和网络游戏开发。尽管面临学习曲线陡峭、内存管理复杂、跨平台兼容性及与其他技术集成的挑战,但通过学习和掌握现代C++特性,开发者能创造高效优质的游戏。
|
安全
什么是跨域,为什么会跨域?
什么是跨域,为什么会跨域?
917 1