RxJava中几个常用但初学介绍不多的方法介绍

简介: 自己学习RxJava的知识点的总结,看了很多篇文章,到现在也算是略有心得;推荐大家如果想了解的可以看看这篇文章,RxJava综合教程(系列版)不过再详细的讲解还是会有说的不到的地方,比如我最近在做安卓的课设,坑的老师,一开始说没有,最后几天在说,真的是加班加点的干。

自己学习RxJava的知识点的总结,看了很多篇文章,到现在也算是略有心得;推荐大家如果想了解的可以看看这篇文章,RxJava综合教程(系列版)

不过再详细的讲解还是会有说的不到的地方,比如我最近在做安卓的课设,坑的老师,一开始说没有,最后几天在说,真的是加班加点的干。不过也算是总结吧,之前一段时间的学习知识点也都在这里有所涉及,不过毕竟简单。git地址在此:欢迎大家指点一个小项目

这里很多东西没有实现,不过我最近学习的RxJava和Retrofit用到了。可惜对于MVP不够熟悉,所以在时间很赶的情况下只能抛弃了。我把我遇到的不熟悉的知识点放在这大家可以瞅一瞅:

disable:rxjava虽然好用,但是总所周知,容易遭层内存泄漏。也就说在订阅了事件后没有及时取阅,导致在activity或者fragment销毁后仍然占用着内存,无法释放。而disposable便是这个订阅事件,可以用来取消订阅。但是在什么时候取消订阅呢?我知道有两种方式:使用CompositeDisposable看源码,CompositeDisposable的介绍很简单

A disposable container that can hold onto multiple other disposables and offers O(1) add and removal complexity.

一个disposable的容器,可以容纳多个disposable,添加和去除的复杂度为O(1)。这里需要注意的是在该类的addAll方法有这么一句注释也就是说,如果这个CompositeDisposable容器已经是处于dispose的状态,那么所有加进来的disposable都会被自动切断。所以说可以创建一个BaseActivity,用CompositeDisposable来管理订阅事件disposable,然后在acivity销毁的时候,调用compositeDisposable.dispose()就可以切断所有订阅事件,防止内存泄漏。

(2)在oError和onComplete后调用disposable.dispose();,ObservableCreate的静态类CreateEmitter就是这种方式实现的。同时也可以看到,onError和onComplete不可以同时调用的原因:每次掉用过onError或onComplete其中一个方法后,就会掉用dispose()方法,此时订阅取消,自然也就不能掉用另一个方法了;    说实话一开始好像没人给我说disable到底是什么东西,之后我在其他人的项目上看到了使用,封装到Base中,所以我想怎么都要搞懂才行啊,不然出去装都装不起来。所以我就搜索了一下。

Consumer我印象(只是印象)第一次看到这个是在学习kotlin的时候,我git了一个项目,看到里面的订阅方法有两个{ },说实话当时我并不懂,不过两天我看了鸿洋大神的git项目,它封装了一些RxJava和Retrofit的基本用法,我在里面又看到了这个我才知道具体真面目,所以上网搜了一些才明白

Consumer是简易版的Observer,他有多重重载,可以自定义你需要处理的信息,我这里调用的是只接受onNext消息的方法,

* 他只提供一个回调接口accept,由于没有onError和onCompete,无法再 接受到onError或者onCompete之后,实现函数回调。

* 无法回调,并不代表不接收,他还是会接收到onCompete和onError之后做出默认操作,也就是监听者(Consumer)不在接收

doOnNext官方介绍:

The doOnNext operator is much like doOnEach(Action1) except that the Action that you pass it as a parameter does not accept a Notification but instead simply accepts the emitted item.

可以这么理解:do系列的作用是side effect,当onNext发生时,它被调用,不改变数据流。doOnNext()允许我们在每次输出一个元素之前做一些额外的事情。使用doOnNext()来调试在flatMap()里使用doOnError()作为错误处理。使用doOnNext()去保存/缓存网络结果


 map And flatMap:   两个方法都是对对象进行转换map (比如可以把String转换为一个实体类型等,参数是 Funcation,可以重写apply方法)flatMap (同样是转换,不过他是对一串事件的转换,比如一个学生拥有很多课程)他们的参数都是有两个 (第一个参数是传入的参数类型)第二个参数是需要转换成什么样的参数类型)注意flatMap的第二个参数需要在外层包裹Observalbe,因为他还需要进行处理上游每发送一个事件, flatMap都将创建一个新的水管, 然后发送转换之后的新的事件, 下游接收到的就是这些新的水管发送的数据. 这里需要注意的是, flatMap并不保证事件的顺序, 也就是图中所看到的, 并不是事件1就在事件2的前面.如果需要保证顺序则需要使用concatMap.


img_dbc9789f0dcf73d315b3cf3955d5ddb7.png
示例

RxJava用法多种多样,其多样性体现在Obserable(被观察者)的创建上。我们先以最基础的Obserable(被观察者)的创建为例介绍RxJava的使用:Observable的创建:Observableobservable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmittere) throws Exception { //执行一些其他操作 //............. //执行完毕,触发回调,通知观察者 e.onNext("我来发射数据"); } });Observer的创建:

使用create( )创建Observable最基本的创建方式。可以看到,这里传入了一个 ObservableOnSubscribe对象作为参数,它的作用相当于一个计划表,当 Observable被订阅的时候,ObservableOnSubscribe的subscribe()方法才会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Observer 将会被调用一次 onNext())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。


