Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow(上)

简介: Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow(上)

要说最近圈内大事件,那就非 chatGPT 莫属了!人工智能领域最新的大突破了吧?很可能引发下一场的技术革命,因为大家都懂的原因现在还不能在中国大陆使用,不过国内的度厂正在积极跟进了,预计3月份能面世,且期待一下吧~

上节主要讲述了 Flow 的组成、Flow 常用操作符以及冷流的具体使用。这节自然就要介绍热流了。先来温习下:

冷流(Cold Flow):在数据被消费者订阅后,即调用 collect 方法之后,生产者才开始执行发送数据流的代码,通常是调用 emit 方法。即不消费,不生产,多次消费才会多次生产。消费者和生产者是一对一的关系。

上次说的例子不太直观,所以这次换了个更直观的对比例子,先来看第一个:

//code 1
val coldFlow = flow {
    println("coldFlow begin emitting")
    emit(40)
    println("coldFlow 40 is emitted")
    emit(50)
    println("coldFlow 50 is emitted")
}
binding.btn2.setOnClickListener {
    lifecycleScope.launch {
        coldFlow.collect {
            println("coldFlow = $it")
        }
    }
}

只有当点击按钮时,才会如图打印出信息,即冷流只有调用了 collect 方法收集流后,emit 才会开始执行。

image.png

热流(Hot Flow)就不一样了,无论有无消费者,生产者都会生产数据。它不像冷流,Flow 必须在调用末端操作符之后才会去执行;而是可以自己控制是否发送或者生产数据流。并且热流可以有多个订阅者;而冷流只有一个。再来看看热流的例子:

//code 2
val hotFlow = MutableStateFlow(0)
lifecycleScope.launch {
    println("hotFlow begin emitting")
    hotFlow.emit(40)
    println("hotFlow 40 is emitted")
    hotFlow.emit(50)
    println("hotFlow 50 is emitted")
}
binding.btn2.setOnClickListener {
    lifecycleScope.launch {
        hotFlow.collect {
            println("hotFlow collects $it")
        }
    }
}

MutableStateFlow 就是热流中的一种,当没有点击按钮时,便会输出下图中的前三行信息。

image.png

当点击两下按钮后,就会依次输出如图第 4,5 行的信息,至于为什么只会接收到 50,这跟 MutableStateFlow 的特性有关,后面再说。

通过这两个例子就可清楚地知道冷热流之间的区别。热流有两种对象,分别是 StateFlow 和 SharedFlow。


1. SharedFlow


先来看看 SharedFlow,它是一个 subscriber 订阅者的角色,当一个 SharedFlow 调用了 collect 方法后,它就不会正常地结束完成;但可以 cancel 掉 collect 所在的协程,这样就可以取消掉订阅了。SharedFlow 在每次 emit 时都会去 check 一下所在协程是否已经取消。绝大多数的终端操作符,例如 Flow.toList() 都不会使得 SharedFlow 结束完成,但 Flow.take() 之类的截断操作符是例外,它们是可以强制完成一个 SharedFlow 的。

SharedFlow 的简单使用样例:

//code 3
class EventBus {
    private val _events = MutableSharedFlow<Event>() // private mutable shared flow
    val events = _events.asSharedFlow() // publicly exposed as read-only shared flow
    suspend fun produceEvent(event: Event) {
        _events.emit(event) // suspends until all subscribers receive it
    }
}

与 LiveData 相似的使用方式。但 SharedFlow 的功能更为强大,它有 replay cache 和 buffer 机制。


1.1 Replay cache


可以理解为是一个粘性事件的缓存。每个新的订阅者会首先收到 replay cache 中之前发出并接收到的事件,再才会收到新的发射出的值。可以在 MutableSharedFlow 的构造函数中设置 cache 的大小,不能为负数,默认为 0.

//code 4
public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)

replay 重播之前最新的 n 个事件,见字知义。下面是例子:

//code 5
private fun testSharedFlow() {
    val sharedFlow = MutableSharedFlow<Int>(replay = 2)
    lifecycleScope.launch {
        launch {
            sharedFlow.collect {
                println("++++ sharedFlow1 collected $it")
            }
        }
        launch {
            (1..3).forEach{
                sharedFlow.emit(it)
            }
        }
        delay(200)
        launch {
            sharedFlow.collect {
                println("++++ sharedFlow2 collected $it")
            }
        }
    }
}

结果为:

com.example.myapplication I/System.out: ++++ sharedFlow1 collected 1
com.example.myapplication I/System.out: ++++ sharedFlow1 collected 2
com.example.myapplication I/System.out: ++++ sharedFlow1 collected 3
com.example.myapplication I/System.out: ++++ sharedFlow2 collected 2
com.example.myapplication I/System.out: ++++ sharedFlow2 collected 3

