golang channel的创建、接受和发送原理讲透

简介: golang channel的创建、接受和发送原理讲透

640.png

golang channel 原理探究



在Go中最常见的就是通信顺序进程(Communicating sequential processes,CSP)的并发模型,通过共享通信,来实现共享内存,这里就提到了channel.


Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Go 语言中的 Goroutine 会通过 Channel 传递数据。

640.png


Goroutine通过使用channel传递数据,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。


Channel 收发操作均遵循了先入先出(FIFO)的设计,具体规则如下:

  • 先从 Channel 读取数据的 Goroutine 会先接收到数据;
  • 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;


Channel 通常会有以下三种类型:

  • 同步 Channel — 不需要缓冲区,发送方会直接将数据交给(Handoff)接收方;
  • 异步 Channel — 基于环形缓存的传统生产者消费者模型;
  • chan struct{} 类型的异步 Channel 的 struct{} 类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)的语义;


Channel 在运行时使用 runtime.hchan 结构体表示:


type hchan struct {
 qcount   uint           // 当前队列里还剩余元素个数
 dataqsiz uint           // 环形队列长度,即缓冲区的大小,即make(chan T,N) 中的N
 buf      unsafe.Pointer // 环形队列指针
 elemsize uint16         // 每个元素的大小
 closed   uint32         // 标识当前通道是否处于关闭状态,创建通道后,该字段设置0,即打开通道;通道调用close将其设置为1,通道关闭
 elemtype *_type         // 元素类型,用于数据传递过程中的赋值
 sendx    uint           // 环形缓冲区的状态字段,它只是缓冲区的当前索引-支持数组,它可以从中发送数据
 recvx    uint          // 环形缓冲区的状态字段,它只是缓冲区当前索引-支持数组,它可以从中接受数据
 recvq    waitq         // 等待读消息的goroutine队列
 sendq    waitq         // 等待写消息的goroutine队列
 // lock protects all fields in hchan, as well as several
 // fields in sudogs blocked on this channel.
 //
 // Do not change another G's status while holding this lock
 // (in particular, do not ready a G), as this can deadlock
 // with stack shrinking.
 lock mutex           // 互斥锁,为每个读写操作锁定通道,因为发送和接受必须是互斥操作
}
type waitq struct {
 first *sudog
 last  *sudog
}


其中hchan结构体中有五个字段是构建底层的循环队列:


* qcount — Channel 中的元素个数;
* dataqsiz — Channel 中的循环队列的长度;
* buf — Channel 的缓冲区数据指针;
* sendx — Channel 的发送操作处理到的位置;
* recvx — Channel 的接收操作处理到的位置;


通常, elemsize 和 elemtype 分别表示当前 Channel 能够收发的元素类型和大小.


sendq 和 recvq 存储了当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,这些等待队列使用双向链表runtime.waitq表示,链表中所有的元素都是runtime.sudog结构.


waitq 表示一个在等待列表中的 Goroutine,该结构体中存储了阻塞的相关信息以及两个分别指向前后runtime.sudog的指针。


channel 在Go中是通过make关键字创建,编译器会将make(chan int,10).


创建管道:

runtime.makechan 和 runtime.makechan64 会根据传入的参数类型和缓冲区大小创建一个新的 Channel 结构,其中后者用于处理缓冲区大小大于 2 的 32 次方的情况.


这里我们来详细看下makechan 函数:


func makechan(t *chantype, size int) *hchan {
 elem := t.elem
 // compiler checks this but be safe.
 if elem.size >= 1<<16 {
  throw("makechan: invalid channel element type")
 }
 if hchanSize%maxAlign != 0 || elem.align > maxAlign {
  throw("makechan: bad alignment")
 }
 mem, overflow := math.MulUintptr(elem.size, uintptr(size))
 if overflow || mem > maxAlloc-hchanSize || size < 0 {
  panic(plainError("makechan: size out of range"))
 }
 // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
 // buf points into the same allocation, elemtype is persistent.
 // SudoG's are referenced from their owning thread so they can't be collected.
 // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
 var c *hchan
 switch {
 case mem == 0:
  // Queue or element size is zero.
  c = (*hchan)(mallocgc(hchanSize, nil, true))
  // Race detector uses this location for synchronization.
  c.buf = c.raceaddr()
 case elem.ptrdata == 0:
  // Elements do not contain pointers.
  // Allocate hchan and buf in one call.
  c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
  c.buf = add(unsafe.Pointer(c), hchanSize)
 default:
  // Elements contain pointers.
  c = new(hchan)
  c.buf = mallocgc(mem, elem, true)
 }
 c.elemsize = uint16(elem.size)
 c.elemtype = elem
 c.dataqsiz = uint(size)
 lockInit(&c.lock, lockRankHchan)
 if debugChan {
  print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
 }
 return c
}


