kotlin通道讲解

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: kotlin通道讲解

概览


关于通道你可以理解为生产者消费者模式,类似BlockQueue,一端生成数据发送数据,另外一边接收数据消费数据


通道基础


构建简单通道

我们构建一个简单的通道

本例中product和consumer两个方法都传入协程作用域和通道对象两个参数,

其中product单开一个协程中每隔100ms向通道中发送一条自增的int类型数据,发四条

consumer中也单开一个协程,协程中开启while循环,如果通道发送端没有关闭数据就一直轮询获取通道中的最新值打印出来

后面章节中的代码都在这个的基础上进行改动

  1. 代码
fun main() {
    channel1()
}
fun channel1() = runBlocking {
    val channel = Channel<Int>()
    product(this, channel)
    consumer(this, channel)
}
/**
 * 发送数据
 */
fun product(scope: CoroutineScope, channel: Channel<Int>) {
    var count = 0
    scope.launch {
        while (count < 4) {
            channel.send(count++)
            delay(100)
        }
    }
}
/**
 * 消费数据
 */
fun consumer(scope: CoroutineScope, channel: Channel<Int>) {
    scope.launch {
        while (!channel.isClosedForSend) {
            delay(100)
            println(channel.receive())
        }
    }
}
复制代码
  1. 日志
0
1
2
3
复制代码


使用consume系列方法消费数据

消费者中可以使用consumeEachIndexed和consumeEach两个方法消费数据。

fun consumer(scope: CoroutineScope, channel: Channel<Int>) {
        scope.launch {
            channel.consumeEach {
                println(it)
            }
        }
    }
复制代码


关闭与迭代通道

通道可以调用Channel.close来关闭通道,这样当我们不需要通道的时候可以主动关闭通道。

当我们调用Channel.close关闭通道的时候,如果通道中有消费者没有消费完成的数据,那么消费者会继续消费完剩余数据

  1. 代码
class Channel2{
    fun channel1() = runBlocking {
        val channel = Channel<Int>()
        product(this, channel)
        consumer(this, channel)
    }
    /**
     * 发送数据
     */
    fun product(scope: CoroutineScope, channel: Channel<Int>) {
        var count = 0
        scope.launch {
            while (count < 4) {
                channel.send(count++)
                delay(100)//延迟100ms
            }
            channel.close()//这里如果调用了close程序可以正常完成,如果不调用程序不能正常完成
        }
    }
    /**
     * 消费数据
     */
    fun consumer(scope: CoroutineScope, channel: Channel<Int>) {
        scope.launch {
            channel.consumeEach {
                delay(200)//延迟200ms,因为生产者那里延迟了100ms,所以消费者的的存活时间是远低于生产者的
                println(it)
            }
        }
    }
}
复制代码


  1. 日志

不关闭通道日志:

image.png

关闭通道日志:

image.png

本例代码中我们必须在生产者发送完数据后主动调用close关闭通道,否则通道会一直阻止程序结束


使用 produce 函数构建管道


我们可以通过produce函数便携的构建一个生产者,如下所示,我们将produce函数构建为CoroutineScope的扩展函数,然后就可以直接在runBlocking中直接调用了。

管道可以被取消

  1. 代码
fun CoroutineScope.produce()=produce<Int> {
    for (i in 1..4){
        send(i)
    }
}
fun main()= runBlocking {
    produce().consumeEach {
        println(it)
    }
}
复制代码
  1. 日志

image.png


带缓冲的通道


  1. 代码
class BufferChannel {
    private val channel = Channel<Int>(4)
    private val scope = CoroutineScope(Dispatchers.Default)
    private var count = 0
    fun product() {
        scope.launch {
            while (count < 10) {
                channel.send(count++)
            }
        }
    }
    fun consumer() {
        scope.launch {
            channel.consumeEach {
                println("消费数据:$it")
            }
        }
    }
}
fun main() = runBlocking {
    BufferChannel().apply {
        product()
        consumer()
    }
    println("执行结束")
}
复制代码
  1. 日志
