一文详解 RxJava2 使用及实现原理

简介: RxJava—一个可以在JVM上运行的,基于观察者模式 实现异步操作的java库。其英文描述为:RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java

RxJava是什么?根据RxJava在GitHub上给出的描述:
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java

大致意思是:
RxJava—一个可以在JVM上运行的,基于观察者模式 实现异步操作的java库。

RxJava的作用:
就是异步RxJava的使用,可以使“逻辑复杂的代码”保持极强的阅读性。

Rxjava github地址

RxAndorid的作用:
Android中RxAndorid与RxJava配合使用; RxAndorid 封装了AndroidSchedulers.mainThread(),Android开发者使用过程中,可以轻松的将任务post Andorid主线程中,执行页面更新操作。

RxAndroid github地址

使用方式

1、Observable

  • Observable:被观察者
  • Observer:观察者,可接收Observable发送的数据

a、Rxjava 实现线程切换:

//
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        //1、“异步线程” 执行耗时操作
        //2、“执行完毕” 调用onNext触发回调,通知观察者
        e.onNext("1");
        e.onComplete();
    }
}).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // 订阅线程  订阅的那一刻在订阅线程中执行
            }

            @Override
            public void onNext(String value) {
                // “主线程”执行的方法
            }

            @Override
            public void onError(Throwable e) {
                // "主线程"执行的方法
            }

            @Override
            public void onComplete() {
                // "主线程"执行的方法
            }
        });

b、Rxjava 使用操作符

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        // IO 线程
        // 请求网络数据
        e.onNext("123456");
    }
}).map(new Function<String, Integer>() {
    @Override
    public Integer apply(String s) {
        // IO 线程
        // 网络数据解析(数据转化)
        //
        // throw new RequestFailException("获取网络请求失败");
        return 123;
    }
}).doOnNext(new Consumer<Integer>() {    //保存登录结果UserInfo
    @Override
    public void accept(@NonNull Integer bean) throws Exception {
        // IO 线程
        // 保存网络数据

    }
}).subscribeOn(Schedulers.io())   //IO线程
.observeOn(AndroidSchedulers.mainThread())  //主线程
.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer bean) throws Exception {
        // 更新UI
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(@NonNull Throwable throwable) throws Exception {
        // 错误 显示错误页面
    }
});

2、Flowable

Flowable是为了应对Backpressure产生的。
Flowable是一个被观察者,与Subscriber(观察者)配合使用

//
Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        //1、“异步线程” 执行耗时操作
        //2、“执行完毕” 调用onNext触发回调,通知观察者
        emitter.onNext(0);
        emitter.onComplete();
    }
    // 若消费者消费能力不足,则抛出MissingBackpressureException异常
}, BackpressureStrategy.ERROR)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                // 订阅时执行,发生在“订阅线程”
                // 这个方法是用来向生产者申请可以消费的事件数量
                // 这里表明消费者拥有Long.MAX_VALUE的消费能力
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                // “主线程”执行的方法
            }

            @Override
            public void onError(Throwable t) {
                // "主线程"执行的方法
            }

            @Override
            public void onComplete() {
                // "主线程"执行的方法
            }
        });

a、 Backpressure(背压)

Backpressure(背压)生产者的生产速度大于消费者的消费能力引起的问题。

在RxJava中有一种情况就是被观察者发送消息十分迅速以至于观察者不能及时的响应这些消息

例如:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        // “异步线程”中 生产者有无限的生产能力
        while (true){
            e.onNext(1);
        }
    }
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        // “主线程”中 消费者消费能力不足,从而造成事件无限堆积,最后导致OOM
        Thread.sleep(2000);
        System.out.println(integer);
    }
});

异步线程中 生产者有无限的生产能力;
主线程中 消费者消费能力不足,从而造成事件无限堆积,最后导致OOM。

上述的现象,有个专有的名词来来形容,即:Backpressure(背压)

b、Subscription.request(long n);

Subscription.request(long n) 方法是用来向生产者申请可以消费的事件数量

  • 当调用了request(long n)方法后,生产者便发送对应数量的事件供消费者消费;
  • 如果不显示调用request就表示消费能力为0