这里用了一个sample操作符, 简单做个介绍, 这个操作符每隔指定的时间就从上游中取出一个事件发送给下游.这里我们让它每隔2秒取一个事件给下游, 来看看这次的运行结果吧:

.sample(2, TimeUnit.SECONDS)  //sample 取样,接收两个参数

(这里肯定有很多种用法)不过我看的介绍是用在处理背压上面;两种方式,一种是控制上游发送的数据量,一种是控制时间。这是其中一种,如果不明白可以看我开头提到的系列文章,里面都会有介绍。

Scheduler:创建线程异步任务的方法


img_b214fbea7f6c1f75e9e2ae1820780103.png
示例

有关背压的处理官方给我们提供了一个Flowable,相对应的是Subscriber

之前我们所的上游和下游分别是Observable和Observer, 这次不一样的是上游变成了Flowable, 下游变成了Subscriber,但是水管之间的连接还是通过subscribe(), 我们来看看最基本的用法吧:我们注意到这次和Observable有些不同. 首先是创建Flowable的时候增加了一个参数, 这个参数是用来选择背压,也就是出现上下游流速不均衡的时候应该怎么处理的办法, 这里我们直接BackpressureStrategy.ERROR这种方式,这种方式会在出现上下游流速不均衡的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException.另外的一个区别是在下游的onSubscribe方法中传给我们的不再是Disposable了, 而是Subscription, 它俩有什么区别呢,首先它们都是上下游中间的一个开关, 之前我们说调用Disposable.dispose()方法可以切断水管,同样的调用Subscription.cancel()也可以切断水管, 不同的地方在于Subscription增加了一个void request(long n)方法,


最后是我自己随便写的总结(学习RxJava的)

注意: 只有当上游和下游建立连接之后, 上游才会开始发送事件. 也就是调用了subscribe() 方法之后才开始发送事件.

ObservableEmitter 和 Disposable

ObservableEmitter: Emitter是发射器的意思,那就很好猜了,这个就是用来发出事件的,它可以发出三种类型的事件,

通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。

但是,请注意,并不意味着你可以随意乱七八糟发射事件,需要满足一定的规则:

上游可以发送无限个onNext, 下游也可以接收无限个onNext.

当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送,

而下游收到onComplete事件之后将不再继续接收事件.

当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.

上游可以不发送onComplete或onError.

最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError,

也不能先发一个onComplete, 然后再发一个onError, 反之亦然

注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制,

如果你的代码逻辑中违背了这个规则, 并不一定会导致程序崩溃. 比如发送多个onComplete是可以正常运行的,

依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.

==========================================================================================================

介绍了ObservableEmitter, 接下来介绍Disposable, 这个单词的字面意思是一次性用品,用完即可丢弃的.

那么在RxJava中怎么去理解它呢, 对应于上面的水管的例子, 我们可以把它理解成两根管道之间的一个机关,

当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件.

注意: 调用dispose()并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件.

filter的参数是Predicate的接口对象

目录
相关文章
RxJava2源码分析(二):操作符原理分析
RxJava2源码分析(二):操作符原理分析
RxJava2源码分析(二):操作符原理分析
|
Java Android开发
面试官:RxJava是如何做到响应式编程的?
RxJava已经出现很多个年头了,但是依然被很多公司使用,如果现在还对RxJava了解的不够透彻, 可以看这个系列对它的分析:相信看完后你对它会有个更全面的认识。 这个系列主要从下面几个方面来讲解: **RxJava基本操作符使用** **RxJava响应式编程是如何实现的** **RxJava的背压机制及Flowable是如何实现背压的** **RxJava的线程切换原理
|
设计模式 缓存 算法
带你手撸一个Kotlin版的EventBus
EventBus的优点有很多(现在来看也并不是优点):代码简洁,是一种发布订阅设计模式(观察者设计模式),简化了组件之间的通讯,分离了事件的发送者和接收者,而且可以随意切换线程,避免了复杂的和易错的依赖关系和生命周期问题
363 0
带你手撸一个Kotlin版的EventBus
|
Java 程序员
Rxjava实战笔记 | Rxjava的基本使用解析(同步结合示例)
Rxjava实战笔记 | Rxjava的基本使用解析(同步结合示例)
|
Java Android开发
关于Rxjava的简单使用
本篇只是讲一下Rxjava的简单入门使用,想要详解的请移步其他博主文章,关于RxJava详解的文章网上一大堆,本片文章内容适合小白学习。
168 1
Livedata源码详细解析-面试这么讲就ok
Livedata源码详细解析-面试这么讲就ok
289 0
Livedata源码详细解析-面试这么讲就ok
|
安全 Java Android开发
MVP+Retrofit+RxJava简单事例
MVP+Retrofit+RxJava简单事例
154 0
MVP+Retrofit+RxJava简单事例
|
IDE 测试技术 API
聊聊我的源码阅读方法
本次代码阅读的项目来自 500lines 的子项目 web-server。 500 Lines or Less不仅是一个项目,也是一本同名书,有源码,也有文字介绍。这个项目由多个独立的章节组成,每个章节由领域大牛试图用 500 行或者更少(500 or less)的代码,让读者了解一个功能或需求的简单实现。
161 0
聊聊我的源码阅读方法