kotlin通道讲解

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: kotlin通道讲解

概览


关于通道你可以理解为生产者消费者模式,类似BlockQueue,一端生成数据发送数据,另外一边接收数据消费数据


通道基础


构建简单通道

我们构建一个简单的通道

本例中product和consumer两个方法都传入协程作用域和通道对象两个参数,

其中product单开一个协程中每隔100ms向通道中发送一条自增的int类型数据,发四条

consumer中也单开一个协程,协程中开启while循环,如果通道发送端没有关闭数据就一直轮询获取通道中的最新值打印出来

后面章节中的代码都在这个的基础上进行改动

  1. 代码
fun main() {
    channel1()
}
fun channel1() = runBlocking {
    val channel = Channel<Int>()
    product(this, channel)
    consumer(this, channel)
}
/**
 * 发送数据
 */
fun product(scope: CoroutineScope, channel: Channel<Int>) {
    var count = 0
    scope.launch {
        while (count < 4) {
            channel.send(count++)
            delay(100)
        }
    }
}
/**
 * 消费数据
 */
fun consumer(scope: CoroutineScope, channel: Channel<Int>) {
    scope.launch {
        while (!channel.isClosedForSend) {
            delay(100)
            println(channel.receive())
        }
    }
}
复制代码
  1. 日志
0
1
2
3
复制代码


使用consume系列方法消费数据

消费者中可以使用consumeEachIndexed和consumeEach两个方法消费数据。

fun consumer(scope: CoroutineScope, channel: Channel<Int>) {
        scope.launch {
            channel.consumeEach {
                println(it)
            }
        }
    }
复制代码


关闭与迭代通道

通道可以调用Channel.close来关闭通道,这样当我们不需要通道的时候可以主动关闭通道。

当我们调用Channel.close关闭通道的时候,如果通道中有消费者没有消费完成的数据,那么消费者会继续消费完剩余数据

  1. 代码
class Channel2{
    fun channel1() = runBlocking {
        val channel = Channel<Int>()
        product(this, channel)
        consumer(this, channel)
    }
    /**
     * 发送数据
     */
    fun product(scope: CoroutineScope, channel: Channel<Int>) {
        var count = 0
        scope.launch {
            while (count < 4) {
                channel.send(count++)
                delay(100)//延迟100ms
            }
            channel.close()//这里如果调用了close程序可以正常完成,如果不调用程序不能正常完成
        }
    }
    /**
     * 消费数据
     */
    fun consumer(scope: CoroutineScope, channel: Channel<Int>) {
        scope.launch {
            channel.consumeEach {
                delay(200)//延迟200ms,因为生产者那里延迟了100ms,所以消费者的的存活时间是远低于生产者的
                println(it)
            }
        }
    }
}
复制代码


  1. 日志

不关闭通道日志:

image.png

关闭通道日志:

image.png

本例代码中我们必须在生产者发送完数据后主动调用close关闭通道,否则通道会一直阻止程序结束


使用 produce 函数构建管道


我们可以通过produce函数便携的构建一个生产者,如下所示,我们将produce函数构建为CoroutineScope的扩展函数,然后就可以直接在runBlocking中直接调用了。

管道可以被取消

  1. 代码
fun CoroutineScope.produce()=produce<Int> {
    for (i in 1..4){
        send(i)
    }
}
fun main()= runBlocking {
    produce().consumeEach {
        println(it)
    }
}
复制代码
  1. 日志

image.png


带缓冲的通道


  1. 代码
class BufferChannel {
    private val channel = Channel<Int>(4)
    private val scope = CoroutineScope(Dispatchers.Default)
    private var count = 0
    fun product() {
        scope.launch {
            while (count < 10) {
                channel.send(count++)
            }
        }
    }
    fun consumer() {
        scope.launch {
            channel.consumeEach {
                println("消费数据:$it")
            }
        }
    }
}
fun main() = runBlocking {
    BufferChannel().apply {
        product()
        consumer()
    }
    println("执行结束")
}
复制代码
  1. 日志
执行结束
消费数据:0
消费数据:1
消费数据:2
消费数据:3
复制代码


带缓冲的通道


无缓冲的通道在发送者和接收者相遇时传输元素(也称“对接”)。如果发送先被调用,则它将被挂起直到接收被调用, 如果接收先被调用,它将被挂起直到发送被调用。

Channel和produce都可以指定一个缓冲,当缓冲没有满的时候生产者和消费者都去缓冲中操作数据,当缓冲满的时候才会进行阻塞

目前这个场景不好模拟,我只能先放上来代码了

  1. 代码
class BufferChannel {
    private val channel = Channel<Int>(5)
    private var count = 0
    fun product(scope: CoroutineScope) {
        scope.launch {
            while (count < 10) {
                channel.send(count++)
            }
        }
    }
    fun consumer(scope: CoroutineScope) {
        scope.launch {
           while (!channel.isClosedForSend){
               println(channel.receive())
           }
        }
    }
}
fun main() = runBlocking {
    BufferChannel().let {
        it.product(this)
        it.consumer(this)
    }
    println("执行结束")
}
复制代码
  1. 日志
执行结束
0
1
2
3
4
5
6
7
8
9



相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
2月前
|
存储 前端开发 编译器
深入理解Kotlin中的数据类及其应用
【8月更文挑战第31天】
15 0
|
4月前
|
消息中间件 缓存 API
Kotlin中的StateFlow和SharedFlow有什么区别?
- `StateFlow`持有一个最新的状态值,适合UI状态管理,新订阅者立即收到当前值。 - `SharedFlow`是通用热流,用于事件总线,不保留最新状态但可配置重播。 - `StateFlow`继承自`SharedFlow`,更专注于状态管理,而`SharedFlow`提供事件处理灵活性。 - 示例中展示了如何`emit`新值和`collect`变化。 - 选择`StateFlow`用于单最新状态共享,`SharedFlow`则适用于需要事件历史或定制策略的场景。 关注公众号“AntDream”了解更多内容!
60 1
|
4月前
|
Kotlin
Kotlin函数
Kotlin函数
188 0
|
JSON 数据格式 Kotlin
Kotlin | Flow数据流的几种使用场景
Flow数据流的几种使用场景及其注意事项
387 0
|
编译器 Android开发 Kotlin
Android基础--kotlin(八)Kotlin 扩展数据类与密封类
Android基础--kotlin(八)Kotlin 扩展数据类与密封类
|
安全 JavaScript 前端开发
Kotlin 之类和接口
Kotlin 之类和接口
Kotlin 之类和接口
|
存储 缓存 算法
Kotlin | 扩展函数(终于知道为什么 with 用 this,let 用 it)
Kotlin | 扩展函数(终于知道为什么 with 用 this,let 用 it)
261 0
Kotlin | 扩展函数(终于知道为什么 with 用 this,let 用 it)
|
安全 Java 编译器
Kotlin学习日志(四)函数
Kotlin学习日志(四)函数
132 0
Kotlin学习日志(四)函数
|
网络协议 开发工具 Android开发
基于 Kotlin 特性开发的有限状态机
基于 Kotlin 特性开发的有限状态机
312 0
基于 Kotlin 特性开发的有限状态机
|
JavaScript 前端开发 Java
浅谈Kotlin中的函数
本文简单谈下Kotlin中的函数,包括表达式函数体,命名参数,默认参数,顶层函数,扩展函数,局部函数,Lambda表达式,成员引用,with/apply函数等。从例子入手,从一般写法到使用特性进行简化,再到原理解析。
1135 0