Android 基于Jetpack LiveData实现消息总线

简介: 在Android开发中,跨页面传递数据(尤其是跨多个页面传递数据)是一个很常见的操作,可以通过Handler、接口回调等方式进行传递,但这几种方式都不太优雅,**消息总线**传递数据的方式相比更优雅。

消息总线

在Android开发中,跨页面传递数据(尤其是跨多个页面传递数据)是一个很常见的操作,可以通过Handler、接口回调等方式进行传递,但这几种方式都不太优雅,消息总线传递数据的方式相比更优雅。

消息总线最大的优势就是解耦,避免了类与类之间强耦合,通常消息总线有以下几种实现方式:

EventBus

EventBus整体思想如下:

eventBus.png

EventBus基于发布/订阅模式,发布者和订阅者是一对多的关系,发布者只有一个,订阅者可以有多个,他们之间都是通过EventBus这个调度中心来进行数据处理与传递。其中发布者将数据传递到调度中心,然后调度中心会找到该发布者对应的订阅者,并将数据依次传递到订阅者,从而完成了数据的传递;如果没有订阅者,那么也就不会传递数据了。整个过程发布者和订阅者不需要知道彼此的存在,即数据传递过程是解耦的。

RxBus

RxBus本身是依赖RxJava的强大功能实现的。RxJava中有一个Subject,是一种特殊的存在,它既是Observable,又是Observer,可以将其看做一个桥梁或代理。Subject有以下四种:

  • AsyncSubject: 无论订阅发生在什么时候,Observer只会接收AsyncSubject发送的在onComplete()之前的最后一个数据,且onComplete()是必须要调用的。
  • BehaviorSubject:Observer会先接收BehaviorSubject被订阅之前的最后一个事件,然后接收订阅之后发送的所有事件。
  • PublishSubject: Observer只接收PublishSubject被订阅之后发送的事件。
  • ReplaySubject:无论subscribe订阅是何时开始的,Observer会接收ReplaySubject发送的所有事件。

具体使用方式可以参考:RxJava中关于Subject和Processor的使用

可以通过Subject来实现一个消息总线,因为不是本文的重点介绍,就不再贴代码了,可以自行搜索其实现方式。

LiveDataBus

LiveDataBus是基于LiveData实现的,上篇文章中详细介绍了其用法及优点:

  • 确保界面符合数据状态
    LiveData 遵循观察者模式。当数据发生变化时,LiveData 会通知 Observer 对象,那么Observer回调的方法中就可以进行UI更新,即数据驱动。
  • 不会发生内存泄漏
    观察者会绑定到 Lifecycle 对象,并在其关联的生命周期遭到销毁(如Activity进入ONDESTROY状态)后进行自我清理。
  • 不会因 Activity 停止而导致崩溃
    如果观察者的生命周期处于非活跃状态(如返回栈中的 Activity),则它不会接收任何 LiveData 事件。
  • 不再需要手动处理生命周期
    界面组件只是观察相关数据,不会停止或恢复观察。LiveData 将自动管理所有这些操作,因为它在观察时可以感知相关的生命周期状态变化。
  • 数据始终保持最新状态
    如果生命周期变为非活跃状态,它会在再次变为活跃状态时接收最新的数据。例如,曾经在后台的 Activity 会在返回前台后立即接收最新的数据。
  • 配置更改时自动保存数据
    如果由于配置更改(如设备旋转)而重新创建了 Activity 或 Fragment,它会立即接收最新的可用数据。
  • 共享资源
    使用单例模式扩展 LiveData 对象以封装系统服务,以便在应用中共享它们。LiveData 对象连接到系统服务一次,然后需要相应资源的任何观察者只需观察 LiveData 对象。

原理

  • 消息:发布者发送,订阅者接收。消息可以是基本类型,也可以是自定义类型的消息。
  • 消息通道LiveData 扮演了消息通道的角色,不同的消息通道用不同的名字区分,名字是 String 类型的,可以通过名字获取到一个LiveData 消息通道。
  • 消息总线: 消息总线通过单例实现,不同的消息通道存放在一个 HashMap 中。
  • 订阅:订阅者通过 get() 获取消息通道,然后调用 observe() 订阅这个通道的消息。
  • 发布:发布者通过 get() 获取消息通道,然后调用 setValue()发布消息。

