Kotlin Coroutines Flow 系列(五) 其他的操作符

简介: Kotlin Coroutines Flow 系列(五) 其他的操作符

八. Flow 其他的操作符



8.1 Transform operators


transform


在使用 transform 操作符时,可以任意多次调用 emit ,这是 transform 跟 map 最大的区别:

fun main() = runBlocking {
    (1..5).asFlow()
        .transform {
            emit(it * 2)
            delay(100)
            emit(it * 4)
        }
        .collect { println(it) }
}


transform 也可以使用  emit 发射任意值:

fun main() = runBlocking {
    (1..5).asFlow()
        .transform {
            emit(it * 2)
            delay(100)
            emit("emit $it")
        }
        .collect { println(it) }
}


8.2 Size-limiting operators


take


take 操作符只取前几个 emit 发射的值。

fun main() = runBlocking {
    (1..5).asFlow()
        .take(2)
        .collect { println(it) }
}


8.3 Terminal flow operators


Kotlin Coroutines Flow 系列(一) Flow 基本使用 一文最后,我整理了 Flow 相关的  Terminal 操作符。本文介绍 reduce 和 fold 两个操作符。


reduce


类似于 Kotlin 集合中的 reduce 函数,能够对集合进行计算操作。


例如,对平方数列求和:

fun main() = runBlocking {
    val sum = (1..5).asFlow()
        .map { it * it }
        .reduce { a, b -> a + b }
    println(sum)
}


例如,计算阶乘:

fun main() = runBlocking {
    val sum = (1..5).asFlow().reduce { a, b -> a * b }
    println(sum)
}


fold


也类似于 Kotlin 集合中的 fold 函数,fold 也需要设置初始值。

fun main() = runBlocking {
    val sum = (1..5).asFlow()
        .map { it * it }
        .fold(0) { a, b -> a + b }
    println(sum)
}


在上述代码中,初始值为0就类似于使用 reduce 函数实现对平方数列求和。


而对于计算阶乘:

fun main() = runBlocking {
    val sum = (1..5).asFlow().fold(1) { a, b -> a * b }
    println(sum)
}


初始值为1就类似于使用 reduce 函数实现计算阶乘。


8.4 Composing flows operators


zip


zip 是可以将2个 flow 进行合并的操作符。

fun main() = runBlocking {
    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three","four","five")
    flowA.zip(flowB) { a, b -> "$a and $b" }
        .collect { println(it) }
}


执行结果:

1 and one
2 and two
3 and three
4 and four
5 and five


zip 操作符会把 flowA 中的一个 item 和 flowB 中对应的一个 item 进行合并。即使 flowB 中的每一个 item 都使用了 delay() 函数,在合并过程中也会等待 delay() 执行完后再进行合并。

fun main() = runBlocking {
    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three", "four", "five").onEach { delay(100) }
    val time = measureTimeMillis {
        flowA.zip(flowB) { a, b -> "$a and $b" }
            .collect { println(it) }
    }
    println("Cost $time ms")
}


执行结果:

1 and one
2 and two
3 and three
4 and four
5 and five
Cost 561 ms


如果 flowA 中 item 个数大于 flowB 中 item 个数:

fun main() = runBlocking {
    val flowA = (1..6).asFlow()
    val flowB = flowOf("one", "two", "three","four","five")
    flowA.zip(flowB) { a, b -> "$a and $b" }
        .collect { println(it) }
}


执行合并后新的 flow 的 item 个数 = 较小的 flow 的 item 个数。


执行结果:

1 and one
2 and two
3 and three
4 and four
5 and five


combine


combine 虽然也是合并,但是跟 zip 不太一样。


使用 combine 合并时,每次从 flowA 发出新的 item ,会将其与 flowB 的最新的 item 合并。

fun main() = runBlocking {
    val flowA = (1..5).asFlow().onEach { delay(100)  }
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200)  }
    flowA.combine(flowB) { a, b -> "$a and $b" }
        .collect { println(it) }
}


执行结果:

1 and one
2 and one
3 and one
3 and two
4 and two
5 and two
5 and three
5 and four
5 and five


flattenMerge


其实,flattenMerge 不会组合多个 flow ,而是将它们作为单个流执行。

fun main() = runBlocking {
    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three","four","five")
    flowOf(flowA,flowB)
        .flattenConcat()
        .collect{ println(it) }
}


执行结果:

1
2
3
4
5
one
two
three
four
five


为了能更清楚地看到 flowA、flowB 作为单个流的执行,对他们稍作改动。

fun main() = runBlocking {
    val flowA = (1..5).asFlow().onEach { delay(100) }
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) }
    flowOf(flowA,flowB)
        .flattenMerge(2)
        .collect{ println(it) }
}


