RxJava操作符详解--来看看你还记得多少

简介: RxJava已经出现很多个年头了,但是依然被很多公司使用,如果现在还对RxJava了解的不够透彻, 可以看这个系列对它的分析:相信看完后你对它会有个更全面的认识。 这个系列主要从下面几个方面来讲解: **RxJava基本操作符使用** **RxJava响应式编程是如何实现的** **RxJava的背压机制及Flowable是如何实现背压的** **RxJava的线程切换原理**

前言

RxJava已经出现很多个年头了,但是依然被很多公司使用,如果现在还对RxJava了解的不够透彻, 可以看这个系列对它的分析:相信看完后你对它会有个更全面的认识。 这个系列主要从下面几个方面来讲解: RxJava基本操作符使用 RxJava响应式编程是如何实现的 RxJava的背压机制及Flowable是如何实现背压的 RxJava的线程切换原理

关于RxJava的其他系列文章,可以点击下方链接

面试官:RxJava背压机制有了解么?

面试官:RxJava是如何做到响应式编程的?

使用rxjava创建一个rxbus事件处理框架

简介

本篇文章主要是针对RxJava基本操作符的讲解: RxJava核心之处就在于其使用的是响应式编程和操作符的多样式, 上面两个特点可以让一个原来很复杂的调用逻辑,使用几行代码就可以解决、 对线程的切换也只需要一个操作符就可以,再也不用满IDE去找Handler了 但是就是由于其使用的简单性以及操作符的多样性,就要求我们对它的操作符以及操作符背后的知识有一个更全面的了解。

PS:

**使用过程中p方法用来打印结果**
public <T> void p(T t){
    System.out.println(t);
}
**用getObserver来创建一个通用的Observer观察者**
public Observer<Object> getObserver(){
        return new Observer<Object>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                p("onSubscribe->"+d);
            }

            @Override
            public void onNext(@NonNull Object obj) {
                p("onNext->"+obj);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                p("onError->"+e.getMessage());

            }

            @Override
            public void onComplete() {
                p("onComplete->");
            }
        };
    }

操作符分类:

创建型

1.create

定义:

创建一个完全的被观察者

使用方式:
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    emitter.onComplete();
}).subscribe(getObserver());
打印结果
onSubscribe->null
onNext->1
onNext->2
onNext->3
onComplete->
  • 这里打印出了各个onNext操作,且调用了onComplete完成事件

2:empty() & never() & error()

定义:
  • empty():定义:创建后直接发出complete事件
  • error:定义:快速创建只发出error事件
  • never:定义:快速创建不发出任何事件
使用方式:
Observable.empty().subscribe(getObserver());
Observable.never().subscribe(getObserver());
Observable.error(() -> new Exception("error test")).subscribe(getObserver());
打印结果:
onComplete->
onError->error test
第一个empty什么都没打印:
第二个never打印出了onComplete,并直接退出
第三个error打印出了一个error信息:error test

3.just

定义:

直接发送传入的事件:最多十个

使用方式
Observable.just(1,2,3,4,5,6,7,8,9,10).subscribe(getObserver());
打印结果
onNext->1
onNext->2
onNext->3
onNext->4
onNext->5
onNext->6
onNext->7
onNext->8
onNext->9
onNext->10
onComplete->

发送完所有事件后,发送一个onComplete事件

4.fromArray

定义:

发送数组数据

使用方式
Object[] objects = new Object[]{1,2,3,4};
Observable.fromArray(objects).subscribe(getObserver());
打印结果
onNext->1
onNext->2
onNext->3
onNext->4
onComplete->

可以看到将数组的元素依次发出

5.fromIterator

定义

发送实现迭代器的接口事件,如list

使用方式
List<Object> list = Arrays.asList(1,2,3,4);
Observable.fromIterable(list).subscribe(getObserver());
打印结果
onNext->1
onNext->2
onNext->3
onNext->4
onComplete->
打印出了list中的每个元素

6.defer

定义

延迟到观察者订阅的时候再创建被观察者

使用方式
Observable.defer(() -> {
    p("call");
    return (ObservableSource<Object>) observer -> {
        p("subscribe");
        observer.onNext(1);
        observer.onNext(2);
        observer.onNext(3);
    };
}).subscribe(getObserver());
打印结果
call
subscribe
onNext->1
onNext->2
onNext->3
可以看到这个观察者是调用call方法后再创建被观察者ObservableSource<Object>

