Go channel结构剖析《二》

简介: Go channel结构剖析《二》

总是依赖别人的话,就永远长不大。——《哆啦A梦》


1 Channel的创建



创建channel的过程实际上是初始化hchan结构。其中类型信息和缓冲区长度由make语句传入,buf的大小则与元素大小和缓冲区长度共同决定。


创建channel的源码如下:

const (
  maxAlign  = 8
  hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
  debugChan = false
)
func makechan(t *chantype, size int) *hchan {
  elem := t.elem
  // 编译器决定size是否超过类型大小
  if elem.size >= 1<<16 {
    throw("makechan: invalid channel element type")
  }
  //对齐字节 与内存模型有关 内存模型可以看我前面的文章
  if hchanSize%maxAlign != 0 || elem.align > maxAlign {
    throw("makechan: bad alignment")
  }
  //分配的内存必须是在预留内存范围之内
  mem, overflow := math.MulUintptr(elem.size, uintptr(size))
  if overflow || mem > maxAlloc-hchanSize || size < 0 {
    panic(plainError("makechan: size out of range"))
  }
  // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
  // buf points into the same allocation, elemtype is persistent.
  // SudoG's are referenced from their owning thread so they can't be collected.
  // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
  var c *hchan
  switch {
  case mem == 0:
    // 队列 or 元素size is zero.
    c = (*hchan)(mallocgc(hchanSize, nil, true))
  case elem.ptrdata == 0:
    // 元素不包含指针
    // 分配 hchan and buf
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    // 元素包含指针
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }
  c.elemsize = uint16(elem.size) //元素大小
  c.elemtype = elem //元素类型
  c.dataqsiz = uint(size)
  lockInit(&c.lock, lockRankHchan)
  return c
}


2 向channel写数据



向一个channel中写数据简单过程如下:


  1. 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;
  2. 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
  3. 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;


简单流程图如下:

640.png


源码如下:


func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   if c == nil {
      if !block {
         return false
      }
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }
   lock(&c.lock)
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }
   if sg := c.recvq.dequeue(); sg != nil {
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }
   if c.qcount < c.dataqsiz {
      qp := chanbuf(c, c.sendx)
      typedmemmove(c.elemtype, qp, ep)
      c.sendx++
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }
   if !block {
      unlock(&c.lock)
      return false
   }
   // Block on the channel. Some receiver will complete our operation for us.
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   c.sendq.enqueue(mysg)
   goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
   // stack tracer.
   KeepAlive(ep)
   // someone woke us up.
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   if gp.param == nil {
      if c.closed == 0 {
         throw("chansend: spurious wakeup")
      }
      panic(plainError("send on closed channel"))
   }
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   return true
}


  1. 第一个if判断 就是chan是nil的情况 如果是nil就调用gopark永远阻塞 所以throw("unreachable")就不会执行到。
  2. 因为我们block=true,所以目的就是让send阻塞,所以第二个if进不去
  3. 第三个if判断如果队列关闭了即closed=1 就会panic 即想关闭的chan发送数据会导致panic
  4. 第四部分就是如果接受队列中有等待着,那么就把它从队列中弹出,数据直接交给它(通过memmove(dst,src,size)实现),而不需要放入buf中,速度更快。
  5. 如果走到这里代表接受队列没有等待的receiver了,那么就判断如果队列没有满,就把数据存储到buf中,返回成功。(注意变量的变化,比如sendx索引指向下一个位置,qcount+1,以及sendx和队列容量相等,从头开始记。就是循环队列嘛)
  6. 能走到这里就代表buf满了,将要发送的数据和当前goroutine打包成sudog对象放入sendq中,并将当前goroutine的状态设置成waiting状态。
  7. goparkunclok函数 其实就是阻塞并且解锁传入的mutex,阻塞的goroutine没必要在持有锁,同时切出goroutine,并设置状态是waiting。gopark和goready互为逆操作,调用gopark在用户测看来就是向chan发送数据阻塞了。goready作用是唤醒对应的goroutine。
  8. 当唤醒的时候,看唤醒的是不是当前的goroutine,如果不是,直接抛异常,否则换新的就是当前goroutine 那么被唤醒之后params会被赋值sudog对象指针(recv函数中),所以这里判断是否是空,如果是空,就是params=nil,就代表这个chan已经close了,closechan的时候会清空params。那么如果closed==0就代表异常状态,不存在的,所以抛异常。否则就是chan关闭了你还发送数据,那么就panic。这里设计到recv函数 closchan函数对params的处理。
  9. 接下来就是一些资源的释放和环境的清理


3 从channel读数据



从一个channel读数据简单过程如下:

  1. 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束读取过程;
  2. 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;
  3. 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
  4. 将当前goroutine加入recvq,进入睡眠,等待被写goroutine唤醒;


简单流程图如下:

640.png


源码如下