Channel 中根据收发元素的类型和缓冲区的大小初始化 runtime.hchan 结构体和缓冲区:

640.png


arena区域就是我们所谓的堆区,Go动态分配的内存都是在这个区域,它把内存分割成8KB大小的页,一些页组合起来称为mspan。


bitmap区域标识arena区域哪些地址保存了对象,并且用4bit标志位表示对象是否包含指针、GC标记信息。bitmap中一个byte大小的内存对应arena区域中4个指针大小(指针大小为 8B )的内存,所以bitmap区域的大小是512GB/(4*8B)=16GB。


640.png

此外我们还可以看到bitmap的高地址部分指向arena区域的低地址部分,这里bitmap的地址是由高地址向低地址增长的。


spans区域存放mspan(是一些arena分割的页组合起来的内存管理基本单元,后文会再讲)的指针,每个指针对应一页,所以spans区域的大小就是512GB/8KB*8B=512MB。


除以8KB是计算arena区域的页数,而最后乘以8是计算spans区域所有指针的大小。创建mspan的时候,按页填充对应的spans区域,在回收object时,根据地址很容易就能找到它所属的mspan。


发送数据:

当我们想要向 Channel 发送数据时,就需要使用 ch <- i 语句. runtime.chansend1 调用了 runtime.chansend 并传入 Channel 和需要发送的数据。


runtime.chansend 是向 Channel 中发送数据时最终会调用的函数,这个函数负责了发送数据的全部逻辑,如果我们在调用时将 block 参数设置成 true,那么就表示当前发送操作是一个阻塞操作:


func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ...
    if !block && c.closed == 0 && full(c) {
        return false
    }
    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
    lock(&c.lock)
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    ...
}

在发送数据的逻辑执行之前会先为当前 Channel 加锁,防止发生竞争条件。如果 Channel 已经关闭,那么向该 Channel 发送数据时就会报"send on closed channel" 错误并中止程序。

因为 runtime.chansend 函数的实现比较复杂,所以我们这里将该函数的执行过程分成以下的

三个部分:

  • 当存在等待的接收者时,通过 runtime.send 直接将数据发送给阻塞的接收者.
  • 当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区.
  • 当不存在缓冲区或者缓冲区已满时,等待其他 Goroutine 从 Channel 接收数据.


因此:


当我们使用 ch <- i 表达式向 Channel 发送数据时遇到的几种情况:

  • 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前的 Goroutine 并将其设置成下一个运行的 Goroutine;
  • 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们就会直接将数据直接存储到当前缓冲区 sendx 所在的位置上;
  • 如果不满足上面的两种情况,就会创建一个 runtime.sudog 结构并将其加入 Channel 的 sendq 队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;


发送数据的过程中可能包含几个会触发 Goroutine 调度的时机:

  1. 发送数据时发现 Channel 上存在等待接收数据的 Goroutine,立刻设置处理器的 runnext 属性,但是并不会立刻触发调度.
  2. 发送数据时并没有找到接收方并且缓冲区已经满了,这时就会将自己加入 Channel 的 sendq 队列并调用 runtime.goparkunlock 触发 Goroutine 的调度让出处理器的使用权.


接收数据:


接着我们看看接受数据,Go中可以使用两种不同的方式去接收 Channel 中的数据:


* i <- ch
* i, ok <- ch

虽然不同的接收方式会被转换成 runtime.chanrecv1 和 runtime.chanrecv2 两种不同函数的调用,但是这两个函数最终还是会调用 runtime.chanrecv。


当我们从一个空 Channel 接收数据时会直接调用 runtime.gopark 直接让出处理器的使用权。