在异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。
无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件;当然如果本身并没有这么多事件需要发送,则不会存128个事件。

  • BackpressureStrategy.ERROR策略下,如果生产者生产的事件大于128个,缓存池便会溢出,从而抛出MissingBackpressureException异常;
  • BackpressureStrategy.BUFFER策略:将RxJava中默认的128个事件的缓存池换成一个更大的缓存池,这样,消费者通过request()即使传入一个很大的数字,生产者也会生产事件。但是这种方式比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。总之BUFFER要慎用。
  • BackpressureStrategy.DROP策略:当消费者处理不了事件,则丢弃。消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
  • BackpressureStrategy.LATEST策略: LATEST与DROP功能基本一致。消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。

源码阅读——简单例子 (一)

注:当前使用的源码版本 rxjava:2.1.9

从这段不涉及操作符和线程切换的简单例子开始:

// 创建观察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Log.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(String o) {

    }

    @Override
    public void onError(@NonNull Throwable e) {
        Log.d(TAG, "onError data is :" + e.toString());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete");
    }
};

// 创建被观察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
// 订阅
observable.subscribe(observer);

a、ObservableOnSubscribe.java

先看一下ObservableOnSubscribe.java这个类

public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

由代码可知 ObservableOnSubscribe是一个回调接口,回调方法中参数为ObservableEmitter,下边看一下ObservableEmitter 这个类。

ObservableEmitter.java

ObservableEmitter字面意思是被观察者发射器,看一下源码:

public interface ObservableEmitter<T> extends Emitter<T> {

    void setDisposable(@Nullable Disposable d);

    void setCancellable(@Nullable Cancellable c);

    boolean isDisposed();

    @NonNull
    ObservableEmitter<T> serialize();

    @Experimental
    boolean tryOnError(@NonNull Throwable t);
}

ObservableEmitter是对Emitter的扩展,而扩展的方法正是 RxJava2.0 之后引入的。提供了可中途取消等新能力,我们看 Emitter 源码:

public interface Emitter<T> {

    void onNext(@NonNull T value);

    void onError(@NonNull Throwable error);

    void onComplete();
}

Emitter字面意思是发射器,这里边的三个方法,大家都很熟悉了。其对应了以下这段代码:

new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
}

回调说完,下边我们来看Observable.create(ObservableOnSubscribe<T> source) 这段代码。

b、Observable.create(ObservableOnSubscribe source)

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
  • RxJavaPlugins 先忽略
  • 我们看到传入的ObservableOnSubscribe被用来创建ObservableCreate,其实ObservableCreate就是Observable的一个实现类

因此 Observable.create(ObservableOnSubscribe<T> source) 这段代码,实际是:

//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO线程中执行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
  • 这里我们知道:当 ObservableOnSubscribe.subscribe 方法被执行时,用户通过调用ObservableEmitter.onNext方法,将数据发送出去(发送给观察者)

下边我们看一下ObservableCreate 这个类

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // 省略部分代码 ...
}
  • ObservableOnSubscribe.subscribe 方法是在ObservableCreate.subscribeActual 方法中第四行中被执行了;subscribe方法中,用户通过调用ObservableEmitter.onNext方法,将数据发送出去;
  • subscribeActual方法第二行,调用了observer.onSubscribe(parent);方法。 订阅发生时,在订阅线程主动执行了observeronSubscribe方法;
  • CreateEmitter 是对ObservableCreate.subscribeActual(Observer<? super T> observer)方法传入的Observer的封装;
  • CreateEmitter的作用是任务取消时,可以不再回调其封装的观察者;observeronNext方法,由CreateEmitter.onNext方法调用;

Observable.create(ObservableOnSubscribe<T> source); 方法最终返回一个 ObservableCreate 对象。
下边看 observable.subscribe(observer); 方法

c、observable.subscribe(observer);

  • observable.subscribe(observer); 即 订阅发生的那一刻。
  • 这里 observable.subscribe(observer); 实际是ObservableCreate.subscribe(observer);

下边查看Observablesubscribe(observer)方法

Observable.subscribe(Observer observer)

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        // Observable的subscribe方法,实际执行的是subscribeActual方法
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(e);
        //
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}
  • 调用 observable.subscribe(observer); 方法时,实际是调用了observable.subscribeActual(observer) 方法。
  • observableObservableCreate的引用,因此这里调用的是ObservableCreate.subscribeActual(observer) 方法。

