消息总线
在Android开发中,跨页面传递数据(尤其是跨多个页面传递数据)是一个很常见的操作,可以通过Handler、接口回调等方式进行传递,但这几种方式都不太优雅,消息总线传递数据的方式相比更优雅。
消息总线最大的优势就是解耦,避免了类与类之间强耦合,通常消息总线有以下几种实现方式:
- EventBus:https://github.com/greenrobot/EventBus
- RxBus : 基于RxJava实现的消息总线
- LiveDataBus:基于
Jetpack
中的LiveData
实现,也是本文主要介绍的实现方式。
EventBus
EventBus
整体思想如下:
EventBus基于发布/订阅模式,发布者和订阅者是一对多的关系,发布者只有一个,订阅者可以有多个,他们之间都是通过EventBus这个调度中心来进行数据处理与传递。其中发布者将数据传递到调度中心,然后调度中心会找到该发布者对应的订阅者,并将数据依次传递到订阅者,从而完成了数据的传递;如果没有订阅者,那么也就不会传递数据了。整个过程发布者和订阅者不需要知道彼此的存在,即数据传递过程是解耦的。
RxBus
RxBus
本身是依赖RxJava
的强大功能实现的。RxJava
中有一个Subject
,是一种特殊的存在,它既是Observable
,又是Observer
,可以将其看做一个桥梁或代理。Subject
有以下四种:
- AsyncSubject: 无论订阅发生在什么时候,Observer只会接收AsyncSubject发送的在onComplete()之前的最后一个数据,且onComplete()是必须要调用的。
- BehaviorSubject:Observer会先接收BehaviorSubject被订阅之前的最后一个事件,然后接收订阅之后发送的所有事件。
- PublishSubject: Observer只接收PublishSubject被订阅之后发送的事件。
- ReplaySubject:无论subscribe订阅是何时开始的,Observer会接收ReplaySubject发送的所有事件。
具体使用方式可以参考:RxJava中关于Subject和Processor的使用
可以通过Subject
来实现一个消息总线,因为不是本文的重点介绍,就不再贴代码了,可以自行搜索其实现方式。
LiveDataBus
LiveDataBus
是基于LiveData
实现的,上篇文章中详细介绍了其用法及优点:
- 确保界面符合数据状态
LiveData 遵循观察者模式。当数据发生变化时,LiveData 会通知 Observer 对象,那么Observer回调的方法中就可以进行UI更新,即数据驱动。 - 不会发生内存泄漏
观察者会绑定到 Lifecycle 对象,并在其关联的生命周期遭到销毁(如Activity进入ONDESTROY状态)后进行自我清理。 - 不会因 Activity 停止而导致崩溃
如果观察者的生命周期处于非活跃状态(如返回栈中的 Activity),则它不会接收任何 LiveData 事件。 - 不再需要手动处理生命周期
界面组件只是观察相关数据,不会停止或恢复观察。LiveData 将自动管理所有这些操作,因为它在观察时可以感知相关的生命周期状态变化。 - 数据始终保持最新状态
如果生命周期变为非活跃状态,它会在再次变为活跃状态时接收最新的数据。例如,曾经在后台的 Activity 会在返回前台后立即接收最新的数据。 - 配置更改时自动保存数据
如果由于配置更改(如设备旋转)而重新创建了 Activity 或 Fragment,它会立即接收最新的可用数据。 - 共享资源
使用单例模式扩展 LiveData 对象以封装系统服务,以便在应用中共享它们。LiveData 对象连接到系统服务一次,然后需要相应资源的任何观察者只需观察 LiveData 对象。
原理
- 消息:发布者发送,订阅者接收。消息可以是基本类型,也可以是自定义类型的消息。
- 消息通道:
LiveData
扮演了消息通道的角色,不同的消息通道用不同的名字区分,名字是String
类型的,可以通过名字获取到一个LiveData
消息通道。 - 消息总线: 消息总线通过单例实现,不同的消息通道存放在一个
HashMap
中。 - 订阅:订阅者通过
get()
获取消息通道,然后调用observe()
订阅这个通道的消息。 - 发布:发布者通过
get()
获取消息通道,然后调用setValue()
发布消息。
图片来源:LiveData实现消息总线
LiveData实现消息总线的优势
相比于EventBus
、RxBus
,使用LiveData
实现消息总线有下面几个优势:
EventBus、RxBus、LiveDataBus
都需要对事件进行注册、解注册。不同于EventBus、RxBus
手动解注册,LiveData
可以自动管理生命周期,所以也能实现自动解注册,避免忘记解注册而导致内存泄漏。LiveData
实现简单,其为Jetpack
中重要的一员,且为官方推出,支持更好LiveData
相比于EventBus、RxBus
,类更少,包更小。
LiveData实现消息总线存在的隐患
LiveData默认是粘性消息
上一篇介绍LiveData的文章Android Jetpack系列之LiveData 中,我们也看到了LiveData
发送的消息为粘性消息,即先发布后订阅也能收到消息,再把订阅observe()
的逻辑贴出来:
@MainThread
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
assertMainThread("observe");
if (owner.getLifecycle().getCurrentState() == DESTROYED) {
//如果当前观察者处于DESTROYED状态,直接返回
return;
}
//将LifecycleOwner、Observer包装成LifecycleBoundObserver
LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
//ObserverWrapper是LifecycleBoundObserver的父类
ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
//如果mObservers中存在该Observer且跟传进来的LifecycleOwner不同,直接抛异常,一个Observer只能对应一个LifecycleOwner
if (existing != null && !existing.isAttachedTo(owner)) {
throw new IllegalArgumentException("Cannot add the same observer"
+ " with different lifecycles");
}
//如果已经存在Observer且跟传进来的LifecycleOwner是同一个,直接返回
if (existing != null) {
return;
}
//通过Lifecycle添加观察者
owner.getLifecycle().addObserver(wrapper);
}
最后执行addObserver()
后,内部通过LifecycleRegistry
添加Observer
,进而会执行到onStateChanged()
方法,该方法辗转又调用到dispatchingValue
方法(setValue/postValue最终也会调用到该方法),接着会调用到我们最关心的considerNotify():
void dispatchingValue(@Nullable ObserverWrapper initiator) {
if (mDispatchingValue) {
mDispatchInvalidated = true;
return;
}
mDispatchingValue = true;
do {
mDispatchInvalidated = false;
if (initiator != null) {
//2、通过observe()的方式会调用这里
considerNotify(initiator);
initiator = null;
} else {
//1、通过setValue/postValue的方式会调用这里,遍历所有观察者并进行分发
for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator =
mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
considerNotify(iterator.next().getValue());
if (mDispatchInvalidated) {
break;
}
}
}
} while (mDispatchInvalidated);
mDispatchingValue = false;
}
private void considerNotify(ObserverWrapper observer) {
if (!observer.mActive) {
//观察者不在活跃状态 直接返回
return;
}
//如果是observe(),则是在STARTED、RESUMED状态时活跃;如果是ObserveForever(),则认为一直是活跃状态
if (!observer.shouldBeActive()) {
observer.activeStateChanged(false);
return;
}
//Observer中的Version必须小于LiveData中的Version,防止重复发送
if (observer.mLastVersion >= mVersion) {
return;
}
observer.mLastVersion = mVersion;
//回调Observer的onChange方法并接收数据
observer.mObserver.onChanged((T) mData);
}
可以看到considerNotify()
里有这么一个逻辑:
if (observer.mLastVersion >= mVersion) {
return;
}
mVersion
代表版本号,发送方、订阅方都有这个变量,默认是-1。发送方每发送一个消息,mVersion
都会进行+1操作;而Observer
中的mVersion
每成功接收一次消息,都会将发送方最新的version赋值给自己的mLastVersion
,当Observer
中的mLastVersion
>=发送方mVersion
时,Observer
会拒绝接收消息,防止重复发送消息。
所以,如果当发送方之前的mVersion
不是默认值-1,说明LiveData
发送过消息。如果此时执行LiveData.observe()
,因为Observer
中的mLastVersion
为默认值-1,小于发送方的mVersion
,所以该消息不会被拦截,Observer
一定可以拿到之前发送的消息,即粘性消息。
LiveData.postValue可能会丢失消息
当频繁使用LiveData.postValue发送多个消息时,LiveData.observe()接收消息时可能会发生丢失,为什么会这样呢?来看postValue()
的内部实现
//LiveData.java
//postValue发送数据,可以在子线程中使用
protected void postValue(T value) {
boolean postTask;
synchronized (mDataLock) {
//mPendingData默认值是NOT_SET,第一次发送时postTask是true
postTask = mPendingData == NOT_SET;
//将发送的值赋值给mPendingData
mPendingData = value;
}
//第一次发送时postTask是true,当第一个消息还未处理时,后面再发送消息时postTask会变成false,所以后面的消息都会被拦截,但是发送的值可以更新到第一次发送里里面
if (!postTask) {
return;
}
ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable);
}
//setValue发送数据,只能在主线程中使用
protected void setValue(T value) {
assertMainThread("setValue");
mVersion++;
mData = value;
dispatchingValue(null);
}
private final Runnable mPostValueRunnable = new Runnable() {
@SuppressWarnings("unchecked")
@Override
public void run() {
Object newValue;
synchronized (mDataLock) {
//将mPendingData中的值通过setValue传给Observer,并将自身格式化为NOT_SET
newValue = mPendingData;
mPendingData = NOT_SET;
}
setValue((T) newValue);
}
};
详细的过程写在注释中了,主要的原因就是postValue发送消息时,会判断之前的消息是否已经处理,如果还未处理,会将当前发送的最新值更新到之前的消息中去(之前的消息存在mPendingData中,直接更新之),所以当多次频繁使用postValue发送消息时,Observer收到的为最后一次发送的最新值。个人猜测官方这么实现的目的主要是LiveData
在MVVM
架构中使用,既主要为了更新UI的最新数据即可,但是当用LiveData
实现的消息总线时,可能就会出现丢失消息的隐患了,这是我们不想看到的,那么怎么解决呢?放弃使用postValue
,都通过setValue
去发送消息,如果是在子线程中发送消息,自行构建Handler
发送到主线程中即可,后续贴代码。
解决方案
支持粘性、非粘性消息
因为LiveData
默认即是粘性消息,我们只需要添加非粘性消息支持即可,LiveData
的mVersion
默认是private
的,如果想在其他类中使用,可以通过反射获取,但是效率相对低;还可以通过androidx.lifecycle
包名来避免反射获取LiveData.mVersion
,代码如下:
//package androidx.lifecycle
open class ExternalLiveData<T> : MutableLiveData<T>() {
companion object {
//通过androidx.lifecycle包名来避免反射获取LiveData.START_VERSION
const val START_VERSION = LiveData.START_VERSION
}
override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
if (owner.lifecycle.currentState == Lifecycle.State.DESTROYED) {
// ignore
return
}
try {
val wrapper = ExternalLifecycleBoundObserver(owner, observer)
val existing =
callMethodPutIfAbsent(observer, wrapper) as? LiveData<*>.LifecycleBoundObserver
require(!(existing != null && !existing.isAttachedTo(owner))) {
("Cannot add the same observer" + " with different lifecycles")
}
if (existing != null) return
owner.lifecycle.addObserver(wrapper)
} catch (e: Exception) {
//ignore
}
}
//继承父类并将修饰符改为public,可以对外暴露
public override fun getVersion(): Int {
return super.getVersion()
}
internal inner class ExternalLifecycleBoundObserver(
owner: LifecycleOwner,
observer: Observer<in T>?
) : LifecycleBoundObserver(owner, observer) {
override fun shouldBeActive(): Boolean {
return mOwner.lifecycle.currentState.isAtLeast(observerActiveLevel())
}
}
/**
* @return Lifecycle.State
*/
protected open fun observerActiveLevel(): Lifecycle.State {
return Lifecycle.State.STARTED
}
//反射获取LiveData.mObservers
private val fieldObservers: Any
get() {
val fieldObservers = LiveData::class.java.getDeclaredField("mObservers")
fieldObservers.isAccessible = true
return fieldObservers
}
/**
* 反射调用LiveData的putIfAbsent方法
*/
private fun callMethodPutIfAbsent(observer: Any, wrapper: Any): Any? {
val mObservers = fieldObservers.javaClass
val putIfAbsent =
mObservers.getDeclaredMethod("putIfAbsent", Any::class.java, Any::class.java)
putIfAbsent.isAccessible = true
return putIfAbsent.invoke(mObservers, observer, wrapper)
}
}
这样外面就可以使用mVersion
了,整体思路是通过装饰者模式对Observer
进行控制,如:
/**
* Observer装饰者模式
*/
class ObserverWrapper<T>(
private val observer: Observer<T>,
var preventNextEvent: Boolean = false
) : Observer<T> {
override fun onChanged(t: T) {
if (preventNextEvent) {
preventNextEvent = false
return
}
observer.onChanged(t)
}
}
非粘性消息:
val observerWrapper = ObserverWrapper(observer)
observerWrapper.preventNextEvent = liveData.version > ExternalLiveData.START_VERSION
liveData.observe(owner, observerWrapper)
liveData.version > ExternalLiveData.START_VERSION
说明liveData
里发送过消息,version
值已经不是初始值,如果是后注册的观察者,observerWrapper.preventNextEvent
返回的是true
,即会屏蔽当前消息,观察者不执行;如果是先注册的观察者,则不受影响,这样就是实现了非粘性消息。
粘性消息:
val observerWrapper = ObserverWrapper(observer)
liveData.observe(owner, observerWrapper)
没什么可说的,默认就是粘性的,无需特殊处理。
支持子线程发送消息
判断是否在主线程:
object ThreadUtils {
/**
* 是否是在主线程
*/
fun isMainThread(): Boolean {
return Looper.myLooper() == Looper.getMainLooper()
}
}
发送消息时判断当前所在线程:
private val mainHandler = Handler(Looper.getMainLooper())
override fun post(value: T) {
if (ThreadUtils.isMainThread()) {
postInternal(value)
} else {
mainHandler.post(PostValueTask(value))
}
}
@MainThread
private fun postInternal(value: T) {
liveData.value = value
}
inner class PostValueTask(val newValue: T) : Runnable {
override fun run() {
postInternal(newValue)
}
}
当post
消息时,先判断当前所在线程,主线程的话直接发送,在子线程的话通过MainHandler
将消息发送到主线程再发送,从而支持了在子线程发送消息。
参考
【1】https://tech.meituan.com/2018/07/26/android-livedatabus.html
【2】文中代码主要来自 https://github.com/JeremyLiao/LiveEventBus