要说最近圈内大事件,那就非 chatGPT 莫属了!人工智能领域最新的大突破了吧?很可能引发下一场的技术革命,因为大家都懂的原因现在还不能在中国大陆使用,不过国内的度厂正在积极跟进了,预计3月份能面世,且期待一下吧~
上节主要讲述了 Flow 的组成、Flow 常用操作符以及冷流的具体使用。这节自然就要介绍热流了。先来温习下:
冷流(Cold Flow):在数据被消费者订阅后,即调用
collect
方法之后,生产者才开始执行发送数据流的代码,通常是调用emit
方法。即不消费,不生产,多次消费才会多次生产。消费者和生产者是一对一的关系。
上次说的例子不太直观,所以这次换了个更直观的对比例子,先来看第一个:
//code 1 val coldFlow = flow { println("coldFlow begin emitting") emit(40) println("coldFlow 40 is emitted") emit(50) println("coldFlow 50 is emitted") } binding.btn2.setOnClickListener { lifecycleScope.launch { coldFlow.collect { println("coldFlow = $it") } } }
只有当点击按钮时,才会如图打印出信息,即冷流只有调用了 collect
方法收集流后,emit
才会开始执行。
热流(Hot Flow)就不一样了,无论有无消费者,生产者都会生产数据。它不像冷流,Flow 必须在调用末端操作符之后才会去执行;而是可以自己控制是否发送或者生产数据流。并且热流可以有多个订阅者;而冷流只有一个。再来看看热流的例子:
//code 2 val hotFlow = MutableStateFlow(0) lifecycleScope.launch { println("hotFlow begin emitting") hotFlow.emit(40) println("hotFlow 40 is emitted") hotFlow.emit(50) println("hotFlow 50 is emitted") } binding.btn2.setOnClickListener { lifecycleScope.launch { hotFlow.collect { println("hotFlow collects $it") } } }
MutableStateFlow 就是热流中的一种,当没有点击按钮时,便会输出下图中的前三行信息。
当点击两下按钮后,就会依次输出如图第 4,5 行的信息,至于为什么只会接收到 50,这跟 MutableStateFlow 的特性有关,后面再说。
通过这两个例子就可清楚地知道冷热流之间的区别。热流有两种对象,分别是 StateFlow 和 SharedFlow。
1. SharedFlow
先来看看 SharedFlow,它是一个 subscriber 订阅者的角色,当一个 SharedFlow 调用了 collect
方法后,它就不会正常地结束完成;但可以 cancel 掉 collect
所在的协程,这样就可以取消掉订阅了。SharedFlow 在每次 emit
时都会去 check 一下所在协程是否已经取消。绝大多数的终端操作符,例如 Flow.toList()
都不会使得 SharedFlow 结束完成,但 Flow.take()
之类的截断操作符是例外,它们是可以强制完成一个 SharedFlow 的。
SharedFlow 的简单使用样例:
//code 3 class EventBus { private val _events = MutableSharedFlow<Event>() // private mutable shared flow val events = _events.asSharedFlow() // publicly exposed as read-only shared flow suspend fun produceEvent(event: Event) { _events.emit(event) // suspends until all subscribers receive it } }
与 LiveData 相似的使用方式。但 SharedFlow 的功能更为强大,它有 replay cache 和 buffer 机制。
1.1 Replay cache
可以理解为是一个粘性事件的缓存。每个新的订阅者会首先收到 replay cache 中之前发出并接收到的事件,再才会收到新的发射出的值。可以在 MutableSharedFlow 的构造函数中设置 cache 的大小,不能为负数,默认为 0.
//code 4 public fun <T> MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND )
replay 重播之前最新的 n 个事件,见字知义。下面是例子:
//code 5 private fun testSharedFlow() { val sharedFlow = MutableSharedFlow<Int>(replay = 2) lifecycleScope.launch { launch { sharedFlow.collect { println("++++ sharedFlow1 collected $it") } } launch { (1..3).forEach{ sharedFlow.emit(it) } } delay(200) launch { sharedFlow.collect { println("++++ sharedFlow2 collected $it") } } } }
结果为:
com.example.myapplication I/System.out: ++++ sharedFlow1 collected 1 com.example.myapplication I/System.out: ++++ sharedFlow1 collected 2 com.example.myapplication I/System.out: ++++ sharedFlow1 collected 3 com.example.myapplication I/System.out: ++++ sharedFlow2 collected 2 com.example.myapplication I/System.out: ++++ sharedFlow2 collected 3
在 emit
发射数据前后分别设置了一个订阅者,后面还延时了 200ms 才进行订阅。第一个订阅者 1、2、3都收到了;而第二个订阅者却只收到了 2 和 3. 这是因为在第二个订阅者开始订阅时,数据已经都发射完了,而 SharedFlow 的重播 replay 为 2,就可将最近发射的两个数据再依次发送一遍,这就可以收到 2 和 3 了。
1.2 extraBufferCapacity
SharedFlow 构造函数的第二个参数 extraBufferCapacity
的作用是,在 replay cache 之外还能额外设置的缓存。常用于当生产者生产数据的速度 > 消费者消费数据的速度时的情况,可以有效提升吞吐量。
所以,若 replay = m,extraBufferCapacity = n
,那么这个 SharedFlow 总共的 BufferSize = m + n. replay 会存储最近发射的数据,如果满了就会往 extraBuffer
中存。接下来看一个例子:
//code 6 private fun coroutineStudy() { val sharedFlow = MutableSharedFlow<Int>(replay = 1, extraBufferCapacity = 1) lifecycleScope.launch { launch { sharedFlow.collect { println("++++ sharedFlow1 collected $it") delay(6000) } } launch { (1..4).forEach{ sharedFlow.emit(it) println("+++emit $it") delay(1000) } } delay(4000) launch { sharedFlow.collect { println("++++ sharedFlow2 collected $it") delay(20000) } } } }
运行结果为:
17:32:09.283 28184-28184 System.out com.wen.testdemo I +++emit 1 17:32:09.284 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 1 17:32:10.285 28184-28184 System.out com.wen.testdemo I +++emit 2 17:32:11.289 28184-28184 System.out com.wen.testdemo I +++emit 3 17:32:13.286 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow2 collected 3 17:32:15.292 28184-28184 System.out com.wen.testdemo I +++emit 4 17:32:15.293 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 2 17:32:21.301 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 3 17:32:27.311 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 4 17:32:33.292 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow2 collected 4
打印结果可能会有点懵,对照着时序图更容易理解(此图来自于参考文献3,感谢 fundroid 大佬的输出~):
1)Emitter 发送 1,因为 Subscriber1 在 Emitter 发送数据前就已开始订阅,所以 Subscriber1 可马上接收;此时 replay
存储 1;
2)Emitter 发送 2,Subscriber1 还在处理中处于挂起态,此时 replay
存储 2;
3)Emitter 发送 3,此时还没有任何消费者能消费,则 replay
存储 3,将 2 放入 extra
中;
4)Emitter 想要发送 4,但发现 SharedFlow 的 Buffer 已满,则按照默认的策略进行挂起等待(默认策略就是 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND);
5)Subscriber2 开始订阅,接收到 replay
中的 3,此时 Subscriber1 还是挂起态,Buffer 中数据没变化,即 replay
存储 3,extra
存储 2;
6)Subscriber1 处理完 1 后,依次处理 Buffer 中 的下一个数据,即消费 extra
中的 2,这时 Buffer 终于有空间了,Emitter 结束挂起,发送 4,replay
存储 4,将 3 放入 extra
中;
7)Subscriber1 消费完 2 后接着再消费 extra
中的 3,此时 Buffer 中就只有 4 了。后面的就不用多说了
比较绕,需要多看几次思考一下。需要注意的是,代码运行结果中下面两行输出到底谁先谁后的问题:
17:32:15.292 28184-28184 System.out com.wen.testdemo I +++emit 4 17:32:15.293 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 2