基于 Kotlin Coroutine 实现的 EventBus

简介: 基于 Kotlin Coroutine 实现的 EventBus

一. 背景



这段时间接手了一个比较紧急的项目,它是一个运行在某开发板上的 Android 项目。


该项目采用的架构比较老,例如 RxJava 还在使用 1.x 的版本。起初看到源码,我内心是拒绝的。(这大半年来,我在使用 C++ 开发桌面端、 Java/Kotlin 开发后端,不过没关系。)好在该项目最近开发的部分功能采用 Kotlin 编写,那我开发的功能也打算使用 Kotlin。


二. RxJava 版本的 EventBus



两年前,我在写《RxJava 2.x 实战》的时候,写过一个 RxJava 2 版本的 EventBus,并且在实际的项目中验证过。


它还需要一个第三方库 RxRelay。RxRelay中的各个 Relay 既是 Observable 也是 Consumer 的 RxJava 类型,它们是一个没有 onComplete 和 onError 的Subject。所以不必要担心下游的触发的终止状态(onComplete 或 onError)。


RxRelay的Github地址:https://github.com/JakeWharton/RxRelay


RxBus 的源码:

package com.safframework.study.rxbus4;
import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.Relay;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Scheduler;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
/**
 * Created by Tony Shen on 2017/6/14.
 */
public class RxBus {
    private Relay<Object> bus = null;
    private static RxBus instance;
    private final Map<Class<?>, Object> mStickyEventMap;
    //禁用构造方法
    private RxBus() {
        bus = PublishRelay.create().toSerialized();
        mStickyEventMap = new ConcurrentHashMap<>();
    }
    public static RxBus get() {
        return Holder.BUS;
    }
    public void post(Object event) {
        bus.accept(event);
    }
    public void postSticky(Object event) {
        synchronized (mStickyEventMap) {
            mStickyEventMap.put(event.getClass(), event);
        }
        bus.accept(event);
    }
    public <T> Observable<T> toObservable(Class<T> eventType) {
        return bus.ofType(eventType);
    }
    /**
     * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
     */
    public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
        synchronized (mStickyEventMap) {
            Observable<T> observable = bus.ofType(eventType);
            final Object event = mStickyEventMap.get(eventType);
            if (event != null) {
                return observable.mergeWith(Observable.create(new ObservableOnSubscribe<T>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<T> e) throws Exception {
                        e.onNext(eventType.cast(event));
                    }
                }));
            } else {
                return observable;
            }
        }
    }
    public boolean hasObservers() {
        return bus.hasObservers();
    }
    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext);
    }
    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
                                   Action onComplete, Consumer onSubscribe) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete, onSubscribe);
    }
    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
                                   Action onComplete) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete);
    }
    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError);
    }
    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext);
    }
    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
                                   Action onComplete, Consumer onSubscribe) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete, onSubscribe);
    }
    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
                                   Action onComplete) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete);
    }
    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError);
    }
    public <T> Disposable registerSticky(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) {
        return toObservableSticky(eventType).observeOn(scheduler).subscribe(onNext);
    }
    public <T> Disposable registerSticky(Class<T> eventType, Consumer<T> onNext) {
        return toObservableSticky(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext);
    }
    public <T> Disposable registerSticky(Class<T> eventType, Consumer<T> onNext, Consumer onError) {
        return toObservableSticky(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext,onError);
    }
    /**
     * 移除指定eventType的Sticky事件
     */
    public <T> T removeStickyEvent(Class<T> eventType) {
        synchronized (mStickyEventMap) {
            return eventType.cast(mStickyEventMap.remove(eventType));
        }
    }
    /**
     * 移除所有的Sticky事件
     */
    public void removeAllStickyEvents() {
        synchronized (mStickyEventMap) {
            mStickyEventMap.clear();
        }
    }
    public void unregister(Disposable disposable) {
        if (disposable != null && !disposable.isDisposed()) {
            disposable.dispose();
        }
    }
    private static class Holder {
        private static final RxBus BUS = new RxBus();
    }
}


该版本 RxBus 支持异常处理和  Sticky 事件。唯一的缺点是,不支持 Backpressure。


三. Kotlin Coroutine 版本的 EventBus



既然有了之前的 RxBus,为何要重新写一个呢?


首先,我们目前的项目并没有采用 EventBus。但是,我写的某一个 Service 需要跟 Activities 通信。我想偷懒,当然采用 EventBus 会比较简单。但是,我们的 RxJava 版本还在用 1.x!!


幸好,我们用了 Kotlin,部分代码还用了 Coroutine,于是我想到了使用 Coroutine 的 Channel 来实现 EventBus。


Channel 可以实现协程之间的数据通信。Kotlin 的 Channel 与 Java 的 BlockingQueue 类似。BlockingQueue 的 put 和 take 操作,相当于 Channel 的 send 和 receive 操作,但是 BlockingQueue 是阻塞操作而 Channel 都是挂起操作。