7.timer

定义:

快速创建一个被观察者,并在延迟多少时间后发出一个事件0

使用方式:
Observable.timer(5,TimeUnit.SECONDS).subscribe(getObserver());
打印结果
onNext->0
onComplete->
在延迟5s钟后发出一个0的事件给下游

8.interval

定义

每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。

使用方式
Observable.interval(2,TimeUnit.SECONDS).subscribe(getObserver());
打印结果
onNext->0
onNext->1
onNext->2
onNext->3
onNext->4
...
这里每隔2s发出一次事件,事件以0开始递归+1

9.intervalRange

定义

和interval一样,只是可以指定发送的起始值和发送的事件个数和发送的延迟时间

/**
参数:
start:起始值
count:一起有几个值
initialDelay:延迟多久发送第一个事件
period:每隔多久发送一个事件
unit:时间单位
**/
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit);
使用方式
Observable.intervalRange(0,5,0,1,TimeUnit.SECONDS).subscribe(getObserver());
打印结果
onNext->0
onNext->1
onNext->2
onNext->3
onNext->4
onComplete->
这里起始时间为0,0s后发送第一个事件,每隔1s发送一个事件,每个事件值递增,最多发出5个事件后结束

range:

定义: 快速创建被观察者,连续发送一个时间事件序列,可指定范围,每次加1
/**
start:初始事件值
count:事件个数
每次事件递增1
**/
public static Observable<Integer> range(final int start, final int count);
使用方式:
Observable.range(1,5).subscribe(getObserver());
打印结果:
onNext->1
onNext->2
onNext->3
onNext->4
onNext->5
onComplete->
从1开始连续打印了5个数,每个数值递增1

变换

map:

定义:将事件作为参数传入一个函数内部,将返回值作为新的事件值发出,有点类似于C语言中的函数指针
/**
传入一个方法作为参数
**/
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper);
使用方法:
Observable.range(1,5).map(new Function<Integer, Object>() {
    @Override
    public Object apply(@NonNull Integer integer) throws Exception {
        return integer+10;
    }
}).subscribe(getObserver());
打印结果:
onNext->11
onNext->12
onNext->13
onNext->14
onNext->15
onComplete->

事件值从1-5经过map后变为11-15

flatMap:

定义:将事件转换为多个子事件,并合并发出,无序

//也是只传入一个Function public final Observable flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {

使用方法:
Observable.range(1,5).flatMap(new Function<Integer, ObservableSource<?>>() {
    @Override
    public ObservableSource<?> apply(@NonNull Integer integer) throws Exception {

        return new ObservableSource<Object>() {
            @Override
            public void subscribe(@NonNull Observer<? super Object> observer) {
                observer.onNext(integer+"发出子事件1");
                observer.onNext(integer+"发出子事件2");
                observer.onComplete();
            }
        };
    }

}).subscribe(getObserver());
打印结果:
onNext->1发出子事件1
onNext->1发出子事件2
onNext->2发出子事件1
onNext->2发出子事件2
onNext->3发出子事件1
onNext->3发出子事件2
onNext->4发出子事件1
onNext->4发出子事件2
onNext->5发出子事件1
onNext->5发出子事件2
onComplete->

这里看到每个事件都分裂为了两个子事件,并将所有的子事件合并为了一个新的事件集合发出,合并后的事件集合可能顺序不一定一致
如果有onComplete事件,则只会在最后发出,这也是合并后的结果

flatMapIterable:

定义:每个子事件可以分裂为一个事件的集合(实现Iterable接口),最后合并为一个事件集合发出
public final <U> Observable<U> flatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper)
使用方式:
Observable.range(1,3).flatMapIterable(new Function<Integer, Iterable<?>>() {
    @Override
    public Iterable<?> apply(@NonNull Integer integer) throws Exception {
        List list = Arrays.asList(integer+1,integer+2,integer+3);
        return list;
    }
}).subscribe(getObserver());
打印结果:
onNext->2
onNext->3
onNext->4
onNext->3
onNext->4
onNext->5
onNext->4
onNext->5
onNext->6
onComplete->
看到这里的每个事件分裂为了三个子事件,最后合并成一个集合发出

concatMap:

定义:将事件转换为多个子事件,并合并发出,有序,和flatmap区别就是这里是有序的
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
使用方式:
Observable.range(1,3).concatMap(new Function<Integer, ObservableSource<?>>() {
    @Override
    public ObservableSource<?> apply(@NonNull Integer integer) throws Exception {
        return new ObservableSource<Object>() {
            @Override
            public void subscribe(@NonNull Observer<? super Object> observer) {
                observer.onNext(integer+"发出子事件1");
                observer.onNext(integer+"发出子事件2");
                observer.onComplete();
            }
        };
    }
}).subscribe(getObserver());
打印结果:
onNext->1发出子事件1
onNext->1发出子事件2
onNext->2发出子事件1
onNext->2发出子事件2
onNext->3发出子事件1
onNext->3发出子事件2
onComplete->
这里发出的事件都是有序的,而flatMap不保证有序

buffer:

定义:每几个事件组合为一个集合事件再发出
public final Observable<List<T>> buffer(int count)
使用方式:
 Observable.range(1,10).buffer(3).subscribe(getObserver());
打印结果:
onNext->[1, 2, 3]
onNext->[4, 5, 6]
onNext->[7, 8, 9]
onNext->[10]
onComplete->

每三个事件组合为了一个集合事件,最后不足3个事件的归到一个集合中

组合:

concat()与concatArray():

定义:concat最多组合四个,concatArray组合无限个,按组合先后顺序发送,追加
public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2);
public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources) {

使用方式

Observable.concat(Observable.just(1,2,3),Observable.just(1,2,3)).subscribe(getObserver());
Observable.concatArray(Observable.just(1,2,3),Observable.just(1,2,3),Observable.just(1,2,3)).subscribe(getObserver());
打印结果:
concat:
onNext->1
onNext->2
onNext->3
onNext->1
onNext->2
onNext->3
onComplete->
concatArray:
onNext->1
onNext->2
onNext->3
onNext->1
onNext->2
onNext->3
onNext->1
onNext->2
onNext->3
onComplete->

merge和mergeArray:

定义:按时间顺序组合,merge最多可以使用4组元素,而mergeArray没有限制
使用方式:
Observable.mergeArray(
        Observable.intervalRange(1,3,0,1000,TimeUnit.MILLISECONDS),
        Observable.intervalRange(1,3,500,10000,TimeUnit.MILLISECONDS))
        .subscribe(getObserver());
打印结果:
onNext->1
onNext->1
onNext->2
onNext->2
onNext->3
onNext->3
onComplete->

第二个观察者是延迟500s后发送数据,而每次发送数据间隔为1s那可以知道发送顺序应该是
发送顺序
第一个被观察者:1   2   3
第二个被观察者:  1   2   3 
根据实际顺序组合应该是1->1->2->2->3->3和打印结果一致

startWith()与startWithArray():

定义:在Observable提交结果之前添加数据
使用方式:
Observable.just(1,2,3,4).startWith(0).subscribe(getObserver());
打印结果:
onNext->0
onNext->1
onNext->2
onNext->3
onNext->4
onComplete->



调用startWith后在所有事件前增加了一个0元素
Observable.just(1,2,3,4).startWithArray(10,20,30,40).subscribe(getObserver());
打印结果:
onNext->10
onNext->20
onNext->30
onNext->40
onNext->1
onNext->2
onNext->3
onNext->4
onComplete->

调用startWithArray后在前面增加了10,20,30,40事件

合并

zip:

定义:按照顺序对位进行合并,最终合并数量等于数据最少的那个
/*
sources:被观察者集合
zipper:合并策略
可以传入一个合并的策略
*/
public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper) 
使用方式:
List list = Arrays.asList(Observable.intervalRange(1, 3, 0, 1000, TimeUnit.MILLISECONDS),
                Observable.intervalRange(1, 3, 500, 1000, TimeUnit.MILLISECONDS));
Observable.zip(list, new Function<Object[], Object>() {
    @Override
    public Object apply(@NonNull Object[] objects) throws Exception {
        Long o = 0L;
        for (int i = 0; i < objects.length; i++) {
            o += (Long) objects[i];
        }
        return o;
    }
}).subscribe(getObserver());
打印结果:
onNext->2
onNext->4
onNext->6
onComplete->
看到结果是以发送的顺序进行合并的:每个组合中的对应位置上的元素进行组合
发送顺序:
第一个被观察者:1   2   3
第二个被观察者:  1   2   3 
根据实际顺序合并应该是
1和1:2
2和2:4
3和3:6
和打印结果一致

