Go channel结构剖析《二》

简介: Go channel结构剖析《二》

总是依赖别人的话,就永远长不大。——《哆啦A梦》


1 Channel的创建



创建channel的过程实际上是初始化hchan结构。其中类型信息和缓冲区长度由make语句传入,buf的大小则与元素大小和缓冲区长度共同决定。


创建channel的源码如下:

const (
  maxAlign  = 8
  hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
  debugChan = false
)
func makechan(t *chantype, size int) *hchan {
  elem := t.elem
  // 编译器决定size是否超过类型大小
  if elem.size >= 1<<16 {
    throw("makechan: invalid channel element type")
  }
  //对齐字节 与内存模型有关 内存模型可以看我前面的文章
  if hchanSize%maxAlign != 0 || elem.align > maxAlign {
    throw("makechan: bad alignment")
  }
  //分配的内存必须是在预留内存范围之内
  mem, overflow := math.MulUintptr(elem.size, uintptr(size))
  if overflow || mem > maxAlloc-hchanSize || size < 0 {
    panic(plainError("makechan: size out of range"))
  }
  // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
  // buf points into the same allocation, elemtype is persistent.
  // SudoG's are referenced from their owning thread so they can't be collected.
  // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
  var c *hchan
  switch {
  case mem == 0:
    // 队列 or 元素size is zero.
    c = (*hchan)(mallocgc(hchanSize, nil, true))
  case elem.ptrdata == 0:
    // 元素不包含指针
    // 分配 hchan and buf
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    // 元素包含指针
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }
  c.elemsize = uint16(elem.size) //元素大小
  c.elemtype = elem //元素类型
  c.dataqsiz = uint(size)
  lockInit(&c.lock, lockRankHchan)
  return c
}


2 向channel写数据



向一个channel中写数据简单过程如下:


  1. 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;
  2. 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
  3. 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;


简单流程图如下:

640.png


源码如下:


func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   if c == nil {
      if !block {
         return false
      }
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }
   lock(&c.lock)
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }
   if sg := c.recvq.dequeue(); sg != nil {
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }
   if c.qcount < c.dataqsiz {
      qp := chanbuf(c, c.sendx)
      typedmemmove(c.elemtype, qp, ep)
      c.sendx++
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }
   if !block {
      unlock(&c.lock)
      return false
   }
   // Block on the channel. Some receiver will complete our operation for us.
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   c.sendq.enqueue(mysg)
   goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
   // stack tracer.
   KeepAlive(ep)
   // someone woke us up.
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   if gp.param == nil {
      if c.closed == 0 {
         throw("chansend: spurious wakeup")
      }
      panic(plainError("send on closed channel"))
   }
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   return true
}


  1. 第一个if判断 就是chan是nil的情况 如果是nil就调用gopark永远阻塞 所以throw("unreachable")就不会执行到。
  2. 因为我们block=true,所以目的就是让send阻塞,所以第二个if进不去
  3. 第三个if判断如果队列关闭了即closed=1 就会panic 即想关闭的chan发送数据会导致panic
  4. 第四部分就是如果接受队列中有等待着,那么就把它从队列中弹出,数据直接交给它(通过memmove(dst,src,size)实现),而不需要放入buf中,速度更快。
  5. 如果走到这里代表接受队列没有等待的receiver了,那么就判断如果队列没有满,就把数据存储到buf中,返回成功。(注意变量的变化,比如sendx索引指向下一个位置,qcount+1,以及sendx和队列容量相等,从头开始记。就是循环队列嘛)
  6. 能走到这里就代表buf满了,将要发送的数据和当前goroutine打包成sudog对象放入sendq中,并将当前goroutine的状态设置成waiting状态。
  7. goparkunclok函数 其实就是阻塞并且解锁传入的mutex,阻塞的goroutine没必要在持有锁,同时切出goroutine,并设置状态是waiting。gopark和goready互为逆操作,调用gopark在用户测看来就是向chan发送数据阻塞了。goready作用是唤醒对应的goroutine。
  8. 当唤醒的时候,看唤醒的是不是当前的goroutine,如果不是,直接抛异常,否则换新的就是当前goroutine 那么被唤醒之后params会被赋值sudog对象指针(recv函数中),所以这里判断是否是空,如果是空,就是params=nil,就代表这个chan已经close了,closechan的时候会清空params。那么如果closed==0就代表异常状态,不存在的,所以抛异常。否则就是chan关闭了你还发送数据,那么就panic。这里设计到recv函数 closchan函数对params的处理。
  9. 接下来就是一些资源的释放和环境的清理


