前一节(Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow)介绍完了两种热流的构造方法以及它们的特点,那有没有方法可以将冷流转化为热流呢?当然是有的。那为什么需要将冷流转化为热流呢?
假如有这么一个场景:一开始有一个冷流 coldFlow 和它对应的消费者,后来下游又有几个新来的消费者需要使用这个 coldFlow,并且还需要之前已发送过的数据。而冷流的生产者与消费者是一对一的关系,且没有 replay
缓存机制,为新的消费者再创建一个冷流开销较大,这种情况下将冷流转为热流就显得事半功倍了。
1. shareIn 操作符
Flow 中的 shareIn
操作符就可以将冷流转为热流,它的方法声明是:
// code 1 public fun <T> Flow<T>.shareIn( scope: CoroutineScope, started: SharingStarted, replay: Int = 0 ): SharedFlow<T>
首先看返回值,最终确实会转化为一个热流 SharedFlow 实例。方法参数先来看最简单的 replay
参数,就是设置回播到每个新增消费者的数据个数,默认为 0。所以默认情况下,新增的消费者只能收到从它开始收集的时间点之后,生产者发送的数据。
再来看第一个 scope
参数,用于设置一个 CoroutineScope 作用域,注意其生命周期的长度需要比任何消费者都要长,保证被转化成的热流能在所有消费者收集数据进行消费时,都能处于活跃状态。新被转化的热流其实就是一个共享数据流,可以被所有的消费者共享使用。
第二个参数 started
复杂一些,它是用于设置被转化为共享数据流的启动方式,官方提供有 3 种方式,下面一个个说:
SharingStarted.Eagerly
勤快式启动方式。不等第一个消费者出现就会立即启动,需要注意的是,这种方式只会保留启动时数据流发送的前 replay
个数据,再之前的数据会立即丢弃。即不对数据流缓存区以外的数据负责,所以 replay
缓存区大小设置很重要。
SharingStarted.Lazily
懒汉式启动方式。需要等第一个消费者出现才会启动,第一个消费者可以接收到数据流所有发送的数据;但其他后面的消费者只能接收到最近的 replay
个数据。这种方式启动的数据流会一直保持活跃状态,甚至所有的的消费者都退出观察不再接收了,数据流仍然会缓存最近的 replay
个数据。
SharingStarted.WhileSubscribed()
灵活式启动方式。默认情况下就是有消费者来它就立即启动,没消费者接收了它就立即停止。所以在第一个消费者出现数据流就启动,当最后一个消费者退出它就立即停止,但它仍会永久缓存最近的 replay
个数据。此外,这种启动方式还可以根据需求自定义设置参数:
// code 2 public fun WhileSubscribed( stopTimeoutMillis: Long = 0, replayExpirationMillis: Long = Long.MAX_VALUE ): SharingStarted = StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
stopTimeoutMillis:设置最后一个消费者退出后,多长时间后再关闭数据流。默认是 0,即立即关闭。replayExpirationMillis:设置关闭流之后等待多长时间后,再重置清空缓存区 replay cache
的数据。默认是 Long.MAX_VALUE
,即永远保存。
自定义 SharingStarted
其实还可以自定义启动方式,自己实现 SharingStarted
接口即可。如果看了前三种启动方式的源码,不难会发现,其实启动方式都是使用固定的几个 SharingCommand
实现的。SharingCommand
有三种:
// code 3 public enum class SharingCommand { /** * 开始启动,并开始收集上游数据流. * 多次发送这个命令并没有什么用(支持防抖),如果先发送 STOP 再发送 START 则是重启一个上游数据流。 */ START, /** * 停止数据流, 取消上游数据流的收集所在协程。 */ STOP, /** * 停止数据流, 取消上游数据流的收集所在协程。并且将 replayCache 缓冲区的值重置为初始状态。 * 如果是 shareIn 操作符,则会调用 [MutableSharedFlow.resetReplayCache] 方法; * 如果是 stateIn 操作符,则会将缓冲数据重置为最初设置的初始值. */ STOP_AND_RESET_REPLAY_CACHE }
感兴趣的同学可以看看 SharingStarted.WhileSubscribed()
的具体实现类 StartedWhileSubscribed
里面的源码。如果需要自定义启动方式,照着葫芦画瓢即可。
既然有 shareIn
,那自然就少不了 stateIn
了。
2. stateIn 操作符
方法声明:
// code 4 public fun <T> Flow<T>.stateIn( scope: CoroutineScope, started: SharingStarted, initialValue: T ): StateFlow<T>
首先可以看出返回值是一个热流 StateFlow 实例,那么自然而然就需要一个参数给它设置一个初始值,即第三个参数 initialValue
。 前两个参数与 shareIn
一样,这里就不再赘述。
3. shareIn 与 stateIn 使用指北
3.1 SharingStarted.WhileSubscribed() 实际使用
从上面的介绍可知,这种启动方式可以在没有消费者时自动取消上游数据流,从而避免资源的浪费。但在实际使用中,建议使用 SharingStarted.WhileSubscribed(5000)
,即在最后一个消费者停止后再保持数据流 5 秒钟的活跃状态。避免在某些特定情况下(如配置改变——最常见就是横竖屏切换、暗夜模式切换)重启上游的数据流。
3.2 shareIn、stateIn 适用于属性声明而非方法返回值
shareIn
和 stateIn
都会创建一个新的数据流,具体说就是 shareIn
会构建一个 ReadonlySharedFlow 实例;stateIn
则会构建一个 ReadonlyStateFlow 实例。而新创建的数据流会一直保存在内存中,直到传入数据流的作用域被取消或者没有任何引用时才会被 GC 回收。
所以下面代码中,前一部分代码是禁止使用的,正确的使用应该是如后一部分的代码,即在属性中使用。
// code 5 //错误示例:每次调用方法都会构建新的数据流 fun getUser(): Flow<User> = userLocalDataSource.getUser() .shareIn(externalScope, WhileSubscribed()) //正确示例:在属性中使用 shareIn 或 stateIn val user: Flow<User> = userLocalDataSource.getUser().shareIn(externalScope, WhileSubscribed())
3.3 MutableSharedFlow 的 subscriptionCount 参数
这个参数表示的是 MutableSharedFlow 中活跃的消费者数目,即订阅者的个数。可用于监听消费者的数目变更,下面就是一个例子:
// code 6 sharedFlow.subscriptionCount .map { count -> count > 0 } // count > 0 说明有消费者,返回 true;= 0 说明没有消费者了,返回 false .distinctUntilChanged() // only react to true<->false changes .onEach { isActive -> // configure an action if (isActive) { // do something } else { // do something } } .launchIn(scope) // launch it
这个例子可以在有消费者收集数据流时,做一些自己的操作;当所有消费者都停止收集时,再处理另外的一些操作,比如资源回收等。
distinctUntilChanged
操作符比较面生,它就是过滤掉前面接收到的重复值,从而使得后面只会接收到发生了变化的新值,和 StateFlow 特性一样。
onEach
操作符也比较常见,可以在流上新增一些处理操作,再发给下游。
3.4 与操作符的搭配使用
如果在实际使用中,需要得知上游数据流的一些状态,比如开始、完成等,则需要在上游数据流转为热流之前添加一些操作符起到监听的作用。
onStart 操作符监听启动,onCompletion 操作符监听完成
// code 7 private fun shareInOnStartDemo() { val testFlow = flow { println("++++emit before") emit(4) delay(1000) emit(5) delay(1000) emit(6) }.onStart { emit(-1) println("++++ onStart") }.onCompletion { emit(-100) println("++++ onCompletion") }.shareIn( lifecycleScope, SharingStarted.WhileSubscribed(), 8 ) lifecycleScope.launch { testFlow.collect { println("++++ collector receive $it") } } }
从打印的 log 可以看到,确实可以监听状态。当然也可以在相同的位置添加 catch
操作符用于监听异常的发生,感兴趣的同学可以试试看。