Cold Observable 和 Hot Observable

简介: Cold Observable 和 Hot Observable

Observable的分类



Observable 有 Cold 和 Hot 之分。


image.png

hot&cold observable.jpg


Hot Observable 无论有没有 Subscriber 订阅,事件始终都会发生。当 Hot Observable 有多个订阅者时,Hot Observable 与订阅者们的关系是一对多的关系,可以与多个订阅者共享信息。


然而,Cold Observable 只有 Subscriber 订阅时,才开始执行发射数据流的代码。并且 Cold Observable 和 Subscriber 只能是一对一的关系,当有多个不同的订阅者时,消息是重新完整发送的。也就是说对 Cold Observable 而言,有多个Subscriber的时候,他们各自的事件是独立的。


如果上面的解释有点枯燥的话,那么下面会更加形象地说明 Cold 和 Hot 的区别:


Think of a hot Observable as a radio station. All of the listeners that are listening to it at this moment listen to the same song.

A cold Observable is a music CD. Many people can buy it and listen to it independently.

by Nickolay Tsvetinov


Cold Observable


Observable 的 just、creat、range、fromXXX 等操作符都能生成Cold Observable。

Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };
        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };
        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread());
        observable.subscribe(subscriber1);
        observable.subscribe(subscriber2);
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


执行结果:

subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1
subscriber1: 2
   subscriber2: 2
   subscriber2: 3
subscriber1: 3
subscriber1: 4
   subscriber2: 4
   subscriber2: 5
subscriber1: 5
subscriber1: 6
   subscriber2: 6
subscriber1: 7
   subscriber2: 7
subscriber1: 8
   subscriber2: 8
subscriber1: 9
   subscriber2: 9


可以看出,subscriber1 和 subscriber2 的结果并不一定是相同的,二者是完全独立的。

尽管 Cold Observable 很好,但是对于某些事件不确定何时发生以及不确定 Observable 发射的元素数量,那还得使用 Hot Observable。比如:UI交互的事件、网络环境的变化、地理位置的变化、服务器推送消息的到达等等。


Cold Observable 如何转换成 Hot Observable?



1. 使用publish,生成 ConnectableObservable


使用 publish 操作符,可以让 Cold Observable 转换成 Hot Observable。它将原先的 Observable 转换成 ConnectableObservable。


来看看刚才的例子:

Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };
        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };
        Consumer<Long> subscriber3 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("      subscriber3: "+aLong);
            }
        };
        ConnectableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread()).publish();
        observable.connect();
        observable.subscribe(subscriber1);
        observable.subscribe(subscriber2);
        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        observable.subscribe(subscriber3);
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


注意,生成的 ConnectableObservable 需要调用connect()才能真正执行。


执行结果:

subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1
subscriber1: 2
   subscriber2: 2
      subscriber3: 2
subscriber1: 3
   subscriber2: 3
      subscriber3: 3
subscriber1: 4
   subscriber2: 4
      subscriber3: 4
subscriber1: 5
   subscriber2: 5
      subscriber3: 5
subscriber1: 6
   subscriber2: 6
      subscriber3: 6
subscriber1: 7
   subscriber2: 7
      subscriber3: 7
subscriber1: 8
   subscriber2: 8
      subscriber3: 8
subscriber1: 9
   subscriber2: 9
      subscriber3: 9
subscriber1: 10
   subscriber2: 10
      subscriber3: 10
subscriber1: 11
   subscriber2: 11
      subscriber3: 11


可以看到,多个订阅的 Subscriber 共享同一事件。


在这里,ConnectableObservable 是线程安全的。


2. 使用Subject/Processor


Subject 和 Processor 的作用是相同的。Processor 是 RxJava2.x 新增的类,继承自 Flowable 支持背压控制。而 Subject 则不支持背压控制。

Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };
        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };
        Consumer<Long> subscriber3 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("      subscriber3: "+aLong);
            }
        };
        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread());
        PublishSubject<Long> subject = PublishSubject.create();
        observable.subscribe(subject);
        subject.subscribe(subscriber1);
        subject.subscribe(subscriber2);
        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        subject.subscribe(subscriber3);
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


执行结果跟上面使用 publish 操作符是一样的。


Subject 既是 Observable 又是 Observer(Subscriber)。这一点可以从 Subject 的源码上看到。