emit 发射数据前后分别设置了一个订阅者,后面还延时了 200ms 才进行订阅。第一个订阅者 1、2、3都收到了;而第二个订阅者却只收到了 2 和 3. 这是因为在第二个订阅者开始订阅时,数据已经都发射完了,而 SharedFlow 的重播 replay 为 2,就可将最近发射的两个数据再依次发送一遍,这就可以收到 2 和 3 了。


1.2 extraBufferCapacity


SharedFlow 构造函数的第二个参数 extraBufferCapacity 的作用是,在 replay cache 之外还能额外设置的缓存。常用于当生产者生产数据的速度 > 消费者消费数据的速度时的情况,可以有效提升吞吐量。

所以,若 replay = m,extraBufferCapacity = n,那么这个 SharedFlow 总共的 BufferSize = m + n.  replay 会存储最近发射的数据,如果满了就会往 extraBuffer 中存。接下来看一个例子:

//code 6
private fun coroutineStudy() {
    val sharedFlow = MutableSharedFlow<Int>(replay = 1, extraBufferCapacity = 1)
    lifecycleScope.launch {
        launch {
            sharedFlow.collect {
                println("++++ sharedFlow1 collected $it")
                delay(6000)
            }
        }
        launch {
            (1..4).forEach{
                sharedFlow.emit(it)
                println("+++emit $it")
                delay(1000)
            }
        }
        delay(4000)
        launch {
            sharedFlow.collect {
                println("++++ sharedFlow2 collected $it")
                delay(20000)
            }
        }
    }
}

运行结果为:

17:32:09.283 28184-28184 System.out com.wen.testdemo I  +++emit 1
17:32:09.284 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow1 collected 1
17:32:10.285 28184-28184 System.out com.wen.testdemo I  +++emit 2
17:32:11.289 28184-28184 System.out com.wen.testdemo I  +++emit 3
17:32:13.286 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow2 collected 3
17:32:15.292 28184-28184 System.out com.wen.testdemo I  +++emit 4
17:32:15.293 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow1 collected 2
17:32:21.301 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow1 collected 3
17:32:27.311 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow1 collected 4
17:32:33.292 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow2 collected 4

打印结果可能会有点懵,对照着时序图更容易理解(此图来自于参考文献3,感谢 fundroid 大佬的输出~):

image.png

1)Emitter 发送 1,因为 Subscriber1 在 Emitter 发送数据前就已开始订阅,所以 Subscriber1 可马上接收;此时 replay 存储 1;

2)Emitter 发送 2,Subscriber1 还在处理中处于挂起态,此时 replay 存储 2;

3)Emitter 发送 3,此时还没有任何消费者能消费,则 replay 存储 3,将 2 放入 extra 中;

4)Emitter 想要发送 4,但发现 SharedFlow 的 Buffer 已满,则按照默认的策略进行挂起等待(默认策略就是 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND);

5)Subscriber2 开始订阅,接收到 replay 中的 3,此时 Subscriber1 还是挂起态,Buffer 中数据没变化,即 replay 存储 3,extra 存储 2;

6)Subscriber1 处理完 1 后,依次处理 Buffer 中 的下一个数据,即消费 extra 中的 2,这时 Buffer 终于有空间了,Emitter 结束挂起,发送 4,replay 存储 4,将 3 放入 extra 中;

7)Subscriber1 消费完 2 后接着再消费 extra 中的 3,此时 Buffer 中就只有 4 了。后面的就不用多说了

比较绕,需要多看几次思考一下。需要注意的是,代码运行结果中下面两行输出到底谁先谁后的问题:

17:32:15.292 28184-28184 System.out com.wen.testdemo I  +++emit 4
17:32:15.293 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow1 collected 2


