RxJava的Single、Completable以及Maybe

简介: RxJava的Single、Completable以及Maybe

通常情况下,如果我们想要使用 RxJava 首先会想到的是使用Observable,如果要考虑到Backpressure的情况,在 RxJava2.x 时代我们会使用Flowable。除了Observable和Flowable之外,在 RxJava2.x 中还有三种类型的Observables:Single、Completable、Maybe。


类型 描述
Observable<T> 能够发射0或n个数据,并以成功或错误事件终止。
Flowable<T> 能够发射0或n个数据,并以成功或错误事件终止。 支持Backpressure,可以控制数据源发射的速度。
Single<T> 只发射单个数据或错误事件。
Completable 它从来不发射数据,只处理 onComplete 和 onError 事件。可以看成是Rx的Runnable。
Maybe<T> 能够发射0或者1个数据,要么成功,要么失败。有点类似于Optional


从上面的表格可以看出,这五种被观察者类型中只有Flowable能支持Backpressure,如果有需要Backpressure的情况,还是必须要使用Flowable。


Single



从SingleEmitter的源码可以看出,Single 只有 onSuccess 和 onError 事件。

/**
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */
package io.reactivex;
import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Cancellable;
/**
 * Abstraction over an RxJava {@link SingleObserver} that allows associating
 * a resource with it.
 * <p>
 * All methods are safe to call from multiple threads.
 * <p>
 * Calling onSuccess or onError multiple times has no effect.
 *
 * @param <T> the value type to emit
 */
public interface SingleEmitter<T> {
    /**
     * Signal a success value.
     * @param t the value, not null
     */
    void onSuccess(@NonNull T t);
    /**
     * Signal an exception.
     * @param t the exception, not null
     */
    void onError(@NonNull Throwable t);
    /**
     * Sets a Disposable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param s the disposable, null is allowed
     */
    void setDisposable(@Nullable Disposable s);
    /**
     * Sets a Cancellable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param c the cancellable resource, null is allowed
     */
    void setCancellable(@Nullable Cancellable c);
    /**
     * Returns true if the downstream cancelled the sequence.
     * @return true if the downstream cancelled the sequence
     */
    boolean isDisposed();
}


其中,onSuccess()用于发射数据(在Observable/Flowable中使用onNext()来发射数据)。

而且只能发射一个数据,后面即使再发射数据也不会做任何处理。


Single的SingleObserver中只有onSuccess、onError,并没有onComplete。这是 Single 跟其他四种被观察者最大的区别。

Single.create(new SingleOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull SingleEmitter<String> e) throws Exception {
                e.onSuccess("test");
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println(s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                throwable.printStackTrace();
            }
        });


上面的代码,由于Observer中有两个Consumer,还可以进一步简化成

Single.create(new SingleOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull SingleEmitter<String> e) throws Exception {
                e.onSuccess("test");
            }
        }).subscribe(new BiConsumer<String, Throwable>() {
            @Override
            public void accept(String s, Throwable throwable) throws Exception {
                System.out.println(s);
            }
        });


Single 可以通过toXXX方法转换成Observable、Flowable、Completable以及Maybe。


Completable



Completable在创建后,不会发射任何数据。从CompletableEmitter的源码可以看到

/**
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */
package io.reactivex;
import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Cancellable;
/**
 * Abstraction over an RxJava {@link CompletableObserver} that allows associating
 * a resource with it.
 * <p>
 * All methods are safe to call from multiple threads.
 * <p>
 * Calling onComplete or onError multiple times has no effect.
 */
public interface CompletableEmitter {
    /**
     * Signal the completion.
     */
    void onComplete();
    /**
     * Signal an exception.
     * @param t the exception, not null
     */
    void onError(@NonNull Throwable t);
    /**
     * Sets a Disposable on this emitter; any previous Disposable
     * or Cancellation will be disposed/cancelled.
     * @param d the disposable, null is allowed
     */
    void setDisposable(@Nullable Disposable d);
    /**
     * Sets a Cancellable on this emitter; any previous Disposable
     * or Cancellation will be disposed/cancelled.
     * @param c the cancellable resource, null is allowed
     */
    void setCancellable(@Nullable Cancellable c);
    /**
     * Returns true if the downstream disposed the sequence.
     * @return true if the downstream disposed the sequence
     */
    boolean isDisposed();
}


Completable 只有 onComplete 和 onError 事件,同时 Completable 并没有map、flatMap等操作符,它的操作符比起 Observable/Flowable 要少得多。


我们可以通过fromXXX操作符来创建一个Completable。这是一个Completable版本的Hello World。

Completable.fromAction(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("Hello World");
            }
        }).subscribe();


Completable 经常会结合andThen操作符

Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(@NonNull CompletableEmitter emitter) throws Exception {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    emitter.onComplete();
                } catch (InterruptedException e) {
                    emitter.onError(e);
                }
            }
        }).andThen(Observable.range(1, 10))
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                System.out.println(integer);
            }
        });


在这里emitter.onComplete()执行完之后,表明Completable已经完全执行完毕,接下来是执行andThen里的操作。


打印结果如下:

1
2
3
4
5
6
7
8
9
10


在Completable中,andThen有多个重载的方法,正好对应了五种被观察者的类型。

Completable       andThen(CompletableSource next)
<T> Maybe<T>      andThen(MaybeSource<T> next)
<T> Observable<T> andThen(ObservableSource<T> next)
<T> Flowable<T>   andThen(Publisher<T> next)
<T> Single<T>     andThen(SingleSource<T> next)


Completable 也可以通过toXXX方法转换成Observable、Flowable、Single以及Maybe。


在网络操作中,如果遇到更新的情况,也就是Restful架构中的PUT操作,一般要么返回原先的对象要么只提示更新成功。下面两个接口使用了Retrofit,分别是用于获取短信验证码和更新用户信息,其中更新用户信息如果用PUT会更符合Restful的API。

/**
     * 获取短信验证码
     * @param param
     * @return
     */
    @POST("v1/user-auth")
    Completable getVerificationCode(@Body VerificationCodeParam param);
    /**
     * 用户信息更新接口
     * @param param
     * @return
     */
    @POST("v1/user-update")
    Completable update(@Body UpdateParam param);


在model类中大致会这样写。

/**
 * Created by Tony Shen on 2017/7/24.
 */
public class VerificationCodeModel extends HttpResponse {
    /**
     * 获取验证码
     * @param activity
     * @param param
     * @return
     */
    public Completable getVerificationCode(AppCompatActivity activity, VerificationCodeParam param) {
        return apiService
                .getVerificationCode(param)
                .compose(RxJavaUtils.<VerificationCodeModel>completableToMain())
                .compose(RxLifecycle.bind(activity).<VerificationCodeModel>toLifecycleTransformer());
    }
}


特别要注意的是getVerificationCode返回的是Completable而不是Completable<T>。


获取验证码成功则给出相应地toast提示,失败可以做出相应地处理。

VerificationCodeModel model = new VerificationCodeModel();
model.getVerificationCode(RegisterActivity.this,param)
           .subscribe(new Action() {
                      @Override
                      public void run() throws Exception {
                              showShort(RegisterActivity.this,"发送验证码成功");
                      }
            },new RxException<Throwable>(){
                      @Override
                     public void accept(@NonNull Throwable throwable) throws Exception {
                              throwable.printStackTrace();
                              ......
                     }
            });


image.png

获取手机验证码.jpeg


Maybe



Maybe 是 RxJava2.x 之后才有的新类型,可以看成是Single和Completable的结合。


Maybe创建之后,MaybeEmitter 和 SingleEmitter 一样并没有onNext()方法,同样需要通过onSuccess()方法来发射数据。

Maybe.create(new MaybeOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull MaybeEmitter<String> e) throws Exception {
                e.onSuccess("testA");
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("s="+s);
            }
        });


打印出来的结果是

s=testA


Maybe也只能发射0或者1个数据,即使发射多个数据,后面发射的数据也不会处理。

Maybe.create(new MaybeOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull MaybeEmitter<String> e) throws Exception {
                e.onSuccess("testA");
                e.onSuccess("testB");
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("s="+s);
            }
        });


打印出来的结果仍然是

s=testA


跟第一次执行的结果是一致的。


如果MaybeEmitter先调用了onComplete(),即使后面再调用了onSuccess()也不会发射任何数据。

Maybe.create(new MaybeOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull MaybeEmitter<String> e) throws Exception {
                e.onComplete();
                e.onSuccess("testA");
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("s="+s);
            }
        });


这次就没有打印任何数据了。


我们对上面的代码再做一下修改,在subscribe()中也加入onComplete(),看看打印出来的结果会是这样的?因为SingleObserver中是没有onComplete()方法。

Maybe.create(new MaybeOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull MaybeEmitter<String> e) throws Exception {
                e.onComplete();
                e.onSuccess("testA");
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("s=" + s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("Maybe onComplete");
            }
        });


这次打印的结果是

Maybe onComplete


通过查看Maybe相关的源码

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError,
            Action onComplete) {
        ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        return subscribeWith(new MaybeCallbackObserver<T>(onSuccess, onError, onComplete));
    }


我们可以得到,Maybe在没有数据发射时候subscribe会调用MaybeObserver的onComplete()。如果Maybe有数据发射或者调用了onError(),是不会再执行MaybeObserver的onComplete()。


我们也可以将 Maybe 转换成Observable、Flowable、Single,只需相应地调用toObservable()、toFlowable()、toSingle()。