func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   if c == nil {
      if !block {
         return
      }
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }
   lock(&c.lock)
   if c.closed != 0 && c.qcount == 0 {
      unlock(&c.lock)
      if ep != nil {
         typedmemclr(c.elemtype, ep)
      }
      return true, false
   }
   if sg := c.sendq.dequeue(); sg != nil {
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true, true
   }
   if c.qcount > 0 {
      // Receive directly from queue
      qp := chanbuf(c, c.recvx)
      if raceenabled {
         raceacquire(qp)
         racerelease(qp)
      }
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return true, true
   }
   if !block {
      unlock(&c.lock)
      return false, false
   }
   // no sender available: block on this channel.
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on gp.waiting where copystack can find it.
   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.param = nil
   c.recvq.enqueue(mysg)
   goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
   // someone woke us up
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   closed := gp.param == nil
   gp.param = nil
   mysg.c = nil
   releaseSudog(mysg)
   return true, !closed
}


  1. 第一个if判断就是chan是不是nil 是nil 那么代表从chan接受数据会被永远阻塞
  2. 第二个if判断就是如果队列关闭了且队列没有数据了,那么把接受到的这个数据ep内存清0 然后直接返回
  3. 第三个if判断就是 如果sendq有等待发送的sender,如果是unbufferd的chan直接将sender的数据复制给receiver,否则就从队首读取一个值,然后把sender的值加入到队列尾部。
  4. 第四个if判断就是没有等待的sender,且buf中有数据,那么就取出一个元素给receiver
  5. 第五个if进不去
  6. 第六部分是buf中没有元素,那么当前receiver就会阻塞,直到它从sender中接受了数据或者是chan被close,才返回。
  7. 第六部分如果被执行,那么receiver被唤醒了,唤醒之后就会判断chan关闭了吗 如果关闭params==nil成立 否则就是被send函数调用唤醒,params有数据。所以清理完资源之后,如果最终closed是false就代表接受到send发送的数据 是true代表chan关闭了。


4 关闭Channel



关闭channel时会把recvq中的G全部唤醒,本该写入G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。


除此之外,panic出现的常见场景还有:

  1. 关闭值为nil的channel
  2. 关闭已经被关闭的channel
  3. 向已经关闭的channel写数据


源码如下:


func closechan(c *hchan) {
   if c == nil {
      panic(plainError("close of nil channel"))
   }
   lock(&c.lock)
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("close of closed channel"))
   }
   if raceenabled {
      callerpc := getcallerpc()
      racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
      racerelease(c.raceaddr())
   }
   c.closed = 1
   var glist gList
   // release all readers
   for {
      sg := c.recvq.dequeue()
      if sg == nil {
         break
      }
      if sg.elem != nil {
         typedmemclr(c.elemtype, sg.elem)
         sg.elem = nil
      }
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = nil
      if raceenabled {
         raceacquireg(gp, c.raceaddr())
      }
      glist.push(gp)
   }
   // release all writers (they will panic)
   for {
      sg := c.sendq.dequeue()
      if sg == nil {
         break
      }
      sg.elem = nil
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = nil
      if raceenabled {
         raceacquireg(gp, c.raceaddr())
      }
      glist.push(gp)
   }
   unlock(&c.lock)
   // Ready all Gs now that we've dropped the channel lock.
   for !glist.empty() {
      gp := glist.pop()
      gp.schedlink = 0
      goready(gp, 3)
   }
}
  1. 如果chan是nil,你close会panic
  2. chan已经closed了,你再次close就会panic
  3. 否则就是chan没有close,且chan不为nil
  4. 就把等待队列中sender和receiver从队列中全部移除并且唤醒 清除的时候将params设置为nil。
  5. 为啥还要唤醒呢,因为他们还在阻塞,你只是清理了数据而已,close chan 唤醒他们 让他们继续工作 否则永远阻塞了。


5 关注公众号



微信公众号:堆栈future

相关文章
|
7天前
|
Go
go之channel关闭与广播
go之channel关闭与广播
8 0
|
13天前
|
存储 Go
Go 语言当中 CHANNEL 缓冲
Go 语言当中 CHANNEL 缓冲
|
27天前
|
Unix Shell 编译器
Go 中空结构有什么用法
在 Go 语言中,空结构体 struct{} 是一个非常特殊的类型,它不包含任何字段并且不占用任何内存空间。虽然听起来似乎没什么用,但空结构体在 Go 编程中实际上有着广泛的应用。本文将详细探讨空结构体的几种典型用法,并解释为何它们在特定场景下非常有用。
|
7天前
|
Go
go之channel任意任务完成、全部任务完成退出
go之channel任意任务完成、全部任务完成退出
8 0
|
2月前
|
编译器 Go
Go 语言结构
Go 语言结构
18 0
|
2月前
|
负载均衡 Go 调度
使用Go语言构建高性能的Web服务器:协程与Channel的深度解析
在追求高性能Web服务的今天,Go语言以其强大的并发性能和简洁的语法赢得了开发者的青睐。本文将深入探讨Go语言在构建高性能Web服务器方面的应用,特别是协程(goroutine)和通道(channel)这两个核心概念。我们将通过示例代码,展示如何利用协程处理并发请求,并通过通道实现协程间的通信和同步,从而构建出高效、稳定的Web服务器。
|
2月前
|
自然语言处理 数据挖掘 程序员
《Go 简易速速上手小册》第2章:控制结构与函数(2024 最新版)(下)
《Go 简易速速上手小册》第2章:控制结构与函数(2024 最新版)(上)
30 1
|
2月前
|
数据采集 搜索推荐 Go
《Go 简易速速上手小册》第2章:控制结构与函数(2024 最新版)(上)
《Go 简易速速上手小册》第2章:控制结构与函数(2024 最新版)
33 1
|
2月前
|
设计模式 缓存 安全
一篇文章带你吃透Go语言的Atomic和Channel--实战方法
一篇文章带你吃透Go语言的Atomic和Channel--实战方法
48 0
|
2月前
|
Go 开发者 索引
Go语言中的循环控制结构解析
【2月更文挑战第3天】本文将详细探讨Go语言中的循环控制结构,包括`for`循环、`range`循环以及`无限循环`的使用方法和适用场景。通过掌握这些循环结构,Go语言开发者能够更加高效地进行迭代操作、处理集合数据以及实现复杂的控制逻辑。