kotlin flow操作符详解

简介: kotlin flow操作符详解

flow 说明


依赖导入

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.3"
复制代码


流简单使用

suspend fun flow1() {
    flow<Int> {
        (0..4).forEach {
            emit(it)//生产者发送数据
        }
    }.collect {
        println(it)//消费者处理数据
    }
}
复制代码

本例中 flow { ... } 构建块中的代码可以挂起

流是冷的,所以 collect 被调用后 flow 内的方法体才会被调用


流操作符

本文要讲的操作符: flowwOf,asFlow,map,transform,take,toList,toSet,first,reduce,buffer,collectLast,zip,combine,


流构建器

flowof

可以将 flowOf 内的可变长参数一一发射

flowOf(1, 2, 5, 4).collect {
        println(it)
    }
复制代码


asFlow

flowOf 可以将集合转换成 flow 发射

suspend fun asFlowM(){
    listOf(1,2,9,0,8).asFlow().collect{
        println(it)
    }
}
复制代码


过渡流操作符

过渡操作符可以对流进行转换,也就是获取生产者发射的数据做一定更改,然后转给消费者


map

我们可以再 map 中执行一些过渡操作,比如本例中将生产者发送的数据*9,然后再发射给消费者

suspend fun mapM(){
    (1..9).asFlow().map {
        it*9
    }.collect{
        println(it)
    }
}
复制代码


值得一提的是,我们是可以再 map 中进行异步操作的,比如有以下代码:

flow<Int> {
        var userId = login()
        userId
    }.map {
        var permission = getPermission(it)
        permission
    }.collect{
        println("打印权限 $it")
    }
复制代码

这段代码中我们模拟了这样一个情景,先在 flow 中调用 login 方法异步获取用户 id,然后再 map 中调用 getPermission 异步获取用户权限。然后将用户权限信息返回给消费者。


通常我们的代码中有多个接口需要连续调用的时候就很适合用这种方法,可以十分有效的避免接口调用嵌套

转换操作符 transform

transform 主要强调的是类型的转换

(1..3).asFlow() // 一个请求流
        //transform中的泛型<Int,String> 表示将Int类型转换为String后,继续发射
        .transform<Int, String> { request ->
            emit("transform Int to String $request")
        }
        .collect { response -> println(response) }
复制代码

本例中发射的数据是 Int 类型,但是我们 collect 中需要的数据是 String 类型,所以我们可以再 transform 中进行将 Int 类型转换为 String,然后使用 emit 继续发射

限长操作符 take


take 操作符可以限定我们要消费的数据的数量,见代码

(1..9).asFlow().take(3).collect {
        println(it)
    }
复制代码

本例中,我们生产者发送了 1..9,一共 9 个数字,但是因为使用了 take(3)操作符,所以只有前三个发射的数字才能被消费者消费到


末端流操作符

末端操作符,我理解的就是消费者调用的方法,比如 collect 就是末端操作符


toList

会把数据消费到一个 List 列表中

suspend fun toList():List<Int> {
   return (1..9).asFlow().filter { it % 2 == 0 }.toList()
}
复制代码


toSet

同 toList


frist

获取第一个元素

suspend fun firstM(): Int {
    return (2..9).asFlow().filter { it % 2 == 1 }.first()
}
复制代码


reduce

reduce 的兰布达表达式会提供运算公式负责计算。

在 reduce 的兰布达表达式中,可以对当前要消费的值和之前计算的值进行计算,得到新值返回。所有值消费完成后返回最终值

suspend fun reduceM():Int {
    return (1..9).asFlow().reduce { accumulator, value ->
        println("$accumulator : $value")
        accumulator + value
    }
}
复制代码


流是连续的

流上下文

缓冲

官方demo了解缓冲出现的原因

image.png

通过图中我们发现,生产者生产数据的时候消费者是需要等待的,然后生产者发送完数据后消费者处理数据,期间生产者必须等消费之处理完数据才能继续发射数据

这就是一种相互阻塞了,那么有没有一种办法能够让消费者消费数据的时候生产者能继续生成对象呢,还真有buffer就可以


