在go中监听多个channel

简介: 在go中监听多个channel

select 关键字


我们可以使用select关键字来同时监听多个goroutine。


package main
import (
    "fmt"
    "time"
)
func main() {
    c1 := make(chan string)
    c2 := make(chan string)
    go func() {
        time.Sleep(1 * time.Second)
        c1 <- time.Now().String()
    }()
    go func() {
        time.Sleep(2 * time.Second)
        c2 <- time.Now().String()
    }()
    for i := 0; i < 2; i++ {
        select {
        case res1 := <-c1:
            fmt.Println("from c1:", res1)
        case res2 := <-c2:
            fmt.Println("from c2:", res2)
        }
    }
}
from c1: 2022-09-04 14:30:39.4469184 -0400 EDT m=+1.000172801
from c2: 2022-09-04 14:30:40.4472699 -0400 EDT m=+2.000524401


上面的代码显示了select关键字是如何工作的:


  • 我们首先创建两个 channel c1和c2来监听。
  • 然后我们生成两个goroutine,分别向c1和c2发送当前时间。
  • 在for循环中,我们创建了一个select语句,并定义了两个 case:第一个 case 是我们何时可以从c1接收,第二个 case 是我们何时可以从c2接收。

你可以看到select语句和switch语句在设计上非常相似。两者都定义了不同的情况,并在满足某种情况时运行相应的代码。另外,我们可以看到select语句是阻塞的。也就是说,它将等待,直到其中一个 case 被满足。


我们为这个循环迭代了两次,因为只有两个goroutine需要监听。更确切地说,每个goroutine都是一个fire-and-forget goroutine,意味着它们在返回之前只向一个通道发送一次。因此,在这段代码中,任何时候都有两个消息的最大值,而我们只需要选择两次。


如果我们不知道工作何时会结束呢?


有时我们不知道有多少个工作。在这种情况下,把 select 语句放在一个while循环里。


package main
import (
    "fmt"
    "math/rand"
    "time"
)
func main() {
    c1 := make(chan string)
    rand.Seed(time.Now().UnixNano())
    for i := 0; i < rand.Intn(10); i++ {
        go func() {
            time.Sleep(1 * time.Second)
            c1 <- time.Now().String()
        }()
    }
    for {
        select {
        case res1 := <-c1:
            fmt.Println("from c1:", res1)
        }
    }
}


因为我们让一个随机数的goroutines运行,所以我们不知道有多少个作业。值得庆幸的是,底部包裹着select语句的for循环将捕获每一个输出。让我们看看如果我们运行这段代码会发生什么。


from c1: 2022-09-04 14:48:47.5145341 -0400 EDT m=+1.000257801
from c1: 2022-09-04 14:48:47.5146126 -0400 EDT m=+1.000336201
from c1: 2022-09-04 14:48:47.5146364 -0400 EDT m=+1.000359901
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
        /home/jacob/blog/testing/listening-to-multiple-channels-in-go/main.go:22 +0x128
exit status 2


嗯,select 语句按预期收到了三次,但由于死锁,程序出错了。为什么会出现这种情况?


记住,在没有任何条件的情况下,Go中的for循环会永远运行。这意味着 select 语句将永远尝试接收。然而,要运行的作业数量是有限的。即使没有更多的工作,select 语句仍然会尝试接收。


还记得在本系列的第一篇文章中我说过,如果你在发送方没有准备好的时候试图从一个没有缓冲的通道中接收,你的程序就会陷入死锁。这正是我们例子中的情况。


那么我们如何解决这个问题呢?我们可以使用以前文章中涉及的概念的组合:退出通道和WaitGroups。