LiveDataBus.png

图片来源:LiveData实现消息总线

LiveData实现消息总线的优势

相比于EventBusRxBus,使用LiveData实现消息总线有下面几个优势:

  • EventBus、RxBus、LiveDataBus都需要对事件进行注册、解注册。不同于EventBus、RxBus手动解注册,LiveData可以自动管理生命周期,所以也能实现自动解注册,避免忘记解注册而导致内存泄漏。
  • LiveData实现简单,其为Jetpack中重要的一员,且为官方推出,支持更好
  • LiveData相比于EventBus、RxBus,类更少,包更小。

LiveData实现消息总线存在的隐患

LiveData默认是粘性消息

上一篇介绍LiveData的文章Android Jetpack系列之LiveData 中,我们也看到了LiveData发送的消息为粘性消息,即先发布后订阅也能收到消息,再把订阅observe()的逻辑贴出来:

@MainThread
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
    assertMainThread("observe");
    if (owner.getLifecycle().getCurrentState() == DESTROYED) {
        //如果当前观察者处于DESTROYED状态,直接返回
        return;
    }
    //将LifecycleOwner、Observer包装成LifecycleBoundObserver
    LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
    //ObserverWrapper是LifecycleBoundObserver的父类
    ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
    //如果mObservers中存在该Observer且跟传进来的LifecycleOwner不同,直接抛异常,一个Observer只能对应一个LifecycleOwner
    if (existing != null && !existing.isAttachedTo(owner)) {
        throw new IllegalArgumentException("Cannot add the same observer"
                + " with different lifecycles");
    }
    //如果已经存在Observer且跟传进来的LifecycleOwner是同一个,直接返回
    if (existing != null) {
        return;
    }
    //通过Lifecycle添加观察者
    owner.getLifecycle().addObserver(wrapper);
}

最后执行addObserver()后,内部通过LifecycleRegistry添加Observer,进而会执行到onStateChanged()方法,该方法辗转又调用到dispatchingValue方法(setValue/postValue最终也会调用到该方法),接着会调用到我们最关心的considerNotify()

    void dispatchingValue(@Nullable ObserverWrapper initiator) {
        if (mDispatchingValue) {
            mDispatchInvalidated = true;
            return;
        }
        mDispatchingValue = true;
        do {
            mDispatchInvalidated = false;
            if (initiator != null) {
                //2、通过observe()的方式会调用这里
                considerNotify(initiator);
                initiator = null;
            } else {
                //1、通过setValue/postValue的方式会调用这里,遍历所有观察者并进行分发
                for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator =
                        mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
                    considerNotify(iterator.next().getValue());
                    if (mDispatchInvalidated) {
                        break;
                    }
                }
            }
        } while (mDispatchInvalidated);
        mDispatchingValue = false;
    }

    private void considerNotify(ObserverWrapper observer) {
        if (!observer.mActive) {
            //观察者不在活跃状态 直接返回
            return;
        }
        //如果是observe(),则是在STARTED、RESUMED状态时活跃;如果是ObserveForever(),则认为一直是活跃状态
        if (!observer.shouldBeActive()) {
            observer.activeStateChanged(false);
            return;
        }
        //Observer中的Version必须小于LiveData中的Version,防止重复发送
        if (observer.mLastVersion >= mVersion) {
            return;
        }
        observer.mLastVersion = mVersion;
        //回调Observer的onChange方法并接收数据
        observer.mObserver.onChanged((T) mData);
    }

可以看到considerNotify()里有这么一个逻辑:

if (observer.mLastVersion >= mVersion) {
   return;
   }

mVersion代表版本号,发送方、订阅方都有这个变量,默认是-1。发送方每发送一个消息,mVersion都会进行+1操作;而Observer中的mVersion每成功接收一次消息,都会将发送方最新的version赋值给自己的mLastVersion,当Observer中的mLastVersion>=发送方mVersion时,Observer会拒绝接收消息,防止重复发送消息。