目录
相关文章
|
2月前
|
移动开发 安全 Android开发
构建高效Android应用:Kotlin协程的实践与优化策略
【5月更文挑战第30天】 在移动开发领域,性能优化始终是关键议题之一。特别是对于Android开发者来说,如何在保证应用流畅性的同时,提升代码的执行效率,已成为不断探索的主题。近年来,Kotlin语言凭借其简洁、安全和实用的特性,在Android开发中得到了广泛的应用。其中,Kotlin协程作为一种新的并发处理机制,为编写异步、非阻塞性的代码提供了强大工具。本文将深入探讨Kotlin协程在Android开发中的应用实践,以及如何通过协程优化应用性能,帮助开发者构建更高效的Android应用。
|
14天前
|
消息中间件 缓存 API
Kotlin中的StateFlow和SharedFlow有什么区别?
- `StateFlow`持有一个最新的状态值,适合UI状态管理,新订阅者立即收到当前值。 - `SharedFlow`是通用热流,用于事件总线,不保留最新状态但可配置重播。 - `StateFlow`继承自`SharedFlow`,更专注于状态管理,而`SharedFlow`提供事件处理灵活性。 - 示例中展示了如何`emit`新值和`collect`变化。 - 选择`StateFlow`用于单最新状态共享,`SharedFlow`则适用于需要事件历史或定制策略的场景。 关注公众号“AntDream”了解更多内容!
20 1
|
2天前
|
安全 Kotlin
Kotlin中的安全导航操作符?.、空合并运算符?:以及let函数的实践与理解
Kotlin中的安全导航操作符?.、空合并运算符?:以及let函数的实践与理解
3 0
|
2月前
|
API 调度 Android开发
打造高效Android应用:探究Kotlin协程的优势与实践
【5月更文挑战第27天】在移动开发领域,性能优化和响应速度是衡量应用质量的关键因素。随着Kotlin语言的普及,协程作为其核心特性之一,为Android开发者提供了一种全新的并发处理方式。本文深入探讨了Kotlin协程在Android应用开发中的优势,并通过实例演示如何在实际项目中有效利用协程提升应用性能和用户体验。
|
2月前
|
移动开发 Android开发 开发者
构建高效Android应用:探究Kotlin协程的优势与实践
【5月更文挑战第21天】在移动开发领域,性能优化和流畅的用户体验是至关重要的。随着Kotlin语言在Android平台的广泛采纳,其并发处理的强大工具—协程(Coroutines),已成为提升应用响应性和效率的关键因素。本文将深入分析Kotlin协程的核心原理,探讨其在Android开发中的优势,并通过实例演示如何有效利用协程来优化应用性能,打造更加流畅的用户体验。
30 4
|
2月前
|
物联网 区块链 Android开发
构建高效Android应用:Kotlin与Jetpack的实践之路未来技术的融合潮流:区块链、物联网与虚拟现实的交汇点
【5月更文挑战第30天】 在移动开发领域,效率和性能始终是开发者追求的核心。随着技术的不断进步,Kotlin语言以其简洁性和现代化特性成为Android开发的新宠。与此同时,Jetpack组件为应用开发提供了一套经过实践检验的库、工具和指南,旨在简化复杂任务并帮助提高应用质量。本文将深入探索如何通过Kotlin结合Jetpack组件来构建一个既高效又稳定的Android应用,并分享在此过程中的最佳实践和常见陷阱。
|
2月前
|
运维 监控 Android开发
构建高效自动化运维系统的策略与实践构建高效Android应用:Kotlin协程的实践指南
【5月更文挑战第29天】随着信息技术的迅猛发展,企业IT基础设施变得日益复杂,传统的手动运维模式已难以满足高效率、高稳定性的要求。本文将深入探讨如何通过自动化工具和策略来构建一个高效的自动化运维系统。文中不仅分析了自动化运维的必要性,还详细介绍了实现过程中的关键步骤,包括监控、配置管理、故障响应等,并结合实际案例分析其效果,以期为读者提供一套行之有效的自动化运维解决方案。
|
2月前
|
移动开发 数据库 Android开发
构建高效Android应用:探究Kotlin协程的优势与实践
【5月更文挑战第29天】 随着移动开发技术的不断进步,开发者寻求更高效、更简洁的方式来编写代码。在Android平台上,Kotlin语言凭借其现代化的特性和对协程的原生支持,成为提高开发效率的关键。本文将深入分析Kotlin协程的核心优势,并通过实例展示如何在Android应用开发中有效地利用协程来处理异步任务,优化性能,以及提升用户体验。通过对比传统线程和回调机制,我们将揭示协程如何简化异步编程模型,并减少内存消耗,使应用更加健壮和可维护。
|
2月前
|
移动开发 安全 编译器
构建高效Android应用:探究Kotlin协程的优势与实践
【5月更文挑战第27天】 在移动开发领域,性能优化和流畅的用户体验始终是开发者追求的目标。随着Android对Kotlin的支持日益增强,Kotlin协程作为一种新的并发处理方式,为Android应用的性能提升提供了新的可能性。本文将深入探讨Kotlin协程的核心优势,并通过具体实例展示如何在Android应用中有效利用协程来提升响应速度、减少内存消耗,并简化异步代码。
|
2月前
|
存储 缓存 算法
深入理解操作系统内存管理:分页系统的优势与挑战构建高效Android应用:探究Kotlin协程的优势与实践
【5月更文挑战第27天】 在现代计算机系统中,内存管理是操作系统的核心功能之一。分页系统作为一种内存管理技术,通过将物理内存划分为固定大小的单元——页面,为每个运行的程序提供独立的虚拟地址空间。这种机制不仅提高了内存的使用效率,还为多任务环境提供了必要的隔离性。然而,分页系统的实现也带来了一系列的挑战,包括页面置换算法的选择、内存抖动问题以及TLB(Translation Lookaside Buffer)的管理等。本文旨在探讨分页系统的原理、优势及其面临的挑战,并通过分析现有解决方案,提出可能的改进措施。