buffer

buffer可以缓存生产者数据,不会被消费者阻塞

suspend fun bufferM() {
    val startMillis = System.currentTimeMillis()
    flow<Int> {
        (1..3).forEach {
            delay(300)
            emit(it)
        }
    }.buffer(4)
        .collect {
            delay(400)
            println(it)
            println("时间已经过了${System.currentTimeMillis() - startMillis}")
        }
}
复制代码

代码执行打印日志:

1
时间已经过了745
2
时间已经过了1148
3
时间已经过了1552
复制代码

如果我们没有用buffer,那么总时长应该2100ms

使用了buffer总时长是:1552=300+400*3

所以使用buffer的时候生产者可以并发发射数据,不会被消费者阻塞


组合多个流

conflate

当生产者发射数据速度大于消费者的时候,消费者只能拿到生产者最新发射的数据

suspend fun conflate(){
    flow<Int> {
        (1..9).forEach {
            delay(100)
            emit(it)
        }
    }.conflate().collect {
        delay(300)
        println(it)
    }
}
复制代码

比如上面这段代码,因为有conflate的存在,输出如下:

1
3
6
9
复制代码

如果没有conflate存在输出如下:

1
2
3
4
5
6
7
8
9
复制代码

两者对比,明显能发现使用conflate的例子替我们忽略了很多无法即时处理的数据


collectLast

这个操作符的意思:如果生产者数据以及发射过来了,消费者还没有把上一个数据处理完,那么直接停止处理上一条数据,直接处理最新的数据

suspend fun collectLastM(){
    flow<Int> {
        (1..9).forEach {
            delay(100)
            emit(it)
        }
    }.collectLatest {
        delay(800)
        println(it)
    }
}
复制代码

比如本例的输出如下:

image.png


zip

zip操作符可以把两个流合并为一个流,然后再zip方法中将两个流发射的数据进行处理组合后继续发射给消费者, 如果两个流长度不一致,按比较短的流来处理:

  1. 两个流长度一致,都是3
suspend fun zipM(){
    val flow1 = (1..3).asFlow()
    val flow2 = flowOf("李白","杜甫","安安安安卓")
    flow1.zip(flow2){a,b->
        "$a : $b"
    }.collect {
        println(it)
    }
}
复制代码


输出:

1 : 李白
2 : 杜甫
3 : 安安安安卓
复制代码


上面的代码我们进行一下改变,将flow1的长度改为5

val flow1 = (1..5).asFlow()
复制代码


查看输出结果:

1 : 李白
2 : 杜甫
3 : 安安安安卓
复制代码

所以验证一下我们开头的结论,两个长度不同的流zip合并,消费者输出的数据长度是较短的流的长度


combine

上一节zip的缺点我们清楚了,就是两个流长度不等的时候,较长的流后面部分无法输出

那么combine就是用来解决zip这个缺点的(也很难说是缺点,只是应用场景不同罢了,你姑且可以认为是缺点)

suspend fun combineM(){
    val flowA = (1..5).asFlow()
    val flowB = flowOf("李白","杜甫","安安安安卓")
    flowA.combine(flowB){a,b->
        "$a : $b"
    }.collect {
        println(it)
    }
}
复制代码


输出日志:

1 : 李白
2 : 李白
2 : 杜甫
3 : 杜甫
3 : 安安安安卓
4 : 安安安安卓
5 : 安安安安卓
复制代码


我们的两个流,数字流长度为5,字符串流为3。

实现的效果简单逻辑分析:

flow发射1,flow2发射 ”李白“ ,打印:1 : 李白
flow发射2,flow2未发射数据  ,打印:2 : 李白
flow未发射,flow2发射 ”杜甫“ ,2 : 杜甫
flow发射3,flow2未发射 ,打印:3 : 杜甫
flow未发射,flow2发射 ”安安安安卓“ ,打印:3 : 安安安安卓
flow发射4,flow2发射完成  ,打印:4 : 安安安安卓
flow发射5,flow2发射完成  ,打印:5 : 安安安安卓
复制代码


