Kotlin | 协程使用手册(不间断更新)(中)

简介: Kotlin协程作为Kotlin核心的一个组件,上手成本并不高,下面的demo都是我参照官网的例子过了一遍。

异步流

挂起函数可以异步的返回单个值,而如何返回多个计算好的值,这正是 Flow(流)的使用之处

使用 list 表示多个值

fun foo(): List<Int> = listOf(1, 2, 3)
fun main() {
    foo().forEach { value -> println(value) } 
}
1
2
3

我们可以看到,相应的值是瞬间一起返回的,如果我们需要他们单个返回呢?

使用 Sequence 表示多个值

使用 Sequence 可以做到同步的返回数据,但是其同时阻塞了线程

fun main() {
    foo().forEach(::println)
}
fun foo():Sequence<Int> = sequence {
    println(System.currentTimeMillis())
    for (i in 1..3){
        Thread.sleep(300)
        //yield  产生一个值,并挂起等待下一次调用
        yield(i)
    }
    println(System.currentTimeMillis())
}
1578822422344
1  //->间隔300ms
2   //->间隔300ms
3   //->间隔300ms
1578822423255

挂起函数

使用 suspend 标志的即为挂起函数。对于编译器来说,suspend 只是起到一个标志作用。

在我们上面的代码中,suspend 我们经常见。

Flow

使用list返回结果,意味着我们会一次返回所有值,而使用Sequence虽然可以做到同步返回,但如果有耗时操作,又会阻塞我们的线程。

flow 正是解决上面存在的弊端的存在。

fun main() {
    runBlocking {
        logThread()
        launch {
            logThread()
            for (i in 1..3) {
                println("lauch块+$i")
                delay(100)
            }
        }
        logThread()
        foo().collect { value ->
            println(value)
        }
    }
}
fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}
fun logThread() = println("当前所在线程----${Thread.currentThread().name}")
当前所在线程----main @coroutine#1
当前所在线程----main @coroutine#1
当前所在线程----main @coroutine#2  //lauch{} 
lauch块+1
1
lauch块+2
2
lauch块+3
3

flow{} 中的代码可以挂起

使用 emit() 函数发射值

使用 collect 函数 收集 值。(可以认为是启动)

取消Flow

取消一个 Flow ,其实就是取消协程,我们无法直接取消Flow,但可以通过取消Flow 所在的协程达到目的。

观察上面的demo,我们如果给foo() 方法中打印所在线程,就会发现,它所在的线程与 runBlocking 为同一个,即 foo() 使用的是 runBlocking 上下文。

我们改动代码如下:

fun main(){
  ...
  withTimeoutOrNull(200){
            foo().collect { value ->
                println(value)
            }
        }
  ...
}
...
当前所在线程----main @coroutine#1
当前所在线程----main @coroutine#2
lauch块+1
1
lauch块+2
lauch块+3

为什么 lauch依然运行呢?

我们在前面已经说过了,launch{}是独立运行一个协程,与父协程无关,所以此时launch{}不受取消影响

Flow构建器

flowOf

用于定义一个发射固定值集的流

flowOf("123","123").collect{
    value ->
    println(value)
}

asFlow

用于将各种集合与序列转为Flow

(1..3).asFlow().collect{value-> println(value)}

过渡性流操作符

map

使用map实现数据转换

runBlocking {
        (1..3).asFlow()
            .map {
                delay(1000)
                "f-$it"
            }
            .collect { value -> println(value) }
    }

转换操作符

transform

使用transform ,我们可以在执行异步请求之前发射一个字符串并跟踪这个响应

    runBlocking {
        (1..3).asFlow()
            .transform {
                request ->
                emit("test-$request")
                //耗时操作
                delay(500)
                emit(request)
            }
            .collect { value -> println(value) }
    }
test-1
1
test-2
2
test-3
3

限长操作符

