1.定义
RxJava
是一个 基于事件流、实现异步操作的库
2.作用
用于实现异步操作,类似于 Android
中的 AsyncTask
、Handler+
new Thread的作用
3. 特点
由于 RxJava
的使用方式是:基于事件流的链式调用,所以使得 RxJava
:
- 逻辑简洁
- 实现优雅
- 使用简单
更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁 & 优雅
Rxjava
原理 基于 一种扩展的观察者模式Rxjava
的扩展观察者模式中有4个角色:
角色 | 作用 | 类比 |
被观察者(Observable) | 产生事件 | 顾客 |
观察者(Observer) | 接收事件,并给出响应动作 | 厨房 |
订阅(Subscribe) | 连接 被观察者 & 观察者 | 服务员 |
事件(Event) | 被观察者 & 观察者 沟通的载体 | 菜式 |
4.使用
导包
implementation 'io.reactivex.rxjava2:rxjava:2.1.3' implementation 'io.reactivex.rxjava2:rxandroid:2.0.1' implementation 'com.jakewharton.rxbinding2:rxbinding:2.2.0'
1.定义被观察者
三种方式:Observable.create、Observable.just、Observable.fromArray
public void buyFood(){ Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { e.onNext("potato"); e.onNext("tomato"); e.onNext("noodles"); e.onComplete(); } }); Observable<String> observable1 = Observable.just("potato","tomato","noodles"); String [] foods = new String[]{"potato","tomato","noodles"}; Observable<String> observable2 = Observable.fromArray(foods); }
2.定义观察者
使用observer或者subscriber
Observer<String> mObserver = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull String s) { } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } };
Subscriber<String> mStringSubscriber = new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { } @Override public void onNext(String s) { } @Override public void onError(Throwable t) { } @Override public void onComplete() { } };
特别注意:2种方法的区别,即Subscriber 抽象类与Observer 接口的区别
// 相同点:二者基本使用方式完全一致(实质上,在RxJava的 subscribe 过程中,Observer总是会先被转换成Subscriber再使用)
// 不同点:Subscriber抽象类对 Observer 接口进行了扩展,新增了两个方法:
// 1\. onStart():在还未响应事件前调用,用于做一些初始化工作
// 2\. unsubscribe():用于取消订阅。在该方法被调用后,观察者将不再接收 & 响应事件
// 调用该方法前,先使用 isUnsubscribed() 判断状态,确定被观察者Observable是否还持有观察者Subscriber的引用,如果引用不能及时释放,就会出现内存泄露
3.订阅
observable.subscribe(observer);
5.优雅实现
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { e.onNext("potato"); e.onNext("tomato"); e.onNext("noodles"); e.onComplete(); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull String s) { } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
其他方法
observeOn:观察者的执行线程
observeOn 作用于该操作符之后直到出现新的observeOn操作符
subscribeOn:被观察者的执行线程
subscribeOn 作用于该操作符之前的 Observable 的创建操符作以及 doOnSubscribe 操作符 ,换句话说就是 doOnSubscribe 以及 Observable 的创建操作符总是被其之后最近的 subscribeOn 控制
实例
//将字符串转换成double类型 String path = "12.3"; public void test(){ //输入事件 Observable.just(path) //对事件进行处理 .map(new Function<String, Double>() { @Override public Double apply(@NonNull String s) throws Exception { return Double.parseDouble(s); } }) //指定被观察者执行线程 .subscribeOn(Schedulers.io()) //指定观察者执行线程 .observeOn(AndroidSchedulers.mainThread()) //订阅观察者 .subscribe( new Observer<Double>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Double aDouble) { } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } }); }
Rxjava中的Scheduler
相当于线程控制器,Rxjava通过它来指定每一段代码应该运行在什么样的线程。Rxjava提供了5种调度器:
- .io()
- .computation()
- .immediate()
- .newThread()
- .trampoline()
另外,Android还有一个专用的AndroidSchedulers.mainThread()
指定操作将在Android主线程运行。Rxjava通过subscribeOn()
和observeOn()
两个方法来对线程进行控制,subscribeOn()
指定subscribe()
时间发生的线程,也就是事件产生的线程,observeOn()
指定Subscriber
做运行的线程,也就是消费事件的线程。
Schedulers.io() 用于I/O操作,比如:读写文件,数据库,网络交互等等。行为模式和newThread()
差不多,重点需要注意的是线程池是无限制的,大量的I/O调度操作将创建许多个线程并占用内存
Schedulers.computation()计算工作默认的调度器,这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。
Schedulers.immediate() 这个调度器允许你立即在当前线程执行你指定的工作。这是默认的Scheduler
。
Schedulers.newThread() 它为指定任务启动一个新的线程。
Schedulers.trampoline()
当我们想在当前线程执行一个任务时,并不是立即,我们可以用trampoline()
将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。
RXView 防抖
private void test3() { RxView.clicks(控件).throttleFirst(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Object>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Object o) { Log.e(TAG, "onNext: 响应事件....." ); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
RxJava篇(应用场景) - 简书 (jianshu.com)