Kotlin Coroutines Flow 系列(一) Flow 基本使用

简介: Kotlin Coroutines Flow 系列(一) Flow 基本使用

一. Kotlin Flow 介绍



Flow 库是在 Kotlin Coroutines 1.3.2 发布之后新增的库。


官方文档给予了一句话简单的介绍:


Flow —  cold asynchronous stream with flow builder and comprehensive operator set (filter, map, etc);


Flow 从文档的介绍来看,它有点类似 RxJava 的 Observable。因为 Observable 也有 Cold 、Hot 之分


二. Flow 基本使用



Flow 能够返回多个异步计算的值,例如下面的 flow builder :

flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect{
            println(it)
        }


其中 Flow 接口,只有一个 collect 函数

public interface Flow<out T> {
    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}


如果熟悉 RxJava 的话,则可以理解为 collect() 对应subscribe(),而 emit() 对应onNext()


2.1 创建 flow


除了刚刚展示的 flow builder 可以用于创建 flow,还有其他的几种方式:

flowOf()

flowOf(1,2,3,4,5)
        .onEach {
            delay(100)
        }
        .collect{
            println(it)
        }


asFlow()

listOf(1, 2, 3, 4, 5).asFlow()
        .onEach {
            delay(100)
        }.collect {
            println(it)
        }


channelFlow()

channelFlow {
        for (i in 1..5) {
            delay(100)
            send(i)
        }
    }.collect{
        println(it)
    }


最后的 channelFlow builder 跟 flow builder 是有一定差异的。


flow 是 Cold Stream。在没有切换线程的情况下,生产者和消费者是同步非阻塞的。


channel 是 Hot Stream。而 channelFlow 实现了生产者和消费者异步非阻塞模型。


下面的代码,展示了使用 flow builder 的情况,大致花费1秒:

fun main() = runBlocking {
    val time = measureTimeMillis {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect{
            delay(100)
            println(it)
        }
    }
    print("cost $time")
}


image.png

flow.png


使用 channelFlow builder 的情况,大致花费700毫秒:

fun main() = runBlocking {
    val time = measureTimeMillis{
        channelFlow {
            for (i in 1..5) {
                delay(100)
                send(i)
            }
        }.collect{
            delay(100)
            println(it)
        }
    }
    print("cost $time")
}


image.png

channelFlow.png


当然,flow 如果切换线程的话,花费的时间也是大致700毫秒,跟使用 channelFlow builder 效果差不多。

fun main() = runBlocking {
    val time = measureTimeMillis{
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.IO)
            .collect {
                delay(100)
                println(it)
            }
    }
    print("cost $time")
}


2.2 切换线程


相比于 RxJava 需要使用 observeOn、subscribeOn 来切换线程,flow 会更加简单。只需使用 flowOn,下面的例子中,展示了 flow builder 和 map 操作符都会受到 flowOn 的影响。

flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
        it * it
    }.flowOn(Dispatchers.IO)
        .collect {
            println(it)
        }


而 collect() 指定哪个线程,则需要看整个 flow 处于哪个 CoroutineScope 下。

例如,下面的代码 collect() 则是在 main 线程:

fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
        it * it
    }.flowOn(Dispatchers.IO)
        .collect {
            println("${Thread.currentThread().name}: $it")
        }
}


执行结果:

main: 1
main: 4
main: 9
main: 16
main: 25


值得注意的地方,不要使用 withContext() 来切换 flow 的线程。


2.3 flow 取消


如果 flow 是在一个挂起函数内被挂起了,那么 flow 是可以被取消的,否则不能取消。

fun main() = runBlocking {
    withTimeoutOrNull(2500) {
        flow {
            for (i in 1..5) {
                delay(1000)
                emit(i)
            }
        }.collect {
            println(it)
        }
    }
    println("Done")
}


执行结果:

1
2
Done


2.4 Terminal flow operators


Flow 的 API 有点类似于 Java Stream 的 API。它也同样拥有 Intermediate Operations、Terminal Operations。


Flow 的 Terminal 运算符可以是 suspend 函数,如 collect、single、reduce、toList 等;也可以是 launchIn 运算符,用于在指定 CoroutineScope 内使用 flow。

@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}


整理一下 Flow 的 Terminal 运算符


  • collect
  • single/first
  • toList/toSet/toCollection
  • count
  • fold/reduce
  • launchIn/produceIn/broadcastIn
