关于Rxjava的简单使用

简介: 本篇只是讲一下Rxjava的简单入门使用,想要详解的请移步其他博主文章,关于RxJava详解的文章网上一大堆,本片文章内容适合小白学习。

本篇只是讲一下Rxjava的简单入门使用,想要详解的请移步其他博主文章,关于RxJava详解的文章网上一大堆,本片文章内容适合小白学习。

首先理解什么是RxJava,官方概念是RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。关注的重点是两个字,那就是异步。

RxJava的特点就是可以非常简便的实现异步调用,可以在逻辑复杂的代码逻辑中以比较轻易的方式实现异步调用。随着逻辑的复杂,需求的更改,代码可依然能保持极强的阅读性,在深入的使用过程中一定对这点深有体会。

在这里我们在理解两个概念之间的关系,分别是RxJava和RxAndroid,很多人肯定会问,这两个之间有什么关联,其实就是Java虚拟机跟Android虚拟机是同一个道理。RxAndroid是RxJava的一个针对Android平台的扩展,主要用于 Android 开发。

简单的说就是,RxJava在Java平台就叫RxJava,扩展到Android平台上,就叫RxAndroid。就像Java虚拟机扩展到Android平台上就叫Android虚拟机,同一个东西在不同平台上拥有不同的名字,类似于这个道理。

开始学习Rxjava前,我要知道一点,RxJava是基于观察者模式设计的一种实现异步操作的一个库。那么要了解来个类,Observable(观察者)和Subscriber(订阅者)。在 RxJava 上,一个 Observable 是一个发出数据流或者事件的类,Subscriber 是一个对这些发出的 items (数据流或者事件)进行处理(采取行动)的类。一个 Observable 的标准流发出一个或多个 item,然后成功完成或者出错。一个 Observable 可以有多个 Subscribers,并且通过 Observable 发出的每一个 item,该 item 将会被发送到 Subscriber.onNext() 方法来进行处理。一旦 Observable 不再发出 items,它将会调用 Subscriber.onCompleted() 方法,或如果有一个出错的话 Observable 会调用 Subscriber.onError() 方法。

下面先看Observable的创建

Observable ob = Observable.create(new Observable.OnSubscribe() {
   @Override
   public void call(Subscriber subscriber) {
       subscriber.onNext(1);
       subscriber.onNext(2);
       subscriber.onNext(3);
       subscriber.onCompleted();
   }
});

在看Subscriber的创建

Subscriber sb= new Subscriber() {
   @Override
   public void onCompleted() {
       System.out.println("Complete!");
   }
   @Override
   public void onError(Throwable e) {
   }
   @Override
   public void onNext(Integer value) {
       System.out.println("onNext: " + value);
   }
};

那么如果将ob和sb进行关联起来呢,答案就是订阅,使用subscribe()方法

ob.subscribe(sb);

以上就完成了一个完整的订阅流程,ob会依次通过Subscriber.onNext()发送数据“1”、“2”,“3”,sb会分别在onNext()中接收这三个数据,当ob调用了Subscriber.onCompleted()表示发送完成,此时sb会回调onCompleted()方法,自此整个订阅结束,但是当ob发动数据出错时,会回调sb的onError()方法。

RxJava最让人喜欢的还有一点是代码结构清晰,上面的代码可以连起来,变成下面的样子

Observable.create(new Observable.OnSubscribe() {
   @Override
   public void call(Subscriber subscriber) {
       subscriber.onNext(1);
       subscriber.onNext(2);
       subscriber.onNext(3);
       subscriber.onCompleted();
   }
}).subscribe(new Subscriber() {
   @Override
   public void onCompleted() {
       System.out.println("Complete!");
   }
   @Override
   public void onError(Throwable e) {
   }
   @Override
   public void onNext(Integer value) {
       System.out.println("onNext: " + value);
   }
});

