浅谈Golang通道channel

简介: 浅谈Golang通道channel

前言


  • channel用于Goroutine间通信时的注意点 - 合理设置channel的size大小 / 正确地关闭channel
  • 合理地运用channel的发送与接收 - 运用函数传入参数的定义,限制 <- chan 和 chan <-
  • channel的底层实现 - 环形队列+发送、接收的waiter通知,结合goroutine的调度思考
  • 理解并运用channel的阻塞逻辑 - 理解channel的每一对 收与发 之间的逻辑,巧妙地使用
  • 思考channel嵌套后的实现逻辑 - 理解用 chan chan 是怎么实现 两层通知 的?


代码

package main
import (
  "errors"
  "fmt"
  "os"
  "time"
)
func ch() {
  var ch = make(chan int)
  go func(ch chan int) {
    // Tip: 由于channel没有设置长度,所以是阻塞的,逐个发送
    ch <- 1
    ch <- 2
    ch <- 3
    fmt.Println("send finished")
  }(ch)
  for {
    select {
    case i := <-ch:
      fmt.Println("receive", i)
    case <-time.After(time.Second):
      fmt.Println("time out")
      os.Exit(1)
    }
  }
}
func chLimit() {
  var ch = make(chan int)
  // Tip: channel参数设置为 chan<- 和 <-chan,可以有效地防止误用发送和接收,例如这里的chan<-只能用于发送
  go func(ch chan<- int) {
    ch <- 1
    ch <- 2
    ch <- 3
    fmt.Println("send finished")
  }(ch)
  for {
    select {
    case i := <-ch:
      fmt.Println("receive", i)
    case <-time.After(time.Second):
      fmt.Println("time out")
      os.Exit(1)
    }
  }
}
func chClose() {
  var ch = make(chan int)
  go func(ch chan<- int) {
    ch <- 1
    ch <- 2
    ch <- 3
    close(ch)
    fmt.Println("send finished")
  }(ch)
  for {
    select {
    case i, ok := <-ch:
      if ok {
        fmt.Println("receive", i)
      } else {
        fmt.Println("channel close")
        os.Exit(0)
      }
    case <-time.After(time.Second):
      fmt.Println("time out")
      os.Exit(1)
    }
  }
}
func chCloseErr() {
  var ch = make(chan int)
  go func(ch chan<- int) {
    ch <- 1
    ch <- 2
    ch <- 3
    close(ch)
    fmt.Println("send finished")
  }(ch)
  for {
    select {
    // Tip: 如果这里不判断,那么i就会一直得到chan类型的默认值,如int为0,永远不会停止
    case i := <-ch:
      fmt.Println("receive", i)
    case <-time.After(time.Second):
      fmt.Println("time out")
      os.Exit(1)
    }
  }
}
func chTask() {
  var doneCh = make(chan struct{})
  var errCh = make(chan error)
  go func(doneCh chan<- struct{}, errCh chan<- error) {
    if time.Now().Unix()%2 == 0 {
      doneCh <- struct{}{}
    } else {
      errCh <- errors.New("unix time is an odd")
    }
  }(doneCh, errCh)
  select {
  // Tip: 这是一个常见的Goroutine处理模式,在这里监听channel结果和错误
  case <-doneCh:
    fmt.Println("done")
  case err := <-errCh:
    fmt.Println("get an error:", err)
  case <-time.After(time.Second):
    fmt.Println("time out")
  }
}
func chBuffer() {
  var ch = make(chan int, 3)
  go func(ch chan int) {
    // Tip: 由于设置了长度,相当于一个消息队列,这里并不会阻塞
    ch <- 1
    ch <- 2
    ch <- 3
    fmt.Println("send finished")
  }(ch)
  for {
    select {
    case i := <-ch:
      fmt.Println("receive", i)
    case <-time.After(time.Second):
      fmt.Println("time out")
      os.Exit(1)
    }
  }
}
func chBufferRange() {
  var ch = make(chan int, 3)
  go func(ch chan int) {
    // Tip: 由于设置了长度,相当于一个消息队列,这里并不会阻塞
    ch <- 1
    ch <- 2
    ch <- 3
    close(ch)
    fmt.Println("send finished")
  }(ch)
  for i := range ch {//由于使用了range,close(ch)之后会自动退出
    fmt.Println("receive", i)
  }
}


底层实现

runtime->chan.go

type hchan struct {
  qcount   uint           // total data in the queue
  dataqsiz uint           // size of the circular queue
  buf      unsafe.Pointer // points to an array of dataqsiz elements
  elemsize uint16
  closed   uint32
  elemtype *_type // element type
  sendx    uint   // send index
  recvx    uint   // receive index
  recvq    waitq  // list of recv waiters
  sendq    waitq  // list of send waiters
  // lock protects all fields in hchan, as well as several
  // fields in sudogs blocked on this channel.
  //
  // Do not change another G's status while holding this lock
  // (in particular, do not ready a G), as this can deadlock
  // with stack shrinking.
  lock mutex
}