执行结束
消费数据:0
消费数据:1
消费数据:2
消费数据:3
复制代码


带缓冲的通道


无缓冲的通道在发送者和接收者相遇时传输元素(也称“对接”)。如果发送先被调用,则它将被挂起直到接收被调用, 如果接收先被调用,它将被挂起直到发送被调用。

Channel和produce都可以指定一个缓冲,当缓冲没有满的时候生产者和消费者都去缓冲中操作数据,当缓冲满的时候才会进行阻塞

目前这个场景不好模拟,我只能先放上来代码了

  1. 代码
class BufferChannel {
    private val channel = Channel<Int>(5)
    private var count = 0
    fun product(scope: CoroutineScope) {
        scope.launch {
            while (count < 10) {
                channel.send(count++)
            }
        }
    }
    fun consumer(scope: CoroutineScope) {
        scope.launch {
           while (!channel.isClosedForSend){
               println(channel.receive())
           }
        }
    }
}
fun main() = runBlocking {
    BufferChannel().let {
        it.product(this)
        it.consumer(this)
    }
    println("执行结束")
}
复制代码
  1. 日志
执行结束
0
1
2
3
4
5
6
7
8
9



相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
5月前
|
缓存 数据处理 Android开发
Android经典实战之Kotlin常用的 Flow 操作符
本文介绍 Kotlin 中 `Flow` 的多种实用操作符,包括转换、过滤、聚合等,通过简洁易懂的例子展示了每个操作符的功能,如 `map`、`filter` 和 `fold` 等,帮助开发者更好地理解和运用 `Flow` 来处理异步数据流。
166 4
|
5月前
|
存储 前端开发 编译器
深入理解Kotlin中的数据类及其应用
【8月更文挑战第31天】
70 0
|
6月前
|
安全 Android开发 Kotlin
Android经典面试题之Kotlin中常见作用域函数
**Kotlin作用域函数概览**: `let`, `run`, `with`, `apply`, `also`. `let`安全调用并返回结果; `run`在上下文中执行代码并返回结果; `with`执行代码块,返回结果; `apply`配置对象后返回自身; `also`附加操作后返回自身
64 8
|
6月前
|
Android开发 Kotlin
Android经典面试题之Kotlin中Lambda表达式有哪些用法
Kotlin的Lambda表达式是匿名函数的简洁形式,常用于集合操作和高阶函数。基本语法是`{参数 -&gt; 表达式}`。例如,`{a, b -&gt; a + b}`是一个加法lambda。它们可在`map`、`filter`等函数中使用,也可作为参数传递。单参数时可使用`it`关键字,如`list.map { it * 2 }`。类型推断简化了类型声明。
36 0
|
7月前
|
Java Kotlin
Kotlin 中的 with 函数简介
Kotlin 中的 with 函数简介
247 0
|
7月前
|
Kotlin
Kotlin函数
Kotlin函数
197 0
|
安全 JavaScript 前端开发
Kotlin 之类和接口
Kotlin 之类和接口
Kotlin 之类和接口
|
Java API 开发者
Kotlin中的序列
Kotlin里面的集合式api和Java类似,但也有区别,Kotlin里面加入了可变和不可变的特性,例如可变集合MutableList,不可变的则是List,这部分的功能主要是和语言的特性相关联的,从Kotlin对于变量的定义中就可以看出来。
90 0
|
存储 缓存 算法
Kotlin | 扩展函数(终于知道为什么 with 用 this,let 用 it)
Kotlin | 扩展函数(终于知道为什么 with 用 this,let 用 it)
316 0
Kotlin | 扩展函数(终于知道为什么 with 用 this,let 用 it)
|
JavaScript 前端开发 Java
浅谈Kotlin中的函数
本文简单谈下Kotlin中的函数,包括表达式函数体,命名参数,默认参数,顶层函数,扩展函数,局部函数,Lambda表达式,成员引用,with/apply函数等。从例子入手,从一般写法到使用特性进行简化,再到原理解析。
1148 0