所以,如果当发送方之前的mVersion不是默认值-1,说明LiveData发送过消息。如果此时执行LiveData.observe(),因为Observer中的mLastVersion为默认值-1,小于发送方的mVersion,所以该消息不会被拦截,Observer一定可以拿到之前发送的消息,即粘性消息。

LiveData.postValue可能会丢失消息

当频繁使用LiveData.postValue发送多个消息时,LiveData.observe()接收消息时可能会发生丢失,为什么会这样呢?来看postValue()的内部实现

//LiveData.java

//postValue发送数据,可以在子线程中使用
protected void postValue(T value) {
    boolean postTask;
    synchronized (mDataLock) {
       //mPendingData默认值是NOT_SET,第一次发送时postTask是true
       postTask = mPendingData == NOT_SET;
       //将发送的值赋值给mPendingData
       mPendingData = value;
    }
    //第一次发送时postTask是true,当第一个消息还未处理时,后面再发送消息时postTask会变成false,所以后面的消息都会被拦截,但是发送的值可以更新到第一次发送里里面
    if (!postTask) {
       return;
    }
    ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable);
 }
    
//setValue发送数据,只能在主线程中使用
protected void setValue(T value) {
    assertMainThread("setValue");
    mVersion++;
    mData = value;
    dispatchingValue(null); 
 }

private final Runnable mPostValueRunnable = new Runnable() {
        @SuppressWarnings("unchecked")
        @Override
        public void run() {
            Object newValue;
            synchronized (mDataLock) {
                //将mPendingData中的值通过setValue传给Observer,并将自身格式化为NOT_SET
                newValue = mPendingData;
                mPendingData = NOT_SET;
            }
            setValue((T) newValue);
     }
 };

详细的过程写在注释中了,主要的原因就是postValue发送消息时,会判断之前的消息是否已经处理,如果还未处理,会将当前发送的最新值更新到之前的消息中去(之前的消息存在mPendingData中,直接更新之),所以当多次频繁使用postValue发送消息时,Observer收到的为最后一次发送的最新值。个人猜测官方这么实现的目的主要是LiveDataMVVM架构中使用,既主要为了更新UI的最新数据即可,但是当用LiveData实现的消息总线时,可能就会出现丢失消息的隐患了,这是我们不想看到的,那么怎么解决呢?放弃使用postValue,都通过setValue去发送消息,如果是在子线程中发送消息,自行构建Handler发送到主线程中即可,后续贴代码。

解决方案

支持粘性、非粘性消息

因为LiveData默认即是粘性消息,我们只需要添加非粘性消息支持即可,LiveDatamVersion默认是private的,如果想在其他类中使用,可以通过反射获取,但是效率相对低;还可以通过androidx.lifecycle包名来避免反射获取LiveData.mVersion,代码如下:

//package androidx.lifecycle
open class ExternalLiveData<T> : MutableLiveData<T>() {

    companion object {
        //通过androidx.lifecycle包名来避免反射获取LiveData.START_VERSION
        const val START_VERSION = LiveData.START_VERSION
    }

    override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
        if (owner.lifecycle.currentState == Lifecycle.State.DESTROYED) {
            // ignore
            return
        }
        try {
            val wrapper = ExternalLifecycleBoundObserver(owner, observer)
            val existing =
                callMethodPutIfAbsent(observer, wrapper) as? LiveData<*>.LifecycleBoundObserver
            require(!(existing != null && !existing.isAttachedTo(owner))) {
                ("Cannot add the same observer" + " with different lifecycles")
            }
            if (existing != null) return
            owner.lifecycle.addObserver(wrapper)
        } catch (e: Exception) {
            //ignore
        }
    }

    //继承父类并将修饰符改为public,可以对外暴露
    public override fun getVersion(): Int {
        return super.getVersion()
    }

    internal inner class ExternalLifecycleBoundObserver(
        owner: LifecycleOwner,
        observer: Observer<in T>?
    ) : LifecycleBoundObserver(owner, observer) {
        override fun shouldBeActive(): Boolean {
            return mOwner.lifecycle.currentState.isAtLeast(observerActiveLevel())
        }
    }

    /**
     * @return Lifecycle.State
     */
    protected open fun observerActiveLevel(): Lifecycle.State {
        return Lifecycle.State.STARTED
    }

    //反射获取LiveData.mObservers
    private val fieldObservers: Any
        get() {
            val fieldObservers = LiveData::class.java.getDeclaredField("mObservers")
            fieldObservers.isAccessible = true
            return fieldObservers
        }

    /**
     * 反射调用LiveData的putIfAbsent方法
     */
    private fun callMethodPutIfAbsent(observer: Any, wrapper: Any): Any? {
        val mObservers = fieldObservers.javaClass
        val putIfAbsent =
            mObservers.getDeclaredMethod("putIfAbsent", Any::class.java, Any::class.java)
        putIfAbsent.isAccessible = true
        return putIfAbsent.invoke(mObservers, observer, wrapper)
    }
}

这样外面就可以使用mVersion了,整体思路是通过装饰者模式对Observer进行控制,如:

/**
 * Observer装饰者模式
 */
class ObserverWrapper<T>(
        private val observer: Observer<T>,
        var preventNextEvent: Boolean = false
) : Observer<T> {
    override fun onChanged(t: T) {
        if (preventNextEvent) {
            preventNextEvent = false
            return
        }
        observer.onChanged(t)
    }
}

非粘性消息

val observerWrapper = ObserverWrapper(observer)
observerWrapper.preventNextEvent = liveData.version > ExternalLiveData.START_VERSION
liveData.observe(owner, observerWrapper)

liveData.version > ExternalLiveData.START_VERSION 说明liveData里发送过消息,version值已经不是初始值,如果是后注册的观察者,observerWrapper.preventNextEvent返回的是true,即会屏蔽当前消息,观察者不执行;如果是先注册的观察者,则不受影响,这样就是实现了非粘性消息。

粘性消息

val observerWrapper = ObserverWrapper(observer)
liveData.observe(owner, observerWrapper)

没什么可说的,默认就是粘性的,无需特殊处理。

支持子线程发送消息

判断是否在主线程:

object ThreadUtils {

    /**
     * 是否是在主线程
     */
    fun isMainThread(): Boolean {
        return Looper.myLooper() == Looper.getMainLooper()
    }
}

发送消息时判断当前所在线程:

private val mainHandler = Handler(Looper.getMainLooper())

override fun post(value: T) {
    if (ThreadUtils.isMainThread()) {
        postInternal(value)
    } else {
        mainHandler.post(PostValueTask(value))
    }
}

@MainThread
private fun postInternal(value: T) {
    liveData.value = value
}

inner class PostValueTask(val newValue: T) : Runnable {
    override fun run() {
        postInternal(newValue)
    }
}

post消息时,先判断当前所在线程,主线程的话直接发送,在子线程的话通过MainHandler将消息发送到主线程再发送,从而支持了在子线程发送消息。

参考

【1】https://tech.meituan.com/2018/07/26/android-livedatabus.html

【2】文中代码主要来自 https://github.com/JeremyLiao/LiveEventBus