相关文章
|
8月前
|
传感器 Android开发 开发者
构建高效Android应用:Kotlin的协程与Flow
【4月更文挑战第26天】随着移动应用开发的不断进步,开发者寻求更简洁高效的编码方式以应对复杂多变的业务需求。在众多技术方案中,Kotlin语言凭借其简洁性和强大的功能库逐渐成为Android开发的主流选择。特别是Kotlin的协程和Flow这两个特性,它们为处理异步任务和数据流提供了强大而灵活的工具。本文将深入探讨如何通过Kotlin协程和Flow来优化Android应用性能,实现更加流畅的用户体验,并展示在实际开发中的应用实例。
|
5月前
|
缓存 数据处理 Android开发
Android经典实战之Kotlin常用的 Flow 操作符
本文介绍 Kotlin 中 `Flow` 的多种实用操作符,包括转换、过滤、聚合等,通过简洁易懂的例子展示了每个操作符的功能,如 `map`、`filter` 和 `fold` 等,帮助开发者更好地理解和运用 `Flow` 来处理异步数据流。
172 4
|
4月前
|
数据处理 开发者 Kotlin
利用Kotlin Flow简化数据流管理
随着移动端应用的复杂化,数据流管理成为一大挑战。Kotlin Flow作为一种基于协程的响应式编程框架,可简化数据流处理并支持背压机制,有效避免应用崩溃。本文通过解答四个常见问题,详细介绍Kotlin Flow的基本概念、创建方法及复杂数据流处理技巧,帮助开发者轻松上手,提升应用性能。
75 16
|
4月前
|
存储 API 数据库
Kotlin协程与Flow的魅力——打造高效数据管道的不二法门!
在现代Android开发中,Kotlin协程与Flow框架助力高效管理异步操作和数据流。协程采用轻量级线程管理,使异步代码保持同步风格,适合I/O密集型任务。Flow则用于处理数据流,支持按需生成数据和自动处理背压。结合两者,可构建复杂数据管道,简化操作流程,提高代码可读性和可维护性。本文通过示例代码详细介绍其应用方法。
74 2
|
4月前
|
数据处理 Kotlin
掌握这项Kotlin技能,让你的数据流管理不再头疼!Flow的秘密你解锁了吗?
【9月更文挑战第12天】随着移动应用发展,数据流管理日益复杂。Kotlin Flow作为一种基于协程的异步数据流处理框架应运而生,它可解耦数据的生产和消费过程,简化数据流管理,并支持背压机制以防应用崩溃。本文通过四个问题解析Kotlin Flow的基础概念、创建方式、复杂数据流处理及背压实现方法,助您轻松掌握这一高效工具,在实际开发中更从容地应对各种数据流挑战,提升应用性能。
64 8
|
4月前
|
数据处理 API 数据库
揭秘Kotlin Flow:迈向响应式编程的黄金钥匙
【9月更文挑战第11天】在现代软件开发中,异步编程与数据处理对于构建高性能应用至关重要。Kotlin Flow作为协程库的一部分,提供了简洁高效的API来处理数据流。本文将通过实例引导你从零开始学习Kotlin Flow,掌握构建响应式应用的方法。Flow是一种冷流,仅在订阅时才开始执行,支持map、filter等操作符,简化数据处理。
53 7
|
4月前
|
移动开发 定位技术 Android开发
「揭秘高效App的秘密武器」:Kotlin Flow携手ViewModel,打造极致响应式UI体验,你不可不知的技术革新!
【9月更文挑战第12天】随着移动开发领域对响应式编程的需求增加,管理应用程序状态变得至关重要。Jetpack Compose 和 Kotlin Flow 的组合提供了一种优雅的方式处理 UI 状态变化,简化了状态管理。本文探讨如何利用 Kotlin Flow 增强 ViewModel 功能,构建简洁强大的响应式 UI。
67 3
|
4月前
|
存储 数据处理 Kotlin
Kotlin Flow背后的神秘力量:背压、缓冲与合并策略的终极揭秘!
【9月更文挑战第13天】Kotlin Flow 是 Kotlin 协程库中处理异步数据流的强大工具,本文通过对比传统方法,深入探讨 Flow 的背压、缓冲及合并策略。背压通过 `buffer` 函数控制生产者和消费者的速率,避免过载;缓冲则允许数据暂存,使消费者按需消费;合并策略如 `merge`、`combine` 和 `zip` 则帮助处理多数据源的整合。通过这些功能,Flow 能更高效地应对复杂数据处理场景。
141 2
|
4月前
|
数据库 Kotlin
Kotlin中的冷流和热流以及如何让Flow停下来
本文介绍了Kotlin中`Flow`的概念及其类型,包括冷流(Cold Flow)、热流`SharedFlow`及具有最新值的`StateFlow`。文中详细描述了每种类型的特性与使用场景,并提供了停止`Flow`的方法,如取消协程、使用操作符过滤及异常处理。通过示例代码展示了如何运用这些概念。
143 2
|
4月前
|
API 数据处理 数据库
掌握 Kotlin Flow 的艺术:让无限数据流处理变得优雅且高效 —— 实战教程揭秘如何在数据洪流中保持代码的健壮与灵活
Kotlin Flow 是一个强大的协程 API,专为处理异步数据流设计。它适合处理网络请求数据、监听数据库变化等场景。本文通过示例代码展示如何使用 Kotlin Flow 管理无限流,如实时数据流。首先定义了一个生成无限整数的流 `infiniteNumbers()`,然后结合多种操作符(如 `buffer`、`onEach`、`scan`、`filter`、`takeWhile` 和 `collectLatest`),实现对无限流的优雅处理,例如计算随机数的平均值并在超过阈值时停止接收新数据。这展示了 Flow 在资源管理和逻辑清晰性方面的优势。
84 0