执行结果:

1
one
2
3
two
4
5
three
four
five


8.5 Flattening flows operators


flatMapConcat、flatMapMerge 类似于 RxJava 的 concatMap、flatMap 操作符。


flatMapConcat


flatMapConcat 由 map、flattenConcat 操作符实现。

@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()


在调用 flatMapConcat 后,collect 函数在收集新值之前会等待 flatMapConcat 内部的 flow 完成。

fun currTime() = System.currentTimeMillis()
var start: Long = 0
fun main() = runBlocking {
    (1..5).asFlow()
        .onStart { start = currTime() }
        .onEach { delay(100) }
        .flatMapConcat {
            flow {
                emit("$it: First")
                delay(500)
                emit("$it: Second")
            }
        }
        .collect {
            println("$it at ${System.currentTimeMillis() - start} ms from start")
        }
}


执行结果:

1: First at 114 ms from start
1: Second at 619 ms from start
2: First at 719 ms from start
2: Second at 1224 ms from start
3: First at 1330 ms from start
3: Second at 1830 ms from start
4: First at 1932 ms from start
4: Second at 2433 ms from start
5: First at 2538 ms from start
5: Second at 3041 ms from start


flatMapMerge


flatMapMerge 由 map、flattenMerge 操作符实现。

@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenMerge(concurrency)


flatMapMerge 是顺序调用内部代码块,并且并行地执行 collect 函数。

fun currTime() = System.currentTimeMillis()
var start: Long = 0
fun main() = runBlocking {
    (1..5).asFlow()
        .onStart { start = currTime() }
        .onEach { delay(100) }
        .flatMapMerge {
            flow {
                emit("$it: First")
                delay(500)
                emit("$it: Second")
            }
        }
        .collect {
            println("$it at ${System.currentTimeMillis() - start} ms from start")
        }
}


执行结果:

1: First at 116 ms from start
2: First at 216 ms from start
3: First at 319 ms from start
4: First at 422 ms from start
5: First at 525 ms from start
1: Second at 618 ms from start
2: Second at 719 ms from start
3: Second at 822 ms from start
4: Second at 924 ms from start
5: Second at 1030 ms from start


flatMapMerge 操作符有一个参数 concurrency ,它默认使用DEFAULT_CONCURRENCY,如果想更直观地了解 flatMapMerge 的并行,可以对这个参数进行修改。例如改成2,就会发现不一样的执行结果。


flatMapLatest


当发射了新值之后,上个 flow 就会被取消。

fun currTime() = System.currentTimeMillis()
var start: Long = 0
fun main() = runBlocking {
    (1..5).asFlow()
        .onStart { start = currTime() }
        .onEach { delay(100) }
        .flatMapLatest {
            flow {
                emit("$it: First")
                delay(500)
                emit("$it: Second")
            }
        }
        .collect {
            println("$it at ${System.currentTimeMillis() - start} ms from start")
        }
}


执行结果:

1: First at 114 ms from start
2: First at 220 ms from start
3: First at 321 ms from start
4: First at 422 ms from start
5: First at 524 ms from start
5: Second at 1024 ms from start


九. Flow VS Reactive Streams



天生的多平台支持


由于 Kotlin 语言自身对多平台的支持,使得 Flow 也可以在多平台上使用。


互操作性


Flow 仍然属于响应式范畴。开发者通过 kotlinx-coroutines-reactive 模块中 Flow.asPublisher() 和 Publisher.asFlow() ,可以方便地将 Flow 跟 Reactive Streams 进行互操作。

