前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。
在 go 的标准库中,提供了 sync.Cond
这个并发原语,让我们可以实现多个 goroutine
等待某一条件满足之后再继续执行。
它需要配合 sync.Mutex
一起使用,因为 Cond
的 Wait
方法需要在 Mutex
的保护下才能正常工作。
对于条件变量,可能大多数人只是知道它的存在,但是用到它的估计寥寥无几,因为很多并发场景的处理都能使用 chan
来实现,
而且 chan
的使用也更加简单。
但是在某些场景下,Cond
可能是最好的选择,本文就来探讨一下 Cond
的使用场景,基本用法,以及它的实现原理。
sync.Cond 是什么?
sync.Cond
表示的是条件变量,它是一种同步机制,用来协调多个 goroutine
之间的同步,当共享资源的状态发生变化的时候,
可以通过条件变量来通知所有等待的 goroutine
去重新获取共享资源。
适用场景
在实际使用中,我们可能会有多个 goroutine
在执行的过程中,由于某一条件不满足而阻塞的情况。
这个时候,我们就可以使用条件变量来实现 goroutine
之间的同步。比如,我们有一个 goroutine
用来获取数据,
但是可能会比较耗时,这个时候,我们就可以使用条件变量来实现 goroutine
之间的同步,
当数据准备好之后,就可以通过条件变量来通知所有等待的 goroutine
去重新获取共享资源。
sync.Cond
条件变量用来协调想要访问共享资源的那些goroutine
,当共享资源的状态发生变化的时候,它可以用来通知所有等待的
goroutine
去重新获取共享资源。
sync.Cond 的基本用法
sync.Cond
的基本用法非常简单,我们只需要通过 sync.NewCond
方法来创建一个 Cond
实例,
然后通过 Wait
方法来等待条件满足,通过 Signal
或者 Broadcast
方法来通知所有等待的 goroutine
去重新获取共享资源。
NewCond 创建实例
sync.NewCond
方法用来创建一个 Cond
实例,它的参数是一个 Locker
接口,我们可以传入一个 Mutex
或者 RWMutex
实例。
这个条件变量的 Locker
接口就是用来保护共享资源的。
Wait 等待条件满足
Wait
方法用来等待条件满足,它会先释放 Cond
的锁(Cond.L
),然后阻塞当前 goroutine
(实际调用的是 goparkunlock
),直到被 Signal
或者 Broadcast
唤醒。
它做了如下几件事情:
- 释放
Cond
的锁(Cond.L
),然后阻塞当前goroutine
。(所以,使用之前需要先锁定) - 被
Signal
或者Broadcast
唤醒之后,会重新获取Cond
的锁(Cond.L
)。 - 之后,就返回到
goroutine
阻塞的地方继续执行。
Signal 通知一个等待的 goroutine
Signal
方法用来通知一个等待的 goroutine
,它会唤醒一个等待的 goroutine
,然后继续执行当前 goroutine
。
如果没有等待的 goroutine
,则不会有任何操作。
Broadcast 通知所有等待的 goroutine
Broadcast
方法用来通知所有等待的 goroutine
,它会唤醒所有等待的 goroutine
,然后继续执行当前 goroutine
。
如果没有等待的 goroutine
,则不会有任何操作。
sync.Cond 使用实例
下面我们通过一个实例来看一下 sync.Cond
的使用方法。
package cond import ( "fmt" "sync" "testing" "time" ) var done bool var data string func write(c *sync.Cond) { fmt.Println("writing.") // 让 reader 先获取锁,模拟条件不满足然后 wait 的情况 time.Sleep(time.Millisecond * 10) c.L.Lock() // 模拟耗时的写操作 time.Sleep(time.Millisecond * 50) data = "hello world" done = true fmt.Println("writing done.") c.L.Unlock() c.Broadcast() } func read(c *sync.Cond) { fmt.Println("reading") c.L.Lock() for !done { fmt.Println("reader wait.") c.Wait() } fmt.Println("read done.") fmt.Println("data:", data) defer c.L.Unlock() } func TestCond(t *testing.T) { var c = sync.NewCond(&sync.Mutex{}) go read(c) // 读操作 go read(c) // 读操作 go write(c) // 写操作 time.Sleep(time.Millisecond * 100) // 等待操作完成 }
输出:
reading reader wait. // 还没获取完数据,需要等待 writing. reading reader wait. writing done. // 获取完数据了,通知所有等待的 reader read done. // 读取到数据了 data: hello world // 输出读取到的数据 read done. data: hello world
这个例子可以粗略地用下图来表示:
说明:
read1
和reader2
表示两个goroutine
,它们都会调用read
函数。- 在
done
为false
的时候,reader1
和reader2
都会调用c.Wait()
函数,然后阻塞等待。 write
表示一个goroutine
,它会调用write
函数。- 在
write
函数中,获取完数据之后,会将done
设置为true
,然后调用c.Broadcast()
函数,通知所有等待的reader
去重新获取共享资源。 reader1
和reader2
在解除阻塞状态后,都会重新获取共享资源,然后输出读取到的数据。
在这个例子中,done
的功能是标记,用来表示共享资源是否已经获取完毕,如果没有获取完毕,那么 reader
就会阻塞等待。
为什么要用 sync.Cond?
在文章开头,我们说了,很多并发编程的问题都可以通过 channel
来解决。
同样的,在上面提到的 sync.Cond
的使用场景,使用 channel
也是可以实现的,
我们只要 close(ch)
来关闭 channel
就可以实现通知多个等待的协程了。
那么为什么还要用 sync.Cond
呢?
主要原因是,sync.Cond
可以重复地进行 Wait()
和 Signal()
、Broadcast()
操作,
但是,如果想通过关闭 chan
来实现这个功能的话,那就只能通知一次了。
因为 channel
只能关闭一次,关闭一个已经关闭的 channel
会导致程序 panic。
使用 channel
的另外一种方式是,记录 reader
的数量,然后通过往 channel
中发送多次数据来实现通知多个 reader
。
但是这样一来代码就会复杂很多,从另一个角度说,出错的概率大了很多。
close channel 广播实例
下面的例子模拟了使用 close(chan)
来实现 sync.Cond
中那种广播功能,但是只能通知一次。
package close_chan import ( "fmt" "testing" "time" ) var data string func read(c <-chan struct{}) { fmt.Println("reading.") // 从 chan 接收数据,如果 chan 中没有数据,会阻塞。 // 如果能接收到数据,或者 chan 被关闭,会解除阻塞状态。 <-c fmt.Println("data:", data) } func write(c chan struct{}) { fmt.Println("writing.") // 模拟耗时的写操作 time.Sleep(time.Millisecond * 10) data = "hello world" fmt.Println("write done.") // 关闭 chan 的时候,会通知所有的 reader // 所有等待从 chan 接收数据的 goroutine 都会被唤醒 close(c) } func TestCloseChan(t *testing.T) { ch := make(chan struct{}) go read(ch) go read(ch) go write(ch) // 不能关闭已经关闭的 chan time.Sleep(time.Millisecond * 20) // panic: close of closed channel // 下面这行代码会导致 panic //go write(ch) time.Sleep(time.Millisecond * 100) }
输出:
writing. reading. // 会阻塞直到写完 reading. // 会阻塞直到写完 write done. // 写完之后,才能读 data: hello world data: hello world
上面例子的 write
不能多次调用,否则会导致 panic。
sync.Cond 基本原理
go 的 sync.Cond
中维护了一个链表,这个链表记录了所有阻塞的 goroutine
,也就是由于调用了 Wait
而阻塞的 goroutine
。
而 Signal
和 Broadcast
方法就是用来唤醒这个链表中的 goroutine
的。
Signal
方法只会唤醒链表中的第一个 goroutine
,而 Broadcast
方法会唤醒链表中的所有 goroutine
。
下图是 Signal
方法的效果,可以看到,Signal
方法只会唤醒链表中的第一个 goroutine
:
说明:
notifyList
是sync.Cond
中维护的一个链表,这个链表记录了所有阻塞的goroutine
。head
是链表的头节点,tail
是链表的尾节点。Signal
方法只会唤醒链表中的第一个goroutine
。
而 Broadcast
方法会唤醒 notifyList
中的所有 goroutine
。
sync.Cond 的设计与实现
最后,我们来看一下 sync.Cond
的设计与实现。
sync.Cond 模型
sync.Cond
的模型如下所示:
type Cond struct { noCopy noCopy // L is held while observing or changing the condition L Locker // L 在观察或改变条件时被持有 notify notifyList checker copyChecker }
属性说明:
noCopy
是一个空结构体,用来检查sync.Cond
是否被复制。(在编译前通过go vet
命令来检查)L
是一个Locker
接口,用来保护条件变量。notify
是一个notifyList
类型,用来记录所有阻塞的goroutine
。checker
是一个copyChecker
类型,用来检查sync.Cond
是否被复制。(如果在运行时被复制,会导致panic
)
notifyList 结构体
notifyList
是 sync.Cond
中维护的一个链表,这个链表记录了所有因为共享资源还没准备好而阻塞的 goroutine
。它的定义如下所示:
type notifyList struct { wait atomic.Uint32 notify uint32 // 阻塞的 waiter 名单。 lock mutex // 锁 head *sudog // 阻塞的 goroutine 链表(链表头) tail *sudog // 阻塞的 goroutine 链表(链表尾) }
属性说明:
wait
是下一个waiter
的编号。它在锁外自动递增。notify
是下一个要通知的waiter
的编号。它可以在锁外读取,但只能在持有锁的情况下写入。lock
是一个mutex
类型,用来保护notifyList
。head
是一个sudog
类型,用来记录阻塞的goroutine
链表的头节点。tail
是一个sudog
类型,用来记录阻塞的goroutine
链表的尾节点。
notifyList
的方法说明:
notifyList
中包含了几个操作阻塞的goroutine
链表的方法。
notifyListAdd
方法将waiter
的编号加 1。notifyListWait
方法将当前的goroutine
加入到notifyList
中。(也就是将当前协程挂起)notifyListNotifyOne
方法将notifyList
中的第一个goroutine
唤醒。notifyListNotifyAll
方法将notifyList
中的所有goroutine
唤醒。notifyListCheck
方法检查 notifyList 的大小是否正确。
sync.Cond 的方法
notifyList
就不细说了,本文重点讲解一下 sync.Cond
的实现。
Wait 方法
Wait
方法用在当条件不满足的时候,将当前运行的协程挂起。
func (c *Cond) Wait() { // 检查是否被复制 c.checker.check() // 更新 notifyList 中需要等待的 waiter 的数量 // 返回当前需要插入 notifyList 的编号 t := runtime_notifyListAdd(&c.notify) // 解锁 c.L.Unlock() // 挂起当前 g,直到被唤醒 runtime_notifyListWait(&c.notify, t) // 唤醒之后,重新加锁。 // 因为阻塞之前解锁了。 c.L.Lock() }
对于 Wait
方法,我们需要注意的是,使用之前,我们需要先调用 L.Lock()
方法加锁,然后再调用 Wait
方法,否则会报错。
文档里面的例子:
c.L.Lock() for !condition() { c.Wait() } // ...使用条件... // 这里是我们在条件满足之后,需要执行的代码。 c.L.Unlock()
好了,问题来了,调用 Wait
方法之前为什么要先加锁呢?
这是因为在我们使用共享资源的时候,可能一些代码是互斥的,所以我们需要加锁。
这样我们就可以保证在我们使用共享资源的时候,不会被其他协程修改。
但是如果因为条件不满足,我们需要等待的话,我们不可能在持有锁的情况下等待,
因为在修改条件的时候,可能也需要加锁,这样就会造成死锁。
另外一个问题是,为什么要使用 for
来检查条件是否满足,而不是使用 if
呢?
这是因为在我们调用 Wait
方法之后,可能会有其他协程唤醒我们,但是条件并没有满足,
这个时候依然是需要继续 Wait
的。
Signal 方法
Signal
方法用在当条件满足的时候,将 notifyList
中的第一个 goroutine
唤醒。
func (c *Cond) Signal() { // 检查 sync.Cond 是否被复制了 c.checker.check() // 唤醒 notifyList 中的第一个 goroutine runtime_notifyListNotifyOne(&c.notify) }
Broadcast 方法
Broadcast
方法用在当条件满足的时候,将 notifyList
中的所有 goroutine
唤醒。
func (c *Cond) Broadcast() { // 检查 sync.Cond 是否被复制了 c.checker.check() // 唤醒 notifyList 中的所有 goroutine runtime_notifyListNotifyAll(&c.notify) }
copyChecker 结构体
copyChecker
结构体用来检查 sync.Cond
是否被复制。它实际上只是一个 uintptr
类型的值。
type copyChecker uintptr // check 方法检查 copyChecker 是否被复制了。 func (c *copyChecker) check() { if uintptr(*c) != uintptr(unsafe.Pointer(c)) && !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) && uintptr(*c) != uintptr(unsafe.Pointer(c)) { panic("sync.Cond is copied") } }
copyChecker
的值只有两种可能:
0
,表示还没有调用过Wait
,Signal
或Broadcast
方法。uintptr(unsafe.Pointer(©Checker))
,表示已经调用过Wait
,Signal
或Broadcast
方法。在这几个方法里面会调用check
方法,所以copyChecker
的值会被修改。
所以如果 copyChecker
的值不是 0
,也不是 uintptr(unsafe.Pointer(©Checker))
(也就是最初的 copyChecker
的内存地址),则表示 copyChecker
被复制了。
需要注意的是,这个方法在调用
CompareAndSwapUintptr
还会检查一下,这是因为有可能会并发调用CompareAndSwapUintptr
,如果另外一个协程调用了
CompareAndSwapUintptr
并且成功了,那么当前协程的这个CompareAndSwapUintptr
调用会返回false
,这个时候就需要检查是否是因为另外一个协程调用了
CompareAndSwapUintptr
而导致的,如果是的话,就不会panic
。
为什么 sync.Cond 不能被复制?
从上一小节中我们可以看到,sync.Cond
其实是不允许被复制的,但是如果是在调用 Wait
, Signal
或 Broadcast
方法之前复制,那倒是没关系。
这是因为 sync.Cond
中维护了一个阻塞的 goroutine
列表。如果 sync.Cond
被复制了,那么这个列表就会被复制,这样就会导致两个 sync.Cond
都包含了这个列表;但是我们唤醒的时候,只会有其中一个 sync.Cond
被唤醒,另外一个 sync.Cond
就会一直阻塞。
所以 go 直接从语言层面限制了这种情况,不允许 sync.Cond
被复制。
总结
sync.Cond
是一个条件变量,它可以用来协调多个goroutine
之间的同步,当条件满足的时候,去通知那些因为条件不满足被阻塞的goroutine
继续执行。sync.Cond
的接口比较简单,只有Wait
,Signal
和Broadcast
三个方法。
Wait
方法用来阻塞当前goroutine
,直到条件满足。调用Wait
方法之前,需要先调用L.Lock
方法加锁。Signal
方法用来唤醒notifyList
中的第一个goroutine
。Broadcast
方法用来唤醒notifyList
中的所有goroutine
。
sync.Cond
的实现也比较简单,它的核心就是notifyList
,它是一个链表,用来保存所有因为条件不满足而被阻塞的goroutine
。- 用关闭
channel
的方式也可以实现类似的广播功能,但是有个问题是channel
不能被重复关闭,所以这种方式无法被多次使用。也就是说使用这种方式无法多次广播。 - 使用
channel
发送通知的方式也是可以的,但是这样实现起来就复杂很多了,就更容易出错了。 sync.Cond
中使用copyChecker
来检查sync.Cond
是否被复制,如果被复制了,就会panic
。需要注意的是,这里的复制是指调用了Wait
,Signal
或Broadcast
方法之后,sync.Cond
被复制了。在调用这几个方法之前进行复制是没有影响的。