这就是RxJava非常受人欢迎的链式结构,是不是看起来很简洁清晰,这种结构在逻辑复杂的情况下更加能体验出优势。看了链式结构后可能会有人会问,为什么是被观察者订阅观察者呢,不应该是观察者订阅被观察者才对的吗?有这个问题是什么正常的,其实原因是为了顺应这种链式结构,试想一下,如果是sb.subscribe(ob),那上面的链式结构不就断了么,那就是观察者,被观察者分开写,然后再用subscribe()方法订阅,无法形成链式结构了。

以上其实就是平时Rxjava的基本内容了,但是呢,RxJava的出现就是为了方便,所以还提供了一些操作符,例如上面的例子,发送三个数1、2、3,调用了三次subscriber.onNext()方法,其实用操作符just就可以替代,上面的例子可以简化为如下:

Observable.just(1, 2 ,3).subscribe(new Subscriber() {
   @Override
   public void onCompleted() {
       System.out.println("Complete!");
   }
   @Override
   public void onError(Throwable e) {}
   @Override
   public void onNext(Integer value) {
       System.out.println("onNext: " + value);
   }
});

just(1,2,3)相当于分别调用subscriber.onNext(1);subscriber.onNext(2);subscriber.onNext(3);

在看操作符filter;filter可以理解为一次筛选,之后返回true才做回调subscriber.onNext()方法。比如下面的例子,打印奇数,先看代码

Observable.just(1, 2, 3, 4, 5, 6) // add more numbers
       .filter(new Func1() {
           @Override
           public Boolean call(Integer value) {
               return value % 2 == 1;
           }
       })
       .subscribe(new Subscriber() {
           @Override
           public void onCompleted() {
               System.out.println("Complete!");
           }
           @Override
           public void onError(Throwable e) {
           }
           @Override
           public void onNext(Integer value) {
               System.out.println("onNext: " + value);
           }
       });

just()发出的数据,先经过filter()的筛选后,符合条件的才会回调onNext()方法被打印出来。

再比如map操作符,上面例子我们再多做一部操作,对筛选出来的奇数计算平方根

Observable.just(1, 2, 3, 4, 5, 6) // add more numbers
       .filter(new Func1() {
           @Override
           public Boolean call(Integer value) {
               return value % 2 == 1;
           }
       })
       .map(new Func1() {
           @Override
           public Double call(Integer value) {
               return Math.sqrt(value);
           }
       })
       .subscribe(new Subscriber() { // notice Subscriber type changed to 
           @Override
           public void onCompleted() {
               System.out.println("Complete!");
           }
 
           @Override
           public void onError(Throwable e) {
           }
 
           @Override
           public void onNext(Double value) {
               System.out.println("onNext: " + value);
           }
       });

上面的例子其实就是实现了对输出数据的一个数据类型转换,因为计算平方根我们完全可以在onNext()中去计算的,注意我们发出的数据是1、2、3、4、5,是Integer类型,求平方根结果是Double类型,在需要返回值的请况下,我们可能就需要进行一次类型强制转化,但是通过map操作符后,大家可以主要到,onNext()方法的参数类型已经是Double类型了,已经帮我们自动转化好了。这种在数据类型比较复杂的情况下就更加能体现出优势了,比如我发送的数据是String,我最后需要得到的结果是一个实体类。

大家都只要,在Android开发中,子线程是不能更新UI的,而网络请求都是在子线程中进行的,获取到数据后还得进行一次线程切换,切换到主线程去更新UI,其实Android本身已经提供很多方法可以实现上述说的需求了,比如异步任务,比如service等,但是异步任务容易造成内存泄漏,service控制不好容易浪费资源。而且代码上也要写比较多的代码,结构不简洁。下面我们来看Rxjava是如何来实现的。

从网络获取数据,getDataFromNetwork(),这个方法我们假如在请求网络数据,这个方法有返回值,我们就假设返回一个String类型,String data= getDataFromNetwork()。从网络拿到data后我们去更新TextView文本内容,那么上述通过RxJava可以这样子实现

