通常情况下,如果我们想要使用 RxJava 首先会想到的是使用Observable,如果要考虑到Backpressure的情况,在 RxJava2.x 时代我们会使用Flowable。除了Observable和Flowable之外,在 RxJava2.x 中还有三种类型的Observables:Single、Completable、Maybe。
类型 | 描述 |
Observable<T> | 能够发射0或n个数据,并以成功或错误事件终止。 |
Flowable<T> | 能够发射0或n个数据,并以成功或错误事件终止。 支持Backpressure,可以控制数据源发射的速度。 |
Single<T> | 只发射单个数据或错误事件。 |
Completable | 它从来不发射数据,只处理 onComplete 和 onError 事件。可以看成是Rx的Runnable。 |
Maybe<T> | 能够发射0或者1个数据,要么成功,要么失败。有点类似于Optional |
从上面的表格可以看出,这五种被观察者类型中只有Flowable能支持Backpressure,如果有需要Backpressure的情况,还是必须要使用Flowable。
Single
从SingleEmitter的源码可以看出,Single 只有 onSuccess 和 onError 事件。
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */ package io.reactivex; import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Cancellable; /** * Abstraction over an RxJava {@link SingleObserver} that allows associating * a resource with it. * <p> * All methods are safe to call from multiple threads. * <p> * Calling onSuccess or onError multiple times has no effect. * * @param <T> the value type to emit */ public interface SingleEmitter<T> { /** * Signal a success value. * @param t the value, not null */ void onSuccess(@NonNull T t); /** * Signal an exception. * @param t the exception, not null */ void onError(@NonNull Throwable t); /** * Sets a Disposable on this emitter; any previous Disposable * or Cancellation will be unsubscribed/cancelled. * @param s the disposable, null is allowed */ void setDisposable(@Nullable Disposable s); /** * Sets a Cancellable on this emitter; any previous Disposable * or Cancellation will be unsubscribed/cancelled. * @param c the cancellable resource, null is allowed */ void setCancellable(@Nullable Cancellable c); /** * Returns true if the downstream cancelled the sequence. * @return true if the downstream cancelled the sequence */ boolean isDisposed(); }
其中,onSuccess()用于发射数据(在Observable/Flowable中使用onNext()来发射数据)。
而且只能发射一个数据,后面即使再发射数据也不会做任何处理。
Single的SingleObserver中只有onSuccess、onError,并没有onComplete。这是 Single 跟其他四种被观察者最大的区别。
Single.create(new SingleOnSubscribe<String>() { @Override public void subscribe(@NonNull SingleEmitter<String> e) throws Exception { e.onSuccess("test"); } }).subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { System.out.println(s); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { throwable.printStackTrace(); } });
上面的代码,由于Observer中有两个Consumer,还可以进一步简化成
Single.create(new SingleOnSubscribe<String>() { @Override public void subscribe(@NonNull SingleEmitter<String> e) throws Exception { e.onSuccess("test"); } }).subscribe(new BiConsumer<String, Throwable>() { @Override public void accept(String s, Throwable throwable) throws Exception { System.out.println(s); } });
Single 可以通过toXXX方法转换成Observable、Flowable、Completable以及Maybe。
Completable
Completable在创建后,不会发射任何数据。从CompletableEmitter的源码可以看到
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */ package io.reactivex; import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Cancellable; /** * Abstraction over an RxJava {@link CompletableObserver} that allows associating * a resource with it. * <p> * All methods are safe to call from multiple threads. * <p> * Calling onComplete or onError multiple times has no effect. */ public interface CompletableEmitter { /** * Signal the completion. */ void onComplete(); /** * Signal an exception. * @param t the exception, not null */ void onError(@NonNull Throwable t); /** * Sets a Disposable on this emitter; any previous Disposable * or Cancellation will be disposed/cancelled. * @param d the disposable, null is allowed */ void setDisposable(@Nullable Disposable d); /** * Sets a Cancellable on this emitter; any previous Disposable * or Cancellation will be disposed/cancelled. * @param c the cancellable resource, null is allowed */ void setCancellable(@Nullable Cancellable c); /** * Returns true if the downstream disposed the sequence. * @return true if the downstream disposed the sequence */ boolean isDisposed(); }
Completable 只有 onComplete 和 onError 事件,同时 Completable 并没有map、flatMap等操作符,它的操作符比起 Observable/Flowable 要少得多。
我们可以通过fromXXX操作符来创建一个Completable。这是一个Completable版本的Hello World。
Completable.fromAction(new Action() { @Override public void run() throws Exception { System.out.println("Hello World"); } }).subscribe();
Completable 经常会结合andThen操作符
Completable.create(new CompletableOnSubscribe() { @Override public void subscribe(@NonNull CompletableEmitter emitter) throws Exception { try { TimeUnit.SECONDS.sleep(1); emitter.onComplete(); } catch (InterruptedException e) { emitter.onError(e); } } }).andThen(Observable.range(1, 10)) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { System.out.println(integer); } });
在这里emitter.onComplete()执行完之后,表明Completable已经完全执行完毕,接下来是执行andThen里的操作。
打印结果如下:
1 2 3 4 5 6 7 8 9 10
在Completable中,andThen有多个重载的方法,正好对应了五种被观察者的类型。
Completable andThen(CompletableSource next) <T> Maybe<T> andThen(MaybeSource<T> next) <T> Observable<T> andThen(ObservableSource<T> next) <T> Flowable<T> andThen(Publisher<T> next) <T> Single<T> andThen(SingleSource<T> next)
Completable 也可以通过toXXX方法转换成Observable、Flowable、Single以及Maybe。
在网络操作中,如果遇到更新的情况,也就是Restful架构中的PUT操作,一般要么返回原先的对象要么只提示更新成功。下面两个接口使用了Retrofit,分别是用于获取短信验证码和更新用户信息,其中更新用户信息如果用PUT会更符合Restful的API。
/** * 获取短信验证码 * @param param * @return */ @POST("v1/user-auth") Completable getVerificationCode(@Body VerificationCodeParam param); /** * 用户信息更新接口 * @param param * @return */ @POST("v1/user-update") Completable update(@Body UpdateParam param);
在model类中大致会这样写。
/** * Created by Tony Shen on 2017/7/24. */ public class VerificationCodeModel extends HttpResponse { /** * 获取验证码 * @param activity * @param param * @return */ public Completable getVerificationCode(AppCompatActivity activity, VerificationCodeParam param) { return apiService .getVerificationCode(param) .compose(RxJavaUtils.<VerificationCodeModel>completableToMain()) .compose(RxLifecycle.bind(activity).<VerificationCodeModel>toLifecycleTransformer()); } }
特别要注意的是getVerificationCode返回的是Completable而不是Completable<T>。
获取验证码成功则给出相应地toast提示,失败可以做出相应地处理。
VerificationCodeModel model = new VerificationCodeModel(); model.getVerificationCode(RegisterActivity.this,param) .subscribe(new Action() { @Override public void run() throws Exception { showShort(RegisterActivity.this,"发送验证码成功"); } },new RxException<Throwable>(){ @Override public void accept(@NonNull Throwable throwable) throws Exception { throwable.printStackTrace(); ...... } });
获取手机验证码.jpeg
Maybe
Maybe 是 RxJava2.x 之后才有的新类型,可以看成是Single和Completable的结合。
Maybe创建之后,MaybeEmitter 和 SingleEmitter 一样并没有onNext()方法,同样需要通过onSuccess()方法来发射数据。
Maybe.create(new MaybeOnSubscribe<String>() { @Override public void subscribe(@NonNull MaybeEmitter<String> e) throws Exception { e.onSuccess("testA"); } }).subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { System.out.println("s="+s); } });
打印出来的结果是
s=testA
Maybe也只能发射0或者1个数据,即使发射多个数据,后面发射的数据也不会处理。
Maybe.create(new MaybeOnSubscribe<String>() { @Override public void subscribe(@NonNull MaybeEmitter<String> e) throws Exception { e.onSuccess("testA"); e.onSuccess("testB"); } }).subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { System.out.println("s="+s); } });
打印出来的结果仍然是
s=testA
跟第一次执行的结果是一致的。
如果MaybeEmitter先调用了onComplete(),即使后面再调用了onSuccess()也不会发射任何数据。
Maybe.create(new MaybeOnSubscribe<String>() { @Override public void subscribe(@NonNull MaybeEmitter<String> e) throws Exception { e.onComplete(); e.onSuccess("testA"); } }).subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { System.out.println("s="+s); } });
这次就没有打印任何数据了。
我们对上面的代码再做一下修改,在subscribe()中也加入onComplete(),看看打印出来的结果会是这样的?因为SingleObserver中是没有onComplete()方法。
Maybe.create(new MaybeOnSubscribe<String>() { @Override public void subscribe(@NonNull MaybeEmitter<String> e) throws Exception { e.onComplete(); e.onSuccess("testA"); } }).subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { System.out.println("s=" + s); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { } }, new Action() { @Override public void run() throws Exception { System.out.println("Maybe onComplete"); } });
这次打印的结果是
Maybe onComplete
通过查看Maybe相关的源码
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError, Action onComplete) { ObjectHelper.requireNonNull(onSuccess, "onSuccess is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); return subscribeWith(new MaybeCallbackObserver<T>(onSuccess, onError, onComplete)); }
我们可以得到,Maybe在没有数据发射时候subscribe会调用MaybeObserver的onComplete()。如果Maybe有数据发射或者调用了onError(),是不会再执行MaybeObserver的onComplete()。
我们也可以将 Maybe 转换成Observable、Flowable、Single,只需相应地调用toObservable()、toFlowable()、toSingle()。
接下来我们再来看看 Maybe 跟 Retrofit 是怎样结合使用的?
下面的网络请求,最初返回的类型是Flowable,但是这个网络请求并不是一个连续事件流,我们只会发起一次 Post 请求返回数据并且只收到一个事件。因此,可以考虑将 onComplete() 可以跟 onNext() 合并。在这里,尝试我们将Flowable改成Maybe。
@POST("v1/contents") Maybe<ContentModel> loadContent(@Body ContentParam param);
在model类中,我们大致会这样写。
public class ContentModel extends HttpResponse { public List<ContentItem> data; /** * 获取内容 * @param fragment * @param param * @param cacheKey * @return */ public Maybe<ContentModel> getContent(Fragment fragment,ContentParam param,String cacheKey) { return apiService.loadContent(param) .compose(RxLifecycle.bind(fragment).<ContentModel>toLifecycleTransformer()) .compose(RxJavaUtils.<ContentModel>maybeToMain()) .compose(RxUtils.<ContentModel>toCacheTransformer(cacheKey,App.getInstance().cache)); } ...... }
其中,maybeToMain()方法是用Kotlin编写的工具方法,这些工具方法由Kotlin来编写会显得比较简单和清晰,特别是lambda表达式更加直观。
@JvmStatic fun <T> maybeToMain(): MaybeTransformer<T, T> { return MaybeTransformer{ upstream -> upstream.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) } }
最后是真正地使用model类,如果网络请求成功则将数据展示到recyclerview上,如果失败也会做出相应地处理。
model.getContent(this,param,cacheKey) .subscribe(new Consumer<ContentModel>() { @Override public void accept(@io.reactivex.annotations.NonNull ContentModel model) throws Exception { adapter = new NewsAdapter(mContext, model); recyclerview.setAdapter(adapter); spinKitView.setVisibility(View.GONE); } }, new RxException<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { throwable.printStackTrace(); spinKitView.setVisibility(View.GONE); ...... } });
获取内容的request.jpeg
获取内容的response.jpeg
总结
RxJava 有五种不同类型的被观察者,合理地使用它们能够写出更简洁优雅的代码。这些被观察者在一定程度上也能够作一些相互转换。值得注意的是,只有Flowable是支持Backpressure的,其余四种都不支持。