成事不说,遂事不谏,既往不咎。——<<论语>>
1 Channel结构
在runtime.hchan中:
type hchan struct { qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 closed uint32 elemtype *_type sendx uint recvx uint recvq waitq sendq waitq lock mutex }
循环队列是如何构建的?有五个字段构建:qcount,dataqsize,buf,sendx,recvx
- qcount Channel中的元素个数
- dataqsiz Channel中的循环队列的长度
- buf Channel的缓冲区数据指针
- sendx Channel的发送操作处理到的位置
- recvx Channel的接收操作处理到的位置
- elemsize Channel能够收发的元素类型
- elemtype Channel能够收发的元素大小
- recvq Channel等待读消息的goroutine队列
- sendq Channel等待写消息的goroutine队列
- lock Channel互斥锁,chan不允许并发读写
下图展示了一个可缓存6个元素的channel示意图:
- dataqsiz指示了队列长度为6,即可缓存6个元素;
- buf指向队列的内存,队列中还剩余两个元素;
- qcount表示队列中还有两个元素;
- sendx指示后续写入的数据存储的位置,取值[0, 6);
- recvx指示从该位置读取数据, 取值[0, 6);
2 等待队列recvq或者sendq
等待队列使用双向链表 runtime.waitq 表示,链表中所有的元素都是 runtime.sudog 结构:
type waitq struct { first *sudog //指向双向链表头节点 last *sudog //指向双向链表未节点 }
sudog结构:
// sudog represents a g in a wait list, such as for sending/receiving // on a channel. // // sudog is necessary because the g ↔ synchronization object relation // is many-to-many. A g can be on many wait lists, so there may be // many sudogs for one g; and many gs may be waiting on the same // synchronization object, so there may be many sudogs for one object. // // sudogs are allocated from a special pool. Use acquireSudog and // releaseSudog to allocate and free them. type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops. g *g next *sudog //下一个元素 prev *sudog //前一个元素 用于实现双向链表 elem unsafe.Pointer // data element (may point to stack) // The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock. acquiretime int64 releasetime int64 ticket uint32 // isSelect indicates g is participating in a select, so // g.selectDone must be CAS'd to win the wake-up race. isSelect bool // success indicates whether communication over channel c // succeeded. It is true if the goroutine was awoken because a // value was delivered over channel c, and false if awoken // because c was closed. success bool parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
runtime.sudog表示一个在等待列表中的 Goroutine, 该结构中存储了两个分别指向前后runtime.sudog的指针以构成链表。
从channel读数据,如果channel缓冲区为空或者没有缓冲区,当前goroutine会被阻塞。向channel写数据,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被阻塞。
阻塞等待队列入出源码:
//入双向队列 func (q *waitq) enqueue(sgp *sudog) { sgp.next = nil x := q.last if x == nil { sgp.prev = nil q.first = sgp q.last = sgp return } sgp.prev = x x.next = sgp q.last = sgp } //出双向队列 func (q *waitq) dequeue() *sudog { for { sgp := q.first if sgp == nil { return nil } y := sgp.next if y == nil { q.first = nil q.last = nil } else { y.prev = nil q.first = y sgp.next = nil // mark as removed (see dequeueSudog) } // if a goroutine was put on this queue because of a // select, there is a small window between the goroutine // being woken up by a different case and it grabbing the // channel locks. Once it has the lock // it removes itself from the queue, so we won't see it after that. // We use a flag in the G struct to tell us when someone // else has won the race to signal this goroutine but the goroutine // hasn't removed itself from the queue yet. if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) { continue } return sgp } }
被阻塞的goroutine将会挂在channel的等待队列中:
- 因读阻塞的goroutine会被向channel写入数据的goroutine唤醒;
- 因写阻塞的goroutine会被从channel读数据的goroutine唤醒;
下图展示了一个没有缓冲区的channel,有几个goroutine阻塞等待读数据:
3 类型信息
一个channel只能传递一种类型的值,类型信息存储在hchan数据结构中。
- elemtype代表类型,用于数据传递过程中的赋值;
- elemsize代表类型大小,用于在buf中定位元素位置。
注意,一般情况下recvq和sendq至少有一个为空。只有一个例外,那就是同一个goroutine使用select语句向channel一边写数据,一边读数据。
4 锁
一个channel同时仅允许被一个goroutine读写,为简单起见,本章后续部分说明读写过程时不再涉及加锁和解锁。
5 关注公众号
微信公众号:堆栈future