3 从channel读数据



从一个channel读数据简单过程如下:

  1. 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束读取过程;
  2. 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;
  3. 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
  4. 将当前goroutine加入recvq,进入睡眠,等待被写goroutine唤醒;


简单流程图如下:

640.png


源码如下


func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   if c == nil {
      if !block {
         return
      }
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }
   lock(&c.lock)
   if c.closed != 0 && c.qcount == 0 {
      unlock(&c.lock)
      if ep != nil {
         typedmemclr(c.elemtype, ep)
      }
      return true, false
   }
   if sg := c.sendq.dequeue(); sg != nil {
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true, true
   }
   if c.qcount > 0 {
      // Receive directly from queue
      qp := chanbuf(c, c.recvx)
      if raceenabled {
         raceacquire(qp)
         racerelease(qp)
      }
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return true, true
   }
   if !block {
      unlock(&c.lock)
      return false, false
   }
   // no sender available: block on this channel.
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on gp.waiting where copystack can find it.
   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.param = nil
   c.recvq.enqueue(mysg)
   goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
   // someone woke us up
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   closed := gp.param == nil
   gp.param = nil
   mysg.c = nil
   releaseSudog(mysg)
   return true, !closed
}


  1. 第一个if判断就是chan是不是nil 是nil 那么代表从chan接受数据会被永远阻塞
  2. 第二个if判断就是如果队列关闭了且队列没有数据了,那么把接受到的这个数据ep内存清0 然后直接返回
  3. 第三个if判断就是 如果sendq有等待发送的sender,如果是unbufferd的chan直接将sender的数据复制给receiver,否则就从队首读取一个值,然后把sender的值加入到队列尾部。
  4. 第四个if判断就是没有等待的sender,且buf中有数据,那么就取出一个元素给receiver
  5. 第五个if进不去
  6. 第六部分是buf中没有元素,那么当前receiver就会阻塞,直到它从sender中接受了数据或者是chan被close,才返回。
  7. 第六部分如果被执行,那么receiver被唤醒了,唤醒之后就会判断chan关闭了吗 如果关闭params==nil成立 否则就是被send函数调用唤醒,params有数据。所以清理完资源之后,如果最终closed是false就代表接受到send发送的数据 是true代表chan关闭了。


4 关闭Channel



关闭channel时会把recvq中的G全部唤醒,本该写入G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。


除此之外,panic出现的常见场景还有:

  1. 关闭值为nil的channel
  2. 关闭已经被关闭的channel
  3. 向已经关闭的channel写数据


源码如下:


func closechan(c *hchan) {
   if c == nil {
      panic(plainError("close of nil channel"))
   }
   lock(&c.lock)
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("close of closed channel"))
   }
   if raceenabled {
      callerpc := getcallerpc()
      racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
      racerelease(c.raceaddr())
   }
   c.closed = 1
   var glist gList
   // release all readers
   for {
      sg := c.recvq.dequeue()
      if sg == nil {
         break
      }
      if sg.elem != nil {
         typedmemclr(c.elemtype, sg.elem)
         sg.elem = nil
      }
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = nil
      if raceenabled {
         raceacquireg(gp, c.raceaddr())
      }
      glist.push(gp)
   }
   // release all writers (they will panic)
   for {
      sg := c.sendq.dequeue()
      if sg == nil {
         break
      }
      sg.elem = nil
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = nil
      if raceenabled {
         raceacquireg(gp, c.raceaddr())
      }
      glist.push(gp)
   }
   unlock(&c.lock)
   // Ready all Gs now that we've dropped the channel lock.
   for !glist.empty() {
      gp := glist.pop()
      gp.schedlink = 0
      goready(gp, 3)
   }
}
  1. 如果chan是nil,你close会panic
  2. chan已经closed了,你再次close就会panic
  3. 否则就是chan没有close,且chan不为nil
  4. 就把等待队列中sender和receiver从队列中全部移除并且唤醒 清除的时候将params设置为nil。
  5. 为啥还要唤醒呢,因为他们还在阻塞,你只是清理了数据而已,close chan 唤醒他们 让他们继续工作 否则永远阻塞了。