func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    if c == nil {
  if !block {
   return
  }
  gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
  throw("unreachable")
 }
    lock(&c.lock)
 if c.closed != 0 && c.qcount == 0 {
  if raceenabled {
   raceacquire(c.raceaddr())
  }
  unlock(&c.lock)
  if ep != nil {
   typedmemclr(c.elemtype, ep)
  }
  return true, false
 }
    ...
}

如果当前 Channel 已经被关闭并且缓冲区中不存在任何的数据,那么就会清除 ep 指针中的数据并立刻返回。


除了上述两种特殊情况,使用 runtime.chanrecv 从 Channel 接收数据时还包含以下三种不同情况:


  • 当存在等待的发送者时,通过 runtime.recv 直接从阻塞的发送者或者缓冲区中获取数据.
  • 当缓冲区存在数据时,从 Channel 的缓冲区中接收数据.
  • 当缓冲区中不存在数据时,等待其他 Goroutine 向 Channel 发送数据.


因此接受数据的时候,Channel 中接收数据时可能会发生的五种情况:

  1. 如果 Channel 为空,那么就会直接调用 runtime.gopark 挂起当前 Goroutine;
  2. 如果 Channel 已经关闭并且缓冲区没有任何数据,runtime.chanrecv 函数会直接返回;
  3. 如果 Channel 的 sendq 队列中存在挂起的 Goroutine,就会将 recvx 索引所在的数据拷贝到接收变量所在的内存空间上并将 sendq 队列中 Goroutine 的数据拷贝到缓冲区;
  4. 如果 Channel 的缓冲区中包含数据就会直接读取 recvx 索引对应的数据;
  5. 在默认情况下会挂起当前的 Goroutine,将 runtime.sudog 结构加入 recvq 队列并陷入休眠等待调度器的唤醒;


从 Channel 接收数据时,会触发 Goroutine 调度的两个时机:

  1. 当 Channel 为空时;
  2. 当缓冲区中不存在数据并且也不存在数据的发送者时;


最后就是关闭管道:

  1. 编译器会将用于关闭管道的 close 关键字调用 runtime.closechan 的函数关闭。
  2. 当 Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接 panic 并抛出异常,处理完了这些异常的情况之后就可以开始执行关闭 Channel 的逻辑.
相关文章
|
4月前
|
Java Go
Golang底层原理剖析之垃圾回收GC(二)
Golang底层原理剖析之垃圾回收GC(二)
48 0
|
4月前
|
存储 SQL 安全
Golang底层原理剖析之上下文Context
Golang底层原理剖析之上下文Context
64 0
|
4月前
|
编译器 Go
Golang底层原理剖析之method
Golang底层原理剖析之method
22 2
|
4月前
|
存储 Go
Golang底层原理剖析之map
Golang底层原理剖析之map
35 1
|
4月前
|
存储 编译器 Go
Golang底层原理剖析之闭包
Golang底层原理剖析之闭包
52 0
|
4月前
|
编译器 Go
Golang底层原理剖析之函数调用栈-传参和返回值
Golang底层原理剖析之函数调用栈-传参和返回值
22 0
|
4月前
|
存储 编译器 Go
Golang底层原理剖析之内存对齐
Golang底层原理剖析之内存对齐
26 0
|
4月前
|
Go 调度
浅谈Golang通道channel
浅谈Golang通道channel
33 0
|
4月前
|
存储 编译器 Go
Golang底层原理剖析之函数调用栈-栈帧布局与函数跳转
Golang底层原理剖析之函数调用栈-栈帧布局与函数跳转
39 0
|
9天前
|
负载均衡 监控 Go
Golang深入浅出之-Go语言中的服务网格(Service Mesh)原理与应用
【5月更文挑战第5天】服务网格是处理服务间通信的基础设施层,常由数据平面(代理,如Envoy)和控制平面(管理配置)组成。本文讨论了服务发现、负载均衡和追踪等常见问题及其解决方案,并展示了使用Go语言实现Envoy sidecar配置的例子,强调Go语言在构建服务网格中的优势。服务网格能提升微服务的管理和可观测性,正确应对问题能构建更健壮的分布式系统。
29 1