概览
关于通道你可以理解为生产者消费者模式,类似BlockQueue,一端生成数据发送数据,另外一边接收数据消费数据
通道基础
构建简单通道
我们构建一个简单的通道
本例中product和consumer两个方法都传入协程作用域和通道对象两个参数,
其中product单开一个协程中每隔100ms向通道中发送一条自增的int类型数据,发四条
consumer中也单开一个协程,协程中开启while循环,如果通道发送端没有关闭数据就一直轮询获取通道中的最新值打印出来
后面章节中的代码都在这个的基础上进行改动
- 代码
fun main() { channel1() } fun channel1() = runBlocking { val channel = Channel<Int>() product(this, channel) consumer(this, channel) } /** * 发送数据 */ fun product(scope: CoroutineScope, channel: Channel<Int>) { var count = 0 scope.launch { while (count < 4) { channel.send(count++) delay(100) } } } /** * 消费数据 */ fun consumer(scope: CoroutineScope, channel: Channel<Int>) { scope.launch { while (!channel.isClosedForSend) { delay(100) println(channel.receive()) } } } 复制代码
- 日志
0 1 2 3 复制代码
使用consume系列方法消费数据
消费者中可以使用consumeEachIndexed和consumeEach两个方法消费数据。
fun consumer(scope: CoroutineScope, channel: Channel<Int>) { scope.launch { channel.consumeEach { println(it) } } } 复制代码
关闭与迭代通道
通道可以调用Channel.close来关闭通道,这样当我们不需要通道的时候可以主动关闭通道。
当我们调用Channel.close关闭通道的时候,如果通道中有消费者没有消费完成的数据,那么消费者会继续消费完剩余数据
- 代码
class Channel2{ fun channel1() = runBlocking { val channel = Channel<Int>() product(this, channel) consumer(this, channel) } /** * 发送数据 */ fun product(scope: CoroutineScope, channel: Channel<Int>) { var count = 0 scope.launch { while (count < 4) { channel.send(count++) delay(100)//延迟100ms } channel.close()//这里如果调用了close程序可以正常完成,如果不调用程序不能正常完成 } } /** * 消费数据 */ fun consumer(scope: CoroutineScope, channel: Channel<Int>) { scope.launch { channel.consumeEach { delay(200)//延迟200ms,因为生产者那里延迟了100ms,所以消费者的的存活时间是远低于生产者的 println(it) } } } } 复制代码
- 日志
不关闭通道日志:
关闭通道日志:
本例代码中我们必须在生产者发送完数据后主动调用close关闭通道,否则通道会一直阻止程序结束
使用 produce 函数构建管道
我们可以通过produce函数便携的构建一个生产者,如下所示,我们将produce函数构建为CoroutineScope的扩展函数,然后就可以直接在runBlocking中直接调用了。
管道可以被取消
- 代码
fun CoroutineScope.produce()=produce<Int> { for (i in 1..4){ send(i) } } fun main()= runBlocking { produce().consumeEach { println(it) } } 复制代码
- 日志
带缓冲的通道
- 代码
class BufferChannel { private val channel = Channel<Int>(4) private val scope = CoroutineScope(Dispatchers.Default) private var count = 0 fun product() { scope.launch { while (count < 10) { channel.send(count++) } } } fun consumer() { scope.launch { channel.consumeEach { println("消费数据:$it") } } } } fun main() = runBlocking { BufferChannel().apply { product() consumer() } println("执行结束") } 复制代码
- 日志
执行结束 消费数据:0 消费数据:1 消费数据:2 消费数据:3 复制代码
带缓冲的通道
无缓冲的通道在发送者和接收者相遇时传输元素(也称“对接”)。如果发送先被调用,则它将被挂起直到接收被调用, 如果接收先被调用,它将被挂起直到发送被调用。
Channel和produce都可以指定一个缓冲,当缓冲没有满的时候生产者和消费者都去缓冲中操作数据,当缓冲满的时候才会进行阻塞
目前这个场景不好模拟,我只能先放上来代码了
- 代码
class BufferChannel { private val channel = Channel<Int>(5) private var count = 0 fun product(scope: CoroutineScope) { scope.launch { while (count < 10) { channel.send(count++) } } } fun consumer(scope: CoroutineScope) { scope.launch { while (!channel.isClosedForSend){ println(channel.receive()) } } } } fun main() = runBlocking { BufferChannel().let { it.product(this) it.consumer(this) } println("执行结束") } 复制代码
- 日志
执行结束 0 1 2 3 4 5 6 7 8 9