展平流

下面三个流,暂时不写,因为我没有想到应用场景

flatMapConcat
flatMapMerge
flagMapLatest

流异常

使用try/catch包裹流

我们是可以使用try/catch来收集流异常的,但是不建议用这种方法

使用flow的catch操作符处理流

使用flow 的catch操作符处理异常更优雅

不过catch也有缺点,它只能捕获生产者的异常不能捕获消费者的异常

suspend fun trycatch() {
    flow<Int> {
        (1..3).forEach {
            if (it == 2) {//故意抛出一个异常
                throw NullPointerException("强行空指针,嘿嘿嘿嘿")
            }
            emit(it)
        }
    }.catch {e->
        e.printStackTrace()
        emit(-1)//异常的情况下发射一个-1
    }.collect{
        println(it)
    }
}
复制代码


消费者的异常如何处理

上一节我们学校了catch生产者的异常,那么消费者产生的异常该如何处理呢。


尝试在消费者中抛出异常,查看是否可以被捕获
flow<Int> {
        for (i in 1..3) {
            emit(i)
        }
    }.catch {
        emit(-1)
    }.collect {
        if(it==2){//在消费者中抛出数据
            throw IllegalArgumentException("数据不合法")
        }
        println(it)
    }
复制代码


输出:

1
Exception in thread "main" java.lang.IllegalArgumentException: 数据不合法
  at HahaKt$consumerCatch$$inlined$collect$1.emit(Collect.kt:138)
复制代码


将异常代码放在onEach中catch异常
suspend fun consumerCatch() {
    flow<Int> {
        for (i in 1..3) {
            emit(i)
        }
    }.onEach {
        if (it == 2) {//与上面的不同,在消费之前先用onEach处理一下
            throw IllegalArgumentException("数据不合法")
        }
    }.catch {
        emit(-1)
    }.collect {
        println(it)
    }
}
复制代码


输出:

1
-1
复制代码


流完成onCompletion

使用onCompletion可以再流完成的时候再发送一个值

flowOf(1, 23, 5, 3, 4).onCompletion {
        println("流操作完成")
        emit(12344)//这里不返回值也没关系
    }.collect {
        println(it)
    }
复制代码


输出:

1
23
5
3
4
刘操作完成
12344



