使用rxjava创建一个rxbus事件处理框架

简介: RxJava已经出现很多个年头了,但是依然被很多公司使用,如果现在还对RxJava了解的不够透彻, 可以看这个系列对它的分析:相信看完后你对它会有个更全面的认识。 这个系列主要从下面几个方面来讲解: **RxJava基本操作符使用** **RxJava响应式编程是如何实现的** **RxJava的背压机制及Flowable是如何实现背压的** **RxJava的线程切换原理

前言:

RxJava已经出现很多个年头了,但是依然被很多公司使用,如果现在还对RxJava了解的不够透彻, 可以看这个系列对它的分析:相信看完后你对它会有个更全面的认识。 这个系列主要从下面几个方面来讲解: RxJava基本操作符使用 RxJava响应式编程是如何实现的 RxJava的背压机制及Flowable是如何实现背压的 **RxJava的线程切换原理

关于RxJava的其他系列文章,可以点击下方链接

面试官:RxJava背压机制有了解么?

面试官:RxJava是如何做到响应式编程的? - 掘金

使用rxjava创建一个rxbus事件处理框架

RxJava操作符详解--来看看你还记得多少 - 掘金

今天我们使用RxJava来封装一个RxBus的事件订阅框架

简介

想下平时Activity或者Fragment之间的通讯是通过什么方式进行的?使用Handler或者回调来完成? 这种方式的缺点很明显代码会很杂乱,要满Ide去找Handler或者回调在哪,而且可能会引发内部溢出等问题,头疼啊 那么今天就教大家使用RxJava来封装一个RxBus,大大简化Android组件之间的通讯

封装过程

我们对封装过程进行步骤化:

1.首先考虑使用全局的单例类RxBus,使用PublishSubject为被观察者

private static volatile RxBus rxbus;
private final Subject<Object> subject  = PublishSubject.create().toSerialized();
public static RxBus getRxbus(){
    if(rxbus==null){
        rxbus = new RxBus();
    }
    return rxbus;
}

这里为什么使用PublishSubjec: PublishSubject与普通的Subject最大的不同就是其可以先订阅事件,然后在某一时刻手动调用方法来触发事件

2.将subject转换为带背压机制的Flowable,并通过类名过滤

private <T> Flowable<T> getObservable(Class<T> type){
    return subject.toFlowable(BackpressureStrategy.BUFFER).ofType(type);
}

这里使用类名对事件进行过滤,用户可以对需要的事件使用对应的类型进行过滤即可。 转换为背压主要是防止短时间并发事件太多导致异常,关于背压机制可以参考下面这篇: 面试官:RxJava背压机制有了解么?

3.创建一个内部订阅接口:

public <T> Disposable doSubscribe(Class<T> type, Consumer<T> onNext, Consumer<Throwable> onError){
    return getObservable(type)
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .subscribe(onNext,onError);
}
  • 这里我们使用subscribeOn(Schedulers.io()),将订阅事件放在io线程上,防止订阅事件占用主线程执行效率
  • 使用observeOn(Schedulers.computation())将事件执行放在computation线程上,这个computation线程是一个高效的执行计算线程,非主线程
  • 并注册了一个onNext和onError方法,对异常进行了监听

4.创建一个内部发送接口post

public void post(Object o){
    subject.onNext(o);
}

5.将disposes注册到map中

private void addSubscription(Object o, Disposable disposable) {
    String key = o.getClass().getName();
    if(disposableMap.get(key)!=null){
        disposableMap.get(key).add(disposable);
    }else{
        CompositeDisposable compositeDisposable = new CompositeDisposable();
        compositeDisposable.add(disposable);
        disposableMap.put(key,compositeDisposable);
    }
}
  • 这里使用了组合模式,对多个相同的key可以有多个分支,就是为了对同一个Class可以注册多个监听

6.解除订阅关系

public void unSubscribe(Object o) {
    String key = o.getClass().getName();
    if(!disposableMap.containsKey(key)){
        return;
    }
    if(disposableMap.get(key)!=null){
        disposableMap.get(key).dispose();
    }
    disposableMap.remove(key);
}
  • 解除订阅的时候需要清理所有注册的观察者

7.创建两个对外接口:订阅和发送

//订阅成功后,将订阅信息注册到map中,使用了CompositeDisposable,为了一个观察者对应多个被观察者
public void postEvent(Object object) {
    RxBus.getRxbus().post(object);
}

public  <M> void addSubscription(Class<M> eventType, Consumer<M> action) {
    Disposable disposable = RxBus.getRxbus().doSubscribe(eventType, action, null);
    RxBus.getRxbus().addSubscription(this, disposable);
}

8.测试

public void testRxbus1() {
    RxBus.getRxbus().addSubscription(String.class, s -> p(s), throwable -> p(throwable));
    RxBus.getRxbus().addSubscription(Integer.class, s -> p(s), throwable -> p(throwable));
    sleep(1);
    RxBus.getRxbus().postEvent("s1");
    RxBus.getRxbus().postEvent("s2");
    RxBus.getRxbus().postEvent(123);
    RxBus.getRxbus().postEvent('a');
}