reduce:

定义:把观察者需要的事件合并为一个事件,合并关系1和2得到12,12和3得到123,123和4得到1234...
使用方式:
Observable.just(1,2,3,4,5).reduce((integer, integer2) -> integer+integer2).subscribe(integer -> p(integer));
打印结果:

15 = 1+2+3+4+5 和猜测一致

collect:

定义:将被观察者发送的数据收集到一个容器中。
使用方式:
Observable.just(1,2,3,4,5).collect((Callable<ArrayList>) () -> new ArrayList<Integer>(), (o, integer) -> {
    o.add(integer);
}).subscribe((Consumer<Object>) o -> p(o));
打印结果:
[1, 2, 3, 4, 5]
将数据放到了一个list集合中

count:

定义:统计事件数量
 Observable.just(1,2,3,4,5).count().subscribe(aLong -> p(aLong));
打印结果:5

延迟

delay:

定义:延迟一定时间Observable(被观察者)再发送事件,delay()有个多个重载的方法:
使用方式:
Observable.just(1,2,3,4,5).delay(3,TimeUnit.SECONDS);
打印结果:

延迟了3s才开始发送事件

do操作符

定义:1.doOnNext()与doOnEach()与doAfterNext()
doOnNext:next方法前执行
doAfterNext:next方法后执行
doOnEach:每个事件方法都会执行
使用方式:
Observable.just(1,2,3).doOnNext(integer -> { p("doOnNext"+integer); }).doAfterNext(integer -> { p("doAfterNext"+integer); }).doOnEach(integerNotification -> { p("doOnEach"+integerNotification.getValue()); }).subscribe(getObserver());
打印结果:
doOnNext1
doOnEach1
onNext->1
doAfterNext1

doOnNext2
doOnEach2
onNext->2
doAfterNext2

doOnNext3
doOnEach3
onNext->3
doAfterNext3

doOnEachnull
onComplete->

可以看到在每个方法前:doOnNext每次Next事件前都会执行一次,doAfterNext在每次Next事件后会执行一次,而doOnEach会在执行任何事件都会打印一次

2.doOnError与doAfterTerinate:

doOnError:发送错误事件执行
doAfterTerinate:终止后执行
使用方法:
Observable.just(1,2,3).flatMap(integer -> (ObservableSource<Object>) observer -> {
    observer.onNext(integer);
    if(integer==2){
        observer.onError(new Throwable("test error"));
    }
}).doOnError(throwable -> {
    p(throwable.getMessage());
}).doAfterTerminate(() -> {
    p("终止了");
}).subscribe(getObserver());
打印结果:
onNext->1
onNext->2
test error
onError->test error
终止了

结果可以看到doOnError后,只要发出error事件都会打印出来,doAfterTerminate是在每个事件完成后包括异常或者正常完成都会回调

3.doOnSubscribe()与doOnComplete()与doFinally()

doOnSubscribe:有观察者订阅时执行 doOnComplete:事件完成时执行 doFinally:事件最后执行,不管是error还是complete

使用方式:
Observable.just(1,2,3).doOnComplete(() -> {
    p("doOnComplete");
}).doFinally(() -> {
    p("doFinally");
}).doOnSubscribe(disposable -> {
    p("disposable");
}).subscribe(getObserver());
打印结果:
disposable
onNext->1
onNext->2
onNext->3
doOnComplete
onComplete->
doFinally

结果看到在订阅的时候打印了disposable,在next事件结束后onComplete前打印了doOnComplete,在最后调用了doFinally

错误处理操作符

onErrorReturn:

定义:发生错误事件时,发出一个onNext的事件和原事件类型一致, 并回调onCompleted事件
使用方式:
Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    emitter.onError(new Throwable("test error"));
}).onErrorReturn((Function<Throwable, Integer>) throwable -> {
    p("onErrorReturn"+throwable.getMessage());
    return 10;
}).subscribe(getObserver());
打印结果:
onNext->1
onNext->2
onNext->3
onErrorReturntest error
onNext->10
onComplete->
这里在发送error的时候,再次发出了一个onNext->10事件,并回调了onComplete方法