相关文章
|
2月前
|
安全 Java Android开发
安卓开发中的新趋势:Kotlin与Jetpack的完美结合
【6月更文挑战第20天】在不断进化的移动应用开发领域,Android平台以其开放性和灵活性赢得了全球开发者的青睐。然而,随着技术的迭代,传统Java语言在Android开发中逐渐显露出局限性。Kotlin,一种现代的静态类型编程语言,以其简洁、安全和高效的特性成为了Android开发中的新宠。同时,Jetpack作为一套支持库、工具和指南,旨在帮助开发者更快地打造优秀的Android应用。本文将探讨Kotlin与Jetpack如何共同推动Android开发进入一个新的时代,以及这对开发者意味着什么。
|
22天前
|
存储 数据库 Android开发
🔥Android Jetpack全解析!拥抱Google官方库,让你的开发之旅更加顺畅无阻!🚀
【7月更文挑战第28天】在Android开发中追求高效稳定的路径?Android Jetpack作为Google官方库集合,是你的理想选择。它包含多个独立又协同工作的库,覆盖UI到安全性等多个领域,旨在减少样板代码,提高开发效率与应用质量。Jetpack核心组件如LiveData、ViewModel、Room等简化了数据绑定、状态保存及数据库操作。引入Jetpack只需在`build.gradle`中添加依赖。例如,使用Room进行数据库操作变得异常简单,从定义实体到实现CRUD操作,一切尽在掌握之中。拥抱Jetpack,提升开发效率,构建高质量应用!
40 4
|
1月前
|
存储 前端开发 测试技术
Android Kotlin中使用 LiveData、ViewModel快速实现MVVM模式
使用Kotlin实现MVVM模式是Android开发的现代实践。该模式分离UI和业务逻辑,借助LiveData、ViewModel和DataBinding增强代码可维护性。步骤包括创建Model层处理数据,ViewModel层作为数据桥梁,以及View层展示UI。添加相关依赖后,Model类存储数据,ViewModel类通过LiveData管理变化,而View层使用DataBinding实时更新UI。这种架构提升代码可测试性和模块化。
99 2
|
2月前
|
安全 JavaScript 前端开发
kotlin开发安卓app,JetPack Compose框架,给webview新增一个按钮,点击刷新网页
在Kotlin中开发Android应用,使用Jetpack Compose框架时,可以通过添加一个按钮到TopAppBar来实现WebView页面的刷新功能。按钮位于右上角,点击后调用`webViewState?.reload()`来刷新网页内容。以下是代码摘要:
|
2月前
|
JavaScript Java Android开发
kotlin安卓在Jetpack Compose 框架下跨组件通讯EventBus
**EventBus** 是一个Android事件总线库,简化组件间通信。要使用它,首先在Gradle中添加依赖`implementation &#39;org.greenrobot:eventbus:3.3.1&#39;`。然后,可选地定义事件类如`MessageEvent`。在活动或Fragment的`onCreate`中注册订阅者,在`onDestroy`中反注册。通过`@Subscribe`注解方法处理事件,如`onMessageEvent`。发送事件使用`EventBus.getDefault().post()`。
171 2
|
2月前
|
JavaScript 前端开发 Android开发
kotlin安卓在Jetpack Compose 框架下使用webview , 网页中的JavaScript代码如何与native交互
在Jetpack Compose中使用Kotlin创建Webview组件,设置JavaScript交互:`@Composable`函数`ComposableWebView`加载网页并启用JavaScript。通过`addJavascriptInterface`添加`WebAppInterface`类,允许JavaScript调用Android方法如播放音频。当页面加载完成时,执行`onWebViewReady`回调。
|
2月前
|
监控 Android开发 数据安全/隐私保护
安卓kotlin JetPack Compose 实现摄像头监控画面变化并录制视频
在这个示例中,开发者正在使用Kotlin和Jetpack Compose构建一个Android应用程序,该程序 能够通过手机后置主摄像头录制视频、检测画面差异、实时预览并将视频上传至FTP服务器的Android应用
139 1
|
1月前
|
XML 存储 API
Jetpack初尝试 NavController,LiveData,DataBing,ViewModel,Paging
Jetpack初尝试 NavController,LiveData,DataBing,ViewModel,Paging
|
2月前
|
Android开发
Jetpack Compose: Hello Android
Jetpack Compose: Hello Android
20 0
|
2月前
|
安全 网络安全 API
kotlin安卓开发JetPack Compose 如何使用webview 打开网页时给webview注入cookie
在Jetpack Compose中使用WebView需借助AndroidView。要注入Cookie,首先在`build.gradle`添加WebView依赖,如`androidx.webkit:webkit:1.4.0`。接着创建自定义`ComposableWebView`,通过`CookieManager`设置接受第三方Cookie并注入Cookie字符串。最后在Compose界面使用这个自定义组件加载URL。注意Android 9及以上版本可能需要在网络安全配置中允许第三方Cookie。
278 0