golang channel 原理探究
在Go中最常见的就是通信顺序进程(Communicating sequential processes,CSP)的并发模型,通过共享通信,来实现共享内存,这里就提到了channel.
Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Go 语言中的 Goroutine 会通过 Channel 传递数据。
Goroutine通过使用channel传递数据,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。
Channel 收发操作均遵循了先入先出(FIFO)的设计,具体规则如下:
- 先从 Channel 读取数据的 Goroutine 会先接收到数据;
- 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;
Channel 通常会有以下三种类型:
- 同步 Channel — 不需要缓冲区,发送方会直接将数据交给(Handoff)接收方;
- 异步 Channel — 基于环形缓存的传统生产者消费者模型;
- chan struct{} 类型的异步 Channel 的 struct{} 类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)的语义;
Channel 在运行时使用 runtime.hchan 结构体表示:
type hchan struct { qcount uint // 当前队列里还剩余元素个数 dataqsiz uint // 环形队列长度,即缓冲区的大小,即make(chan T,N) 中的N buf unsafe.Pointer // 环形队列指针 elemsize uint16 // 每个元素的大小 closed uint32 // 标识当前通道是否处于关闭状态,创建通道后,该字段设置0,即打开通道;通道调用close将其设置为1,通道关闭 elemtype *_type // 元素类型,用于数据传递过程中的赋值 sendx uint // 环形缓冲区的状态字段,它只是缓冲区的当前索引-支持数组,它可以从中发送数据 recvx uint // 环形缓冲区的状态字段,它只是缓冲区当前索引-支持数组,它可以从中接受数据 recvq waitq // 等待读消息的goroutine队列 sendq waitq // 等待写消息的goroutine队列 // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex // 互斥锁,为每个读写操作锁定通道,因为发送和接受必须是互斥操作 } type waitq struct { first *sudog last *sudog }
其中hchan结构体中有五个字段是构建底层的循环队列:
* qcount — Channel 中的元素个数; * dataqsiz — Channel 中的循环队列的长度; * buf — Channel 的缓冲区数据指针; * sendx — Channel 的发送操作处理到的位置; * recvx — Channel 的接收操作处理到的位置;
通常, elemsize 和 elemtype 分别表示当前 Channel 能够收发的元素类型和大小.
sendq 和 recvq 存储了当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,这些等待队列使用双向链表runtime.waitq表示,链表中所有的元素都是runtime.sudog结构.
waitq 表示一个在等待列表中的 Goroutine,该结构体中存储了阻塞的相关信息以及两个分别指向前后runtime.sudog的指针。
channel 在Go中是通过make关键字创建,编译器会将make(chan int,10).
创建管道:
runtime.makechan 和 runtime.makechan64 会根据传入的参数类型和缓冲区大小创建一个新的 Channel 结构,其中后者用于处理缓冲区大小大于 2 的 32 次方的情况.
这里我们来详细看下makechan 函数:
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c }
Channel 中根据收发元素的类型和缓冲区的大小初始化 runtime.hchan 结构体和缓冲区:
arena区域就是我们所谓的堆区,Go动态分配的内存都是在这个区域,它把内存分割成8KB大小的页,一些页组合起来称为mspan。
bitmap区域标识arena区域哪些地址保存了对象,并且用4bit标志位表示对象是否包含指针、GC标记信息。bitmap中一个byte大小的内存对应arena区域中4个指针大小(指针大小为 8B )的内存,所以bitmap区域的大小是512GB/(4*8B)=16GB。
此外我们还可以看到bitmap的高地址部分指向arena区域的低地址部分,这里bitmap的地址是由高地址向低地址增长的。
spans区域存放mspan(是一些arena分割的页组合起来的内存管理基本单元,后文会再讲)的指针,每个指针对应一页,所以spans区域的大小就是512GB/8KB*8B=512MB。
除以8KB是计算arena区域的页数,而最后乘以8是计算spans区域所有指针的大小。创建mspan的时候,按页填充对应的spans区域,在回收object时,根据地址很容易就能找到它所属的mspan。
发送数据:
当我们想要向 Channel 发送数据时,就需要使用 ch <- i 语句. runtime.chansend1 调用了 runtime.chansend 并传入 Channel 和需要发送的数据。
runtime.chansend 是向 Channel 中发送数据时最终会调用的函数,这个函数负责了发送数据的全部逻辑,如果我们在调用时将 block 参数设置成 true,那么就表示当前发送操作是一个阻塞操作:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... if !block && c.closed == 0 && full(c) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } ... }
在发送数据的逻辑执行之前会先为当前 Channel 加锁,防止发生竞争条件。如果 Channel 已经关闭,那么向该 Channel 发送数据时就会报"send on closed channel" 错误并中止程序。
因为 runtime.chansend 函数的实现比较复杂,所以我们这里将该函数的执行过程分成以下的
三个部分:
- 当存在等待的接收者时,通过 runtime.send 直接将数据发送给阻塞的接收者.
- 当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区.
- 当不存在缓冲区或者缓冲区已满时,等待其他 Goroutine 从 Channel 接收数据.
因此:
当我们使用 ch <- i 表达式向 Channel 发送数据时遇到的几种情况:
- 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前的 Goroutine 并将其设置成下一个运行的 Goroutine;
- 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们就会直接将数据直接存储到当前缓冲区 sendx 所在的位置上;
- 如果不满足上面的两种情况,就会创建一个 runtime.sudog 结构并将其加入 Channel 的 sendq 队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;
发送数据的过程中可能包含几个会触发 Goroutine 调度的时机:
- 发送数据时发现 Channel 上存在等待接收数据的 Goroutine,立刻设置处理器的 runnext 属性,但是并不会立刻触发调度.
- 发送数据时并没有找到接收方并且缓冲区已经满了,这时就会将自己加入 Channel 的 sendq 队列并调用 runtime.goparkunlock 触发 Goroutine 的调度让出处理器的使用权.
接收数据:
接着我们看看接受数据,Go中可以使用两种不同的方式去接收 Channel 中的数据:
* i <- ch * i, ok <- ch
虽然不同的接收方式会被转换成 runtime.chanrecv1 和 runtime.chanrecv2 两种不同函数的调用,但是这两个函数最终还是会调用 runtime.chanrecv。
当我们从一个空 Channel 接收数据时会直接调用 runtime.gopark 直接让出处理器的使用权。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } lock(&c.lock) if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } ... }
如果当前 Channel 已经被关闭并且缓冲区中不存在任何的数据,那么就会清除 ep 指针中的数据并立刻返回。
除了上述两种特殊情况,使用 runtime.chanrecv 从 Channel 接收数据时还包含以下三种不同情况:
- 当存在等待的发送者时,通过 runtime.recv 直接从阻塞的发送者或者缓冲区中获取数据.
- 当缓冲区存在数据时,从 Channel 的缓冲区中接收数据.
- 当缓冲区中不存在数据时,等待其他 Goroutine 向 Channel 发送数据.
因此接受数据的时候,Channel 中接收数据时可能会发生的五种情况:
- 如果 Channel 为空,那么就会直接调用 runtime.gopark 挂起当前 Goroutine;
- 如果 Channel 已经关闭并且缓冲区没有任何数据,runtime.chanrecv 函数会直接返回;
- 如果 Channel 的 sendq 队列中存在挂起的 Goroutine,就会将 recvx 索引所在的数据拷贝到接收变量所在的内存空间上并将 sendq 队列中 Goroutine 的数据拷贝到缓冲区;
- 如果 Channel 的缓冲区中包含数据就会直接读取 recvx 索引对应的数据;
- 在默认情况下会挂起当前的 Goroutine,将 runtime.sudog 结构加入 recvq 队列并陷入休眠等待调度器的唤醒;
从 Channel 接收数据时,会触发 Goroutine 调度的两个时机:
- 当 Channel 为空时;
- 当缓冲区中不存在数据并且也不存在数据的发送者时;
最后就是关闭管道:
- 编译器会将用于关闭管道的 close 关键字调用 runtime.closechan 的函数关闭。
- 当 Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接 panic 并抛出异常,处理完了这些异常的情况之后就可以开始执行关闭 Channel 的逻辑.