EventBus 用于注册普通事件、Sticky 事件,事件的发布等等。

package com.safframework.eventbus
import android.util.Log
import kotlinx.coroutines.*
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.CoroutineContext
/**
 *
 * @FileName:
 *          com.safframework.eventbus.EventBus
 * @author: Tony Shen
 * @date: 2019-08-24 23:28
 * @version: V1.0 <描述当前版本功能>
 */
val UI: CoroutineDispatcher = Dispatchers.Main
object EventBus: CoroutineScope {
    private val TAG = "EventBus"
    private val job = SupervisorJob()
    override val coroutineContext: CoroutineContext = Dispatchers.Default + job
    private val contextMap = ConcurrentHashMap<String, MutableMap<Class<*>, EventData<*>>>()
    private val mStickyEventMap = ConcurrentHashMap<Class<*>, Any>()
    @JvmStatic
    fun <T> register(
        contextName: String,
        eventDispatcher: CoroutineDispatcher = UI,
        eventClass: Class<T>,
        eventCallback: (T) -> Unit
    ) {
        val eventDataMap = if (contextMap.containsKey(contextName)) {
            contextMap[contextName]!!
        } else {
            val eventDataMap = mutableMapOf<Class<*>, EventData<*>>()
            contextMap[contextName] = eventDataMap
            eventDataMap
        }
        eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback)
    }
    @JvmStatic
    fun <T> register(
        contextName: String,
        eventDispatcher: CoroutineDispatcher = UI,
        eventClass: Class<T>,
        eventCallback: (T) -> Unit,
        eventFail:(Throwable)->Unit
    ) {
        val eventDataMap = if (contextMap.containsKey(contextName)) {
            contextMap[contextName]!!
        } else {
            val eventDataMap = mutableMapOf<Class<*>, EventData<*>>()
            contextMap[contextName] = eventDataMap
            eventDataMap
        }
        eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback, eventFail)
    }
    @JvmStatic
    fun <T> registerSticky(
        contextName: String,
        eventDispatcher: CoroutineDispatcher = UI,
        eventClass: Class<T>,
        eventCallback: (T) -> Unit
    ) {
        val eventDataMap = if (contextMap.containsKey(contextName)) {
            contextMap[contextName]!!
        } else {
            val eventDataMap = mutableMapOf<Class<*>, EventData<*>>()
            contextMap[contextName] = eventDataMap
            eventDataMap
        }
        eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback)
        val event = mStickyEventMap[eventClass]
        event?.let {
            postEvent(it)
        }
    }
    @JvmStatic
    fun <T> registerSticky(
        contextName: String,
        eventDispatcher: CoroutineDispatcher = UI,
        eventClass: Class<T>,
        eventCallback: (T) -> Unit,
        eventFail:(Throwable)->Unit
    ) {
        val eventDataMap = if (contextMap.containsKey(contextName)) {
            contextMap[contextName]!!
        } else {
            val eventDataMap = mutableMapOf<Class<*>, EventData<*>>()
            contextMap[contextName] = eventDataMap
            eventDataMap
        }
        eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback, eventFail)
        val event = mStickyEventMap[eventClass]
        event?.let {
            postEvent(it)
        }
    }
    @JvmStatic
    fun post(event: Any, delayTime: Long = 0) {
        if (delayTime > 0) {
            launch {
                delay(delayTime)
                postEvent(event)
            }
        } else {
            postEvent(event)
        }
    }
    @JvmStatic
    fun postSticky(event: Any) {
        mStickyEventMap[event.javaClass] = event
    }
    @JvmStatic
    fun unregisterAllEvents() {
        Log.i(TAG,"unregisterAllEvents()")
        coroutineContext.cancelChildren()
        for ((_, eventDataMap) in contextMap) {
            eventDataMap.values.forEach {
                it.cancel()
            }
            eventDataMap.clear()
        }
        contextMap.clear()
    }
    @JvmStatic
    fun unregister(contextName: String) {
        Log.i(TAG,"$contextName")
        val cloneContexMap = ConcurrentHashMap<String, MutableMap<Class<*>, EventData<*>>>()
        cloneContexMap.putAll(contextMap)
        val map = cloneContexMap.filter { it.key == contextName }
        for ((_, eventDataMap) in map) {
            eventDataMap.values.forEach {
                it.cancel()
            }
            eventDataMap.clear()
        }
        contextMap.remove(contextName)
    }
    @JvmStatic
    fun <T> removeStickyEvent(eventType: Class<T>) {
        mStickyEventMap.remove(eventType)
    }
    private fun postEvent(event: Any) {
        val cloneContexMap = ConcurrentHashMap<String, MutableMap<Class<*>, EventData<*>>>()
        cloneContexMap.putAll(contextMap)
        for ((_, eventDataMap) in cloneContexMap) {
            eventDataMap.keys
                .firstOrNull { it == event.javaClass || it == event.javaClass.superclass }
                ?.let { key -> eventDataMap[key]?.postEvent(event) }
        }
    }
}


