合并
conflate
用于跳过中间的值,只处理最新的值。
fun main() { measureTimeMillis { runBlocking { (1..5).asFlow() .conflate() .buffer() .collect { delay(100) println(it) } } }.let{ println("花费时间-${it}ms") } }
1 5 花费时间-325ms
处理最新值
collectLatest & conf
取消缓慢的收集器,并在每次发射新值的时候重新启动它。
fun main() { measureTimeMillis { runBlocking { (1..5).asFlow() .collectLatest { println(it) delay(100) println("重新发射-$it") } } }.let { println("花费时间-${it}ms") } }
1 2 3 4 5 重新发射-5 花费时间-267ms
组合流
zip
组合两个流中的相关值
fun main() { measureTimeMillis { val strs= flowOf("one","two","three") runBlocking { (1..5).asFlow() .zip(strs){ a,b -> "$a -> $b" }.collect { println(it) } } }.let { println("花费时间-${it}ms") } }
1 -> one 2 -> two 3 -> three 花费时间-120ms
Combine
suspend fun main() { val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒 val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串 val startTime = System.currentTimeMillis() // 记录开始的时间 nums.combine(strs) { a, b -> "$a -> $b" } // 使用“zip”组合单个字符串 .collect { value -> // 收集并打印 println("$value at ${System.currentTimeMillis() - startTime} ms from start") } }
1 -> one at 472 ms from start 2 -> one at 682 ms from start 2 -> two at 874 ms from start 3 -> two at 987 ms from start 3 -> three at 1280 ms from start
通道Channel
- 非阻塞的通信基础设施
- 类似于 BlockingQueue+suspend
提供了一种在 Flow 中传递数据的方法
Channel的分类
Channel基础
suspend fun main() { runBlocking { val channel = Channel<Int>() launch { // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送 for (x in 1..5) channel.send(x * x) } // 这里我们打印了 5 次被接收的整数: repeat(5) { println(channel.receive()) } println("Done!") } }
1 4 9 16 Done!
channel的关闭与迭代
suspend fun main() { runBlocking { val channel = Channel<Int>() launch { // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送 for (x in 1..5) { channel.send(x * x) } channel.close() println("通道是否关闭+${channel.isClosedForSend}") } for (i in channel) { println(i) } println("是否接收了所有的数据+${channel.isClosedForReceive}") } }
1 4 9 16 25
consumeEach
用以代替for循环
suspend fun main() { runBlocking { val channel = Channel<Int>() launch { // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送 for (x in 1..5) { channel.send(x * x) } channel.close() } channel.consumeEach(::println) } }
Channel的协程Builder
- Produce 启动一个生产者协程,返回ReceiveChannel
- actor : 启动一个消费者协程,返回 SendChannel (暂时废弃)
- 以上Builder启动的协程结束后悔自动关闭对应的 Channel
Produce
suspend fun main() { val receiveChannel = GlobalScope.produce(capacity = Channel.UNLIMITED) { for (i in 0..3) { send(i) } } GlobalScope.launch { receiveChannel.consumeEach(::println) }.join() }
0 1 2 3
actor
suspend fun main() { val sendChannel = GlobalScope.actor<Int>(capacity = Channel.UNLIMITED) { channel.consumeEach(::println) } GlobalScope.launch { for (i in 1..3){ sendChannel.send(i) } }.join() }
BroadcastChannel
- Channel 的元素只能被一个消费者消费
- BroadcastChannel 的元素可以分发给所有的订阅者
- BroadcastChannel 不支持RENDEZVOUS
suspend fun main() { val broadcastChannel = GlobalScope.broadcast { for (i in 1..3) send(i) } List(3) { GlobalScope.launch { val receiveChannel = broadcastChannel.openSubscription() println("-----$it") receiveChannel.consumeEach(::println) } }.joinAll() }
-----2 -----0 -----1 1 1 1 2 2 3 3 2 3
Select(实验性)
- 是一个IO多路复用的概念
- koltin中用于挂起函数的多路复用
Select表达式可以同时等待多个挂起函数,并选择第一个可用的
在Channel使用
suspend fun main() { runBlocking { val fizz = fizz() val buzz = buzz() repeat(7) { selectFizzBuzz(fizz, buzz) } coroutineContext.cancelChildren() } } suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) { select<Unit> { // <Unit> 意味着该 select 表达式不返回任何结果 fizz.onReceive { value -> // 这是第一个 select 子句 println("fizz -> '$value'") } buzz.onReceive { value -> // 这是第二个 select 子句 println("buzz -> '$value'") } } } @ExperimentalCoroutinesApi fun CoroutineScope.fizz() = produce { while (true) { delay(300) send("Fizz") } } @ExperimentalCoroutinesApi fun CoroutineScope.buzz() = produce { while (true) { delay(500) send("Buzz") } }
fizz -> 'Fizz' buzz -> 'Buzz' fizz -> 'Fizz' fizz -> 'Fizz' buzz -> 'Buzz' fizz -> 'Fizz' buzz -> 'Buzz'
使用receive 挂起函数,我们可以从两个通道中接收其中一个数据,但是select允许我们使用其 onReceive 子句同时从两者接收。
注意:onReceiver 在已经该关闭的通道执行会发生失败并抛出异常,我们可以使用onReceiveOrNull 子句在关闭通道时执行特定操作