RxJava 主要做异步、网络的数据处理,强大之处就是对数据的处理了,而对于处理完后的数据处理是一样的都是观察者模式来通知,也可以把 RxJava 进一步封装出一个 EventBus(RxBus) 库,二者可以转换的。
//引入rxJava
implementation 'io.reactivex.rxjava2:rxjava:2.1.8'
//引入rxAndroid
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
//引入rxJava适配器,方便rxJava与retrofit的结合
implementation 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
//引入J神的rxrelay2,出现异常仍然可以处理
implementation 'com.jakewharton.rxrelay2:rxrelay:2.0.0'
直接上代码--->
1、创建RxBus
public class RxBus { private final Subject<Object> _bus; private static class RxBusHolder { private static final RxBus instance = new RxBus(); } private RxBus() { _bus = PublishSubject.create(); } public static synchronized RxBus getInstance() { return RxBusHolder.instance; } public void send(Object o) { _bus.onNext(o); } public <T> Observable<T> toObservable(Class<T> eventType) { return _bus.ofType(eventType); } }
2、创建用于传递消息的尸体
public class RxBean { String msgs; public RxBean(String msg) { msgs = msg; } public String getMsgs() { return msgs; } public void setMsgs(String msgs) { this.msgs = msgs; } }
3、发布消息
RxBus.getInstance().send(new RxBean("BasicPartyBuildingActivityRefresh"));
4、订阅消息
RxBus.getInstance().toObservable(RxBean.class).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<RxBean>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(RxBean rxBean) { if ("BasicPartyBuildingActivityRefresh".equals(rxBean.getMsgs())) { //要执行的代码 } } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });