总是依赖别人的话,就永远长不大。——《哆啦A梦》
1 Channel的创建
创建channel的过程实际上是初始化hchan结构。其中类型信息和缓冲区长度由make语句传入,buf的大小则与元素大小和缓冲区长度共同决定。
创建channel的源码如下:
const ( maxAlign = 8 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) debugChan = false ) func makechan(t *chantype, size int) *hchan { elem := t.elem // 编译器决定size是否超过类型大小 if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } //对齐字节 与内存模型有关 内存模型可以看我前面的文章 if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } //分配的内存必须是在预留内存范围之内 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0: // 队列 or 元素size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) case elem.ptrdata == 0: // 元素不包含指针 // 分配 hchan and buf c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // 元素包含指针 c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) //元素大小 c.elemtype = elem //元素类型 c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) return c }
2 向channel写数据
向一个channel中写数据简单过程如下:
- 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;
- 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
- 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;
简单流程图如下:
源码如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) // stack tracer. KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) return true }
- 第一个if判断 就是chan是nil的情况 如果是nil就调用gopark永远阻塞 所以throw("unreachable")就不会执行到。
- 因为我们block=true,所以目的就是让send阻塞,所以第二个if进不去
- 第三个if判断如果队列关闭了即closed=1 就会panic 即想关闭的chan发送数据会导致panic
- 第四部分就是如果接受队列中有等待着,那么就把它从队列中弹出,数据直接交给它(通过memmove(dst,src,size)实现),而不需要放入buf中,速度更快。
- 如果走到这里代表接受队列没有等待的receiver了,那么就判断如果队列没有满,就把数据存储到buf中,返回成功。(注意变量的变化,比如sendx索引指向下一个位置,qcount+1,以及sendx和队列容量相等,从头开始记。就是循环队列嘛)
- 能走到这里就代表buf满了,将要发送的数据和当前goroutine打包成sudog对象放入sendq中,并将当前goroutine的状态设置成waiting状态。
- goparkunclok函数 其实就是阻塞并且解锁传入的mutex,阻塞的goroutine没必要在持有锁,同时切出goroutine,并设置状态是waiting。gopark和goready互为逆操作,调用gopark在用户测看来就是向chan发送数据阻塞了。goready作用是唤醒对应的goroutine。
- 当唤醒的时候,看唤醒的是不是当前的goroutine,如果不是,直接抛异常,否则换新的就是当前goroutine 那么被唤醒之后params会被赋值sudog对象指针(recv函数中),所以这里判断是否是空,如果是空,就是params=nil,就代表这个chan已经close了,closechan的时候会清空params。那么如果closed==0就代表异常状态,不存在的,所以抛异常。否则就是chan关闭了你还发送数据,那么就panic。这里设计到recv函数 closchan函数对params的处理。
- 接下来就是一些资源的释放和环境的清理
3 从channel读数据
从一个channel读数据简单过程如下:
- 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束读取过程;
- 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;
- 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
- 将当前goroutine加入recvq,进入睡眠,等待被写goroutine唤醒;
简单流程图如下:
源码如下
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } lock(&c.lock) if c.closed != 0 && c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed }
- 第一个if判断就是chan是不是nil 是nil 那么代表从chan接受数据会被永远阻塞
- 第二个if判断就是如果队列关闭了且队列没有数据了,那么把接受到的这个数据ep内存清0 然后直接返回
- 第三个if判断就是 如果sendq有等待发送的sender,如果是unbufferd的chan直接将sender的数据复制给receiver,否则就从队首读取一个值,然后把sender的值加入到队列尾部。
- 第四个if判断就是没有等待的sender,且buf中有数据,那么就取出一个元素给receiver
- 第五个if进不去
- 第六部分是buf中没有元素,那么当前receiver就会阻塞,直到它从sender中接受了数据或者是chan被close,才返回。
- 第六部分如果被执行,那么receiver被唤醒了,唤醒之后就会判断chan关闭了吗 如果关闭params==nil成立 否则就是被send函数调用唤醒,params有数据。所以清理完资源之后,如果最终closed是false就代表接受到send发送的数据 是true代表chan关闭了。
4 关闭Channel
关闭channel时会把recvq中的G全部唤醒,本该写入G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。
除此之外,panic出现的常见场景还有:
- 关闭值为nil的channel
- 关闭已经被关闭的channel
- 向已经关闭的channel写数据
源码如下:
func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) racerelease(c.raceaddr()) } c.closed = 1 var glist gList // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }
- 如果chan是nil,你close会panic
- chan已经closed了,你再次close就会panic
- 否则就是chan没有close,且chan不为nil
- 就把等待队列中sender和receiver从队列中全部移除并且唤醒 清除的时候将params设置为nil。
- 为啥还要唤醒呢,因为他们还在阻塞,你只是清理了数据而已,close chan 唤醒他们 让他们继续工作 否则永远阻塞了。
5 关注公众号
微信公众号:堆栈future