相关文章
|
4月前
|
缓存 数据处理 Android开发
Android经典实战之Kotlin常用的 Flow 操作符
本文介绍 Kotlin 中 `Flow` 的多种实用操作符,包括转换、过滤、聚合等,通过简洁易懂的例子展示了每个操作符的功能,如 `map`、`filter` 和 `fold` 等,帮助开发者更好地理解和运用 `Flow` 来处理异步数据流。
158 4
|
3月前
|
数据处理 开发者 Kotlin
利用Kotlin Flow简化数据流管理
随着移动端应用的复杂化,数据流管理成为一大挑战。Kotlin Flow作为一种基于协程的响应式编程框架,可简化数据流处理并支持背压机制,有效避免应用崩溃。本文通过解答四个常见问题,详细介绍Kotlin Flow的基本概念、创建方法及复杂数据流处理技巧,帮助开发者轻松上手,提升应用性能。
72 16
|
3月前
|
存储 API 数据库
Kotlin协程与Flow的魅力——打造高效数据管道的不二法门!
在现代Android开发中,Kotlin协程与Flow框架助力高效管理异步操作和数据流。协程采用轻量级线程管理,使异步代码保持同步风格,适合I/O密集型任务。Flow则用于处理数据流,支持按需生成数据和自动处理背压。结合两者,可构建复杂数据管道,简化操作流程,提高代码可读性和可维护性。本文通过示例代码详细介绍其应用方法。
65 2
|
3月前
|
数据处理 Kotlin
掌握这项Kotlin技能,让你的数据流管理不再头疼!Flow的秘密你解锁了吗?
【9月更文挑战第12天】随着移动应用发展,数据流管理日益复杂。Kotlin Flow作为一种基于协程的异步数据流处理框架应运而生,它可解耦数据的生产和消费过程,简化数据流管理,并支持背压机制以防应用崩溃。本文通过四个问题解析Kotlin Flow的基础概念、创建方式、复杂数据流处理及背压实现方法,助您轻松掌握这一高效工具,在实际开发中更从容地应对各种数据流挑战,提升应用性能。
56 8
|
3月前
|
数据处理 API 数据库
揭秘Kotlin Flow:迈向响应式编程的黄金钥匙
【9月更文挑战第11天】在现代软件开发中,异步编程与数据处理对于构建高性能应用至关重要。Kotlin Flow作为协程库的一部分,提供了简洁高效的API来处理数据流。本文将通过实例引导你从零开始学习Kotlin Flow,掌握构建响应式应用的方法。Flow是一种冷流,仅在订阅时才开始执行,支持map、filter等操作符,简化数据处理。
49 7
|
3月前
|
存储 数据处理 Kotlin
Kotlin Flow背后的神秘力量:背压、缓冲与合并策略的终极揭秘!
【9月更文挑战第13天】Kotlin Flow 是 Kotlin 协程库中处理异步数据流的强大工具,本文通过对比传统方法,深入探讨 Flow 的背压、缓冲及合并策略。背压通过 `buffer` 函数控制生产者和消费者的速率,避免过载;缓冲则允许数据暂存,使消费者按需消费;合并策略如 `merge`、`combine` 和 `zip` 则帮助处理多数据源的整合。通过这些功能,Flow 能更高效地应对复杂数据处理场景。
126 2
|
3月前
|
移动开发 定位技术 Android开发
「揭秘高效App的秘密武器」:Kotlin Flow携手ViewModel,打造极致响应式UI体验,你不可不知的技术革新!
【9月更文挑战第12天】随着移动开发领域对响应式编程的需求增加,管理应用程序状态变得至关重要。Jetpack Compose 和 Kotlin Flow 的组合提供了一种优雅的方式处理 UI 状态变化,简化了状态管理。本文探讨如何利用 Kotlin Flow 增强 ViewModel 功能,构建简洁强大的响应式 UI。
57 3
|
3月前
|
数据库 Kotlin
Kotlin中的冷流和热流以及如何让Flow停下来
本文介绍了Kotlin中`Flow`的概念及其类型,包括冷流(Cold Flow)、热流`SharedFlow`及具有最新值的`StateFlow`。文中详细描述了每种类型的特性与使用场景,并提供了停止`Flow`的方法,如取消协程、使用操作符过滤及异常处理。通过示例代码展示了如何运用这些概念。
132 2
|
3月前
|
API 数据处理 数据库
掌握 Kotlin Flow 的艺术:让无限数据流处理变得优雅且高效 —— 实战教程揭秘如何在数据洪流中保持代码的健壮与灵活
Kotlin Flow 是一个强大的协程 API,专为处理异步数据流设计。它适合处理网络请求数据、监听数据库变化等场景。本文通过示例代码展示如何使用 Kotlin Flow 管理无限流,如实时数据流。首先定义了一个生成无限整数的流 `infiniteNumbers()`,然后结合多种操作符(如 `buffer`、`onEach`、`scan`、`filter`、`takeWhile` 和 `collectLatest`),实现对无限流的优雅处理,例如计算随机数的平均值并在超过阈值时停止接收新数据。这展示了 Flow 在资源管理和逻辑清晰性方面的优势。
76 0
|
4月前
|
移动开发 Java Android开发
探索安卓应用开发的新趋势:Kotlin与Coroutines
【8月更文挑战第31天】本文旨在为读者揭示安卓开发中的最新潮流,特别是Kotlin语言及其协程特性如何重塑移动应用的构建方式。我们将通过实际代码示例深入理解Kotlin简洁语法背后的强大功能,并探讨如何利用Coroutines优雅地处理异步任务,从而提升应用性能和用户体验。无论你是安卓开发的新手还是资深开发者,这篇文章都将为你带来新的启示和灵感。