前言:
RxJava已经出现很多个年头了,但是依然被很多公司使用,如果现在还对RxJava了解的不够透彻, 可以看这个系列对它的分析:相信看完后你对它会有个更全面的认识。 这个系列主要从下面几个方面来讲解: RxJava基本操作符使用 RxJava响应式编程是如何实现的 RxJava的背压机制及Flowable是如何实现背压的 **RxJava的线程切换原理
关于RxJava的其他系列文章,可以点击下方链接
使用rxjava创建一个rxbus事件处理框架
今天我们使用RxJava来封装一个RxBus的事件订阅框架
简介
想下平时Activity或者Fragment之间的通讯是通过什么方式进行的?使用Handler或者回调来完成? 这种方式的缺点很明显代码会很杂乱,要满Ide去找Handler或者回调在哪,而且可能会引发内部溢出等问题,头疼啊 那么今天就教大家使用RxJava来封装一个RxBus,大大简化Android组件之间的通讯
封装过程
我们对封装过程进行步骤化:
1.首先考虑使用全局的单例类RxBus,使用PublishSubject为被观察者
private static volatile RxBus rxbus;
private final Subject<Object> subject = PublishSubject.create().toSerialized();
public static RxBus getRxbus(){
if(rxbus==null){
rxbus = new RxBus();
}
return rxbus;
}
这里为什么使用PublishSubjec: PublishSubject与普通的Subject最大的不同就是其可以先订阅事件,然后在某一时刻手动调用方法来触发事件
2.将subject转换为带背压机制的Flowable,并通过类名过滤
private <T> Flowable<T> getObservable(Class<T> type){
return subject.toFlowable(BackpressureStrategy.BUFFER).ofType(type);
}
这里使用类名对事件进行过滤,用户可以对需要的事件使用对应的类型进行过滤即可。 转换为背压主要是防止短时间并发事件太多导致异常,关于背压机制可以参考下面这篇: 面试官:RxJava背压机制有了解么?
3.创建一个内部订阅接口:
public <T> Disposable doSubscribe(Class<T> type, Consumer<T> onNext, Consumer<Throwable> onError){
return getObservable(type)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(onNext,onError);
}
- 这里我们使用subscribeOn(Schedulers.io()),将订阅事件放在io线程上,防止订阅事件占用主线程执行效率
- 使用observeOn(Schedulers.computation())将事件执行放在computation线程上,这个computation线程是一个高效的执行计算线程,非主线程
- 并注册了一个onNext和onError方法,对异常进行了监听
4.创建一个内部发送接口post
public void post(Object o){
subject.onNext(o);
}
5.将disposes注册到map中
private void addSubscription(Object o, Disposable disposable) {
String key = o.getClass().getName();
if(disposableMap.get(key)!=null){
disposableMap.get(key).add(disposable);
}else{
CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(disposable);
disposableMap.put(key,compositeDisposable);
}
}
- 这里使用了组合模式,对多个相同的key可以有多个分支,就是为了对同一个Class可以注册多个监听
6.解除订阅关系
public void unSubscribe(Object o) {
String key = o.getClass().getName();
if(!disposableMap.containsKey(key)){
return;
}
if(disposableMap.get(key)!=null){
disposableMap.get(key).dispose();
}
disposableMap.remove(key);
}
- 解除订阅的时候需要清理所有注册的观察者
7.创建两个对外接口:订阅和发送
//订阅成功后,将订阅信息注册到map中,使用了CompositeDisposable,为了一个观察者对应多个被观察者
public void postEvent(Object object) {
RxBus.getRxbus().post(object);
}
public <M> void addSubscription(Class<M> eventType, Consumer<M> action) {
Disposable disposable = RxBus.getRxbus().doSubscribe(eventType, action, null);
RxBus.getRxbus().addSubscription(this, disposable);
}
8.测试
public void testRxbus1() {
RxBus.getRxbus().addSubscription(String.class, s -> p(s), throwable -> p(throwable));
RxBus.getRxbus().addSubscription(Integer.class, s -> p(s), throwable -> p(throwable));
sleep(1);
RxBus.getRxbus().postEvent("s1");
RxBus.getRxbus().postEvent("s2");
RxBus.getRxbus().postEvent(123);
RxBus.getRxbus().postEvent('a');
}
打印结果:
s1
s2
123
- 看到我们这里注册了一个String和Integer的观察者类型 传入的String和Integer事件都正常打印了,但是Char类型的没有打印,因为没有注册Char类型观察者
完整代码如下:
package com.android.yuhb.test;
import java.util.HashMap;
import java.util.Map;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
public class RxBus {
private static volatile RxBus rxbus;
private final Subject<Object> subject = PublishSubject.create().toSerialized();
public static RxBus getRxbus(){
if(rxbus==null){
rxbus = new RxBus();
}
return rxbus;
}
Map<String, CompositeDisposable> disposableMap = new HashMap<>();
private <T> Flowable<T> getObservable(Class<T> type){
return subject.toFlowable(BackpressureStrategy.BUFFER).ofType(type);
}
private void addSubscription(Object o, Disposable disposable) {
String key = o.getClass().getName();
if(disposableMap.get(key)!=null){
disposableMap.get(key).add(disposable);
}else{
CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(disposable);
disposableMap.put(key,compositeDisposable);
}
}
public void unSubscribe(Object o) {
String key = o.getClass().getName();
if(!disposableMap.containsKey(key)){
return;
}
if(disposableMap.get(key)!=null){
disposableMap.get(key).dispose();
}
disposableMap.remove(key);
}
public void post(Object o){
subject.onNext(o);
}
public <T> Disposable doSubscribe(Class<T> type, Consumer<T> onNext, Consumer<Throwable> onError){
return getObservable(type)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(onNext,onError);
}
public void postEvent(Object object) {
RxBus.getRxbus().post(object);
}
public <M> void addSubscription(Class<M> eventType, Consumer<M> action) {
Disposable disposable = RxBus.getRxbus().doSubscribe(eventType, action, null);
RxBus.getRxbus().addSubscription(this, disposable);
}
public <M> void addSubscription(Class<M> eventType, Consumer<M> action, Consumer<Throwable> error) {
Disposable disposable = RxBus.getRxbus().doSubscribe(eventType, action, error);
RxBus.getRxbus().addSubscription(this, disposable);
}
}
总结
以上就是我们RxBus封装的一个思路,有问题欢迎指正,知识共享,码字不易,喜欢的就关注下, 后期会推出更多Android体系课知识
关于RxJava的其他系列文章,可以点击下方链接
使用rxjava创建一个rxbus事件处理框架