Jetpack 系列(2)—— 为什么 LiveData 会重放数据,怎么解决?

简介: Jetpack 系列(2)—— 为什么 LiveData 会重放数据,怎么解决?

前言


  • LiveData 是 Jetpack 组件中较常用的组件之一,曾经也是实现 MVVM 模式的标准组件之一,不过目前 Google 更多推荐使用 Kotlin Flow 来代替 LiveData;
  • 虽然 LiveData 不再是 Google 主推的组件,但考虑到 LiveData 依然存在于大量存量代码中,以及 LiveData 伴随着 Android 生态发展过程中衍生的问题和解决方案,我认为 LiveData 依然有存在的意义。虽然我们不再优先使用 LiveData,但不代表学习 LiveData 没有价值。


1. 认识 LiveData


1.1 为什么要使用 LiveData?


LiveData 是基于 Lifecycle 框架实现的生命周期感知型数据容器,能够让数据观察者更加安全地应对宿主(Activity / Fragment 等)生命周期变化,核心概括为 2 点:


  • 1、自动取消订阅: 当宿主生命周期进入消亡(DESTROYED)状态时,LiveData 会自动移除观察者,避免内存泄漏;
  • 2、安全地回调数据: 在宿主生命周期状态低于活跃状态(STAETED)时,LiveData 不会回调数据,避免产生空指针异常或不必要的性能损耗;当宿主生命周期不低于活跃状态(STAETED)时,LiveData 会重新尝试回调数据,确保观察者接收到最新的数据。


1.2 LiveData 的使用方法


  • 1、添加依赖: 在 build.gradle 中添加 LiveData 依赖,需要注意区分过时的方式:


// 过时方式(lifecycle-extensions 不再维护)
implementation "androidx.lifecycle:lifecycle-extensions:2.4.0"
// 目前的方式:
def lifecycle_version = "2.5.0"
// Lifecycle 核心类
implementation "androidx.lifecycle:lifecycle-runtime:$lifecycle_version"
// LiveData
implementation "androidx.lifecycle:lifecycle-livedata-ktx:$lifecycle_version"
// ViewModel
implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:$lifecycle_version"
复制代码
  • 2、模板代码: LiveData 通常会搭配 ViewModel 使用,以下为使用模板,相信大家都很熟悉了:


NameViewModel.kt


class NameViewModel : ViewModel() {
    val currentName: MutableLiveData<String> by lazy {
        MutableLiveData<String>()
    }
}
复制代码

MainActivity.kt


class MainActivity : AppCompatActivity() {
    private val model: NameViewModel by viewModels()
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        // LiveData 观察者
        val nameObserver = Observer<String> { newName ->
            // 更新视图
            nameTextView.text = newName
        }
        // 注册 LiveData 观察者,this 为生命周期宿主
        model.currentName.observe(this, nameObserver)
        // 修改 LiveData 数据
        button.setOnClickListener {
            val anotherName = "John Doe"
            model.currentName.value = anotherName
        }
    }
}
复制代码
  • 3、注册观察者: LiveData 支持两种注册观察者的方式:
  • LiveData#observe(LifecycleOwner, Observer) 带生命周期感知的注册: 更常用的注册方式,这种方式能够获得 LiveData 自动取消订阅和安全地回调数据的特性;
  • LiveData#observeForever(Observer) 永久注册: LiveData 会一直持有观察者的引用,只要数据更新就会回调,因此这种方式必须在合适的时机手动移除观察者。

Observer.java


// 观察者接口
public interface Observer<T> {
    void onChanged(T t);
}
复制代码
  • 4、设置数据: LiveData 设置数据需要利用子类 MutableLiveData 提供的接口:setValue() 为同步设置数据,postValue() 为异步设置数据,内部将 post 到主线程再修改数据。

MutableLiveData.java


public class MutableLiveData<T> extends LiveData<T> {
    // 异步设置数据
    @Override
    public void postValue(T value) {
        super.postValue(value);
    }
    // 同步设置数据
    @Override
    public void setValue(T value) {
        super.setValue(value);
    }
}
复制代码


1.3 LiveData 存在的局限


LiveData 是 Android 生态中一个的简单的生命周期感知型容器。简单即是它的优势,也是它的局限,当然这些局限性不应该算 LiveData 的缺点,因为 LiveData 的设计初衷就是一个简单的数据容器,需要具体问题具体分析。对于简单的数据流场景,使用 LiveData 完全没有问题。


  • 1、LiveData 只能在主线程更新数据: 只能在主线程 setValue,即使 postValue 内部也是切换到主线程执行;
  • 2、LiveData 数据重放问题: 注册新的订阅者,会重新收到 LiveData 存储的数据,这在有些情况下不符合预期(具体见第 TODO 节);
  • 3、LiveData 不防抖问题: 重复 setValue 相同的值,订阅者会收到多次 onChanged() 回调(可以使用 distinctUntilChanged() 优化);
  • 4、LiveData 丢失数据问题: 在数据生产速度 > 数据消费速度时,LiveData 无法观察者能够接收到全部数据。比如在子线程大量 postValue 数据但主线程消费跟不上时,中间就会有一部分数据被忽略。


1.4 LiveData 的替代者


  • 1、RxJava: RxJava 是第三方组织 ReactiveX 开发的组件,Rx 是一个包括 Java、Go 等语言在内的多语言数据流框架。功能强大是它的优势,支持大量丰富的操作符,也支持线程切换和背压。然而 Rx 的学习门槛过高,对开发反而是一种新的负担,也会带来误用的风险。
  • 2、Kotlin Flow: Kotlin Flow 是基于 Kotlin 协程基础能力搭建的一套数据流框架,从功能复杂性上看是介于 LiveData 和 RxJava 之间的解决方案。Kotlin Flow 拥有比 LiveData 更丰富的能力,但裁剪了 RxJava 大量复杂的操作符,做得更加精简。并且在 Kotlin 协程的加持下,Kotlin Flow 目前是 Google 主推的数据流框架。


关于 Kotlin Flow 的更多内容,我们在 4、Flow:LiveData 的替代方案 这篇文章讨论过。


2. LiveData 实现原理分析


2.1 注册观察者的执行过程


LiveData 支持使用 observe() 或 observeForever() 两种方式注册观察者,其内部会分别包装为 2 种包装对象:


  • 1、observe(): 将观察者包装为 LifecycleBoundObserver 对象,它是 Lifecycle 框架中 LifecycleEventObserver 的实现类,因此它可以绑定到宿主(参数 owner)的生命周期上,这是实现 LiveData 自动取消订阅和安全地回调数据的关键;
  • 2、observeForever(): 将观察者包装为 AlwaysActiveObserver,不会关联宿主生命周期,当然你也可以理解为全局生命周期。


注意: LiveData 内部会禁止一个观察者同时使用 observe() 和 observeForever() 两种注册方式。但同一个 LiveData 可以接收 observe() 和 observeForever() 两种观察者。


LiveData.java


private SafeIterableMap<Observer<? super T>, ObserverWrapper> mObservers = new SafeIterableMap<>();
// 注册方式 1:带生命周期感知的注册方式
@MainThread
public void observe(LifecycleOwner owner, Observer<? super T> observer) {
    // 1.1 主线程检查
    assertMainThread("observe");
    // 1.2 宿主生命周期状态是 DESTROY,则跳过
    if (owner.getLifecycle().getCurrentState() == DESTROYED) {
        return;
    }
    // 1.3 将 Observer 包装为 LifecycleBoundObserver
    LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
    ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
    // 1.4 禁止将 Observer 绑定到不同的宿主上
    if (existing != null && !existing.isAttachedTo(owner)) {
        throw new IllegalArgumentException("Cannot add the same observer with different lifecycles");
    }
    if (existing != null) {
        return;
    }
    // 1.5 将包装类注册到宿主声明周期上
    owner.getLifecycle().addObserver(wrapper);
}
// 注册方式 2:永久注册的方式
@MainThread
public void observeForever(Observer<? super T> observer) {
    // 2.1 主线程检查
    assertMainThread("observeForever");
    // 2.2 将 Observer 包装为 AlwaysActiveObserver
    AlwaysActiveObserver wrapper = new AlwaysActiveObserver(observer);
    // 2.3 禁止将 Observer 注册到生命周期宿主后又进行永久注册
    ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
    if (existing instanceof LiveData.LifecycleBoundObserver) {
        throw new IllegalArgumentException("Cannot add the same observer with different lifecycles");
    }
    if (existing != null) {
        return;
    }
    // 2.4 分发最新数据
    wrapper.activeStateChanged(true);
}
// 注销观察者
@MainThread
public void removeObserver(@NonNull final Observer<? super T> observer) {
    // 主线程检查
    assertMainThread("removeObserver");
    // 移除
    ObserverWrapper removed = mObservers.remove(observer);
    if (removed == null) {
        return;
    }
    // removed.detachObserver() 方法:
    // LifecycleBoundObserver 最终会调用 Lifecycle#removeObserver()
    // AlwaysActiveObserver 为空实现
    removed.detachObserver();
    removed.activeStateChanged(false);
}
复制代码


2.2 生命周期感知源码分析


LifecycleBoundObserver 是 LifecycleEventObserver 的实现类,当宿主生命周期变化时,会回调其中的 LifecycleEventObserve#onStateChanged() 方法:

LiveData$ObserverWrapper.java


private abstract class ObserverWrapper {
    final Observer<? super T> mObserver;
    boolean mActive;
    // 观察者持有的版本号
    int mLastVersion = START_VERSION; // -1
    ObserverWrapper(Observer<? super T> observer) {
        mObserver = observer;
    }
    abstract boolean shouldBeActive();
    boolean isAttachedTo(LifecycleOwner owner) {
        return false;
    }
    void detachObserver() {
    }
    void activeStateChanged(boolean newActive) {
        // 同步宿主的生命状态
        if (newActive == mActive) {
            return;
        }
        mActive = newActive;
        changeActiveCounter(mActive ? 1 : -1);
        // STARTED 状态以上才会尝试分发数据
        if (mActive) {
            dispatchingValue(this);
        }
    }
}
复制代码


Livedata$LifecycleBoundObserver.java


// 注册方式:observe()
class LifecycleBoundObserver extends ObserverWrapper implements LifecycleEventObserver {
    @NonNull
    final LifecycleOwner mOwner;
    LifecycleBoundObserver(@NonNull LifecycleOwner owner, Observer<? super T> observer) {
        super(observer);
        mOwner = owner;
    }
    // 宿主的生命周期大于等于可见状态(STARTED),认为活动状态
    @Override
    boolean shouldBeActive() {
        return mOwner.getLifecycle().getCurrentState().isAtLeast(STARTED);
    }
    @Override
    public void onStateChanged(LifecycleOwner source, Lifecycle.Event event) {
        Lifecycle.State currentState = mOwner.getLifecycle().getCurrentState();
        // 宿主生命周期进入 DESTROYED 时,会移除观察者
        if (currentState == DESTROYED) {
            removeObserver(mObserver);
            return;
        }
        Lifecycle.State prevState = null;
        while (prevState != currentState) {
            prevState = currentState;
            // 宿主从非可见状态转为可见状态(STARTED)时,会尝试触发数据分发
            activeStateChanged(shouldBeActive());
            currentState = mOwner.getLifecycle().getCurrentState();
        }
    }
    @Override
    boolean isAttachedTo(LifecycleOwner owner) {
        return mOwner == owner;
    }
    @Override
    void detachObserver() {
        mOwner.getLifecycle().removeObserver(this);
    }
}
复制代码


AlwaysActiveObserver.java


// 注册方式:observeForever()
private class AlwaysActiveObserver extends ObserverWrapper {
    AlwaysActiveObserver(Observer<? super T> observer) {
        super(observer);
    }
    @Override
    boolean shouldBeActive() {
        return true;
    }
}
复制代码


2.3 同步设置数据的执行过程


LiveData 使用 setValue() 方法进行同步设置数据(必须在主线程调用),需要注意的是,设置数据后并不一定会回调 Observer#onChanged() 分发数据,而是需要同时 2 个条件:


  • 条件 1: 观察者绑定的生命周期处于活跃状态;
  • observeForever() 观察者:一直处于活跃状态;
  • observe() 观察者:owner 宿主生命周期处于活跃状态。
  • 条件 2: 观察者的持有的版本号小于 LiveData 的版本号时。

LiveData.java