在 流 触及相应限制的时候会将它的执行取消。协程中的取消操作总是通过抛出异常来执行,这样所有的资源管理函数(try{},finally{}块 会在取消的情况下正常运行

take

获取指定个数的发射个数,到达上限将停止发射

  runBlocking {
        (1..3).asFlow()
            .take(1)
            .collect { value -> println(value) }
    }
1  //结果只有一个

末端流操作符

toList

//toList
    runBlocking {
        (1..3).asFlow()
            .toList().let(::println)
    }
[1, 2, 3]

toSet

//toSet
    runBlocking {
        (1..3).asFlow()
            .toSet().let(::println)
    }
[1, 2, 3]

first

将流规约为单个值。即只发送第一个数据

 runBlocking {
        (1..3).asFlow()
            .first().let(::println)
    }
1

single

将流规约为单个值。即只发送第一个数据,不同的是,如果发送数据大于1个,将抛出 IllegalStateException

 //single
    runBlocking {
        flowOf(1,2).single().let(::println)
        //flowOf(1).single().let(::println)
    }
1 
Exception in thread "main" java.lang.IllegalStateException: Expected only one element

reduce

对流进行累加,数据累加

runBlocking {
        (1..3).asFlow()
            .reduce { a, b -> a + b }.let(::println)
    }
6

fold

对流进行累加,数据累加。不同于 reduce 的是,fold 可以赋初值

runBlocking {
        (1..3).asFlow()
            .fold(10) {acc, i -> acc + i }.let(::println)
    }
16

流是连续的

在kotlin中,流是按照顺序执行的。从上游到下游的每个过渡操作符都会处理每个发射出的值然后再交给末端操作符。

简单理解就是,从上到下顺序执行,只有满足上游条件才会执行下面操作符。

 (1..5).asFlow()
        .filter {
            println("filter=$it")
            it % 2 == 0
        }
        .map {
            println("map=$it")
            "map-$it"
        }
        .collect {
            println(
                "collect-$it"
            )
        }
filter=1
filter=2
map=2
collect-map-2
filter=3
filter=4
map=4
collect-map-4
filter=5

Flow中的错误示例

在协程中,通常使用 withContext 切换上下文 (简单理解切换线程,不过也并不准确,因为协程的上下文包含很多数据,如value等,我们通常只是用来切换线程) 但是 flow{} 构建器中的代码必须遵循上下文保存属性(即不允许更改上下文),并且不允许从其他上下文中发射数据 (不允许从其他launch{}发射)。

但可以通过 flowOn更改

错误案例1: 更改上下文

fun main() {
    runBlocking {
        flow {
            withContext(Dispatchers.Default) {
                for (i in 1..3) {
                    delay(100)
                    emit(i)
                }
            }
        }.collect().let(::println)
    }
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
    Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@1f88ae32, BlockingEventLoop@32db9536],
    but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@77a8c47e, DefaultDispatcher].
    Please refer to 'flow' documentation or use 'flowOn' instead
...

错误案例2:从别的上下文发射数据