package main
import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)
func main() {
    c1 := make(chan string)
    exit := make(chan struct{})
    rand.Seed(time.Now().UnixNano())
    var wg sync.WaitGroup
    go func() {
        numJob := rand.Intn(10)
        fmt.Println("number of jobs:", numJob)
        for i := 0; i < numJob; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                time.Sleep(1 * time.Second)
                c1 <- time.Now().String()
            }()
        }
        wg.Wait()
        close(exit)
    }()
    for {
        select {
        case res1 := <-c1:
            fmt.Println("from c1:", res1)
        case <-exit:
            return
        }
    }
}
3
from c1: 2022-09-04 15:09:08.6936976 -0400 EDT m=+1.000287801
from c1: 2022-09-04 15:09:08.6937788 -0400 EDT m=+1.000369101
from c1: 2022-09-04 15:09:08.6937949 -0400 EDT m=+1.000385101
  • 我们创建一个 exit channel 和一个WaitGroup。
  • 每次运行的作业数都是随机的。对于numJobs的次数,我们启动goroutines。为了等待作业的完成,我们把它们加到wg中。当一个工作完成后,我们从wg中减去一个。
  • 一旦所有工作完成,我们就关闭exit channel
  • 我们将上述部分包裹在一个goroutine中,因为我们希望所有的部分都是无阻塞的。如果我们不把它包在一个goroutine中,wg.Wait()就会等待,直到作业完成。这将阻塞代码,并且不会让底部的for-select语句运行。
  • 此外,由于c1是一个未缓冲的通道,等待所有的goroutine将消息发送到c1,将导致许多消息被发送到c1而没有for-select语句接收。这就导致了一个死锁,因为当发送者准备好时,接收者还没有准备好。


如何使select成为非阻塞式


select语句默认是阻塞的。我们如何使其成为非阻塞的呢?这很简单--我们只需添加一个 default case。


package main
import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)
func main() {
    ashleyMsg := make(chan string)
    brianMsg := make(chan string)
    exit := make(chan struct{})
    rand.Seed(time.Now().UnixNano())
    var wg sync.WaitGroup
    go func() {
        numJob := rand.Intn(10)
        fmt.Println("number of jobs:", numJob)
        for i := 0; i < numJob; i++ {
            wg.Add(2)
            go func() {
                defer wg.Done()
                time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
                ashleyMsg <- "hi"
            }()
            go func() {
                defer wg.Done()
                time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
                brianMsg <- "what's up"
            }()
        }
        wg.Wait()
        close(exit)
    }()
    for {
        select {
        case res1 := <-ashleyMsg:
            fmt.Println("ashley:", res1)
        case res2 := <-brianMsg:
            fmt.Println("brian:", res2)
        case <-exit:
            fmt.Println("chat ended")
            return
        default:
            fmt.Println("...")
            time.Sleep(time.Millisecond)
        }
    }
}
...
number of jobs: 4
brian: what's up
...
ashley: hi
...
...
brian: what's up
ashley: hi
ashley: hi
brian: what's up
...
...
ashley: hi
...
brian: what's up
...
chat ended


除了蹩脚的对话之外,我们可以看到默认情况下的工作方式。与其等待聊天记录的到来,我们可以在没有频道可以接收的时候做一些事情。在这个例子中,我们只是打印出省略号,但你可以做任何你想做的事情。

相关文章
|
4月前
|
存储 安全 Java
Go 基础数据结构的底层原理(slice,channel,map)
Go 基础数据结构的底层原理(slice,channel,map)
83 0
|
3月前
|
Go
go之channel关闭与广播
go之channel关闭与广播
|
1月前
|
消息中间件 Kafka Go
从Go channel中批量读取数据
从Go channel中批量读取数据
|
1月前
|
缓存 并行计算 Go
go 语言中 channel 的简单介绍
go 语言中 channel 的简单介绍
|
3月前
|
存储 Go
Go 语言当中 CHANNEL 缓冲
Go 语言当中 CHANNEL 缓冲
|
3月前
|
Go
go之channel任意任务完成、全部任务完成退出
go之channel任意任务完成、全部任务完成退出
|
4月前
|
负载均衡 Go 调度
使用Go语言构建高性能的Web服务器:协程与Channel的深度解析
在追求高性能Web服务的今天,Go语言以其强大的并发性能和简洁的语法赢得了开发者的青睐。本文将深入探讨Go语言在构建高性能Web服务器方面的应用,特别是协程(goroutine)和通道(channel)这两个核心概念。我们将通过示例代码,展示如何利用协程处理并发请求,并通过通道实现协程间的通信和同步,从而构建出高效、稳定的Web服务器。
|
4月前
|
设计模式 缓存 安全
一篇文章带你吃透Go语言的Atomic和Channel--实战方法
一篇文章带你吃透Go语言的Atomic和Channel--实战方法
70 0
|
4月前
|
存储 Go
|
4月前
|
安全 Go 调度
Go语言的并发编程:goroutine和channel详解
Go语言的并发编程:goroutine和channel详解
172 2