// LiveData 持有的版本号
private int mVersion;
// 异步设置数据 postValue() 最终也是调用到 setValue()
@MainThread
protected void setValue(T value) {
    // 主线程检查
    assertMainThread("setValue");
    // 版本号加一
    mVersion++;
    mData = value;
    // 数据分发
    dispatchingValue(null);
}
// 数据分发
void dispatchingValue(ObserverWrapper initiator) {
    // 这里的标记位和嵌套循环是为了处理在 Observer#onChanged() 中继续调用 setValue(),
    // 而产生的递归设置数据的情况,此时会中断旧数据的分发,转而分发新数据,这是丢失数据的第 2 种情况。
    if (mDispatchingValue) {
        mDispatchInvalidated = true;
        return;
    }
    mDispatchingValue = true;
    do {
        mDispatchInvalidated = false;
        if (initiator != null) {
            // onStateChanged() 走这个分支,只需要处理单个观察者
            considerNotify(initiator);
            initiator = null;
        } else {
            // setValue() 走这个分支,需要遍历所有观察者
            for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator = mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
                considerNotify(iterator.next().getValue());
                if (mDispatchInvalidated) {
                    break;
                }
            }
        }
    } while (mDispatchInvalidated);
    mDispatchingValue = false;
}
// 尝试触发回调,只有观察者持有的版本号小于 LiveData 持有版本号,才会分发回调
private void considerNotify(ObserverWrapper observer) {
    // STARTED 状态以上才会尝试分发数据
    if (!observer.mActive) {
        return;
    }
    if (!observer.shouldBeActive()) {
        observer.activeStateChanged(false);
        return;
    }
    // 版本对比
    if (observer.mLastVersion >= mVersion) {
        return;
    }
    observer.mLastVersion = mVersion;
    // 分发回调
    observer.mObserver.onChanged((T) mData);
}
复制代码


总结一下回调 Observer#onChanged() 的情况:


  • 1、注册观察者时,观察者绑定的生命处于活跃状态,并且 LiveData 存在已设置的旧数据;
  • 2、调用 setValue() / postValue() 设置数据时,观察者绑定的生命周期处于活跃状态;
  • 3、观察者绑定的生命周期由非活跃状态转为活跃状态,并且 LiveData 存在未分发到该观察者的数据(即观察者持有的版本号小于 LiveData 持有的版本号);


提示: observeForever() 虽然没有直接绑定生命周期宿主,但可以理解为绑定的生命周期是全局的,因此在移除观察者之前都是活跃状态。


2.4 异步设置数据的执行过程


LiveData 使用 postValue() 方法进行异步设置数据(允许在子线程调用),内部会通过一个临时变量 mPendingData 存储数据,再通过 Handler 将切换到主线程并调用 setValue(临时变量)。因此,当在子线程连续 postValue() 时,可能会出现中间的部分数据不会被观察者接收到。


LiveData.java


final Object mDataLock = new Object();
static final Object NOT_SET = new Object();
// 临时变量
volatile Object mPendingData = NOT_SET;
private final Runnable mPostValueRunnable = new Runnable() {
    @SuppressWarnings("unchecked")
    @Override
    public void run() {
        Object newValue;
        synchronized (mDataLock) {
            newValue = mPendingData;
            // 重置临时变量
            mPendingData = NOT_SET;
        }
        // 真正修改数据的地方,也是统一到 setValue() 设置数据
        setValue((T) newValue);
    }
};
protected void postValue(T value) {
    boolean postTask;
    synchronized (mDataLock) {
        // 临时变量被重置时,才会发送修改的 Message,这是出现背压的第 1 种情况
        postTask = mPendingData == NOT_SET;
        mPendingData = value;
    }
    if (!postTask) {
        return;
    }
    ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable);
}
复制代码