相关文章
|
2月前
|
缓存 数据处理 Android开发
Android经典实战之Kotlin常用的 Flow 操作符
本文介绍 Kotlin 中 `Flow` 的多种实用操作符,包括转换、过滤、聚合等,通过简洁易懂的例子展示了每个操作符的功能,如 `map`、`filter` 和 `fold` 等,帮助开发者更好地理解和运用 `Flow` 来处理异步数据流。
80 4
|
23天前
|
数据处理 开发者 Kotlin
利用Kotlin Flow简化数据流管理
随着移动端应用的复杂化,数据流管理成为一大挑战。Kotlin Flow作为一种基于协程的响应式编程框架,可简化数据流处理并支持背压机制,有效避免应用崩溃。本文通过解答四个常见问题,详细介绍Kotlin Flow的基本概念、创建方法及复杂数据流处理技巧,帮助开发者轻松上手,提升应用性能。
54 16
|
16天前
|
存储 API 数据库
Kotlin协程与Flow的魅力——打造高效数据管道的不二法门!
在现代Android开发中,Kotlin协程与Flow框架助力高效管理异步操作和数据流。协程采用轻量级线程管理,使异步代码保持同步风格,适合I/O密集型任务。Flow则用于处理数据流,支持按需生成数据和自动处理背压。结合两者,可构建复杂数据管道,简化操作流程,提高代码可读性和可维护性。本文通过示例代码详细介绍其应用方法。
27 2
|
23天前
|
数据处理 Kotlin
掌握这项Kotlin技能,让你的数据流管理不再头疼!Flow的秘密你解锁了吗?
【9月更文挑战第12天】随着移动应用发展,数据流管理日益复杂。Kotlin Flow作为一种基于协程的异步数据流处理框架应运而生,它可解耦数据的生产和消费过程,简化数据流管理,并支持背压机制以防应用崩溃。本文通过四个问题解析Kotlin Flow的基础概念、创建方式、复杂数据流处理及背压实现方法,助您轻松掌握这一高效工具,在实际开发中更从容地应对各种数据流挑战,提升应用性能。
39 8
|
24天前
|
数据处理 API 数据库
揭秘Kotlin Flow:迈向响应式编程的黄金钥匙
【9月更文挑战第11天】在现代软件开发中,异步编程与数据处理对于构建高性能应用至关重要。Kotlin Flow作为协程库的一部分,提供了简洁高效的API来处理数据流。本文将通过实例引导你从零开始学习Kotlin Flow,掌握构建响应式应用的方法。Flow是一种冷流,仅在订阅时才开始执行,支持map、filter等操作符,简化数据处理。
30 7
|
22天前
|
存储 数据处理 Kotlin
Kotlin Flow背后的神秘力量:背压、缓冲与合并策略的终极揭秘!
【9月更文挑战第13天】Kotlin Flow 是 Kotlin 协程库中处理异步数据流的强大工具,本文通过对比传统方法,深入探讨 Flow 的背压、缓冲及合并策略。背压通过 `buffer` 函数控制生产者和消费者的速率,避免过载;缓冲则允许数据暂存,使消费者按需消费;合并策略如 `merge`、`combine` 和 `zip` 则帮助处理多数据源的整合。通过这些功能,Flow 能更高效地应对复杂数据处理场景。
40 2
|
23天前
|
移动开发 定位技术 Android开发
「揭秘高效App的秘密武器」:Kotlin Flow携手ViewModel,打造极致响应式UI体验,你不可不知的技术革新!
【9月更文挑战第12天】随着移动开发领域对响应式编程的需求增加,管理应用程序状态变得至关重要。Jetpack Compose 和 Kotlin Flow 的组合提供了一种优雅的方式处理 UI 状态变化,简化了状态管理。本文探讨如何利用 Kotlin Flow 增强 ViewModel 功能,构建简洁强大的响应式 UI。
30 3
|
23天前
|
数据库 Kotlin
Kotlin中的冷流和热流以及如何让Flow停下来
本文介绍了Kotlin中`Flow`的概念及其类型,包括冷流(Cold Flow)、热流`SharedFlow`及具有最新值的`StateFlow`。文中详细描述了每种类型的特性与使用场景,并提供了停止`Flow`的方法,如取消协程、使用操作符过滤及异常处理。通过示例代码展示了如何运用这些概念。
56 2
|
16天前
|
API 数据处理 数据库
掌握 Kotlin Flow 的艺术:让无限数据流处理变得优雅且高效 —— 实战教程揭秘如何在数据洪流中保持代码的健壮与灵活
Kotlin Flow 是一个强大的协程 API,专为处理异步数据流设计。它适合处理网络请求数据、监听数据库变化等场景。本文通过示例代码展示如何使用 Kotlin Flow 管理无限流,如实时数据流。首先定义了一个生成无限整数的流 `infiniteNumbers()`,然后结合多种操作符(如 `buffer`、`onEach`、`scan`、`filter`、`takeWhile` 和 `collectLatest`),实现对无限流的优雅处理,例如计算随机数的平均值并在超过阈值时停止接收新数据。这展示了 Flow 在资源管理和逻辑清晰性方面的优势。
30 0
|
2月前
|
缓存 API Android开发
Android经典实战之Kotlin Flow中的3个数据相关的操作符:debounce、buffer和conflate
本文介绍了Kotlin中`Flow`的`debounce`、`buffer`及`conflate`三个操作符。`debounce`过滤快速连续数据,仅保留指定时间内的最后一个;`buffer`引入缓存减轻背压;`conflate`仅保留最新数据。通过示例展示了如何在搜索输入和数据流处理中应用这些操作符以提高程序效率和用户体验。
40 6