如何优雅地关闭Go channel

简介:

几天前,我写了一篇文章来说明golang中channel的使用规范。在reddit和HN,那篇文章收到了很多赞同,但是我也收到了下面几个关于Go channel设计和规范的批评:

● 在不能更改channel状态的情况下,没有简单普遍的方式来检查channel是否已经关闭了
● 关闭已经关闭的channel会导致panic,所以在closer(关闭者)不知道channel是否已经关闭的情况下去关闭channel是很危险的
● 发送值到已经关闭的channel会导致panic,所以如果sender(发送者)在不知道channel是否已经关闭的情况下去向channel发送值是很危险的

那些批评看起来都很有道理(实际上并没有)。是的,没有一个内置函数可以检查一个channel是否已经关闭。如果你能确定不会向channel发送任何值,那么也确实需要一个简单的方法来检查channel是否已经关闭:


1package main
2
3import "fmt"
4
5type T int
6
7func IsClosed(ch <-chan T) bool {
8 select {
9 case <-ch:
10 return true
11 default:
12 }
13
14 return false
15}
16
17func main() {
18 c := make(chan T)
19 fmt.Println(IsClosed(c)) // false
20 close(c)
21 fmt.Println(IsClosed(c)) // true
22}

上面已经提到了,没有一种适用的方式来检查channel是否已经关闭了。但是,就算有一个简单的 closed(chan T) bool函数来检查channel是否已经关闭,它的用处还是很有限的,就像内置的len函数用来检查缓冲channel中元素数量一样。原因就在于,已经检查过的channel的状态有可能在调用了类似的方法返回之后就修改了,因此返回来的值已经不能够反映刚才检查的channel的当前状态了。
尽管在调用closed(ch)返回true的情况下停止向channel发送值是可以的,但是如果调用closed(ch)返回false,那么关闭channel或者继续向channel发送值就不安全了(会panic)。

The Channel Closing Principle