import io.reactivex.*;
import io.reactivex.annotations.*;
/**
 * Represents an Observer and an Observable at the same time, allowing
 * multicasting events from a single source to multiple child Subscribers.
 * <p>All methods except the onSubscribe, onNext, onError and onComplete are thread-safe.
 * Use {@link #toSerialized()} to make these methods thread-safe as well.
 *
 * @param <T> the item value type
 */
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    /**
     * Returns true if the subject has any Observers.
     * <p>The method is thread-safe.
     * @return true if the subject has any Observers
     */
    public abstract boolean hasObservers();
    /**
     * Returns true if the subject has reached a terminal state through an error event.
     * <p>The method is thread-safe.
     * @return true if the subject has reached a terminal state through an error event
     * @see #getThrowable()
     * &see {@link #hasComplete()}
     */
    public abstract boolean hasThrowable();
    /**
     * Returns true if the subject has reached a terminal state through a complete event.
     * <p>The method is thread-safe.
     * @return true if the subject has reached a terminal state through a complete event
     * @see #hasThrowable()
     */
    public abstract boolean hasComplete();
    /**
     * Returns the error that caused the Subject to terminate or null if the Subject
     * hasn't terminated yet.
     * <p>The method is thread-safe.
     * @return the error that caused the Subject to terminate or null if the Subject
     * hasn't terminated yet
     */
    @Nullable
    public abstract Throwable getThrowable();
    /**
     * Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
     * onComplete methods, making them thread-safe.
     * <p>The method is thread-safe.
     * @return the wrapped and serialized subject
     */
    @NonNull
    public final Subject<T> toSerialized() {
        if (this instanceof SerializedSubject) {
            return this;
        }
        return new SerializedSubject<T>(this);
    }
}


当 Subject 作为 Subscriber 时,它可以订阅目标 Cold Observable 使对方开始发送事件。同时它又作为Observable 转发或者发送新的事件,让 Cold Observable 借助 Subject 转换为 Hot Observable。


注意,Subject 并不是线程安全的,如果想要其线程安全需要调用toSerialized()方法。(在RxJava1.x的时代还可以用 SerializedSubject 代替 Subject,但是在RxJava2.x以后SerializedSubject不再是一个public class)


然而,很多基于 EventBus 改造的 RxBus 并没有这么做,包括我以前也写过这样的 RxBus :( 。这样的做法是非常危险的,因为会遇到并发的情况。


Hot Observable 如何转换成 Cold Observable?



1. ConnectableObservable的refCount操作符


reactivex官网的解释是


make a Connectable Observable behave like an ordinary Observable

image.png

RefCount.png


RefCount操作符把从一个可连接的 Observable 连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者订阅这个Observable时,RefCount连接到下层的可连接Observable。RefCount跟踪有多少个观察者订阅它,直到最后一个观察者完成才断开与下层可连接Observable的连接。


如果所有的订阅者都取消订阅了,则数据流停止。如果重新订阅则重新开始数据流。

Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };
        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };
        ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread()).publish();
        connectableObservable.connect();
        Observable<Long> observable = connectableObservable.refCount();
        Disposable disposable1 = observable.subscribe(subscriber1);
        Disposable disposable2 = observable.subscribe(subscriber2);
        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        disposable1.dispose();
        disposable2.dispose();
        System.out.println("重新开始数据流");
        disposable1 = observable.subscribe(subscriber1);
        disposable2 = observable.subscribe(subscriber2);
        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


执行结果:

subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1
重新开始数据流
subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1

如果不是所有的订阅者都取消了订阅,只取消了部分。部分的订阅者重新开始订阅,则不会从头开始数据流。

Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };
        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };
        Consumer<Long> subscriber3 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("      subscriber3: "+aLong);
            }
        };
        ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread()).publish();
        connectableObservable.connect();
        Observable<Long> observable = connectableObservable.refCount();
        Disposable disposable1 = observable.subscribe(subscriber1);
        Disposable disposable2 = observable.subscribe(subscriber2);
        observable.subscribe(subscriber3);
        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        disposable1.dispose();
        disposable2.dispose();
        System.out.println("subscriber1、subscriber2 重新订阅");
        disposable1 = observable.subscribe(subscriber1);
        disposable2 = observable.subscribe(subscriber2);
        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


执行结果:

subscriber1: 0
   subscriber2: 0
      subscriber3: 0
subscriber1: 1
   subscriber2: 1
      subscriber3: 1
subscriber1、subscriber2 重新订阅
      subscriber3: 2
subscriber1: 2
   subscriber2: 2
      subscriber3: 3
subscriber1: 3
   subscriber2: 3
      subscriber3: 4
subscriber1: 4
   subscriber2: 4


在这里,subscriber1和subscriber2先取消了订阅,subscriber3并没有取消订阅。之后,subscriber1和subscriber2又重新订阅。最终subscriber1、subscriber2、subscriber3的值保持一致。


2. Observable的share操作符


share操作符封装了publish().refCount()调用,可以看其源码。

/**
     * Returns a new {@link ObservableSource} that multicasts (shares) the original {@link ObservableSource}. As long as
     * there is at least one {@link Observer} this {@link ObservableSource} will be subscribed and emitting data.
     * When all subscribers have disposed it will dispose the source {@link ObservableSource}.
     * <p>
     * This is an alias for {@link #publish()}.{@link ConnectableObservable#refCount()}.
     * <p>
     * ![](http://upload-images.jianshu.io/upload_images/2613397-81dcef165b69aca2.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
     * <dl>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>{@code share} does not operate by default on a particular {@link Scheduler}.</dd>
     * </dl>
     *
     * @return an {@code ObservableSource} that upon connection causes the source {@code ObservableSource} to emit items
     *         to its {@link Observer}s
     * @see <a href="http://reactivex.io/documentation/operators/refcount.html">ReactiveX operators documentation: RefCount</a>
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Observable<T> share() {
        return publish().refCount();
    }


总结



理解了 Hot Observable 和 Cold Observable 的区别才能够写出更好Rx代码。同理,也能理解Hot & Cold Flowable。再者,在其他语言的Rx版本中包括 RxSwift、RxJS 等也存在 Hot Observable 和 Cold Observable 这样的概念。

相关文章
|
7月前
|
缓存 监控 JavaScript
methods、computed、watch它们的差异与区别
在Vue中,Methods、Watch和Computed是三种用于处理数据和响应数据变化的不同方式。
124 0
|
JavaScript
[Vue warn] Error in callback for immediate watcher “data“ “TypeError Cannot read property ‘reduce
[Vue warn] Error in callback for immediate watcher “data“ “TypeError Cannot read property ‘reduce
426 0
|
JavaScript 前端开发 API
Js 异步处理演进,Callback=>Promise=>Observer
异步调用就像是接水管,相互缠绕的管道越多,就越容易漏水。如何将水管巧妙连通,使整个系统有足够的弹性,需要去认真思考 🤔 对于 JavaScript 异步的理解,不少人感到过困惑:Js 是单线程的,如何做到异步的呢?实际上,Js 引擎通过混用 2 种内存数据结构:栈和队列,来实现的。栈与队列的交互也就是大家所熟知的 Js 事件循环~~
|
JavaScript 前端开发 定位技术
Observable学习笔记
Observable学习笔记
196 0
Observable学习笔记
|
XML JavaScript 前端开发
你不知道的 DOM 变动观察器:Mutation observer
你不知道的 DOM 变动观察器:Mutation observer
197 0
Rxjs Observable.pipe 传入多个 operators 的执行逻辑分析
Rxjs Observable.pipe 传入多个 operators 的执行逻辑分析
Rxjs Observable.pipe 传入多个 operators 的执行逻辑分析
成功解决AttributeError: 'GradientBoostingRegressor' object has no attribute 'staged_decision_function'
成功解决AttributeError: 'GradientBoostingRegressor' object has no attribute 'staged_decision_function'
|
前端开发 JavaScript NoSQL
5. 创建 Observable
第一个示例 注册事件监听器的常规写法。 var button = document.querySelector('button'); button.addEventListener('click', () => console.
986 0
6. Observable 和 数组的区别
Observable 和 数组都有filter, map 等运算操作operators,具体的区别是什么? 主要是两点: 延迟运算 渐进式取值 延迟运算 延迟运算很好理解,所有 Observable 一定会等到订阅后才开始对元素做运算,如果没有订阅就不会有运算的行为 var source = Rx.
863 0
|
JavaScript
Inject reducer arbitrarily rather than top level for redux store to replace reducer
When writing a big react redux application or any SPA using redux, there is a need to split code with webpack and load reducers asynchronously. The way to load reducer asynchronously is utilizing red
1467 0