接下来我们再来看看 Maybe 跟 Retrofit 是怎样结合使用的?


下面的网络请求,最初返回的类型是Flowable,但是这个网络请求并不是一个连续事件流,我们只会发起一次 Post 请求返回数据并且只收到一个事件。因此,可以考虑将 onComplete() 可以跟 onNext() 合并。在这里,尝试我们将Flowable改成Maybe。

@POST("v1/contents")
    Maybe<ContentModel> loadContent(@Body ContentParam param);


在model类中,我们大致会这样写。

public class ContentModel extends HttpResponse {
    public List<ContentItem> data;
    /**
     * 获取内容
     * @param fragment
     * @param param
     * @param cacheKey
     * @return
     */
    public Maybe<ContentModel> getContent(Fragment fragment,ContentParam param,String cacheKey) {
        return apiService.loadContent(param)
                .compose(RxLifecycle.bind(fragment).<ContentModel>toLifecycleTransformer())
                .compose(RxJavaUtils.<ContentModel>maybeToMain())
                .compose(RxUtils.<ContentModel>toCacheTransformer(cacheKey,App.getInstance().cache));
    }
    ......
}


其中,maybeToMain()方法是用Kotlin编写的工具方法,这些工具方法由Kotlin来编写会显得比较简单和清晰,特别是lambda表达式更加直观。

@JvmStatic
    fun <T> maybeToMain(): MaybeTransformer<T, T> {
        return MaybeTransformer{
            upstream ->
            upstream.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
        }
    }


最后是真正地使用model类,如果网络请求成功则将数据展示到recyclerview上,如果失败也会做出相应地处理。

model.getContent(this,param,cacheKey)
                .subscribe(new Consumer<ContentModel>() {
                    @Override
                    public void accept(@io.reactivex.annotations.NonNull ContentModel model) throws Exception {
                        adapter = new NewsAdapter(mContext, model);
                        recyclerview.setAdapter(adapter);
                        spinKitView.setVisibility(View.GONE);
                    }
                }, new RxException<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        throwable.printStackTrace();
                        spinKitView.setVisibility(View.GONE);
                        ......
                    }
                });


image.png

获取内容的request.jpeg


image.png

获取内容的response.jpeg


总结



RxJava 有五种不同类型的被观察者,合理地使用它们能够写出更简洁优雅的代码。这些被观察者在一定程度上也能够作一些相互转换。值得注意的是,只有Flowable是支持Backpressure的,其余四种都不支持。

相关文章
|
6月前
|
设计模式 Java 容器
【设计模式】JAVA Design Patterns——Async Method Invocation(异步方法调用模式)
【设计模式】JAVA Design Patterns——Async Method Invocation(异步方法调用模式)
【UVM源码学习】uvm_event
【UVM源码学习】uvm_event
331 0
【UVM源码学习】uvm_event
|
C++
读书笔记 effective c++ Item 49 理解new-handler的行为
1. new-handler介绍 当操作符new不能满足内存分配请求的时候,它就会抛出异常。很久之前,它会返回一个null指针,一些旧的编译器仍然会这么做。你仍然会看到这种旧行为,但是我会把关于它的讨论推迟到本条款结束的时候。
817 0
|
缓存 C++ Kotlin
Kotlin Coroutines Flow 系列(二) Flow VS RxJava2
Kotlin Coroutines Flow 系列(二) Flow VS RxJava2
224 0
Kotlin Coroutines Flow 系列(二) Flow VS RxJava2
|
Java API
RxJava 之 ParallelFlowable
RxJava 之 ParallelFlowable
273 0
RxJava 之 ParallelFlowable
|
Java API Kotlin
Kotlin Coroutines Flow 系列(一) Flow 基本使用
Kotlin Coroutines Flow 系列(一) Flow 基本使用
329 0
Kotlin Coroutines Flow 系列(一) Flow 基本使用
|
Java 大数据 Spring
Java高阶编程——RxBus 开源,基于 RxJava 的 event bus
介绍 RxBus 是一个发布/订阅模式的事件总线,用法和 EventBus 一样简单。RxBus 基于 RxJava 开发,除了拥有和 EventBus 一样简单的事件总线机制之外,还拥有 RxJava 的丰富特性。
1522 0
|
Android开发 Kotlin Java
Architecture -- LiveData
1). 简介 LiveData是一个可观察的数据持有者类。 与常规observable不同,LiveData是生命周期感知的,这意味着它尊重其他应用程序组件的生命周期,例如活动,片段或服务。
1045 0
RxJava/RxAndroid:ConnectableObservable &amp; replay(int bufferSize)
RxJava/RxAndroid:ConnectableObservable & replay(int bufferSize) import android.
942 0
RxJava/RxAndroid : interval
RxJava/RxAndroid : interval import android.support.v7.app.AppCompatActivity; import android.
876 0