打印结果:

s1
s2
123
  • 看到我们这里注册了一个String和Integer的观察者类型 传入的String和Integer事件都正常打印了,但是Char类型的没有打印,因为没有注册Char类型观察者

完整代码如下:

package com.android.yuhb.test;

import java.util.HashMap;
import java.util.Map;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;

public class RxBus {
    private static volatile RxBus rxbus;
    private final Subject<Object> subject  = PublishSubject.create().toSerialized();
    public static RxBus getRxbus(){
        if(rxbus==null){
            rxbus = new RxBus();
        }
        return rxbus;
    }
    Map<String, CompositeDisposable> disposableMap = new HashMap<>();
    private <T> Flowable<T> getObservable(Class<T> type){
        return subject.toFlowable(BackpressureStrategy.BUFFER).ofType(type);
    }
    private void addSubscription(Object o, Disposable disposable) {
        String key = o.getClass().getName();
        if(disposableMap.get(key)!=null){
            disposableMap.get(key).add(disposable);
        }else{
            CompositeDisposable compositeDisposable = new CompositeDisposable();
            compositeDisposable.add(disposable);
            disposableMap.put(key,compositeDisposable);
        }
    }
    public void unSubscribe(Object o) {
        String key = o.getClass().getName();
        if(!disposableMap.containsKey(key)){
            return;
        }
        if(disposableMap.get(key)!=null){
            disposableMap.get(key).dispose();
        }
        disposableMap.remove(key);
    }

    public void post(Object o){
        subject.onNext(o);
    }
    public <T> Disposable doSubscribe(Class<T> type, Consumer<T> onNext, Consumer<Throwable> onError){
        return getObservable(type)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.computation())
                .subscribe(onNext,onError);
    }

    public void postEvent(Object object) {
        RxBus.getRxbus().post(object);
    }

    public  <M> void addSubscription(Class<M> eventType, Consumer<M> action) {
        Disposable disposable = RxBus.getRxbus().doSubscribe(eventType, action, null);
        RxBus.getRxbus().addSubscription(this, disposable);
    }

    public <M> void addSubscription(Class<M> eventType, Consumer<M> action, Consumer<Throwable> error) {
        Disposable disposable = RxBus.getRxbus().doSubscribe(eventType, action, error);
        RxBus.getRxbus().addSubscription(this, disposable);
    }

}

总结

以上就是我们RxBus封装的一个思路,有问题欢迎指正,知识共享,码字不易,喜欢的就关注下, 后期会推出更多Android体系课知识

关于RxJava的其他系列文章,可以点击下方链接

面试官:RxJava背压机制有了解么?

面试官:RxJava是如何做到响应式编程的? - 掘金

使用rxjava创建一个rxbus事件处理框架

RxJava操作符详解--来看看你还记得多少 - 掘金

相关文章
|
7月前
|
安全 数据处理 C++
【Qt 底层之事件驱动系统】深入理解 Qt 事件机制:主事件循环与工作线程的交互探究,包括 QML 的视角
【Qt 底层之事件驱动系统】深入理解 Qt 事件机制:主事件循环与工作线程的交互探究,包括 QML 的视角
1539 3
|
设计模式 缓存 JavaScript
JavaScript 简单实现观察者模式和发布-订阅模式
JavaScript 简单实现观察者模式和发布-订阅模式
62 0
|
3月前
|
前端开发 JavaScript
React的事件与原生事件的执行顺序?
React的事件与原生事件的执行顺序?
|
7月前
|
Java 开发者 UED
Java 异步和事件驱动编程:探索响应式模式
【4月更文挑战第27天】在现代软件开发中,异步和事件驱动编程是提高应用性能和响应性的关键策略。Java 提供了多种机制来支持这些编程模式,使开发者能够构建高效、可扩展的应用程序。
132 4
|
7月前
|
安全 调度 Python
什么是Python中的事件驱动编程?如何使用`asyncio`模块实现异步事件处理?
【2月更文挑战第4天】【2月更文挑战第9篇】什么是Python中的事件驱动编程?如何使用`asyncio`模块实现异步事件处理?
155 0
|
7月前
|
前端开发 JavaScript UED
React事件和原生事件的执行顺序
React事件和原生事件的执行顺序
80 0
|
前端开发 Java Maven
响应式编程实战(08)-WebFlux,使用注解编程模式构建异步非阻塞服务
响应式编程实战(08)-WebFlux,使用注解编程模式构建异步非阻塞服务
185 0
|
前端开发 JavaScript API
RxJS系列01:响应式编程与异步
RxJS系列01:响应式编程与异步
200 0
|
JavaScript 前端开发
三连弹!原生实现异步处理利器 —— Observable
本篇带来用原生实现 Observable,一探内部究竟!!
|
数据库连接
Yii2.0框架中如何进行事件处理?它支持哪些事件?
Yii2.0框架中如何进行事件处理?它支持哪些事件?
210 0