我们又回到 ObservableCreate 这个类的subscribeActual方法

ObservableCreate.java

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    //  subscribeActual 方法在 订阅发生的那一刻被调用 既 observable.subscribe(observer);时被调用
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 若中途任务取消,通过CreateEmitter 可终止对observer中方法onNext 、onError 等的回调
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 订阅发生时,执行 观察者的onSubscribe(Disposable d) 方法
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // 省略部分代码 ...
}
  • subscribeActual 方法在 订阅发生的那一刻被调用的;在 observable.subscribe(observer); 时被调用;
  • observer.onSubscribe(parent); 订阅发生时,在订阅线程回调observeronSubscribe方法
  • subscribeActual 方法中,传入的Observer会被包装成一个CreateEmitter;若中途任务取消,通过CreateEmitter 可终止对observer中方法onNext 、onError 等的回调;

subscribeActual 中第二行代码 observer.onSubscribe(parent);

observer.onSubscribe(parent); 订阅发生时,执行 观察者的onSubscribe(Disposable d) 方法,这里回到了以下代码

// 创建观察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Log.d(TAG, "onSubscribe");
    }
    // ... 省略onNext、onError、onComplete
};
  • 这里传入的参数为 new CreateEmitter<T>(observer) ,其实现了Disposable接口,若任务取消,则不回调传入的观察者observer 对应的onNext 、onError、onComplete 等方法

subscribeActual 中第四行代码 source.subscribe(parent);

source.subscribe(parent);ObservableOnSubscribe.subscribe(new CreateEmitter<T>(observer));

代码最终回到ObservableOnSubscribesubscribe :

new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
}
  • subscribe中,调用到 CreateEmitter 类的onNext 、onComplete、onError 方法,将数据发送CreateEmitter中的观察者

到此,“这段不涉及操作符和线程切换的简单例子” 的代码跟踪结束。

源码阅读——线程切换 (二)

注:当前使用的源码版本 rxjava:2.1.9

从这段线程切换的简单例子开始:

// 创建观察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        // 订阅线程  订阅的那一刻在订阅线程中执行
    }

    @Override
    public void onNext(String o) {
        // Android 主线程中执行
    }

    @Override
    public void onError(@NonNull Throwable e) {
        // Android 主线程中执行
    }

    @Override
    public void onComplete() {
        // Android 主线程中执行
    }
};

// 创建被观察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO线程中执行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
// 被观察者 IO 线程
observable = observable.subscribeOn(Schedulers.io());
// 观察者  Android主线程
observable = observable.observeOn(AndroidSchedulers.mainThread());
// 订阅
observable.subscribe(observer);

先来个我总结的RxJava2的整个代码执行流程:
QQ20211202-104355.png

a、Observable.create(ObservableOnSubscribe source)

源码阅读——简单例子 (一) 中我们了解到了Observable.create(ObservableOnSubscribe<T> source)实际是 如下代码:

//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO线程中执行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
  • ObservableCreate 中含有一个subscribeActual(observer) 方法,用于执行传入观察者的observer.onSubscribe方法,和间接调用 观察者的onNext、onComplete 等方法;

ObservableCreate

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // 省略部分代码 ...
}
  • subscribeActual方法第二行,调用了传入的观察者的observer.onSubscribe(parent);方法; 订阅发生时,在订阅线程主动执行了observeronSubscribe方法;
  • subscribeActual方法第四行,调用了传入的观察者的observer.subscribe 方法;subscribe方法中,用户通过调用CreateEmitter.onNext方法,将数据发送出去;
  • CreateEmitter 是对ObservableCreate.subscribeActual(Observer<? super T> observer)方法传入的Observer的封装;
  • CreateEmitter的作用是任务取消时,可以不再回调其封装的观察者;observeronNext方法,由CreateEmitter.onNext方法调用;

下边查看observable.subscribeOn(Schedulers.io())相关代码

注:
ObservableEmitterCreateEmitter的引用,是对Observer的进一步封装。CreateEmitter在执行onNext时,如果任务取消,则不再回调ObserveronNext方法。

b、observable.subscribeOn(Schedulers.io())

下边我们查看Observable 类的subscribeOn(Scheduler scheduler)方法

Observable.java

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    // 生成一个ObservableSubscribeOn对象
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
  • 继续忽略RxJavaPlugins
  • 最终返回一个ObservableSubscribeOn 对象

这里Observable observable = observableCreate.subscribeOn(Schedulers.io())代码实际是

ObservableSubscribeOn observable = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
  • 因此 observable.subscribeOn(Schedulers.io()) 返回的是一个ObservableSubscribeOn 的引用

下边查看ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    // ... 省略部分代码
}

看一下ObservableSubscribeOn中的subscribeActual 方法

  • subscribeActual 方法第二行代码中,执行了传入ObserveronSubscribe 方法;
  • subscribeActual 方法第三行: 在 scheduler 对应的IO线程中,执行observableCreatesubscribe 方法,传入参数为SubscribeOnObserver,即:IO线程中 执行observableCreate.subscribe(new SubscribeOnObserver(observer));

因此,无论ObservableSubscribeOn.subscribeActual(observer)在哪个线程中被调用observableCreate.subscribe(new SubscribeOnObserver<T>(observer))均在IO线程中执行,因此观察者的e.onNext("hello"); e.onComplete(); 亦在IO线程中执行;

c、observable.observeOn(AndroidSchedulers.mainThread())

下边我们查看Observable 类的observeOn(Scheduler scheduler)方法

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}
// 
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

这里可以看到 Observable observable = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread())实际是:

ObservableObserveOn observable = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);

因此 ,observable.observeOn(AndroidSchedulers.mainThread()) 返回的是ObservableObserveOn 的引用。

下边查看ObservableObserveOn

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    // ... 省略部分代码
}

看一下ObservableObserveOn中的subscribeActual 方法

  • subscribeActual 方法第五行代码,实际为observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
  • ObserveOnObserver 的作用是在ObserveOnObserveronNext方法被实行时;将observeronNext方法post到 Android主线程中;

d、observable.subscribe(observer)

  • 我们知道Observablesubscribe(Observer<? super T> observer)方法,实际调用到了ObservablesubscribeActual(Observer<? super T> observer) 方法;
  • 而这里的observable 实际是ObservableObserveOn的引用;

因此,observable.subscribe(observer)实际执行的是observableObserveOn.subscribeActual(observer)

到这里,我们 线程切换 (二) 的小例子变换为了以下代码:

// 创建观察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        // 订阅线程  订阅的那一刻在订阅线程中执行
    }

    @Override
    public void onNext(String o) {
        // Android 主线程中执行
    }

    @Override
    public void onError(@NonNull Throwable e) {
        // Android 主线程中执行
    }

    @Override
    public void onComplete() {
        // Android 主线程中执行
    }
};
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO线程中执行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
//
ObservableSubscribeOn observableSubscribeOn = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
//
ObservableObserveOn observableObserveOn = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);
//
observableObserveOn.subscribeActual(observer);

下边我们查看observableObserveOn.subscribeActual(observer)

ObservableObserveOn.java

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        // source 为 observableSubscribeOn
        super(source);
        // scheduler 为AndroidSchedulers.mainThread()
        this.scheduler = scheduler;
        // false
        this.delayError = delayError;
        // 128
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // AndroidSchedulers.mainThread() 为 HandlerScheduler,因此会走到else部分代码
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        }
        // 代码会走到else 部分
         else {
            Scheduler.Worker w = scheduler.createWorker();
            // source 为 observableSubscribeOn
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    // ... 省略部分代码
}
  • subscribeActual 方法中,AndroidSchedulers.mainThread()HandlerScheduler ,因此 if 中的判断语句直接忽略,直接走到代码的 else 部分。
  • subscribeActual 方法中,将观察者observer封装成了ObserveOnObserver;并且调用observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))
  • observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))实际是
ObserveOnObserver observeOnObserver = new ObserveOnObserver<T>(observer, w, delayError, bufferSize)
// 1、“订阅线程中” —— 执行onSubscribe, 实际执行的是observer的onSubscribe方法
observeOnObserver.onSubscribe(new SubscribeOnObserver<T>(observeOnObserver));
// 2、“IO程中” —— 执行subscribe ;IO线程 subscribe方法中,用户主动调用ObserveOnObserver的onNext、onError、onComplete方法,将数据发出去
observableCreate.subscribe(new SubscribeOnObserver<T>(observeOnObserver))
  • 用户调用SubscribeOnObserveronNext 是将数据发送出去
  • SubscribeOnObserver.onNext调用了observeOnObserver.onNext
  • observeOnObserver.onNext通过HandlerSchedulerobserver.onNext、observer.onError、observer.onComplete 等方法post到Android主线程中执行。