onErrorResumeNext:

定义:发生错误事件时,发出一个事件源,事件源可以再次发送一个ObservableSource
使用方式:
Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    emitter.onError(new Throwable("test error"));
}).onErrorResumeNext((Function<Throwable, ObservableSource<? extends Integer>>) throwable -> {
    p("onErrorResumeNext"+throwable.getMessage());
    return new ObservableSource<Integer>() {
        @Override
        public void subscribe(@NonNull Observer<? super Integer> observer) {
            observer.onNext(10);
            observer.onNext(20);
            observer.onComplete();
        }
    };
}).subscribe(getObserver());
打印结果:
onNext->1
onNext->2
onNext->3
onErrorResumeNexttest error
onNext->10
onNext->20
onComplete->
在发送error异常时,又在新的ObservableSource中发送了一个10和20,并回调onComplete方法
onErrorResumeNext()与onErrorReturn()类似:区别在于 onErrorReturn(): 返回的 是一个与源Observable相同类型的结果(onNext()) onErrorResumeNext():返回的是与源Observable相同类型的新的Observable,也就是说可以返回多个相同类型的Observable。可以返回多个相同类型的Observable。

retry:

定义:当被观察者发生错误或者异常时,重新执行原逻辑,可以指定次数
使用方式:
Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onError(new Throwable("test error"));
}).retry(2).subscribe(getObserver());
打印结果:
onNext->1
onNext->2
onNext->1
onNext->2
onNext->1
onNext->2
onError->test error
可以看到在发生异常事件后,会重复执行之前的事件两次

retryWhen():

定义:在源被观察者Observable出现错误或者异常时,通过回调新的Observable来判断是否重新尝试执行源Observable的逻辑, 如果第二个Observable没有出现错误或者异常,就会重新尝试执行源Observable的逻辑,否则会直接回调订阅者的onError()方法。
使用方式:
Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onError(new Throwable("test error"));
}).retryWhen(throwableObservable -> new ObservableSource<Object>() {
    @Override
    public void subscribe(@NonNull Observer<? super Object> observer) {
        observer.onNext(1);
        observer.onError(new Throwable("eo"));
    }
}).subscribe(getObserver());
打印结果:
onNext->1
onNext->2
onError->eo
看到在新的被观察者数据元中出现了error则不会发出重试情况

repeat():

定义:表示重复,可设置数字,但是如果出现异常则不会再发送事件
使用方式:
Observable.just(1,2,3).repeat(2).subscribe(getObserver());
打印结果:
onNext->1
onNext->2
onNext->3
onNext->1
onNext->2
onNext->3
onComplete->
重复做了两次

repeatWhen():

定义:repeatWhen()是指有条件,重复性发送原来的被观察者Observable事件;在回调方法中创建新的Observable,通过新的observable是否重新订阅和发送事件。
Observable.just(1,2,3).repeatWhen(objectObservable -> new ObservableSource<Object>() {
    @Override
    public void subscribe(@NonNull Observer<? super Object> observer) {
//                        observer.onNext(1);
//                        observer.onNext(2);
        observer.onError(new Throwable("eo"));
    }
}).subscribe(getObserver());

打印结果:

onError->eo:这里只打印出了错误信息

我们在使用下面方式:
Observable.just(1,2,3).repeatWhen(objectObservable -> new ObservableSource<Object>() {
    @Override
    public void subscribe(@NonNull Observer<? super Object> observer) {
        observer.onNext(1);
//      observer.onNext(2);
        observer.onError(new Throwable("eo"));
    }
}).subscribe(getObserver());
**打印结果:**
onNext->1
onNext->2
onNext->3
onError->eo
打印出了一次1,2,3
**再次添加一个onNext看看:**
Observable.just(1,2,3).repeatWhen(objectObservable -> new ObservableSource<Object>() {
    @Override
    public void subscribe(@NonNull Observer<? super Object> observer) {
        observer.onNext(1);
        observer.onNext(2);
        observer.onError(new Throwable("eo"));
    }
}).subscribe(getObserver());
打印结果:
onNext->1
onNext->2
onNext->3
onNext->1
onNext->2
onNext->3
onError->eo
看到打印出了两次123,123说明这个操作符是根据我们新的ObservableSource中发送的next次数来重复发送123,当遇到onError,直接退出

repeatUtil:重复直到什么状态停止