总结一下 LiveData 可能丢失数据的场景,此时观察者可能不会接收到所有的数据:


  • 情况 1(背压问题): 使用 postValue() 异步设置数据,并且观察者的消费速度小于数据生产速度;
  • 情况 2: 在观察者处理回调(Observer#obChanged())的过程中重新设置新数据,此时会中断旧数据的分发,部分观察者将无法接收到旧数据;
  • 情况 3: 观察者绑定的生命周期处于非活跃状态时,连续使用 setValue() / postValue() 设置数据时,观察将无法接收到中间的数据。


注意: 丢失数据不一定是需要解决的问题,需要视场景分析。


2.5 LiveData 数据重放原因分析


LiveData 的数据重放问题也叫作数据倒灌、粘性事件,核心源码在 LiveData#considerNotify(Observer) 中:


  • 首先,LiveData 和观察者各自会持有一个版本号 version,每次 LiveData#setValue 或 postValue 后,LiveData 持有的版本号会自增 1。在 LiveData#considerNotify(Observer) 尝试分发数据时,会判断观察者持有版本号是否小于 LiveData 的版本号(Observer#mLastVersion >= LiveData#mVersion 是否成立),如果成立则说明这个观察者还没有消费最新的数据版本。
  • 而观察者的持有的初始版本号是 -1,因此当注册新观察者并且正好宿主的生命周期是大于等于可见状态(STARTED)时,就会尝试分发数据,这就是数据重放。

为什么 Google 要把 LiveData 设计为粘性呢?LiveData 重放问题需要区分场景来看 —— 状态适合重放,而事件不适合重放:

  • 当 LiveData 作为一个状态使用时,在注册新观察者时重放已有状态是合理的;
  • 当 LiveData 作为一个事件使用时,在注册新观察者时重放已经分发过的事件就是不合理的。


3. LiveData 数据重放问题的解决方案


这里我们总结一下业界提出处理 LiveData 数据重放问题的方案:


3.1 Event 事件包装器


实现一个事件包装器,内部使用一个标志位标记事件是否已经被消费过。这样的话,当观察者收到重放的数据时,由于其中的标记位已经显示被消费,因此会抛弃该事件。

不过,虽然这个方法能够解决数据倒灌问题,但是会有副作用:对于多个观察者的情况,只允许第一个观察者消费,而后续的观察者无法消费实现,这一般是不能满足需求的。


open class Event<out T>(private val content: T)
复制代码


3.2 SingleLiveData 事件包装器变型方案


SingeLiveData 是 Google 官方的方案,在 LiveData 内部通过一个原子标志位来标记事件是否已经被消费过。这个方法本质上和 Event 实现包装器是一样的,因此也存在完全相同的副作用。

SingleLiveEvent.java


public class SingleLiveEvent<T> extends MutableLiveData<T> {
    private static final String TAG = "SingleLiveEvent";
    // 消费标记位
    private final AtomicBoolean mPending = new AtomicBoolean(false);
    @MainThread
    public void observe(LifecycleOwner owner, final Observer<T> observer) {
        if (hasActiveObservers()) {
            Log.w(TAG, "Multiple observers registered but only one will be notified of changes.");
        }
        // Observe the internal MutableLiveData
        super.observe(owner, new Observer<T>() {
            @Override
            public void onChanged(@Nullable T t) {
                if (mPending.compareAndSet(true, false)) {
                    observer.onChanged(t);
                }
            }
        });
    }
    @MainThread
    public void setValue(@Nullable T t) {
        mPending.set(true);
        super.setValue(t);
    }
    /**
     * Used for cases where T is Void, to make calls cleaner.
     */
    @MainThread
    public void call() {
        setValue(null);
    }
}
复制代码


3.3 反射修改观察者版本号


业界分享出来的一个方案,不确定思路原创源。实现方法是在注册新观察者时,通过反射的手段将观察者持有的版本号(Observer#mLastVersion)同步为 LiveData 的版本号。缺点是使用反射,但确实能够解决多观察者问题。


private void hook(@NonNull Observer<T> observer) throws Exception {
    //get wrapper's version
    Class<LiveData> classLiveData = LiveData.class;
    Field fieldObservers = classLiveData.getDeclaredField("mObservers");
    fieldObservers.setAccessible(true);
    Object objectObservers = fieldObservers.get(this);
    Class<?> classObservers = objectObservers.getClass();
    Method methodGet = classObservers.getDeclaredMethod("get", Object.class);
    methodGet.setAccessible(true);
    Object objectWrapperEntry = methodGet.invoke(objectObservers, observer);
    Object objectWrapper = null;
    if (objectWrapperEntry instanceof Map.Entry) {
        objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();
    }
    if (objectWrapper == null) {
        throw new NullPointerException("Wrapper can not be bull!");
    }
    Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass();
    Field fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");
    fieldLastVersion.setAccessible(true);
    //get livedata's version
    Field fieldVersion = classLiveData.getDeclaredField("mVersion");
    fieldVersion.setAccessible(true);
    Object objectVersion = fieldVersion.get(this);
    //set wrapper's version
    fieldLastVersion.set(objectWrapper, objectVersion);
}
复制代码


3.4 UnPeekLiveData 反射方案优化


UnPeekLiveData 是 KunMinX 提出并开源的方案,主要思路是将 LiveData 源码中的 Observer#mLastVersion 和 LiveData#mVersion 在子类中重新实现一遍。在 UnPeekLiveData 中会有一个原子整型来标记数据版本,并且每个 Observer 在注册时会拿到当前 LiveData 的最新数据版本,而在 Observer#onChanged 中会对比两个版本号来决定是否分发。这个过程中没有使用反射,也不会存在不支持多观察者的问题。

ProtectedUnPeekLiveData.java


public class ProtectedUnPeekLiveData<T> extends LiveData<T> {
    private final static int START_VERSION = -1;
    private final AtomicInteger mCurrentVersion = new AtomicInteger(START_VERSION);
    protected boolean isAllowNullValue;
    @Override
    public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
        super.observe(owner, createObserverWrapper(observer, mCurrentVersion.get()));
    }
    @Override
    public void observeForever(@NonNull Observer<? super T> observer) {
        super.observeForever(createObserverWrapper(observer, mCurrentVersion.get()));
    }
    public void observeSticky(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
        super.observe(owner, createObserverWrapper(observer, START_VERSION));
    }
    public void observeStickyForever(@NonNull Observer<? super T> observer) {
        super.observeForever(createObserverWrapper(observer, START_VERSION));
    }
    @Override
    protected void setValue(T value) {
        mCurrentVersion.getAndIncrement();
        super.setValue(value);
    }
    class ObserverWrapper implements Observer<T> {
        private final Observer<? super T> mObserver;
        private int mVersion = START_VERSION;
        public ObserverWrapper(@NonNull Observer<? super T> observer, int version) {
            this.mObserver = observer;
            this.mVersion = version;
        }
        @Override
        public void onChanged(T t) {
            if (mCurrentVersion.get() > mVersion && (t != null || isAllowNullValue)) {
                mObserver.onChanged(t);
            }
        }
        @SuppressWarnings("unchecked")
        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            ObserverWrapper that = (ObserverWrapper) o;
            return Objects.equals(mObserver, that.mObserver);
        }
        @Override
        public int hashCode() {
            return Objects.hash(mObserver);
        }
    }
    @Override
    public void removeObserver(@NonNull Observer<? super T> observer) {
        if (observer.getClass().isAssignableFrom(ObserverWrapper.class)) {
            super.removeObserver(observer);
        } else {
            super.removeObserver(createObserverWrapper(observer, START_VERSION));
        }
    }
    private ObserverWrapper createObserverWrapper(@NonNull Observer<? super T> observer, int version) {
        return new ObserverWrapper(observer, version);
    }
    public void clear() {
        super.setValue(null);
    }
}
复制代码

