异步流
挂起函数可以异步的返回单个值,而如何返回多个计算好的值,这正是 Flow(流)的使用之处
使用 list 表示多个值
fun foo(): List<Int> = listOf(1, 2, 3) fun main() { foo().forEach { value -> println(value) } }
1 2 3
我们可以看到,相应的值是瞬间一起返回的,如果我们需要他们单个返回呢?
使用 Sequence 表示多个值
使用 Sequence 可以做到同步的返回数据,但是其同时阻塞了线程
fun main() { foo().forEach(::println) } fun foo():Sequence<Int> = sequence { println(System.currentTimeMillis()) for (i in 1..3){ Thread.sleep(300) //yield 产生一个值,并挂起等待下一次调用 yield(i) } println(System.currentTimeMillis()) }
1578822422344 1 //->间隔300ms 2 //->间隔300ms 3 //->间隔300ms 1578822423255
挂起函数
使用 suspend 标志的即为挂起函数。对于编译器来说,suspend 只是起到一个标志作用。
在我们上面的代码中,suspend 我们经常见。
Flow
使用list返回结果,意味着我们会一次返回所有值,而使用Sequence虽然可以做到同步返回,但如果有耗时操作,又会阻塞我们的线程。
flow 正是解决上面存在的弊端的存在。
fun main() { runBlocking { logThread() launch { logThread() for (i in 1..3) { println("lauch块+$i") delay(100) } } logThread() foo().collect { value -> println(value) } } } fun foo(): Flow<Int> = flow { for (i in 1..3) { delay(100) emit(i) } } fun logThread() = println("当前所在线程----${Thread.currentThread().name}")
当前所在线程----main @coroutine#1 当前所在线程----main @coroutine#1 当前所在线程----main @coroutine#2 //lauch{} lauch块+1 1 lauch块+2 2 lauch块+3 3
flow{} 中的代码可以挂起
使用 emit() 函数发射值
使用 collect 函数 收集 值。(可以认为是启动)
取消Flow
取消一个 Flow ,其实就是取消协程,我们无法直接取消Flow,但可以通过取消Flow 所在的协程达到目的。
观察上面的demo,我们如果给foo() 方法中打印所在线程,就会发现,它所在的线程与 runBlocking 为同一个,即 foo() 使用的是 runBlocking 上下文。
我们改动代码如下:
fun main(){ ... withTimeoutOrNull(200){ foo().collect { value -> println(value) } } ... } ...
当前所在线程----main @coroutine#1 当前所在线程----main @coroutine#2 lauch块+1 1 lauch块+2 lauch块+3
为什么 lauch依然运行呢?
我们在前面已经说过了,launch{}是独立运行一个协程,与父协程无关,所以此时launch{}不受取消影响
Flow构建器
flowOf
用于定义一个发射固定值集的流
flowOf("123","123").collect{ value -> println(value) }
asFlow
用于将各种集合与序列转为Flow
(1..3).asFlow().collect{value-> println(value)}
过渡性流操作符
map
使用map实现数据转换
runBlocking { (1..3).asFlow() .map { delay(1000) "f-$it" } .collect { value -> println(value) } }
转换操作符
transform
使用transform ,我们可以在执行异步请求之前发射一个字符串并跟踪这个响应
runBlocking { (1..3).asFlow() .transform { request -> emit("test-$request") //耗时操作 delay(500) emit(request) } .collect { value -> println(value) } }
test-1 1 test-2 2 test-3 3
限长操作符
在 流 触及相应限制的时候会将它的执行取消。协程中的取消操作总是通过抛出异常来执行,这样所有的资源管理函数(try{},finally{}块 会在取消的情况下正常运行
take
获取指定个数的发射个数,到达上限将停止发射
runBlocking { (1..3).asFlow() .take(1) .collect { value -> println(value) } }
1 //结果只有一个
末端流操作符
toList
//toList runBlocking { (1..3).asFlow() .toList().let(::println) }
[1, 2, 3]
toSet
//toSet runBlocking { (1..3).asFlow() .toSet().let(::println) }
[1, 2, 3]
first
将流规约为单个值。即只发送第一个数据
runBlocking { (1..3).asFlow() .first().let(::println) }
1
single
将流规约为单个值。即只发送第一个数据,不同的是,如果发送数据大于1个,将抛出 IllegalStateException
//single runBlocking { flowOf(1,2).single().let(::println) //flowOf(1).single().let(::println) }
1 Exception in thread "main" java.lang.IllegalStateException: Expected only one element
reduce
对流进行累加,数据累加
runBlocking { (1..3).asFlow() .reduce { a, b -> a + b }.let(::println) }
6
fold
对流进行累加,数据累加。不同于 reduce 的是,fold 可以赋初值
runBlocking { (1..3).asFlow() .fold(10) {acc, i -> acc + i }.let(::println) }
16
流是连续的
在kotlin中,流是按照顺序执行的。从上游到下游的每个过渡操作符都会处理每个发射出的值然后再交给末端操作符。
简单理解就是,从上到下顺序执行,只有满足上游条件才会执行下面操作符。
(1..5).asFlow() .filter { println("filter=$it") it % 2 == 0 } .map { println("map=$it") "map-$it" } .collect { println( "collect-$it" ) }
filter=1 filter=2 map=2 collect-map-2 filter=3 filter=4 map=4 collect-map-4 filter=5
Flow中的错误示例
在协程中,通常使用 withContext 切换上下文 (简单理解切换线程,不过也并不准确,因为协程的上下文包含很多数据,如value等,我们通常只是用来切换线程) 但是 flow{} 构建器中的代码必须遵循上下文保存属性(即不允许更改上下文),并且不允许从其他上下文中发射数据 (不允许从其他launch{}发射)。
但可以通过 flowOn更改
错误案例1: 更改上下文
fun main() { runBlocking { flow { withContext(Dispatchers.Default) { for (i in 1..3) { delay(100) emit(i) } } }.collect().let(::println) } }
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@1f88ae32, BlockingEventLoop@32db9536], but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@77a8c47e, DefaultDispatcher]. Please refer to 'flow' documentation or use 'flowOn' instead ...
错误案例2:从别的上下文发射数据
fun main() { runBlocking { flow { launch { emit(123) emit(123) emit(123) } delay(100) }.collect { println(it) } } }
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: Emission from another coroutine is detected. Child of "coroutine#2":StandaloneCoroutine{Active}@379619aa, expected child of "coroutine#1":BlockingCoroutine{Active}@cac736f. FlowCollector is not thread-safe and concurrent emissions are prohibited. To mitigate this restriction please use 'channelFlow' builder instead of 'flow' at ...
flowOn
用于更改流发射的上下文。
fun main() { runBlocking { logThread() flow { for (i in 1..3) { logThread() emit(i) delay(10) } }.flowOn(Dispatchers.IO).collect { println(it) } } }
当前所在线程----main @coroutine#1 当前所在线程----DefaultDispatcher-worker-1 @coroutine#2 1 当前所在线程----DefaultDispatcher-worker-1 @coroutine#2 2 当前所在线程----DefaultDispatcher-worker-1 @coroutine#2 3
这里我们收集在主线程中,发射数据在IO线程。也意味着我们收集与发射此时处于两个协程之中。
Buffer
流的发射与收集通常是按顺序执行,通过上面我们发现,将流 的不同部分运行在不同的协程中将对于时间有大幅度减少。但现在如果我们不使用 flowOn,此时发射一个流(emit)和收集流(collect)的耗时将累加起来。
比如发射一个流需要100ms,收集需要200ms,则发送3个流并收集总需要至少900ms+
fun main() { runBlocking { val start=System.currentTimeMillis() flow { for (i in 1..3) { emit(i) delay(100) } }.collect { delay(300) println(it) } println("花费时间-${System.currentTimeMillis()-start}ms") } }
1 2 3 花费时间-1230ms
我们可以在流上使用 buffer 操作符来并发运行 数据发射及收集,而不是按顺序执行
更改代码如下:
fun main() { runBlocking { val start = System.currentTimeMillis() flow { for (i in 1..3) { logThread() emit(i) delay(100) } }.buffer().collect { logThread() delay(300) println(it) } println("花费时间-${System.currentTimeMillis() - start}ms") } }
当前所在线程----main @coroutine#2 当前所在线程----main @coroutine#1 当前所在线程----main @coroutine#2 当前所在线程----main @coroutine#2 1 当前所在线程----main @coroutine#1 2 当前所在线程----main @coroutine#1 3 花费时间-961ms
我们发现,实则buffer也是内部切换了线程,也就是说buffer和 flowOn使用了相同的缓存机制,只不过 buffer 没有显改变上下文。