在使用Go channel的时候,一个适用的原则是不要从接收端关闭channel,也不要关闭有多个并发发送者的channel。换句话说,如果sender(发送者)只是唯一的sender或者是channel最后一个活跃的sender,那么你应该在sender的goroutine关闭channel,从而通知receiver(s)(接收者们)已经没有值可以读了。维持这条原则将保证永远不会发生向一个已经关闭的channel发送值或者关闭一个已经关闭的channel。
(下面,我们将会称上面的原则为channel closing principle

打破channel closing principle的解决方案

如果你因为某种原因从接收端(receiver side)关闭channel或者在多个发送者中的一个关闭channel,那么你应该使用列在Golang panic/recover Use Cases的函数来安全地发送值到channel中(假设channel的元素类型是T)


1func SafeSend(ch chan T, value T) (closed bool) {
2 defer func() {
3 if recover() != nil {
4 // the return result can be altered
5 // in a defer function call
6 closed = true
7 }
8 }()
9
10 ch <- value // panic if ch is closed
11 return false // <=> closed = false; return
12}

如果channel ch没有被关闭的话,那么这个函数的性能将和ch <- value接近。对于channel关闭的时候,SafeSend函数只会在每个sender goroutine中调用一次,因此程序不会有太大的性能损失。
同样的想法也可以用在从多个goroutine关闭channel中:


1func SafeClose(ch chan T) (justClosed bool) {
2 defer func() {
3 if recover() != nil {
4 justClosed = false
5 }
6 }()
7
8 // assume ch != nil here.
9 close(ch) // panic if ch is closed
10 return true
11}

很多人喜欢用sync.Once来关闭channel:


1type MyChannel struct {
2 C chan T
3 once sync.Once
4}
5
6func NewMyChannel() *MyChannel {
7 return &MyChannel{C: make(chan T)}
8}
9
10func (mc *MyChannel) SafeClose() {
11 mc.once.Do(func(){
12 close(mc.C)
13 })
14}

当然了,我们也可以用sync.Mutex来避免多次关闭channel:


1type MyChannel struct {
2 C chan T
3 closed bool
4 mutex sync.Mutex
5}
6
7func NewMyChannel() *MyChannel {
8 return &MyChannel{C: make(chan T)}
9}
10
11func (mc *MyChannel) SafeClose() {
12 mc.mutex.Lock()
13 if !mc.closed {
14 close(mc.C)
15 mc.closed = true
16 }
17 mc.mutex.Unlock()
18}
19
20func (mc *MyChannel) IsClosed() bool {
21 mc.mutex.Lock()
22 defer mc.mutex.Unlock()
23 return mc.closed
24}

我们应该要理解为什么Go不支持内置SafeSendSafeClose函数,原因就在于并不推荐从接收端或者多个并发发送端关闭channel。Golang甚至禁止关闭只接收(receive-only)的channel。

保持channel closing principle的优雅方案

上面的SaveSend函数有一个缺点是,在select语句的case关键字后不能作为发送操作被调用(译者注:类似于 case SafeSend(ch, t):)。另外一个缺点是,很多人,包括我自己都觉得上面通过使用panic/recoversync包的方案不够优雅。针对各种场景,下面介绍不用使用panic/recoversync包,纯粹是利用channel的解决方案。
(在下面的例子总,sync.WaitGroup只是用来让例子完整的。它的使用在实践中不一定一直都有用)

●  M个receivers,一个sender,sender通过关闭data channel说“不再发送”
这是最简单的场景了,就只是当sender不想再发送的时候让sender关闭data 来关闭channel:

1package main
2
3import (
4 "time"
5 "math/rand"
6 "sync"
7 "log"
8)
9
10func main() {
11 rand.Seed(time.Now().UnixNano())
12 log.SetFlags(0)
13
14 // ...
15 const MaxRandomNumber = 100000
16 const NumReceivers = 100
17
18 wgReceivers := sync.WaitGroup{}
19 wgReceivers.Add(NumReceivers)
20
21 // ...
22 dataCh := make(chan int, 100)
23
24 // the sender
25 go func() {
26 for {
27 if value := rand.Intn(MaxRandomNumber); value == 0 {
28 // the only sender can close the channel safely.
29 close(dataCh)
30 return
31 } else {
32 dataCh <- value
33 }
34 }
35 }()
36
37 // receivers
38 for i := 0; i < NumReceivers; i++ {
39 go func() {
40 defer wgReceivers.Done()
41
42 // receive values until dataCh is closed and
43 // the value buffer queue of dataCh is empty.
44 for value := range dataCh {
45 log.Println(value)
46 }
47 }()
48 }
49
50 wgReceivers.Wait()
51}
●  一个receiver,N个sender,receiver通过关闭一个额外的signal channel说“请停止发送”
这种场景比上一个要复杂一点。我们不能让receiver关闭data channel,因为这么做将会打破 channel closing principle 。但是我们可以让receiver关闭一个额外的signal channel来通知sender停止发送值:

1package main
2
3import (
4 "time"
5 "math/rand"
6 "sync"
7 "log"
8)
9
10func main() {
11 rand.Seed(time.Now().UnixNano())
12 log.SetFlags(0)
13
14 // ...
15 const MaxRandomNumber = 100000
16 const NumSenders = 1000
17
18 wgReceivers := sync.WaitGroup{}
19 wgReceivers.Add(1)
20
21 // ...
22 dataCh := make(chan int, 100)
23 stopCh := make(chan struct{})
24 // stopCh is an additional signal channel.
25 // Its sender is the receiver of channel dataCh.
26 // Its reveivers are the senders of channel dataCh.
27
28 // senders
29 for i := 0; i < NumSenders; i++ {
30 go func() {
31 for {
32 value := rand.Intn(MaxRandomNumber)
33
34 select {
35 case <- stopCh:
36 return
37 case dataCh <- value:
38 }
39 }
40 }()
41 }
42
43 // the receiver
44 go func() {
45 defer wgReceivers.Done()
46
47 for value := range dataCh {
48 if value == MaxRandomNumber-1 {
49 // the receiver of the dataCh channel is
50 // also the sender of the stopCh cahnnel.
51 // It is safe to close the stop channel here.
52 close(stopCh)
53 return
54 }
55
56 log.Println(value)
57 }
58 }()
59
60 // ...
61 wgReceivers.Wait()
62}

正如注释说的,对于额外的signal channel来说,它的sender是data channel的receiver。这个额外的signal channel被它唯一的sender关闭,遵守了channel closing principle

●  M个receiver,N个sender,它们当中任意一个通过通知一个moderator(仲裁者)关闭额外的signal channel来说“让我们结束游戏吧”
这是最复杂的场景了。我们不能让任意的receivers和senders关闭data channel,也不能让任何一个receivers通过关闭一个额外的signal channel来通知所有的senders和receivers退出游戏。这么做的话会打破 channel closing principle 。但是,我们可以引入一个moderator来关闭一个额外的signal channel。这个例子的一个技巧是怎么通知moderator去关闭额外的signal channel:

1package main
2
3import (
4 "time"
5 "math/rand"
6 "sync"
7 "log"
8 "strconv"
9)
10
11func main() {
12 rand.Seed(time.Now().UnixNano())
13 log.SetFlags(0)
14
15 // ...
16 const MaxRandomNumber = 100000
17 const NumReceivers = 10
18 const NumSenders = 1000
19
20 wgReceivers := sync.WaitGroup{}
21 wgReceivers.Add(NumReceivers)
22
23 // ...
24 dataCh := make(chan int, 100)
25 stopCh := make(chan struct{})
26 // stopCh is an additional signal channel.
27 // Its sender is the moderator goroutine shown below.
28 // Its reveivers are all senders and receivers of dataCh.
29 toStop := make(chan string, 1)
30 // the channel toStop is used to notify the moderator
31 // to close the additional signal channel (stopCh).
32 // Its senders are any senders and receivers of dataCh.
33 // Its reveiver is the moderator goroutine shown below.
34
35 var stoppedBy string
36
37 // moderator
38 go func() {
39 stoppedBy = <- toStop // part of the trick used to notify the moderator
40 // to close the additional signal channel.
41 close(stopCh)
42 }()
43
44 // senders
45 for i := 0; i < NumSenders; i++ {
46 go func(id string) {
47 for {
48 value := rand.Intn(MaxRandomNumber)
49 if value == 0 {
50 // here, a trick is used to notify the moderator
51 // to close the additional signal channel.
52 select {
53 case toStop <- "sender#" + id:
54 default:
55 }
56 return
57 }
58
59 // the first select here is to try to exit the
60 // goroutine as early as possible.
61 select {
62 case <- stopCh:
63 return
64 default:
65 }
66
67 select {
68 case <- stopCh:
69 return
70 case dataCh <- value:
71 }
72 }
73 }(strconv.Itoa(i))
74 }
75
76 // receivers
77 for i := 0; i < NumReceivers; i++ {
78 go func(id string) {
79 defer wgReceivers.Done()
80
81 for {
82 // same as senders, the first select here is to
83 // try to exit the goroutine as early as possible.
84 select {
85 case <- stopCh:
86 return
87 default:
88 }
89
90 select {
91 case <- stopCh:
92 return
93 case value := <-dataCh:
94 if value == MaxRandomNumber-1 {
95 // the same trick is used to notify the moderator
96 // to close the additional signal channel.
97 select {
98 case toStop <- "receiver#" + id:
99 default:
100 }
101 return
102 }
103
104 log.Println(value)
105 }
106 }
107 }(strconv.Itoa(i))
108 }
109
110 // ...
111 wgReceivers.Wait()
112 log.Println("stopped by", stoppedBy)
113}

在这个例子中,仍然遵守着channel closing principle
请注意channel toStop的缓冲大小是1.这是为了避免当mederator goroutine 准备好之前第一个通知就已经发送了,导致丢失。

●  更多的场景?
很多的场景变体是基于上面三种的。举个例子,一个基于最复杂情况的变体可能要求receivers读取buffer channel中剩下所有的值。这应该很容易处理,所有这篇文章也就不提了。
尽管上面三种场景不能覆盖所有Go channel的使用场景,但它们是最基础的,实践中的大多数场景都可以分类到那三种中。

结论

这里没有一种场景要求你去打破channel closing principle。如果你遇到了这种场景,请思考一下你的设计并重写你的代码。
用Go编程就像在创作艺术。


原文发布时间为:2018-09-9

本文作者:天唯

本文来自云栖社区合作伙伴“Golang语言社区”,了解相关信息可以关注“Golang语言社区”。

相关文章
|
6月前
|
存储 安全 Java
Go 基础数据结构的底层原理(slice,channel,map)
Go 基础数据结构的底层原理(slice,channel,map)
96 0
|
10天前
|
Go 调度 开发者
探索Go语言中的并发模式:goroutine与channel
在本文中,我们将深入探讨Go语言中的核心并发特性——goroutine和channel。不同于传统的并发模型,Go语言的并发机制以其简洁性和高效性著称。本文将通过实际代码示例,展示如何利用goroutine实现轻量级的并发执行,以及如何通过channel安全地在goroutine之间传递数据。摘要部分将概述这些概念,并提示读者本文将提供哪些具体的技术洞见。
|
1月前
|
安全 Go 调度
探索Go语言的并发之美:goroutine与channel
在这个快节奏的技术时代,Go语言以其简洁的语法和强大的并发能力脱颖而出。本文将带你深入Go语言的并发机制,探索goroutine的轻量级特性和channel的同步通信能力,让你在高并发场景下也能游刃有余。
|
1月前
|
存储 安全 Go
探索Go语言的并发模型:Goroutine与Channel
在Go语言的多核处理器时代,传统并发模型已无法满足高效、低延迟的需求。本文深入探讨Go语言的并发处理机制,包括Goroutine的轻量级线程模型和Channel的通信机制,揭示它们如何共同构建出高效、简洁的并发程序。
|
5月前
|
Go
go之channel关闭与广播
go之channel关闭与广播
|
27天前
|
存储 Go 调度
深入理解Go语言的并发模型:goroutine与channel
在这个快速变化的技术世界中,Go语言以其简洁的并发模型脱颖而出。本文将带你穿越Go语言的并发世界,探索goroutine的轻量级特性和channel的同步机制。摘要部分,我们将用一段对话来揭示Go并发模型的魔力,而不是传统的介绍性文字。
|
1月前
|
安全 Go 调度
探索Go语言的并发模型:Goroutine与Channel的魔力
本文深入探讨了Go语言的并发模型,不仅解释了Goroutine的概念和特性,还详细讲解了Channel的用法和它们在并发编程中的重要性。通过实际代码示例,揭示了Go语言如何通过轻量级线程和通信机制来实现高效的并发处理。
|
1月前
|
安全 Go 调度
探索Go语言的并发之美:goroutine与channel的实践指南
在本文中,我们将深入探讨Go语言的并发机制,特别是goroutine和channel的使用。通过实际的代码示例,我们将展示如何利用这些工具来构建高效、可扩展的并发程序。我们将讨论goroutine的轻量级特性,channel的同步通信能力,以及它们如何共同简化并发编程的复杂性。
|
1月前
|
安全 Go 数据处理
掌握Go语言并发:从goroutine到channel
在Go语言的世界中,goroutine和channel是构建高效并发程序的基石。本文将带你一探Go语言并发机制的奥秘,从基础的goroutine创建到channel的同步通信,让你在并发编程的道路上更进一步。
|
3月前
|
消息中间件 Kafka Go
从Go channel中批量读取数据
从Go channel中批量读取数据