Observable.create(new Observable.OnSubscribe() {
   @Override
   public void call(Subscriber subscriber) {
       String data = getDataFromNetwork();
       subscriber.onNext(data);
   }
}).subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Subscriber() {
     @Override
     public void onCompleted() {
         System.out.println("Complete!");
     }
     @Override
     public void onError(Throwable e) {
     }
     @Override
     public void onNext(String value) {
         textView.setText(value);
     }
});

是不是很简单,上面的代码就实现全部内容了,这里有三个重点的地方,我们从网络获取到数据data后,需要通过subscriber.onNext(data);将数据发送出来,onNext()才能回调。另外两个重点是subscribeOn(Schedulers.io())和observeOn(AndroidSchedulers.mainThread()),subscribeOn()方法其实就是设置网络请求在哪个线程中去实现,这里传入的参数是Schedulers.io(),表示网络请求在I/O线程中去完成,这样子耗时操作没有在主线程,也就不是阻塞主线程,UI不会出现卡顿。再看observeOn(),表示获取到的数据需要再哪个线程中去使用,这里传入的参数是AndroidSchedulers.mainThread(),代表主线程,总结就是,获取数据的时候,先切换到I/O线程去获取数据,获取数据后再切换回主线程,既然回到了主线程,那就可以去更新UI了。

是不是很简单,是不是很心动,其实就算是RxJava的冰山一角,还有很多更强大更简单的功能,但是对于小白来说,先了解到这些已经差不多了,有时间再去深入了解即可。

相关文章
|
Java 程序员
Rxjava实战笔记 | Rxjava的基本使用解析(同步结合示例)
Rxjava实战笔记 | Rxjava的基本使用解析(同步结合示例)
|
JSON Java 数据格式
rxjava2+retrofit2 简介
rxjava2+retrofit2 简介
110 0
|
Java 数据库 UED
RxJava的简介
RxJava的简介
312 0
RxJava的简介
|
安全 Android开发
详解 RxJava 的 Disposable
RxJava2 的 Disposable,可以在适当时机取消订阅、截断数据流,避免 Android 中的内存泄露。
1225 0
|
前端开发 NoSQL Java
使用 Kotlin + WebFlux/RxJava 2 实现响应式以及尝试正式版本的协程
使用 Kotlin + WebFlux/RxJava 2 实现响应式以及尝试正式版本的协程
841 0
使用 Kotlin + WebFlux/RxJava 2 实现响应式以及尝试正式版本的协程
|
Java 调度 安全
Rxjava深入理解之自己动手编写Rxjava
Demo的源码地址在 mini-rxjava, 有兴趣的可以下载源码来看. 从观察者模式说起 观察者模式,是我们在平时使用的比较多的一种设计模式.观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。
1181 0
RxJava2学习笔记(3)
接上回继续,今天来学习下zip(打包)操作 一、zip操作 @Test public void zipTest() { Observable.zip(Observable.
1111 0
|
iOS开发
RxJava2学习笔记(2)
上一篇已经熟悉了Observable的基本用法,但是如果仅仅只是“生产-消费”的模型,这就体现不出优势了,java有100种办法可以玩这个:) 一、更简单的多线程 正常情况下,生产者与消费者都在同一个线程里处理,参考下面的代码: final long start = System.
1016 0
RxJava2学习笔记(1)
作为github上star数极高的响应式编程java扩展类库,rxjava是啥就不多说了,网上能查到一堆介绍,下面是一些学习记录: 前提依赖: compile 'io.reactivex.rxjava2:rxjava:2.1.9' 一、Observable 1.1 hello world rxjava中的核心思路是“生产者-消费者”模型,生产者的java类通常用xxxEmitter命名,字面意思:发射器,可以想象为一个机关枪,一直biu biu biu的向外发射信息,另一端则是靶子(也就是消费者),在不停的接收。
1355 0