一. 背景
这段时间接手了一个比较紧急的项目,它是一个运行在某开发板上的 Android 项目。
该项目采用的架构比较老,例如 RxJava 还在使用 1.x 的版本。起初看到源码,我内心是拒绝的。(这大半年来,我在使用 C++ 开发桌面端、 Java/Kotlin 开发后端,不过没关系。)好在该项目最近开发的部分功能采用 Kotlin 编写,那我开发的功能也打算使用 Kotlin。
二. RxJava 版本的 EventBus
两年前,我在写《RxJava 2.x 实战》的时候,写过一个 RxJava 2 版本的 EventBus,并且在实际的项目中验证过。
它还需要一个第三方库 RxRelay。RxRelay中的各个 Relay 既是 Observable 也是 Consumer 的 RxJava 类型,它们是一个没有 onComplete 和 onError 的Subject。所以不必要担心下游的触发的终止状态(onComplete 或 onError)。
RxRelay的Github地址:https://github.com/JakeWharton/RxRelay
RxBus 的源码:
package com.safframework.study.rxbus4; import com.jakewharton.rxrelay2.PublishRelay; import com.jakewharton.rxrelay2.Relay; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Scheduler; import io.reactivex.android.schedulers.AndroidSchedulers; import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Action; import io.reactivex.functions.Consumer; /** * Created by Tony Shen on 2017/6/14. */ public class RxBus { private Relay<Object> bus = null; private static RxBus instance; private final Map<Class<?>, Object> mStickyEventMap; //禁用构造方法 private RxBus() { bus = PublishRelay.create().toSerialized(); mStickyEventMap = new ConcurrentHashMap<>(); } public static RxBus get() { return Holder.BUS; } public void post(Object event) { bus.accept(event); } public void postSticky(Object event) { synchronized (mStickyEventMap) { mStickyEventMap.put(event.getClass(), event); } bus.accept(event); } public <T> Observable<T> toObservable(Class<T> eventType) { return bus.ofType(eventType); } /** * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者 */ public <T> Observable<T> toObservableSticky(final Class<T> eventType) { synchronized (mStickyEventMap) { Observable<T> observable = bus.ofType(eventType); final Object event = mStickyEventMap.get(eventType); if (event != null) { return observable.mergeWith(Observable.create(new ObservableOnSubscribe<T>() { @Override public void subscribe(@NonNull ObservableEmitter<T> e) throws Exception { e.onNext(eventType.cast(event)); } })); } else { return observable; } } } public boolean hasObservers() { return bus.hasObservers(); } public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) { return toObservable(eventType).observeOn(scheduler).subscribe(onNext); } public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError, Action onComplete, Consumer onSubscribe) { return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete, onSubscribe); } public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError, Action onComplete) { return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete); } public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError) { return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError); } public <T> Disposable register(Class<T> eventType, Consumer<T> onNext) { return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext); } public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError, Action onComplete, Consumer onSubscribe) { return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete, onSubscribe); } public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError, Action onComplete) { return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete); } public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError) { return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError); } public <T> Disposable registerSticky(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) { return toObservableSticky(eventType).observeOn(scheduler).subscribe(onNext); } public <T> Disposable registerSticky(Class<T> eventType, Consumer<T> onNext) { return toObservableSticky(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext); } public <T> Disposable registerSticky(Class<T> eventType, Consumer<T> onNext, Consumer onError) { return toObservableSticky(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext,onError); } /** * 移除指定eventType的Sticky事件 */ public <T> T removeStickyEvent(Class<T> eventType) { synchronized (mStickyEventMap) { return eventType.cast(mStickyEventMap.remove(eventType)); } } /** * 移除所有的Sticky事件 */ public void removeAllStickyEvents() { synchronized (mStickyEventMap) { mStickyEventMap.clear(); } } public void unregister(Disposable disposable) { if (disposable != null && !disposable.isDisposed()) { disposable.dispose(); } } private static class Holder { private static final RxBus BUS = new RxBus(); } }
该版本 RxBus 支持异常处理和 Sticky 事件。唯一的缺点是,不支持 Backpressure。
三. Kotlin Coroutine 版本的 EventBus
既然有了之前的 RxBus,为何要重新写一个呢?
首先,我们目前的项目并没有采用 EventBus。但是,我写的某一个 Service 需要跟 Activities 通信。我想偷懒,当然采用 EventBus 会比较简单。但是,我们的 RxJava 版本还在用 1.x!!
幸好,我们用了 Kotlin,部分代码还用了 Coroutine,于是我想到了使用 Coroutine 的 Channel 来实现 EventBus。
Channel 可以实现协程之间的数据通信。Kotlin 的 Channel 与 Java 的 BlockingQueue 类似。BlockingQueue 的 put 和 take 操作,相当于 Channel 的 send 和 receive 操作,但是 BlockingQueue 是阻塞操作而 Channel 都是挂起操作。
EventBus 用于注册普通事件、Sticky 事件,事件的发布等等。
package com.safframework.eventbus import android.util.Log import kotlinx.coroutines.* import java.util.concurrent.ConcurrentHashMap import kotlin.coroutines.CoroutineContext /** * * @FileName: * com.safframework.eventbus.EventBus * @author: Tony Shen * @date: 2019-08-24 23:28 * @version: V1.0 <描述当前版本功能> */ val UI: CoroutineDispatcher = Dispatchers.Main object EventBus: CoroutineScope { private val TAG = "EventBus" private val job = SupervisorJob() override val coroutineContext: CoroutineContext = Dispatchers.Default + job private val contextMap = ConcurrentHashMap<String, MutableMap<Class<*>, EventData<*>>>() private val mStickyEventMap = ConcurrentHashMap<Class<*>, Any>() @JvmStatic fun <T> register( contextName: String, eventDispatcher: CoroutineDispatcher = UI, eventClass: Class<T>, eventCallback: (T) -> Unit ) { val eventDataMap = if (contextMap.containsKey(contextName)) { contextMap[contextName]!! } else { val eventDataMap = mutableMapOf<Class<*>, EventData<*>>() contextMap[contextName] = eventDataMap eventDataMap } eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback) } @JvmStatic fun <T> register( contextName: String, eventDispatcher: CoroutineDispatcher = UI, eventClass: Class<T>, eventCallback: (T) -> Unit, eventFail:(Throwable)->Unit ) { val eventDataMap = if (contextMap.containsKey(contextName)) { contextMap[contextName]!! } else { val eventDataMap = mutableMapOf<Class<*>, EventData<*>>() contextMap[contextName] = eventDataMap eventDataMap } eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback, eventFail) } @JvmStatic fun <T> registerSticky( contextName: String, eventDispatcher: CoroutineDispatcher = UI, eventClass: Class<T>, eventCallback: (T) -> Unit ) { val eventDataMap = if (contextMap.containsKey(contextName)) { contextMap[contextName]!! } else { val eventDataMap = mutableMapOf<Class<*>, EventData<*>>() contextMap[contextName] = eventDataMap eventDataMap } eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback) val event = mStickyEventMap[eventClass] event?.let { postEvent(it) } } @JvmStatic fun <T> registerSticky( contextName: String, eventDispatcher: CoroutineDispatcher = UI, eventClass: Class<T>, eventCallback: (T) -> Unit, eventFail:(Throwable)->Unit ) { val eventDataMap = if (contextMap.containsKey(contextName)) { contextMap[contextName]!! } else { val eventDataMap = mutableMapOf<Class<*>, EventData<*>>() contextMap[contextName] = eventDataMap eventDataMap } eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback, eventFail) val event = mStickyEventMap[eventClass] event?.let { postEvent(it) } } @JvmStatic fun post(event: Any, delayTime: Long = 0) { if (delayTime > 0) { launch { delay(delayTime) postEvent(event) } } else { postEvent(event) } } @JvmStatic fun postSticky(event: Any) { mStickyEventMap[event.javaClass] = event } @JvmStatic fun unregisterAllEvents() { Log.i(TAG,"unregisterAllEvents()") coroutineContext.cancelChildren() for ((_, eventDataMap) in contextMap) { eventDataMap.values.forEach { it.cancel() } eventDataMap.clear() } contextMap.clear() } @JvmStatic fun unregister(contextName: String) { Log.i(TAG,"$contextName") val cloneContexMap = ConcurrentHashMap<String, MutableMap<Class<*>, EventData<*>>>() cloneContexMap.putAll(contextMap) val map = cloneContexMap.filter { it.key == contextName } for ((_, eventDataMap) in map) { eventDataMap.values.forEach { it.cancel() } eventDataMap.clear() } contextMap.remove(contextName) } @JvmStatic fun <T> removeStickyEvent(eventType: Class<T>) { mStickyEventMap.remove(eventType) } private fun postEvent(event: Any) { val cloneContexMap = ConcurrentHashMap<String, MutableMap<Class<*>, EventData<*>>>() cloneContexMap.putAll(contextMap) for ((_, eventDataMap) in cloneContexMap) { eventDataMap.keys .firstOrNull { it == event.javaClass || it == event.javaClass.superclass } ?.let { key -> eventDataMap[key]?.postEvent(event) } } } }
EventData 通过 channel 实现真正的发送、消费事件。
package com.safframework.eventbus import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.launch import java.lang.Exception /** * * @FileName: * com.safframework.eventbus.EventData * @author: Tony Shen * @date: 2019-08-25 00:20 * @version: V1.0 <描述当前版本功能> */ data class EventData<T>( val coroutineScope: CoroutineScope, val eventDispatcher: CoroutineDispatcher, val onEvent: (T) -> Unit, val exception: ((Throwable)->Unit)? = null ) { private val channel = Channel<T>() init { coroutineScope.launch { channel.consumeEach { // 消费者循环地消费消息 launch(eventDispatcher) { if (exception!=null) { try{ onEvent(it) } catch (e:Exception) { exception.invoke(e) } } else { onEvent(it) } } } } } fun postEvent(event: Any) { if (!channel.isClosedForSend) { coroutineScope.launch { channel.send(event as T) } } else { println("Channel is closed for send") } } fun cancel() { channel.cancel() } }
EventBus github 地址:https://github.com/fengzhizi715/EventBus
该版本的 EventBus 跟 RxBus 的功能基本一致。上述 github 地址中,包含有 demo 介绍了 EventBus 的具体使用,其实也跟 RxBus 的使用一致。
题外话,最近还抽空优化了另外一个跟 Coroutines 相关的项目:https://github.com/fengzhizi715/Lifecycle-Coroutines-Extension
四. 总结
该版本的 EventBus 是给不使用 RxBus 或者其他版本 EventBus 提供了另一种选择。当然,该项目未来还有优化的空间。