e、整体流程图如下

最后总结一下RxJava2的整个执行流程:

QQ20211202-104355.png

参考

手把手教你使用 RxJava 2.0(一)
RxJava2 源码解析(一)
RxJava2 源码解析——流程

= THE END =

文章首发于公众号”CODING技术小馆“,如果文章对您有帮助,可关注我的公众号。
文章首发于公众号”CODING技术小馆“,如果文章对您有帮助,可关注我的公众号。
文章首发于公众号”CODING技术小馆“,如果文章对您有帮助,可关注我的公众号。

目录
相关文章
|
17天前
|
XML Java 编译器
Java注解的底层源码剖析与技术认识
Java注解(Annotation)是Java 5引入的一种新特性,它提供了一种在代码中添加元数据(Metadata)的方式。注解本身并不是代码的一部分,它们不会直接影响代码的执行,但可以在编译、类加载和运行时被读取和处理。注解为开发者提供了一种以非侵入性的方式为代码提供额外信息的手段,这些信息可以用于生成文档、编译时检查、运行时处理等。
53 7
|
XML 安全 数据库连接
温故知新-源码分析篇
温故知新-源码分析篇
50 0
RxJava2源码分析(二):操作符原理分析
RxJava2源码分析(二):操作符原理分析
RxJava2源码分析(二):操作符原理分析
|
Java Android开发
面试官:RxJava是如何做到响应式编程的?
RxJava已经出现很多个年头了,但是依然被很多公司使用,如果现在还对RxJava了解的不够透彻, 可以看这个系列对它的分析:相信看完后你对它会有个更全面的认识。 这个系列主要从下面几个方面来讲解: **RxJava基本操作符使用** **RxJava响应式编程是如何实现的** **RxJava的背压机制及Flowable是如何实现背压的** **RxJava的线程切换原理
|
算法 Java
【Java技术指南】「并发编程专题」Fork/Join框架基本使用和原理探究(原理篇)
【Java技术指南】「并发编程专题」Fork/Join框架基本使用和原理探究(原理篇)
169 0
【Java技术指南】「并发编程专题」Fork/Join框架基本使用和原理探究(原理篇)
|
设计模式 uml
Rxjava源码解析笔记 | Rxjava概述 & 传统观察者设计模式源码解析
Rxjava源码解析笔记 | Rxjava概述 & 传统观察者设计模式源码解析
|
Java 数据库
Java线程池的简单使用
对于资源池的技术,相信大家早就接触过,比如数据库连接池,常见的有c3p0、dbcp等等,而线程也有对应的池子,称为线程池。
158 0
Java线程池的简单使用
|
消息中间件 存储 JavaScript
一文详解 JsBridge 实现原理
JsBridge主要用于“JS与Native的通信”,众所周知Android 4.2以下的WebView存在addJavascriptInterface漏洞的问题,为解决这一问题国内很多知名大厂很早就在各自项目中集成了JsBridge。
1063 0
一文详解 JsBridge 实现原理
|
存储 Java 数据库
Java集合源码分析之开篇
初衷 Java集合是我们使用最频繁的工具,也是面试的热点,但我们对它的理解仅限于使用上,而且大多数情况没有考虑过其使用规范。本系列文章将跟随源码的思路,分析实现的每个细节,以期在使用时避免各种不规范的坑。在这里,我们会惊艳于开发者优秀的设计,也会感激先辈们付出的艰辛努力,更重要的是知其所以然,少犯错误,写出优秀的代码。 许多人对集合类的理解是暴力的,当需要保存对象时就使用ArrayList,当需要保存键值对时就使用HashMap,当需要不可重复时就使用HashSet,等等。而且使用方式也比较单一:
215 0
|
设计模式 Java 数据处理
反应式编程 RxJava 设计原理解析
本篇文章主要聚焦对RxJava中几种主要的设计模式的理解,通过梳理Observable的相关类图以及讲解这些类之间的关系,让大家能够更清晰的理解RxJava中事件驱动的工作原理。
1101 0