lifecycleScope
刚才是在lifecycleScope
收集新闻流的,它是一个和生命周期对象绑定的协程域:
public val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope get() = lifecycle.coroutineScope public val Lifecycle.coroutineScope: LifecycleCoroutineScope get() { while (true) { // 获取现有 lifecycleScope val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl? if (existing != null) { return existing } // 若没有现成的,则构建 val newScope = LifecycleCoroutineScopeImpl( this, SupervisorJob() + Dispatchers.Main.immediate ) // 并通过 cas + 自旋的方式保证存入 mInternalScopeRef if (mInternalScopeRef.compareAndSet(null, newScope)) { // 开始观察生命周期变化 newScope.register() return newScope } } }
lifecycleScope 是一个LifecycleCoroutineScope
实例,并以 Lifecycle 对象的扩展属性存在。之所以能这样做是因为 Lifecycle 开了后门:
public abstract class Lifecycle { // 后门,方便在类的外存取“附加值” AtomicReference<Object> mInternalScopeRef = new AtomicReference<>(); }
这种动态为类新增属性的方法,在 Kotlin 源码中很常见,详解可以点击读源码长知识 | 动态扩展类并绑定生命周期的新方式。
新建 LifecycleCoroutineScope 实例后,会当场调用 register() 方法观察生命周期变化:
internal class LifecycleCoroutineScopeImpl( override val lifecycle: Lifecycle, override val coroutineContext: CoroutineContext ) : LifecycleCoroutineScope(), LifecycleEventObserver { fun register() { launch(Dispatchers.Main.immediate) { // 开始观察生命周期 if (lifecycle.currentState >= Lifecycle.State.INITIALIZED) { lifecycle.addObserver(this@LifecycleCoroutineScopeImpl) } else { coroutineContext.cancel() } } } override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) { // 当生命周期为 DESTROYED 时,取消观察并取消协程中 job 的执行 if (lifecycle.currentState <= Lifecycle.State.DESTROYED) { lifecycle.removeObserver(this) coroutineContext.cancel() } } }
lifecycleScope.launch() 会立刻启动协程,并在生命周期 DESTROYED 时取消协程。
当 Activity 被另一个 Activity 遮挡时并不会 DESTROYED,所以此时若有流数据推过来还是可以更新到界面,并导致 crash。
flowWithLifecycle()
为此官方提供了flowWithLifecycle()
:
public fun <T> Flow<T>.flowWithLifecycle( lifecycle: Lifecycle, minActiveState: Lifecycle.State = Lifecycle.State.STARTED ): Flow<T> = callbackFlow { lifecycle.repeatOnLifecycle(minActiveState) { this@flowWithLifecycle.collect { send(it) } } close() }
flowWithLifecycle() 内部生成了一个中间消费者callbackFlow
,中间消费者会将上游数据转发给下游,不过是有条件的,只有当生命周期满足要求时才会转发。
其中的 repeatOnLifecycle() 是 Lifecycle 的扩展方法:
public suspend fun Lifecycle.repeatOnLifecycle( state: Lifecycle.State, block: suspend CoroutineScope.() -> Unit ) { ... }
repeatOnLifecycle() 会在新的协程执行 block,当且仅当生命周期至少达到 state 状态,若生命周期未达标,则会取消 block 执行,若再次达标,则再次执行。
让 Flow 感知生命周期的写法如下:只有当生命周期满足要求时,才收集上游并转发给下游,否则取消收集:
class NewsActivity : AppCompatActivity() { private val newsViewModel by lazy { ViewModelProvider( this, NewsViewModelFactory(NewsRepo(this)) )[NewsViewModel::class.java] } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // 以感知生命周期的方式收集新闻流 lifecycleScope.launch { repeatOnLifecycle(Lifecycle.State.STARTED) { newsViewModel.newsFlow(1, 8).collect { showNews(it) } } } } }
嵌套回调出现了,看上去有点复杂。 还好有扩展方法,可以把这些细节隐藏起来:
// 用感知生命周期的方式收集流 fun <T> Flow<T>.collectIn( lifecycleOwner: LifecycleOwner, minActiveState: Lifecycle.State = Lifecycle.State.STARTED, action: (T) -> Unit ): Job = lifecycleOwner.lifecycleScope.launch { flowWithLifecycle(lifecycleOwner.lifecycle, minActiveState).collect(action) }
然后就可以像这样在界面中收集新闻流:
class NewsActivity : AppCompatActivity() { private val newsViewModel by lazy { ViewModelProvider( this, NewsViewModelFactory(NewsRepo(this)) )[NewsViewModel::class.java] } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) newsViewModel.newsFlow(1, 8).collectIn(this) { showNews(it) } } }
超简洁,把 LiveData 又比下去了~
这个方法需注意调用顺序,当不满足生命周期时,它只会取消订阅上游的数据,若下游还有另一流在生成数据,则无法感知生命周期。(封装的collectIn()
保证了它是收集数据前的最后一个操作符)
避免重复触发冷流
按照上面的写法,还是有问题。当从新闻界面跳转到另一个界面再返回时,会重新查数据库,重新请求网络。。。
因为 Repository 提供的数据库和网络流都是“冷流”。冷流只有被收集之后才会生产数据,且冷流是没有地方存数据的,当数据从上游经过若干个中间消费者最后传递给订阅者,数据被展示在界面上,但整个数据链路上没有一个地方把数据存了下了。
又因为使用了repeatOnLifecycle(Lifecycle.State.STARTED)
,所以从另一个界面返回时,重新订阅了冷流,那它就毫不留情地开始重新生产数据。
SharedFlow
对于这种场景,解决方案是:让冷流共享,即多个订阅也不会触发冷流重新生产数据,最好能让冷流的数据被缓存,这样就能将最新的数据粘性地传递给新订阅者。
SharedFlow
由此而生:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { val newsFlow(type: Int, count: Int) = flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count)) .flattenMerge() .transformWhile { emit(it.news) !it.abort } .map { NewsModel(it, false) } .flowOn(Dispatchers.IO) .catch { if (it is YourException) emit(NewsModel(emptyList(),false,"network error,show old news")) } // 将流转换为 SharedFlow .shareIn(viewModelScope, SharingStarted.Lazily) }
使用shareIn()
将冷流转换成共享热流:
public fun <T> Flow<T>.shareIn( scope: CoroutineScope, started: SharingStarted,// 启动策略 replay: Int = 0 // 缓存大小,默认不缓存(非粘性) ): SharedFlow<T> {...}
shareIn 是 Flow 的扩展方法:
started
参数是启动策略,它决定了上游流的生命周期,SharingStarted.Lazily
适用于当前的场景,即当共享热流有订阅者时才启动上游流,上游流将一直存活着。
replay
参数决定了缓存的大小,若为1,表示会缓存最新的1个值,当有新订阅者,会将缓存值分发给它,实现粘性效果(同 LiveData)。默认为0不缓存。
可以把 SharedFlow 想象成一个中间消费者,它收集上游流的数据并将其推送到热流中,然后将这些数据缓存并分享给所有的下游订阅者。
StateFlow
StateFlow 是一个特别的 SharedFlow,它是 Kotlin Flow 中更像 LiveData 的存在。因为:
- StateFlow 总是会缓存1个最新的数据,上游流产生新数据后就会覆盖旧值(LiveData 也是)。
- StateFlow 持有一个 value 字段,可通过
stateFlow.value
读取最新值(LiveData 也是)。
- StateFlow 是粘性的,会将缓存的最新值分发给新订阅者(LiveData 也是)。
- StateFlow 必须有一个初始值(LiveData 不是)。
- StateFlow 会过滤重复值,即新值和旧值相同时不更新。(LiveData 不是)。
可以使用stateIn()
重写新闻流:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { val newsFlow(type: Int, count: Int) = flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count)) .flattenMerge() .transformWhile { emit(it.news) !it.abort } .map { NewsModel(it, false) } .flowOn(Dispatchers.IO) .catch { if (it is YourException) emit(NewsModel(emptyList(),false,"network error,show old news")) } // 将流转换为 StateFlow .stateIn(viewModelScope, SharingStarted.Lazily, NewsModel(emptyList(), true)) }
stateIn() 中的第三个参数就是必须有的初始值,当 Repository 的原始数据流未生成数据时,初始值就已经推送给了订阅者,界面可以借此展示 loading。
若使用 shareIn(),则可以这样展示 loading:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { val newsFlow(type: Int, count: Int) = flowOf(newsRepo.localNewsFlow, newsRepo.remoteNewsFlow(type, count)) .flattenMerge() .transformWhile { emit(it.news) !it.abort } .map { NewsModel(it, false) } .flowOn(Dispatchers.IO) .onStart { emit(NewsModel(emptyList(), true)) }// 展示loading .catch { if (it is YourException) emit(NewsModel(emptyList(),false,"network error,show old news")) } // 将流转换为 SharedFlow .shareIn(viewModelScope, SharingStarted.Lazily) }
使用onStart()
,它会在流被收集时立刻发生一个数据。
到底使用 StateFlow 还是 SharedFlow?得看场景:
- 当需在流以外的地方访问流的最新值,则用 StateFlow。
- 当需过滤重复值,则用 StateFlow(在 SharedFlow 上用 distinctUntilChanged() 效果相同)。
- 在需粘性的场景下,则用 StateFlow(将 SharedFlow 的 replay 置为1效果相同)。
我试图找到更多使用 StateFlow 的理由,但就像你看到的那样,大部分理由都不充分。只有第一个场景下,必用 StateFlow 不可。其他都可用 SharedFlow 代替,而且后者提供了更大的灵活性。
MVI 化
上面的代码已经比较接近 MVI 的模样了。
MVI 有三个关键词:响应式编程 + 单向数据流 + 唯一可信数据源。
关于 MVI 的剖析可以点击Android 架构最新进展 | MVI = 响应式编程 + 单向数据流 + 唯一可信数据源 - 掘金
现援引“单向数据流”图片如下:
界面产生的数据叫事件(意图)Intent
,它流向 ViewModel,经加工后转换成 状态State
供界面刷新。
sealed class FeedsIntent { // Feeds 初始化 data class InitIntent(val type: Int, val count: Int) : FeedsIntent() // Feeds 加载更多 data class MorePageIntent(val timestamp: Long, val count: Int) : FeedsIntent() // 删除某个帖子 data class RemoveIntent(val id: Long) : FeedsIntent() }
原本界面发起的事件是通过 ViewModel 的一个方法调用传递的。为了使用响应式编程形成数据流,得把函数调用用“数据”的形式包装起来。
事件产生自界面,所以事件流理所当然在界面组织:
class StateFlowActivity : AppCompatActivity() { private val refreshLayout: RefreshLayout // 在界面层组织事件流 private val intents by lazy { merge( // 加载 Feeds 首页事件 flowOf(FeedsIntent.InitIntent(1, 5)) // 加载更多 Feeds 事件 loadMoreFeedsFlow() ) } private fun loadMoreFeedsFlow(): Flow<FeedsIntent> = callbackFlow { refreshLayout.setOnRefreshListener { trySend(FeedsIntent.MorePageIntent) } awaitClose() } }
上述代码包含了两个事件,分别是加载首页和加载更多,它俩都被组织成流,并使用 merge 进行合流,merge 会将每个 Flow 中的数据合起来并发地转发到一个新的流上。
当流被订阅后,加载首页的事件会立刻产生并无条件的分发给下游,而加载更多事件需等待上拉动作发生时才会生成。
class StateFlowActivity : AppCompatActivity() { private val newsViewModel by lazy { ViewModelProvider( this, NewsViewModelFactory(NewsRepo(this)) )[NewsViewModel::class.java] } private val intents by lazy { merge( flowOf(FeedsIntent.InitIntent(1, 5)) loadMoreFeedsFlow() ) } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // 订阅事件流,将事件传递给 ViewModel intents .onEach(newsViewModel::send) // .onEach { newsViewModel.send(it) } 效果一样 .launchIn(lifecycleScope) } }
在 onCreate() 订阅事件流,每产生一个事件都会调用 NewsViewModel.send() 方法将事件传递给 ViewModel。其中::
用于将一个方法变为 lambda,方法就可以作为参数传给另一个方法,以简化代码。
NewsViewModel.send() 方法定义如下:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { // 用于接收界面事件的共享流 private val _feedsIntent = MutableSharedFlow<FeedsIntent>() // 界面事件唯一入口,向流中发送事件 fun send(intent: FeedsIntent) { viewModelScope.launch { _feedsIntent.emit(intent) } } }
现在界面事件已经以数据流Flow<FeedsIntent>
的方式流入了 ViewModel,下一步就是在流上进行数据变换,即流入的是 Intent,流出的是 State。遂定义一个将Flow<FeedsIntent>
转化成Flow<NewsState>
的扩展方法:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { // 将事件转换成状态(NewsState即是上面的NewsModel,换了个名字而已) private fun Flow<FeedsIntent>.toNewsStateFlow(): Flow<NewsState> = merge( // 加载首页事件处理 filterIsInstance<FeedsIntent.InitIntent>() .flatMapConcat { it.toFetchInitPageFlow() }, // 删除帖子事件处理 filterIsInstance<FeedsIntent.RemoveIntent>() .flatMapConcat { ... }, // 加载更多事件处理 filterIsInstance<FeedsIntent.MorePageIntent>() .flatMapConcat { ... } ) }
每一个上游的FeedsIntent
都会在这里被转换成一个Flow<NewsState>
,就形成了Flow<Flow<NewsState>>
这样的结构,然后用 flatMapConcat() 将其展平变成Flow<NewsState>
。
由于有多种事件,遂使用 filterIsInstance() 按事件类型过滤,实现了事件分流,即是用流的方式写 if-else。
其中toFetchInitPageFlow()
描述了如何将加载首页事件转换成Flow<NewsState>
:
// NewsViewModel.kt private fun FeedsIntent.InitIntent.toFetchInitPageFlow() = flowOf( newsRepo.localNewsOneShotFlow, newsRepo.remoteNewsFlow(this.type, this.count) ) .flattenMerge() .transformWhile { emit(it.news) !it.abort } .map { NewsState(it, false) } .onStart { emit(NewsState(emptyList(), true)) } .catch { if (it is SSLHandshakeException) emit( NewsState( emptyList(), false, "network error,show old news" ) ) }
转化的方法即是拉取数据库以及网络(就是把之前定义好的数据库网络合流拿过来)。
是时候把事件流以及它的变换操作合起来了:
class NewsViewModel(private val newsRepo: NewsRepo) : ViewModel() { // 事件流 private val _feedsIntent = MutableSharedFlow<FeedsIntent>() // 状态流 val newsState = _feedsIntent .toNewsStateFlow() // 将事件流转换成状态流 .flowOn(Dispatchers.IO) // 异步地进行变换操作 .shareIn(viewModelScope, SharingStarted.Eagerly) // 将流转换成共享流以供界面订阅 }
最后界面观察状态流:
class StateFlowActivity : AppCompatActivity() { private val newsViewModel by lazy { ViewModelProvider( this, NewsViewModelFactory(NewsRepo(this)) )[NewsViewModel::class.java] } // 组织界面事件 private val intents by lazy { merge( flowOf(FeedsIntent.InitIntent(1, 5)) loadMoreFeedsFlow() ) } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // 数据流起点:发送事件 intents .onEach(newsViewModel::send) .launchIn(lifecycleScope) // 数据流终点:消费状态 newsViewModel.newsState .collectIn(this) { showNews(it) } } // 渲染界面 private fun showNews(newsModel: NewsState) { when { newsModel.loading -> { showLoading() } newsModel.errorMessage.isEmpty() -> { dismissLoading() newsAdapter.news = newsModel.news rvNews.adapter = newsAdapter } else -> { dismissLoading() tv.text = newsModel.errorMessage } } } }
(这里的 MVI 是一个半成品,比如该代码结构就无法实现“上拉加载更多”这个需求,后续文章会在此基础上做重构,欢迎持续关注~)
LiveData vs Flow
LiveData 面试题库、解答、源码分析 这里详尽地分析了 LiveData 的原理及使用过程中的坑。
对于承载数据来说,Kotlin Flow 相较于 LiveData 只能说有过之而无不及:
- LiveData 不能方便地支持异步化。
- LiveData 粘性问题的解决方案虽然很多,但用起来都很变扭。
- LiveData 可能发生数据丢失的情况。
- LiveData 的数据变换能力远远不如 Flow。
- LiveData 多数据源的合流能力远远不如 Flow。
除此之外,Flow 还有一点非常吸引人,那就是 简洁,Flow 可以用及其轻松简单的方式实现复杂的效果,代码的复杂度斗降,可读性斗升。更重要的是,这是大势所趋,还在犹豫什么~
参考
Substituting LiveData: StateFlow or SharedFlow? | ProAndroidDev
A safer way to collect flows from Android UIs | by Manuel Vivo | Android Developers | Medium
kotlinx.coroutines/flow.md at …
Migrating from LiveData to Kotlin’s Flow | by Jose Alcérreca | Android Developers | Medium
Exceptions in Kotlin Flows. Kotlin Flow can complete normally or… | by Roman Elizarov | Medium
Flow.transformWhile operator · Issue #2065 · Kotlin/kotlinx.cor…
Merging kotlin flows - Stack Overflow
Model-View-Intent Design Pattern on Android - xizzhu
推荐阅读
Kotlin 协程 | CoroutineContext 为什么要设计成 indexed set?(一)
我是怎么把业务代码越写越复杂的 | MVP - MVVM - Clean Architecture
Android 架构之 MVI 雏形 | 响应式编程 + 单向数据流 + 唯一可信数据源
Android 架构之 MVI 初级体 | Flow 替换 LiveData 重构数据链路
Android 架构之 MVI 完全体 | 重新审视 MVVM 之殇(PartialChange & Reducer)