UnPeekLiveData.java

public class UnPeekLiveData<T> extends ProtectedUnPeekLiveData<T> {
    @Override
    public void setValue(T value) {
        super.setValue(value);
    }
    @Override
    public void postValue(T value) {
        super.postValue(value);
    }
    public static class Builder<T> {
        private boolean isAllowNullValue;
        public Builder<T> setAllowNullValue(boolean allowNullValue) {
            this.isAllowNullValue = allowNullValue;
            return this;
        }
        public UnPeekLiveData<T> create() {
            UnPeekLiveData<T> liveData = new UnPeekLiveData<>();
            liveData.isAllowNullValue = this.isAllowNullValue;
            return liveData;
        }
    }
}
复制代码


3.5 Kotlin Flow


Google 对 Flow 的定位是 Kotlin 环境下对 LiveData 的替代品,使用 SharedFlow 可以控制重放数量,可以设置为 0 表示禁止重放。


4. 基于 LiveData 的事件总线 LiveDataBus


如果我们把事件理解为一种数据,LiveData 可以推数据自然也可以推事件,于是有人将 LiveData 封装为 “广播”,从而实现 “事件发送者” 和 “事件观察者” 的代码解耦,例如美团版本的 LiveDataBus。相较于 EventBus,LiveDataBus 实现更强的生命周期安全;相较于接口,LiveData 的约束力更弱。


4.1 LiveDataBus 什么场景适合?


无论是 EventBus 还是 LiveDataBus,它们本质上都是 “多对多的广播”,它们仅适合作为全局的事件通信,而页面内的事件通信应该继续采用 ViewModel + LiveData 等方案。这是因为事件总线缺乏 MVVM 模式建立的唯一可信源约束,事件发出后很难定位是哪个消息源推送出来的。


4.2 LiveDataBus 的实现


LiveDataBus 代码不多,核心在于使用哈希表保存事件名到 LiveData 的映射关系:

LiveDataBus.java


