// A canceler is a context type that can be canceled directly. The // implementations are *cancelCtx and *timerCtx. type canceler interface { cancel(removeFromParent bool, err error) Done() <-chan struct{} }
具体实现为,沿着回溯链找到第一个实现了 Done()
方法的实例,
- 如果为
canceler
的实例,则其必有 children 字段,并且实现了 cancel 方法(canceler),将该 context 放进 children 数组即可。此后,父 cancelCtx 在 cancel 时会递归遍历所有 children,逐一 cancel。 - 如果为非
canceler
的第三方 Context 实例,则我们不知其内部实现,因此只能为每个新加的子 Context 启动一个守护 goroutine,当 父 Context 取消时,取消该 Context。
需要注意的是,由于 Context 可能会被多个 goroutine 并行访问,因此在更改类字段时,需要再一次检查父节点是否已经被取消,若父 Context 被取消,则立即取消子 Context 并退出。
func propagateCancel(parent Context, child canceler) { 2 done := parent.Done() 3 if done == nil { 4 return // 父节点不可取消 5 } 6 7 select { 8 case <-done: 9 // 父节点已经取消 10 child.cancel(false, parent.Err()) 11 return 12 default: 13 } 14 15 if p, ok := parentCancelCtx(parent); ok { // 找到一个 cancelCtx 实例 16 p.mu.Lock() 17 if p.err != nil { 18 // 父节点已经被取消 19 child.cancel(false, p.err) 20 } else { 21 if p.children == nil { 22 p.children = make(map[canceler]struct{}) // 惰式创建 23 } 24 p.children[child] = struct{}{} 25 } 26 p.mu.Unlock() 27 } else { // 找到一个非 cancelCtx 实例 28 atomic.AddInt32(&goroutines, +1) 29 go func() { 30 select { 31 case <-parent.Done(): 32 child.cancel(false, parent.Err()) 33 case <-child.Done(): 34 } 35 }() 36 } 37}
下面用一张图来解释下回溯链和树组织, C0 是 emptyCtx
,通常由 context.Background()
得来,作为 Context 树的根节点。C1~C4 依次通过嵌入的方式从各自父节点派生而来。图中的虚线是由嵌入(embedded)而构成的回溯链,实线是由 cancelCtx
children 数组而保存的父子关系。
parentCancelCtx(C2)
和 parentCancelCtx(C4)
都为 C1,则 C1 的 children 数组中保存的为 C2 和 C4。构建了这两层关系后,就可以沿着回溯链向上查询 Value 值,包括找到第一个祖先 cancelCtx
;也可以沿着 children 关系往下进行级联取消。
当然,图中所有 Context 都是针对 go 包中的系统 Context,没有画出有第三方 Context 的情况。而实际代码由于增加了对第三方 Context 的处理逻辑,稍微难懂一些。区分系统 Context 实现和用户自定义 Context 的关键点在于是否实现了 canceler
接口。
第三方 Context 实现了此接口就可以进行树形组织,并且在上游 cancelCtx
取消时,递归调用 children 的 cancel 进行级联取消。否则只能通过为每个第三方 Context 启动一个 goroutine 来监听上游取消事件,以对第三方 Context 进行取消了。
级联取消
下面是级联取消中的关键函数 cancelCtx.cancel
的实现。在本 cancelCtx
取消时,需要级联取消以该 cancelCtx
为根节点的 Context 树中的所有 Context,并将根 cancelCtx
从其从父节点中摘除,以让 GC 回收该 cancelCtx
子树所有节点的资源。
cancelCtx.cancel
是非导出函数,不能在 context 包外调用,因此持有 Context 的内层过程不能自己取消自己,须由返回的 CancelFunc
(简单的包裹了cancelCtx.cancel
)来取消,其句柄一般为外层过程所持有。
func (c *cancelCtx) cancel(removeFromParent bool, err error) { 2 if err == nil { // 需要给定取消的理由,Canceled or DeadlineExceeded 3 panic("context: internal error: missing cancel error") 4 } 5 6 c.mu.Lock() 7 if c.err != nil { 8 c.mu.Unlock() 9 return // 已经被其他 goroutine 取消 10 } 11 12 // 记下错误,并关闭 done 13 c.err = err 14 if c.done == nil { 15 c.done = closedchan 16 } else { 17 close(c.done) 18 } 19 20 // 级联取消 21 for child := range c.children { 22 // NOTE: 持有父 Context 的同时获取了子 Context 的锁 23 child.cancel(false, err) 24 } 25 c.children = nil 26 c.mu.Unlock() 27 28 // 子树根需要摘除,子树中其他节点则不再需要 29 if removeFromParent { 30 removeChild(c.Context, c) 31 } 32}
计时 timerCtx
timerCtx
在嵌入 cancelCtx
的基础上增加了一个计时器 timer,根据用户设置的时限,到点取消。
type timerCtx struct { 2 cancelCtx 3 timer *time.Timer // Under cancelCtx.mu 4 5 deadline time.Time 6} 7 8func (c *timerCtx) Deadline() (deadline time.Time, ok bool) { 9 return c.deadline, true 10} 11 12func (c *timerCtx) cancel(removeFromParent bool, err error) { 13 // 级联取消子树中所有 Context 14 c.cancelCtx.cancel(false, err) 15 16 if removeFromParent { 17 // 单独调用以摘除此节点,因为是摘除 c,而非 c.cancelCtx 18 removeChild(c.cancelCtx.Context, c) 19 } 20 21 // 关闭计时器 22 c.mu.Lock() 23 if c.timer != nil { 24 c.timer.Stop() 25 c.timer = nil 26 } 27 c.mu.Unlock() 28}
设置超时取消是在 context.WithDeadline()
中完成的。如果祖先节点时限早于本节点,只需返回一个 cancelCtx
即可,因为祖先节点到点后在级联取消时会将其取消。
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) { 2 if cur, ok := parent.Deadline(); ok && cur.Before(d) { 3 // 祖先节点的时限更早 4 return WithCancel(parent) 5 } 6 7 c := &timerCtx{ 8 cancelCtx: newCancelCtx(parent), // 使用一个新的 cancelCtx 实现部分 cancel 功能 9 deadline: d, 10 } 11 propagateCancel(parent, c) // 构建 Context 取消树,注意传入的是 c 而非 c.cancelCtx 12 dur := time.Until(d) // 测试时限是否设的太近以至于已经结束了 13 if dur <= 0 { 14 c.cancel(true, DeadlineExceeded) 15 return c, func() { c.cancel(false, Canceled) } 16 } 17 18 // 设置超时取消 19 c.mu.Lock() 20 defer c.mu.Unlock() 21 if c.err == nil { 22 c.timer = time.AfterFunc(dur, func() { 23 c.cancel(true, DeadlineExceeded) 24 }) 25 } 26 return c, func() { c.cancel(true, Canceled) } 27}
Context 使用
使用了 Context 的子过程须保证在 Context 被关闭时及时退出并释放资源。也就是说,使用 Context 需要遵循上述原则才能保证级联取消时释放资源的效果。因此,Context 本质上是一种树形分发信号的机制,可以用 Context 树追踪过程调用树,当外层过程取消时,使用 Context 级联通知所有被调用过程。
以下是一个典型子过程的检查 Context 以确定是否需要退出的代码片段:
for ; ; time.Sleep(time.Second) { 2 select { 3 case <-context.Done(): 4 return 5 default: 6 } 7 8 // 一些耗时操作 9}
可以看出,Context 接口本身并没有 Cancel 方法,这和 Done()
返回的 channel 是只读的是一个道理:Context 关闭信号的发送方和接收方通常不在一个函数中。比如,当父 goroutine 启动了一些子 goroutine 来干活时,只能是父 goroutine 来关闭 done channel,子 goroutine 来检测 channel 的关闭信号。即不能在子 goroutine 中 取消父 goroutine 中传递过来的 Context。
Context 注意
Context 有一些使用实践需要遵循:
- Context 通常作为函数中第一个参数
- 不要在 struct 中存储 Context,每个函数都要显式的传递 Context。不过实践中可以根据 struct 的生命周期来灵活组合。
- 不要使用 nil Context,尽管语法上允许。不知道使用什么值合适时,可以使用
context.TODO()
。 - Context value 是为了在请求生命周期中共享数据,而非作为函数中传递额外参数的方法。因为这是一种隐式的语义,极易造成 bug;要想传额外参数,还是要在函数中显式声明。
- Context 是 immutable 的,因此是线程安全的,可以在多个 goroutine 中传递并使用同一个 Context。
注
[1] 文中的过程,指的是计算密集型或者 IO 密集型的耗时函数,或者 goroutine。
[2] Context 的 Done Channel,指的是 context.Done()
返回的 channel。它是 Context 内的关键数据结构,作为沟通不同过程的的渠道。需要结束时,父过程向该 channel 发送信号,子过程读取该 channel 信号后做扫尾工作并且退出。
参考
- go doc context:https://golang.org/pkg/context/
- code review conmments: https://github.com/golang/go/wiki/CodeReviewComments#contexts
- go blog context:https://blog.golang.org/context
- go context 源码:https://golang.org/src/context/context.go
- go 语言设计与实现:https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-context/
不妨一读