5 关注公众号



微信公众号:堆栈future

相关文章
|
15天前
|
程序员 Go
go语言中的控制结构
【11月更文挑战第3天】
90 58
|
13天前
|
Go 调度 开发者
探索Go语言中的并发模式:goroutine与channel
在本文中,我们将深入探讨Go语言中的核心并发特性——goroutine和channel。不同于传统的并发模型,Go语言的并发机制以其简洁性和高效性著称。本文将通过实际代码示例,展示如何利用goroutine实现轻量级的并发执行,以及如何通过channel安全地在goroutine之间传递数据。摘要部分将概述这些概念,并提示读者本文将提供哪些具体的技术洞见。
|
1月前
|
安全 Go 调度
探索Go语言的并发之美:goroutine与channel
在这个快节奏的技术时代,Go语言以其简洁的语法和强大的并发能力脱颖而出。本文将带你深入Go语言的并发机制,探索goroutine的轻量级特性和channel的同步通信能力,让你在高并发场景下也能游刃有余。
|
1月前
|
存储 安全 Go
探索Go语言的并发模型:Goroutine与Channel
在Go语言的多核处理器时代,传统并发模型已无法满足高效、低延迟的需求。本文深入探讨Go语言的并发处理机制,包括Goroutine的轻量级线程模型和Channel的通信机制,揭示它们如何共同构建出高效、简洁的并发程序。
|
30天前
|
存储 Go 调度
深入理解Go语言的并发模型:goroutine与channel
在这个快速变化的技术世界中,Go语言以其简洁的并发模型脱颖而出。本文将带你穿越Go语言的并发世界,探索goroutine的轻量级特性和channel的同步机制。摘要部分,我们将用一段对话来揭示Go并发模型的魔力,而不是传统的介绍性文字。
|
1月前
|
安全 Go 调度
探索Go语言的并发模型:Goroutine与Channel的魔力
本文深入探讨了Go语言的并发模型,不仅解释了Goroutine的概念和特性,还详细讲解了Channel的用法和它们在并发编程中的重要性。通过实际代码示例,揭示了Go语言如何通过轻量级线程和通信机制来实现高效的并发处理。
|
1月前
|
安全 Go 调度
探索Go语言的并发之美:goroutine与channel的实践指南
在本文中,我们将深入探讨Go语言的并发机制,特别是goroutine和channel的使用。通过实际的代码示例,我们将展示如何利用这些工具来构建高效、可扩展的并发程序。我们将讨论goroutine的轻量级特性,channel的同步通信能力,以及它们如何共同简化并发编程的复杂性。
|
1月前
|
安全 Go 数据处理
掌握Go语言并发:从goroutine到channel
在Go语言的世界中,goroutine和channel是构建高效并发程序的基石。本文将带你一探Go语言并发机制的奥秘,从基础的goroutine创建到channel的同步通信,让你在并发编程的道路上更进一步。
|
3月前
|
消息中间件 Kafka Go
从Go channel中批量读取数据
从Go channel中批量读取数据
|
3月前
|
消息中间件 缓存 API
go-zero微服务实战系列(三、API定义和表结构设计)
go-zero微服务实战系列(三、API定义和表结构设计)