public final class LiveDataBus {
    // 事件名 - LiveData 哈希表
    private final Map<String, BusMutableLiveData<Object>> bus;
    private LiveDataBus() {
        bus = new HashMap<>();
    }
    // 全局单例模式
    private static class SingletonHolder {
        private static final LiveDataBus DEFAULT_BUS = new LiveDataBus();
    }
    public static LiveDataBus get() {
        return SingletonHolder.DEFAULT_BUS;
    }
    // 根据事件名映射 LiveData
    public <T> MutableLiveData<T> with(String key, Class<T> type) {
        if (!bus.containsKey(key)) {
            // 构造新的 LiveData 对象
            bus.put(key, new BusMutableLiveData<>());
        }
        return (MutableLiveData<T>) bus.get(key);
    }
    // 根据事件名映射 LiveData
    public MutableLiveData<Object> with(String key) {
        return with(key, Object.class);
    }
    private static class ObserverWrapper<T> implements Observer<T> {
        private Observer<T> observer;
        public ObserverWrapper(Observer<T> observer) {
            this.observer = observer;
        }
        @Override
        public void onChanged(@Nullable T t) {
            if (observer != null) {
                if (isCallOnObserve()) {
                    return;
                }
                observer.onChanged(t);
            }
        }
        private boolean isCallOnObserve() {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            if (stackTrace != null && stackTrace.length > 0) {
                for (StackTraceElement element : stackTrace) {
                    if ("android.arch.lifecycle.LiveData".equals(element.getClassName()) &&
                            "observeForever".equals(element.getMethodName())) {
                        return true;
                    }
                }
            }
            return false;
        }
    }
    private static class BusMutableLiveData<T> extends MutableLiveData<T> {
        private Map<Observer, Observer> observerMap = new HashMap<>();
        @Override
        public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
            super.observe(owner, observer);
            try {
                hook(observer);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        @Override
        public void observeForever(@NonNull Observer<T> observer) {
            if (!observerMap.containsKey(observer)) {
                observerMap.put(observer, new ObserverWrapper(observer));
            }
            super.observeForever(observerMap.get(observer));
        }
        @Override
        public void removeObserver(@NonNull Observer<T> observer) {
            Observer realObserver = null;
            if (observerMap.containsKey(observer)) {
                realObserver = observerMap.remove(observer);
            } else {
                realObserver = observer;
            }
            super.removeObserver(realObserver);
        }
        // 也可以使用其他方案
        private void hook(@NonNull Observer<T> observer) throws Exception {
            //get wrapper's version
            Class<LiveData> classLiveData = LiveData.class;
            Field fieldObservers = classLiveData.getDeclaredField("mObservers");
            fieldObservers.setAccessible(true);
            Object objectObservers = fieldObservers.get(this);
            Class<?> classObservers = objectObservers.getClass();
            Method methodGet = classObservers.getDeclaredMethod("get", Object.class);
            methodGet.setAccessible(true);
            Object objectWrapperEntry = methodGet.invoke(objectObservers, observer);
            Object objectWrapper = null;
            if (objectWrapperEntry instanceof Map.Entry) {
                objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();
            }
            if (objectWrapper == null) {
                throw new NullPointerException("Wrapper can not be bull!");
            }
            Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass();
            Field fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");
            fieldLastVersion.setAccessible(true);
            //get livedata's version
            Field fieldVersion = classLiveData.getDeclaredField("mVersion");
            fieldVersion.setAccessible(true);
            Object objectVersion = fieldVersion.get(this);
            //set wrapper's version
            fieldLastVersion.set(objectWrapper, objectVersion);
        }
    }
}
复制代码


使用 LiveDataBus:


LiveDataBus.get().with("key_test").setValue("");
LiveDataBus.get()
    .with("key_test", String.class)
    .observe(this, new Observer<String>() {
        @Override
        public void onChanged(@Nullable String s) {
        }
    });
复制代码


4.3 如何加强 LiveDataBus 事件约束


无论是 EventBus 还是 LiveDataBus 都没有对事件定义进行约束,不同开发者 / 不同组件可能会定义相同的事件字符串而导致冲突。


为了优化这个问题,可以使用美团 ModularEventBus 方案:用接口定义事件来实现强约束,在动态代理中取 接口名_方法名 作为事件名,再完成后续 LiveDataBus 的交互。

LiveDataBus.java


class LiveDataBus {
    fun <E> of(clz: Class<E>): E {
        if(!clz.isInterface){
            throw IllegalArgumentException("API declarations must be interfaces.")
        }
        if(0 < clz.interfaces.size){
            throw IllegalArgumentException("API interfaces must not extend other interfaces.")
        }
        return Proxy.newProxyInstance(clz.classLoader, arrayOf(clz), InvocationHandler { _, method, _->
            // 取“接口名_方法名”作为事件名,再转交给 LiveDataBus
            return@InvocationHandler get().with(
                "${clz.canonicalName}_${method.name}",
                (method.genericReturnType as ParameterizedType).actualTypeArguments[0].javaClass)
        }) as E
    }
}
复制代码


另外,事件接口可以交给 APT 注解处理器生成:通过 DemoEvent 定义事件名常量,用 APT 将事件名转换为事件接口的方法:

DemoEvents.java


//可以指定module,若不指定,则使用包名作为module名
@ModuleEvents()
public class DemoEvents {
    //不指定消息类型,那么消息的类型默认为Object
    public static final String EVENT1 = "event1";
    //指定消息类型为自定义Bean
    @EventType(TestEventBean.class)
    public static final String EVENT2 = "event2";
    //指定消息类型为java原生类型
    @EventType(String.class)
    public static final String EVENT3 = "event3";
}
复制代码


EventsDefineOfDemoEvents.java


package com.sankuai.erp.modularevent.generated.com.meituan.jeremy.module_b_export;
public interface EventsDefineOfDemoEvents extends com.sankuai.erp.modularevent.base.IEventsDefine {
    com.sankuai.erp.modularevent.Observable<java.lang.Object> EVENT1();
    com.sankuai.erp.modularevent.Observable<com.meituan.jeremy.module_b_export.TestEventBean> EVENT2();
    com.sankuai.erp.modularevent.Observable<java.lang.String> EVENT3();
}
复制代码


使用:


LiveDataBus
    .get()
    .of(EventsDefineOfDemoEvents::class.java)
    .EVENT1()
    .post(true)
LiveDataBus
    .get()
    .of(EventsDefineOfDemoEvents::class.java)
    .EVENT1()
    .observe(this, Observer {
        Log.i(LOG, it.toString())
    })
复制代码


image.png


image.png



—— 图片引用自美团技术博客


5. 总结


到这里,Jetpack 中的 LiveData 组件就讲完了,由于美团的 modular-event 并没有开源,下篇文章我们直接来做一次学习落地。关注我,带你了解更多。

目录
相关文章
|
8月前
|
前端开发 Java API
Jetpack MVVM 七宗罪之五: 在 Repository 中使用 LiveData
Jetpack MVVM 七宗罪之五: 在 Repository 中使用 LiveData
130 0
|
8月前
|
前端开发 JavaScript Android开发
Jetpack MVVM 七宗罪之四: 使用 LiveData/StateFlow 发送 Events
Jetpack MVVM 七宗罪之四: 使用 LiveData/StateFlow 发送 Events
201 0
|
6月前
|
XML 存储 API
Jetpack初尝试 NavController,LiveData,DataBing,ViewModel,Paging
Jetpack初尝试 NavController,LiveData,DataBing,ViewModel,Paging
|
Android开发
Android JetPack组件之LiveData的使用详解
Android JetPack组件之LiveData的使用详解
165 0
|
前端开发 数据处理 调度
Android 基于Jetpack LiveData实现消息总线
在Android开发中,跨页面传递数据(尤其是跨多个页面传递数据)是一个很常见的操作,可以通过Handler、接口回调等方式进行传递,但这几种方式都不太优雅,**消息总线**传递数据的方式相比更优雅。
358 0
|
存储 Android开发 数据格式
Android Jetpack系列之LiveData
**LiveData是一种可观察的数据存储类**。LiveData 具有生命周期感知能力,遵循其他应用组件(如 Activity、Fragment 或 Service)的生命周期。这种感知能力可确保 LiveData 仅更新处于活跃生命周期状态的Observer,非活跃状态下的Observer不会受到通知。
154 0
|
Android开发
安卓Jetpack狠活——Lifecycles与LiveData(二)
书接上回,我们看过了LiveData的使用,自然也就明白了这玩意虽然好,但不能处处到位,因为需要你自己去post后才能得到,那如何不用在子线程一直苦苦等待就能给人一种在实时更新的感觉呢?那自然要用到我们的狠活——Lifecycle。
144 0
|
Android开发
安卓Jetpack狠活——Lifecycles与LiveData(一)
今天在工作时,测试突然提了一个Bug给我,要求我将APP中某活动页面的UI界面要根据用户在由此页面跳转的下个页面操作,在返回时要实时更新。
119 0
|
设计模式 缓存 自然语言处理
Jetpack 系列(4)—— 有小伙伴说看不懂 LiveData、Flow、Channel,跟我走
Jetpack 系列(4)—— 有小伙伴说看不懂 LiveData、Flow、Channel,跟我走
911 0
Jetpack 系列(4)—— 有小伙伴说看不懂 LiveData、Flow、Channel,跟我走