EventData 通过 channel 实现真正的发送、消费事件。

package com.safframework.eventbus
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import java.lang.Exception
/**
 *
 * @FileName:
 *          com.safframework.eventbus.EventData
 * @author: Tony Shen
 * @date: 2019-08-25 00:20
 * @version: V1.0 <描述当前版本功能>
 */
data class EventData<T>(
    val coroutineScope: CoroutineScope,
    val eventDispatcher: CoroutineDispatcher,
    val onEvent: (T) -> Unit,
    val exception: ((Throwable)->Unit)? = null
) {
    private val channel = Channel<T>()
    init {
        coroutineScope.launch {
            channel.consumeEach { // 消费者循环地消费消息
                launch(eventDispatcher) {
                    if (exception!=null) {
                        try{
                            onEvent(it)
                        } catch (e:Exception) {
                            exception.invoke(e)
                        }
                    } else {
                        onEvent(it)
                    }
                }
            }
        }
    }
    fun postEvent(event: Any) {
        if (!channel.isClosedForSend) {
            coroutineScope.launch {
                channel.send(event as T)
            }
        } else {
            println("Channel is closed for send")
        }
    }
    fun cancel() {
        channel.cancel()
    }
}


EventBus github 地址:https://github.com/fengzhizi715/EventBus

该版本的 EventBus 跟 RxBus 的功能基本一致。上述 github 地址中,包含有 demo 介绍了 EventBus 的具体使用,其实也跟 RxBus 的使用一致。


题外话,最近还抽空优化了另外一个跟 Coroutines 相关的项目:https://github.com/fengzhizi715/Lifecycle-Coroutines-Extension


四. 总结



该版本的 EventBus 是给不使用 RxBus 或者其他版本 EventBus 提供了另一种选择。当然,该项目未来还有优化的空间。

相关文章
|
Kotlin
Kotlin | 实现数据类(data)深拷贝
在Kotlin中,data数据类默认的copy方法实现的是浅拷贝,但我们有时候需要实现深拷贝。 在kotlin中,实现就比较容易了。
779 0
Kotlin | 实现数据类(data)深拷贝
|
7月前
|
JavaScript Java Android开发
kotlin安卓在Jetpack Compose 框架下跨组件通讯EventBus
**EventBus** 是一个Android事件总线库,简化组件间通信。要使用它,首先在Gradle中添加依赖`implementation &#39;org.greenrobot:eventbus:3.3.1&#39;`。然后,可选地定义事件类如`MessageEvent`。在活动或Fragment的`onCreate`中注册订阅者,在`onDestroy`中反注册。通过`@Subscribe`注解方法处理事件,如`onMessageEvent`。发送事件使用`EventBus.getDefault().post()`。
|
API 开发者 Kotlin
来个面试题,看看你对 kotlin coroutine掌握得如何?
来个面试题,看看你对 kotlin coroutine掌握得如何?
158 0
|
设计模式 Java 编译器
Kotlin协程(Coroutine)
Kotlin协程(Coroutine)
123 0
|
设计模式 Kotlin
Kotlin设计模式实现之装饰者模式(Decorator)
装饰者模式(Decorator):在不改变对象自身的基础上,动态地给一个对象添加一些额外的职责。与继承相比,装饰者是一种更轻便灵活的做法。若要扩展功能,装饰者提供了比继承更有弹性的替代方法。
208 1
Kotlin设计模式实现之装饰者模式(Decorator)
|
设计模式 缓存 算法
带你手撸一个Kotlin版的EventBus
EventBus的优点有很多(现在来看也并不是优点):代码简洁,是一种发布订阅设计模式(观察者设计模式),简化了组件之间的通讯,分离了事件的发送者和接收者,而且可以随意切换线程,避免了复杂的和易错的依赖关系和生命周期问题
375 0
带你手撸一个Kotlin版的EventBus
|
设计模式 算法 Kotlin
Kotlin设计模式实现之策略模式
Kotlin设计模式实现之策略模式
197 0
Kotlin设计模式实现之策略模式
|
存储 Kotlin
数据结构 | 二分搜索树及它的各种操作(kotlin实现)
在开始之前,应该先讲一下什么是二叉树。
126 0
数据结构 | 二分搜索树及它的各种操作(kotlin实现)
|
存储 C语言 Kotlin
重学数据结构-使用Kotlin实现链表及其他扩展
很简单,链表不像数组那样,不需要我们主动扩容,我们只需要类似递归一样,一层套一层即可,即node1持有node2的引用,node2持有node3…,相应的每次插入我们只需要更改头结点即可,当node-x持有的下一个node引用为null时,我们也可以判定,此时为链表尾节点。
279 0
|
算法 Kotlin
数据结构 | 使用Kotlin实现栈与队列
Last In First Out(LIFO) 后进先出 栈也是一种线性数据结构
695 0