hchan这个结构体是重点,其内部实现是环形队列,用环形队列控制消息的收发,那么有什么好处呢?第一点很容易想出来,复用内存

recvq    waitq  // list of recv waiters
sendq    waitq  // list of send waiters

观察这两个字段,有两个waitq,一个发送,一个接收,把发送和接收channel的goroutine都注册到这个channel(hchan)里面,当channel里面发送数据的时候,所有接收方收到通知,阻塞的goroutine得到消息就可以运行了,这样成千上万的goroutine便得到了调度。


如果不这样做会有什么后果呢?一个channel,1000个goroutine去等待接收他的消息,就要一直去判断这个channel能不能接收消息,这将损耗大量cpu资源。

两个有趣的例子

package main
import (
  "fmt"
  "time"
)
// 示例1
type Ball struct {
  hits int
}
func passBall() {
  table := make(chan *Ball)
  go player("ping", table)
  go player("pong", table)
  // Tip: 核心逻辑:往channel里放入数据,作为启动信号;从channel读出数据,作为关闭信号
  table <- new(Ball)
  time.Sleep(time.Second)
  <-table
  //如果在一个大的系统里,table没有关闭,还有两个goroutine在阻塞等待table,就会造成资源的泄露,goroutine没有得到有效的释放
}
func player(name string, table chan *Ball) {
  for {
    // Tip: 刚进goroutine时,先阻塞在这里
    ball := <-table
    ball.hits++
    fmt.Println(name, ball.hits)
    time.Sleep(100 * time.Millisecond)
    // Tip: 运行到这里时,另一个goroutine在收数据,所以能准确送达
    table <- ball
  }
}
// 示例2
func passBallWithClose() {
  // Tip 虽然可以通过GC自动回收channel资源,但我们仍应该注意这点
  table := make(chan *Ball)
  go playerWithClose("ping", table)
  go playerWithClose("pong", table)
  table <- new(Ball)
  time.Sleep(time.Second)
  <-table
  close(table)
}
func playerWithClose(name string, table chan *Ball) {
  for {
    ball, ok := <-table
    if !ok {
      break
    }
    ball.hits++
    fmt.Println(name, ball.hits)
    time.Sleep(100 * time.Millisecond)
    table <- ball
  }
}
// 示例3
type sub struct {
  // Tip 把chan error看作一个整体,作为关闭的通道
  closing chan chan error
  updates chan string
}
func (s *sub) Close() error {
  // Tip 核心逻辑:两层通知,第一层作为准备关闭的通知,第二层作为关闭结果的返回
  errc := make(chan error)
  // Tip 第一步:要关闭时,先传一个chan error过去,通知要关闭了
  s.closing <- errc
  // Tip 第三步:从chan error中读取错误,阻塞等待
  return <-errc
}
func (s *sub) loop() {
  var err error
  for {
    select {
    case errc := <-s.closing:
      // Tip 第二步:收到关闭后,进行处理,处理后把error传回去
      errc <- err
      close(s.updates)
      return
    }
  }
}


目录
相关文章
|
安全 Go
Golang 语言使用 channel 并发编程
Golang 语言使用 channel 并发编程
57 0
|
安全 Go 索引
Golang 语言中的 channel 实现原理
Golang 语言中的 channel 实现原理
66 0
|
消息中间件 缓存 Go
Golang 语言中 Channel 的使用方式
Golang 语言中 Channel 的使用方式
57 0
|
3月前
|
Go
Golang语言之管道channel快速入门篇
这篇文章是关于Go语言中管道(channel)的快速入门教程,涵盖了管道的基本使用、有缓冲和无缓冲管道的区别、管道的关闭、遍历、协程和管道的协同工作、单向通道的使用以及select多路复用的详细案例和解释。
140 4
Golang语言之管道channel快速入门篇
|
4月前
|
存储 消息中间件 缓存
|
5月前
|
自然语言处理 Go 数据处理
云计算自旋锁问题之引入Golang插件系统后iLogtail的输入输出通道和处理能力如何解决
云计算自旋锁问题之引入Golang插件系统后iLogtail的输入输出通道和处理能力如何解决
39 1
|
7月前
|
程序员 Go 调度
第十六章 Golang中goroutine和channel
第十六章 Golang中goroutine和channel
54 3
|
7月前
|
存储 算法 Java
Golang底层原理剖析之多路select、channel数据结构和阻塞与非阻塞
Golang底层原理剖析之多路select、channel数据结构和阻塞与非阻塞
92 0
|
存储 安全 Go
Golang通道(Channel)原理解析
Golang通道(Channel)原理解析
|
3月前
|
Go
Golang语言文件操作快速入门篇
这篇文章是关于Go语言文件操作快速入门的教程,涵盖了文件的读取、写入、复制操作以及使用标准库中的ioutil、bufio、os等包进行文件操作的详细案例。
71 4
Golang语言文件操作快速入门篇