Go语言核心手册-7.通道

简介: 本章基本都是干货,上面总结的比较全面,这里就不再重复了,如果你能回答我提的这些问题,你应该就掌握了本章的内容:• 发送和接收时,分别有哪些情况会导致channel阻塞呢?• 对于发送和关闭channel,有哪些情况会导致panic呢?• 当channel关闭后,继续读取里面的数据,能读取到么?如何保证数据读取完毕呢?• 对于生产者和消费者模型,如何才能优雅关闭channel,避免写channel导致的panic呢?• for-range读取channel数据,对于channel关闭和未关闭的情况,是如何处理的呢?会存在阻塞情况么?• 使用select时,有哪些注意事项呢?你知道se

Don’t communicate by sharing memory, share memory by communicating。相信学过Go的同学都知道这句名言,可以说channel就是后边这句话的具体实现。channel是一个类型安全的循环队列,能够控制groutine在它上面读写消息的行为,比如阻塞某个groutine ,或者唤醒某个 groutine。


7.1 基本特征



一个通道相当于一个先进先出(FIFO)的队列,各个元素值都是严格地按照发送的顺序排列的,先被发送通道的元素值一定会先被接收,一个左尖括号紧接着一个减号形象地代表了元素值的传输方向。下面是创建几种不同的通道:

ch1 := make(chan int)      // 无缓冲通道ch2 := make(chan int, 3)   // 有缓冲通道ch3 := make(chan<- int, 1) // 单向通道:只能发送不能接收ch4 := make(<-chan int, 1) // 单向通道:只能接收不能发送


下面举一个简单的示例:

func main() {    done := make(chan struct{})    c := make(chan string)    go func() {        s := <-c     // 接收消息        println(s)        close(done)  // 关闭通道,作为结束通知    }()    c <- "lvmenglou" // 发送消息    <-done           // 阻塞,知道有数据或者通道关闭}//最后输出:lvmenglou

通道发送和接收操作基本特性:

  • 元素复制:进入通道的并不是在接收操作符右边的那个元素值,而是它的副本(发送操作包括“复制元素值”和“放入通道”2步,接收操作包括“复制通道内的元素值”、“放置副本到接收方”和“删掉原值”3步);
  • 不可分割:一个数据进入通道时,不会存在还没有复制完毕,就被接收的情况。


7.2 底层原理


7.2.1 数据结构

channel的数据结构如下:


type hchan struct {     qcount uint // 当前队列中剩余元素个数     dataqsiz uint // 环形队列长度,即可以存放的元素个数     buf unsafe.Pointer // 环形队列指针     elemsize uint16 // 每个元素的大小     closed uint32 // 标识关闭状态     elemtype *_type // 元素类型     sendx uint // 队列下标,指示元素写入时存放到队列中的位置     recvx uint // 队列下标,指示元素从队列的该位置读出     recvq waitq // 等待读消息的goroutine队列     sendq waitq // 等待写消息的goroutine队列     lock mutex // 互斥锁,chan不允许并发读写 }

chan内部实现了一个环形队列作为其缓冲区,队列的长度是创建chan时指定的,下图展示了一个可缓存6个元素的channel示意图:

L$I{3FPGM0UN%T)4Z7_`KSQ.png


下图展示了一个没有缓冲区的channel,有几个goroutine阻塞等待读数据:

K%8]IL3GPL3A[9[{H[{J9XL.png


7.2.2 发送

向一个channel中写数据简单过程如下:

  • 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;
  • 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
  • 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒。

image.gif

7.2.3 接收

从一个channel读数据简单过程如下:

  • 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束 读取过程;
  • 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;
  • 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
  • 将当前goroutine加入recvq,进入睡眠,等待被写goroutine唤醒。


7.2.4 关闭

关闭channel时会把recvq中的G全部唤醒,本该写入G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。


7.3 核心知识


7.3.1 发送

阻塞情况:

  • nil阻塞:向nil通道发送数据会被阻塞。
ch := make(chan int, 2)ch = nilch <- 4  // all goroutines are asleep - deadlock!
  • 无缓冲channel + 读未ready:向无缓冲 channel写数据,如果读协程没有准备好,会阻塞。
  • 有缓冲channel + 缓冲已满:向有缓冲 channel写数据,如果缓冲已满,会阻塞。

重要知识点:

  • panic:closed的channel,写数据会panic。
ch := make(chan int, 2)ch <- 4close(ch)ch <- 3 // panic: send on closed channel


  • 资源回收:channel使用完后,需要close掉,否则资源不会回收(包括channel资源,以及channel里面存储的数据资源)。
  • 数据交换:就算是有缓冲的 channel ,也不是每次发送、接收都要经过缓存,如果发送的时候,刚好有等待接收的协程,那么会直接交换数据。【这块我存在质疑,待定!!!】


7.3.2 接收

阻塞情况:

  • nil阻塞:从nil通道接收数据会被阻塞。
  • 无缓冲channel + 写未ready:从无缓冲channel读数据,如果写协程没有准备好,会阻塞。
  • 有缓冲channel + 缓冲为空:从有缓冲 channel读数据,如果缓冲为空,会阻塞。

重要知识点:

  • 关闭channel数据接收:从已关闭channel接收数据,如果通道有数据,会返回已缓冲数据;如果没有数据,会读到通道传输数据类型的零值,比如指针类型,读到nil。(可以通过x, ok:=<-c中的ok,判断数据是否读取完毕)
c := make(chan int, 3)c <- 11c <- 12close(c)for i := 0; i < cap(c)+1; i++ {    x, ok := <-c    println(i, ":", ok, x)}// 输出// 0: true 11// 1: true 12// 2: false 0// 3: false 0


7.3.3 关闭

重要知识点:

  • close panic:重复关闭,或关闭nil通道会引发panic。
ch := make(chan int, 2)ch <- 4close(ch)close(ch) // panic: close of closed channel
  • 多线程通道关闭原则:由于close的channel,写数据是会panic,所以在多线程写入和读取时,需要遵循“谁写入,谁负责关闭”原则。(后面会通过完整的示例讲解该知识)


7.3.4 for-range读取

我们常常会用for-range来读取channel的数据

ch := make(chan int, 1)go func(ch chan int) {    for i := 0; i < 10; i++ {        ch <- i    }    close(ch)}(ch)for val := range ch {    fmt.Println(val)}


重要知识点:

  • 如果channel已经关闭,它还是会继续执行,直到所有值被取完,然后退出执行;
  • 如果channel没有关闭,但是channel没有可读取的数据,它则会阻塞在range这句位置,直到被唤醒;
  • 如果channel是nil,读取会被阻塞,也就是会一直阻塞在range位置。


7.3.5 select

select是跟channel关系最亲密的语句,它是被专门设计出来处理通道的,因为每个 case 后面跟的都是通道表达式,可以是读,也可以是写。下面看一个简单的示例:

// 准备好几个通道。intChannels := [3]chan int{  make(chan int, 1),  make(chan int, 1),  make(chan int, 1),}// 随机选择一个通道,并向它发送元素值。index := rand.Intn(3)fmt.Printf("The index: %d\n", index)intChannels[index] <- index// 哪一个通道中有可取的元素值,哪个对应的分支就会被执行。select {case <-intChannels[0]:  fmt.Println("The first candidate case is selected.")case <-intChannels[1]:  fmt.Println("The second candidate case is selected.")case elem := <-intChannels[2]:  fmt.Printf("The third candidate case is selected, the element is %d.\n", elem)default:  fmt.Println("No candidate case is selected!")}

我们用一个包含了三个候选分支的select语句,分别尝试从上述三个通道中接收元素值,哪一个通道中有值,哪一个对应的候选分支就会被执行。后面还有一个默认分支,不过在这里它是不可能被选中的。在使用select语句的时候,我们需要注意下面几个事情:

  • 有default情况:select只要有默认语句,就不会被阻塞,换句话说,如果没有default,然后case又都不能读或者写,则会被阻塞。
  • 无default情况:如果没有加入默认分支,那么一旦所有的case表达式都没有满足求值条件,那么select语句就会被阻塞。直到至少有一个case表达式满足条件为止。
  • multi-valued assignment:select不能够像for-range一样发现channel被关闭而终止执行,我们可能会因为通道关闭了,而直接从通道接收到一个其元素类型的零值。所以,在很多时候,我们需要通过接收表达式的第二个结果值来判断通道是否已经关闭。一旦发现某个通道关闭了,我们就应该及时地屏蔽掉对应的分支或者采取其他措施。这对于程序逻辑和程序性能都是有好处的。
  • select + for:select语句只能对其中的每一个case表达式各求值一次。所以,如果我们想连续或定时地操作其中的通道的话,就往往需要通过在for语句中嵌入select语句的方式实现。但这时要注意,简单地在select语句的分支中使用break语句,只能结束当前的select语句的执行,而并不会对外层的for语句产生作用。这种错误的用法可能会让这个for语句无休止地运行下去。
intChan := make(chan int, 1)// 一秒后关闭通道。time.AfterFunc(time.Second, func() {  close(intChan)})select {case _, ok := <-intChan:  if !ok {    fmt.Println("The candidate case is closed.")    break  }  fmt.Println("The candidate case is selected.")}
  • 随机选择case:如果同时有多个case足了条件,会使用伪随机选择一个case来执行。
  • 先全部扫描,再选择:每次select语句的执行,是会扫码完所有的case后才确定如何执行,而不是说遇到合适的case就直接执行了。
  • nil阻塞:nil的channel,不管读写都会被阻塞。


上面的知识需要牢记,面试常考,下面是讲解select执行的流程:

  1. 对于每一个case表达式,都至少会包含一个代表发送操作的发送表达式或者一个代表接收操作的接收表达式,同时也可能会包含其他的表达式。比如,如果case表达式是包含了接收表达式的短变量声明时,那么在赋值符号左边的就可以是一个或两个表达式,不过此处的表达式的结果必须是可以被赋值的。当这样的case表达式被求值时,它包含的多个表达式总会以从左到右的顺序被求值。
  2. select语句包含的候选分支中的case表达式都会在该语句执行开始时先被求值,并且求值的顺序是依从代码编写的顺序从上到下的。结合上一条规则,在select语句开始执行时,排在最上边的候选分支中最左边的表达式会最先被求值,然后是它右边的表达式。仅当最上边的候选分支中的所有表达式都被求值完毕后,从上边数第二个候选分支中的表达式才会被求值,顺序同样是从左到右,然后是第三个候选分支、第四个候选分支,以此类推。
  3. 对于每一个case表达式,如果其中的发送表达式或者接收表达式在被求值时,相应的操作正处于阻塞状态,那么对该case表达式的求值就是不成功的。在这种情况下,我们可以说,这个case表达式所在的候选分支是不满足选择条件的。
  4. 仅当select语句中的所有case表达式都被求值完毕后,它才会开始选择候选分支。这时候,它只会挑选满足选择条件的候选分支执行。如果所有的候选分支都不满足选择条件,那么默认分支就会被执行。如果这时没有默认分支,那么select语句就会立即进入阻塞状态,直到至少有一个候选分支满足选择条件为止。一旦有一个候选分支满足选择条件,select语句(或者说它所在的goroutine)就会被唤醒,这个候选分支就会被执行。
  5. 如果select语句发现同时有多个候选分支满足选择条件,那么它就会用一种伪随机的算法在这些分支中选择一个并执行。注意,即使select语句是在被唤醒时发现的这种情况,也会这样做。
  6. 一条select语句中只能够有一个默认分支。并且,默认分支只在无候选分支可选时才会被执行,这与它的编写位置无关。
  7. select语句的每次执行,包括case表达式求值和分支选择,都是独立的。不过,至于它的执行是否是并发安全的,就要看其中的case表达式以及分支中,是否包含并发不安全的代码了。


上面写的有些多,简单总结一下:执行select时,会从左到右,从上到下,对每个case表达式求值,当所有case求值完毕后,会挑选满足的case执行,如果有多条都满足,就随机选择一条;如果都没有满足,就执行default;如果连default都没有,就阻塞住,等有满足条件的case出现时,再执行。


7.4 并发实例:海外商城Push

关于channel,零碎的知识点非常多,我还是想通过一个完整的示例,将这些知识点全部串起来,下面就以海外商城Push为例,将上面知识应用到实际场景中。


7.4.1 示例介绍

海外商城需要对W个业务方发送Push,针对每个业务方,为了提高Push的并发能力,采用N个协程从EMQ中读取数据(EMQ中都一个消息队列,里面缓存了大量的Push数据),数据读取后进行处理,然后将处理后的数据写到channel中。同时,服务有M个协程从channel中取出数据并消费,然后通过小米Push SDK,给用户发送Push。整体发送链路如下:

image.gif

在看后面的内容前,我先抛出几个问题:

  • 生成者往关闭的Channel写数据,会Panic,那么Channel该如何关闭呢?
  • 当Channel关闭后(比如服务重启),需要继续消费Channel里面的Push,该如何操作呢?
  • 每消费一条Channel数据,需要记录Push发送成功,但是一条Channel数据包含2-3个Push内容(IOS/Android/PC),程序记录Push成功前,如何保证这2-3个Push都发送完毕了呢?


7.4.2 初始化

初始化channel数组,数组里面是每个业务方appTypes的channel,channel的缓存区大小为30,并启动10个消费者协程:

var (   messageChan    map[string]chan *WorkMessage  // channel   stopMasterChan chan bool                     // 消费者结束通知   appTypes = map[int32]string{1: "shop", 2: "bbs", 3: "sharesave"})func initPushChannel() {   maxSize = 30 // channel缓存区大小   workNum = 10 // goroutine个数   stopMasterChan = make(chan bool)   messageChan = make(map[string]chan *WorkMessage)   for _, name := range appTypes {      workChan := make(chan *WorkMessage, maxSize)      messageChan[name] = workChan      for i := 0; i < workNum; i++ {         go startMaster(name, workChan) // 启动消费者协程      }   }}func startMaster(name string, workChan chan *WorkMessage) {   for {      if exit := dostartMaster(name, workChan); exit {         return      }   }}

初始化EMQ的Client,并启动10个生产者协程:

var (   clientFactory  client.ClientFactory // EMQ Client   stopChan       chan bool            // 生产者结束通知)func initEmq() {   // 初始化EMQ的Client和单次读取数据条数,该处代码省略...   maxConsumerNum := 10   stopChan = make(chan bool)   for i := 0; i < maxConsumerNum; i++ {      go receiveMsg(i) // 启动生产者协程   }}func receiveMsg(queueID int) {   for {      if exit := doReceiveMsg(queueID); exit {         logz.Info("stop receive msg ...", logz.F("queueID", queueID))         return      }   }}


主方法调用:

func InitWorker() {   // 初始化push SDK,逻辑省略...   initPushChannel() // 初始化Channel,启动消费者   initEmq()         // 启动生产者}


7.4.3 发送

func doReceiveMsg(queueID int) bool {   defer func() {      if err := recover(); err != nil {         println("[panic] recover from error.")      }   }()   ticker := time.NewTicker(time.Second)   for {      select {      case <-ticker.C:         // 1. 从EMQ获取数据List,逻辑省略...         // 2. 遍历List,获取业务类型,逻辑省略...         // 3. 根据业务类型,获取对应的channel         name := "sharesave"  // 示例数据         pushChannel, _ := messageChan[name]         // 4. 构造Push数据,然后放入channel         pushData := &WorkMessage{AppLocal: "id", AppType: 1} // 示例数据         pushChannel <- pushData      case <-stopChan:         println("stop to send data to channel.")         return true      }   }}

这部分代码我做了大量简化,这里主要做了2件事情:

  1. 通过select + 定时器,每隔1S就会从EMQ中获取数据,然后将构造后的数据放入对应业务的channel;
  2. 当收到stopChan事件时,会通知所有的生产者协程,退出goroutine,这里其实就是协程退出的方式之一。


7.4.4 接收

func dostartMaster(name string, workChan chan *WorkMessage) bool {   defer func() {      if err := recover(); err != nil {         println("[panic] recover from error.")      }   }()   for {      select {      case t := <-workChan:         if t != nil {             for _, message := range t.PushMessages {                // 接受channel数据t,将数据推给Push SDK                // 逻辑省略...             }         }      case <-stopMasterChan:         println("stop to get data from channel.")         return true      }   }}

这部分代码同样做了大量简化,这里主要做了2件事情:

  1. 通过select,如果channel里面有数据,直接读取,然后给用户发送Push;
  2. 当收到stopMasterChan事件时,会通知所有的生产者协程,退出goroutine。


7.4.5 关闭


// 通知生产者协程关闭,协程不再写channelfunc stopRecvMsgFromQueue() {   close(stopChan)}// 通知消费者协程关闭,协程不再读channel,并关闭channel,消费完channel中剩余消息func stopPushChannel() {   close(stopMasterChan)   time.Sleep(time.Second)   for _, c := range messageChan {      close(c)      for msg := range c {         if msg != nil {            for _, message := range msg.PushMessages {               // 接受channel数据t,将数据推给Push SDK               // 逻辑省略...            }         }      }   }}// 主方法调用func StopWorker() {   stopRecvMsgFromQueue()   time.Sleep(time.Second * 2)   stopPushChannel()}

比如服务重启,需要关闭协程时,主要做以下事情:

  1. 执行close(stopChan),先通知生产者协程,不再往channel里面写数据;
  2. 执行close(stopMasterChan),通知消费者协程,不再从channel里面读取数据;
  3. 关闭数组messageChan的每个channel;
  4. 继续读取channel中剩余的数据,因为使用的是for-range方式,所以当channel里面所有的数据读取完毕后,for-range会自动退出。

这里有两个地方sleep了一下,分别有以下作用:

  1. 调用stopPushChannel()前sleep:关闭生成者后,消费者继续消费剩余的数据;
  2. 调用close(c)前sleep:避免协程未完全关闭,导致往关闭的channel写数据,导致panic。


7.5 总结

本章基本都是干货,上面总结的比较全面,这里就不再重复了,如果你能回答我提的这些问题,你应该就掌握了本章的内容:

  • 发送和接收时,分别有哪些情况会导致channel阻塞呢?
  • 对于发送和关闭channel,有哪些情况会导致panic呢?
  • 当channel关闭后,继续读取里面的数据,能读取到么?如何保证数据读取完毕呢?
  • 对于生产者和消费者模型,如何才能优雅关闭channel,避免写channel导致的panic呢?
  • for-range读取channel数据,对于channel关闭和未关闭的情况,是如何处理的呢?会存在阻塞情况么?
  • 使用select时,有哪些注意事项呢?你知道select执行的流程么?

最后就是Push的并发示例,强烈建议大家能掌握,掌握了这个示例,后续你应该也能很容易通过channel实现数据共享,并结合goroutine写出你自己的高并发程序。

相关文章
|
5天前
|
存储 JSON 监控
Viper,一个Go语言配置管理神器!
Viper 是一个功能强大的 Go 语言配置管理库,支持从多种来源读取配置,包括文件、环境变量、远程配置中心等。本文详细介绍了 Viper 的核心特性和使用方法,包括从本地 YAML 文件和 Consul 远程配置中心读取配置的示例。Viper 的多来源配置、动态配置和轻松集成特性使其成为管理复杂应用配置的理想选择。
23 2
|
3天前
|
Go 索引
go语言中的循环语句
【11月更文挑战第4天】
12 2
|
3天前
|
Go C++
go语言中的条件语句
【11月更文挑战第4天】
14 2
|
6天前
|
监控 Go API
Go语言在微服务架构中的应用实践
在微服务架构的浪潮中,Go语言以其简洁、高效和并发处理能力脱颖而出,成为构建微服务的理想选择。本文将探讨Go语言在微服务架构中的应用实践,包括Go语言的特性如何适应微服务架构的需求,以及在实际开发中如何利用Go语言的特性来提高服务的性能和可维护性。我们将通过一个具体的案例分析,展示Go语言在微服务开发中的优势,并讨论在实际应用中可能遇到的挑战和解决方案。
|
3天前
|
Go
go语言中的 跳转语句
【11月更文挑战第4天】
11 4
|
3天前
|
JSON 安全 Go
Go语言中使用JWT鉴权、Token刷新完整示例,拿去直接用!
本文介绍了如何在 Go 语言中使用 Gin 框架实现 JWT 用户认证和安全保护。JWT(JSON Web Token)是一种轻量、高效的认证与授权解决方案,特别适合微服务架构。文章详细讲解了 JWT 的基本概念、结构以及如何在 Gin 中生成、解析和刷新 JWT。通过示例代码,展示了如何在实际项目中应用 JWT,确保用户身份验证和数据安全。完整代码可在 GitHub 仓库中查看。
14 1
|
5天前
|
Go 调度 开发者
探索Go语言中的并发模式:goroutine与channel
在本文中,我们将深入探讨Go语言中的核心并发特性——goroutine和channel。不同于传统的并发模型,Go语言的并发机制以其简洁性和高效性著称。本文将通过实际代码示例,展示如何利用goroutine实现轻量级的并发执行,以及如何通过channel安全地在goroutine之间传递数据。摘要部分将概述这些概念,并提示读者本文将提供哪些具体的技术洞见。
|
Go Unix
Go语言有缓冲和无缓冲通道实现样例
感觉可以,但不好用。 应该有封装程序更高的包包吧。 package main import ( "math/rand" "fmt" "time" "sync" ) const ( numberGoroutines = 4 taskLoad = 10 ) var ( wg sync.
1154 0
|
9天前
|
JavaScript Java Go
探索Go语言在微服务架构中的优势
在微服务架构的浪潮中,Go语言以其简洁、高效和并发处理能力脱颖而出。本文将深入探讨Go语言在构建微服务时的性能优势,包括其在内存管理、网络编程、并发模型以及工具链支持方面的特点。通过对比其他流行语言,我们将揭示Go语言如何成为微服务架构中的一股清流。
|
8天前
|
Ubuntu 编译器 Linux
go语言中SQLite3驱动安装
【11月更文挑战第2天】
31 7