使用方式:
Observable.just(1,2,3).repeatUntil(new BooleanSupplier() {
            int count =0;
            @Override
            public boolean getAsBoolean() throws Exception {
                return ++count>2?true:false;
            }
        }).subscribe(getObserver());
打印结果:
onNext->1
onNext->2
onNext->3
onNext->1
onNext->2
onNext->3
onNext->1
onNext->2
onNext->3
onComplete->
这里repeatUntil内部在count大于2的情况下返回true,就不在重复,说明BooleanSupplier这个类会被重复调用

过滤:

filter:根据条件过滤

使用方式:
Observable.just(1,2,3).filter(integer -> integer==2?false:true).subscribe(getObserver());
打印结果:

onNext->1 onNext->3 onComplete-> 打印出了1和3,而2因为在filter内部返回为false,则会被过滤了

ofType:

定义:根据类型过滤

使用方式:
Observable.just(1,2,3,"s1","s2",4).ofType(String.class).subscribe(getObserver());
打印结果:
onNext->s1
onNext->s2
onComplete->
看到把除了String类型外的都过滤掉了

elementAt:

定义:根据索引来,可以在超出范围时,使用默认值
Observable.just(1,2,3,"s1","s2",4).elementAt(4,"default value").subscribe((Consumer<Serializable>) serializable -> p(serializable));
打印结果:
s2
可以看到只获取了index对应的数值

distinct

定义:去重
使用方式:
Observable.just(1,2,3,4,1,2,3).distinct().subscribe(getObserver());
打印结果:
onNext->1
onNext->2
onNext->3
onNext->4
onComplete->
看到只打印了1,2,3,4去掉了重复的数字

debounce:

定义:只接受超时时间后的数据
Observable.intervalRange(1,5,0,2,TimeUnit.SECONDS).debounce(1,TimeUnit.SECONDS).subscribe(getObserver());
打印结果:
onNext->1
onNext->2
onNext->3
onNext->4
onNext->5
onComplete->
这里debounce设置的超时时间为1s,而intervalRange内部的事件都是在延迟2s后发出,所以打出了所有超出2s的时间1,2,3,4,5

first:

定义:第一个事件的值,可以用elementAt或者take(1)代替
使用方式:
Observable.just(1,2,3,4,5).first(2).subscribe(integer -> p(integer));
打印结果:1

last:最后一个和first对应

使用方式:
Observable.just(1,2,3,4,5).last(2).subscribe(integer -> p(integer));
打印结果:5

skip:跳过前面几个

使用方式:
Observable.just(1,2,3,4,5).skip(3).subscribe(integer -> p(integer));
打印结果:
4,5
前面3个跳过了

take:将事件的前面几个事件发出和skip相反

takeLast:将事件后面几个发出

使用方式:
Observable.just(1,2,3,4,5).take(3).subscribe(integer -> p(integer));
打印结果:1,2,3

其他操作符

groupBy:按某个方式排序

使用方式:
Observable.just(1,2,3,"s1","s2","s3",4).groupBy((Function<Serializable, Object>) serializable -> {
    if(serializable.getClass().getName().equals(String.class.getName())){
        return "string";
    }
    return "integer";
}).subscribe(objectSerializableGroupedObservable -> {
    String key = (String) objectSerializableGroupedObservable.getKey();
    if("string".equals(key)){
        objectSerializableGroupedObservable.subscribe((Consumer<Serializable>) serializable -> {
           p("String:"+serializable);
        });
    }else{
        objectSerializableGroupedObservable.subscribe((Consumer<Serializable>) serializable -> {
            p("Integer:"+serializable);
        });
    }
});
对传入的数据通过Class类型判断,分为string和integer两个key
然后在观察者中对key进行判断就可以将事件分组出来

cast:强制转换

使用方式:
Observable.just(new Bean(1,2)).cast(BaseBean.class).subscribe(getObserver());
打印结果:
onNext->com.android.yuhb.test.RxJava$Bean@45c8e616
onComplete->
打印出了Bean类型的数据,

这里有个点:父类不能强制转换为子类,子类可以强制转换为父类

scan:通过遍历源Observable(被观察者)产生的结果,依次每个结果按照指定的规则进行计算,计算后的结果作为下一项迭代的参数,