fun main() {
    runBlocking {
        flow {
            launch {
                emit(123)
                emit(123)
                emit(123)
            }
            delay(100)
        }.collect {
            println(it)
        }
    }
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
    Emission from another coroutine is detected.
    Child of "coroutine#2":StandaloneCoroutine{Active}@379619aa, expected child of "coroutine#1":BlockingCoroutine{Active}@cac736f.
    FlowCollector is not thread-safe and concurrent emissions are prohibited.
    To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
  at 
  ...

flowOn

用于更改流发射的上下文。

fun main() {
    runBlocking {
        logThread()
        flow {
            for (i in 1..3) {
                logThread()
                emit(i)
                delay(10)
            }
        }.flowOn(Dispatchers.IO).collect {
            println(it)
        }
    }
}
当前所在线程----main @coroutine#1
当前所在线程----DefaultDispatcher-worker-1 @coroutine#2
1
当前所在线程----DefaultDispatcher-worker-1 @coroutine#2
2
当前所在线程----DefaultDispatcher-worker-1 @coroutine#2
3

这里我们收集在主线程中,发射数据在IO线程。也意味着我们收集与发射此时处于两个协程之中。

Buffer

流的发射与收集通常是按顺序执行,通过上面我们发现,将流 的不同部分运行在不同的协程中将对于时间有大幅度减少。但现在如果我们不使用 flowOn,此时发射一个流(emit)和收集流(collect)的耗时将累加起来。


比如发射一个流需要100ms,收集需要200ms,则发送3个流并收集总需要至少900ms+

fun main() {
    runBlocking {
        val start=System.currentTimeMillis()
        flow {
            for (i in 1..3) {
                emit(i)
                delay(100)
            }
        }.collect {
            delay(300)
            println(it)
        }
        println("花费时间-${System.currentTimeMillis()-start}ms")
    }
}
1
2
3
花费时间-1230ms

我们可以在流上使用 buffer 操作符来并发运行 数据发射及收集,而不是按顺序执行

更改代码如下:

fun main() {
    runBlocking {
        val start = System.currentTimeMillis()
        flow {
            for (i in 1..3) {
                logThread()
                emit(i)
                delay(100)
            }
        }.buffer().collect {
            logThread()
            delay(300)
            println(it)
        }
        println("花费时间-${System.currentTimeMillis() - start}ms")
    }
}
当前所在线程----main @coroutine#2
当前所在线程----main @coroutine#1
当前所在线程----main @coroutine#2
当前所在线程----main @coroutine#2
1
当前所在线程----main @coroutine#1
2
当前所在线程----main @coroutine#1
3
花费时间-961ms

我们发现,实则buffer也是内部切换了线程,也就是说buffer和 flowOn使用了相同的缓存机制,只不过 buffer 没有显改变上下文。

目录
相关文章
|
17天前
|
存储 Java 调度
Android面试题之Kotlin 协程的挂起、执行和恢复过程
了解Kotlin协程的挂起、执行和恢复机制。挂起时,状态和上下文(局部变量、调用栈、调度器等)被保存;挂起点通过`Continuation`对象处理,释放线程控制权。当恢复条件满足,调度器重新分配线程,调用`resumeWith`恢复执行。关注公众号“AntDream”获取更多并发知识。
28 2
|
2月前
|
移动开发 Android开发 开发者
构建高效Android应用:Kotlin与协程的完美融合
【5月更文挑战第25天】 在移动开发的世界中,性能和响应性是衡量应用质量的关键指标。随着Kotlin的流行和协程的引入,Android开发者现在有了更强大的工具来提升应用的性能和用户体验。本文深入探讨了Kotlin语言如何与协程相结合,为Android应用开发带来异步处理能力的同时,保持代码的简洁性和可读性。我们将通过实际案例分析,展示如何在Android项目中实现协程,以及它们如何帮助开发者更有效地管理后台任务和用户界面的流畅交互。
|
2月前
|
移动开发 数据库 Android开发
构建高效Android应用:探究Kotlin的协程优势
【5月更文挑战第22天】随着移动开发技术的不断进步,Android平台的性能优化已经成为开发者关注的焦点。在众多提升应用性能的手段中,Kotlin语言提供的协程概念因其轻量级线程管理和异步编程能力而受到广泛关注。本文将深入探讨Kotlin协程在Android开发中的应用,以及它如何帮助开发者构建出更高效、响应更快的应用,同时保持代码的简洁性和可读性。
|
2月前
|
移动开发 Android开发 开发者
构建高效安卓应用:Kotlin 协程的实践指南
【5月更文挑战第18天】 随着移动开发技术的不断进步,安卓平台亟需一种高效的异步编程解决方案来应对日益复杂的应用需求。Kotlin 协程作为一种新兴的轻量级线程管理机制,以其简洁的语法和强大的功能,成为解决这一问题的关键。本文将深入探讨Kotlin协程在安卓开发中的实际应用,从基本概念到高级技巧,为开发者提供一份全面的实践指南,旨在帮助读者构建更加高效、稳定的安卓应用。
|
2月前
|
移动开发 数据处理 Android开发
构建高效Android应用:Kotlin的协程与Flow的使用
【5月更文挑战第23天】 在移动开发领域,性能优化和异步编程一直是核心议题。随着Kotlin语言在Android开发中的普及,其提供的协程(coroutines)和流式编程(Flow)功能为开发者带来了革命性的工具,以更简洁、高效的方式处理异步任务和数据流。本文将深入探讨Kotlin协程和Flow在Android应用中的实际应用,以及它们如何帮助开发者编写更加响应迅速且不阻塞用户界面的应用程序。我们将通过具体案例分析这两种技术的优势,并展示如何在现有项目中实现这些功能。
|
2月前
|
安全 调度 Python
探索Python中的并发编程:协程与多线程的比较
本文将深入探讨Python中的并发编程技术,重点比较协程与多线程的特点和应用场景。通过对协程和多线程的原理解析,以及在实际项目中的应用案例分析,读者将能够更好地理解两种并发编程模型的异同,并在实践中选择合适的方案来提升Python程序的性能和效率。
|
7天前
|
数据挖掘 程序员 调度
Python并发编程之协程与异步IO
传统的多线程和多进程模型在处理大规模并发时存在一些性能瓶颈和资源消耗问题。本文将重点介绍Python中基于协程和异步IO的并发编程方法,探讨其工作原理和实际应用,帮助开发者更好地理解并利用Python的并发编程能力。
|
8天前
|
开发者 Python
探索 Python 中的协程:从基本概念到实际应用
在现代编程中,异步处理变得越来越重要,Python 通过其内置的协程提供了强大的工具来简化这一过程。本文将深入探讨 Python 中的协程,从基本概念出发,逐步展示其实际应用,并通过具体代码示例帮助你掌握这种技术。
|
5天前
|
安全 Unix API
完整了解如何在python中处理协程和流
【6月更文挑战第25天】本文介绍异步库asyncio的概念和用法,异步编程在Python中是通过事件循环和协程实现并发,随着版本更新,API有所变化。
25 1
|
13天前
|
数据挖掘 调度 开发者
Python并发编程的艺术:掌握线程、进程与协程的同步技巧
并发编程在Python中涵盖线程、进程和协程,用于优化IO操作和响应速度。`threading`模块支持线程,`multiprocessing`处理进程,而`asyncio`则用于协程。线程通过Lock和Condition Objects同步,进程使用Queue和Pipe通信。协程利用异步事件循环避免上下文切换。了解并发模型及同步技术是提升Python应用性能的关键。
37 5