使用方式:
Observable.just(1,2,3,4).scan((integer, integer2) -> integer+integer2).subscribe(getObserver());
打印结果:
onNext->1
onNext->3
onNext->6
onNext->10
onComplete->
这里第一个事件发出,此时scan数据为0+1
第二个事件发出:此时scan数据等于0+1 +2 = 3
第三个事件发出:此时scan数据等于0+1+2 +3 = 6
第四个事件发出;此时scan数据等于0+1+2+3 +4 = 10
和打印结果一致
总结scan:前一个数据和后一个数据的和 作为下一个事件的前一个操作数

join:合并同一时间的值,

使用方式:
Observable<Long> o1 = Observable.interval(0,1000,TimeUnit.MILLISECONDS).map(aLong -> aLong*5).take(5);
Observable<Long> o2 = Observable.interval(500,1000,TimeUnit.MILLISECONDS).map(aLong -> aLong*10).take(5);
o1.join(o2, aLong -> {
    //使Observable延时600毫秒执行(控制observable1的生命周期)
    return Observable.just(aLong).delay(600,TimeUnit.MILLISECONDS);
}, aLong -> {
    //使Observable延时600毫秒执行(控制observable2的生命周期)
    return Observable.just(aLong).delay(600,TimeUnit.MILLISECONDS);
}, (o, o21) -> {
    pl("o1:"+o+" o2:"+ o21);
    return o+ o21;
}).subscribe(new Consumer() {
    @Override
    public void accept(Object o) throws Exception {
        p(o);
    }
});
打印结果:
o1:0 o2:0 0
o1:5 o2:0 5
o1:5 o2:10 15
o1:10 o2:10 20
o1:10 o2:20 30
o1:15 o2:20 35
o1:15 o2:30 45
o1:20 o2:30 50
o1:20 o2:40 60
看下面的两个事件的时间线:
0  5  10  15  20
  0 10  20  30  40 
同一时间事件碰撞列表:
o1:o2

0:0
5:0
5:10
10:10
10:20
15:20
15:30
20:30
20:40

这个列表和打印结果是一致的,说明我们的猜测是正确的

以上就是我对各个操作符的一些使用和解析,如果需要了解内部原理的可以查看内部源代码

关于RxJava的其他系列文章,可以点击下方链接

面试官:RxJava背压机制有了解么?

面试官:RxJava是如何做到响应式编程的?

使用rxjava创建一个rxbus事件处理框架

相关文章
|
Java 编译器
【JavaSE专栏66】使用instanceof关键字,验证显隐式类型转换
【JavaSE专栏66】使用instanceof关键字,验证显隐式类型转换
|
3月前
|
C++
C++(十九)new/delete 重载
本文介绍了C++中`operator new/delete`重载的使用方法,并通过示例代码展示了如何自定义内存分配与释放的行为。重载`new`和`delete`可以实现内存的精细控制,而`new[]`和`delete[]`则用于处理数组的内存管理。不当使用可能导致内存泄漏或错误释放。
|
7月前
|
存储 编译器 C语言
操作符精讲——这些操作符你还记得几个?
操作符精讲——这些操作符你还记得几个?
|
Java API
想要精通Java8编程?学会Predicate接口,轻松搞定条件判断!
想要精通Java8编程?学会Predicate接口,轻松搞定条件判断!
73 0
RxJava2源码分析(二):操作符原理分析
RxJava2源码分析(二):操作符原理分析
RxJava2源码分析(二):操作符原理分析
|
SQL Java
Java奇技淫巧:new对象后加大括号{},最后顺便做了个小框架?
Java奇技淫巧:new对象后加大括号{},最后顺便做了个小框架?
258 0
|
存储 JavaScript 前端开发
ES2020 系列:可选链 "?." 为啥出现,我们能用它来干啥?
ES2020 系列:可选链 "?." 为啥出现,我们能用它来干啥?
136 0
ES2020 系列:可选链 "?." 为啥出现,我们能用它来干啥?
|
Java
Lambda基本使用方法(二)——再看不懂我找不到女朋友
Lambda基本使用方法(二)——再看不懂我找不到女朋友
91 0
|
Java
JavaSE面试题03:方法的参数传递机制
JavaSE面试题03:方法的参数传递机制
87 0
JavaSE面试题——方法的参数传递机制
JavaSE面试题——方法的参数传递机制
JavaSE面试题——方法的参数传递机制