读书是在别人思想的帮助下,建立起自己的思想。——鲁巴金
我发现很多同学对Golang的并发控制了解不到位,说的东一块,西一块,很零碎,没有系统的去梳理这些知识,那么今天无问南北就给大家带来Golang是如何高效做到高并发控制的。
1 Golang常见的并发控制
- Channel
- WaitGroup
- Context
三种方案各有优劣,比如Channel优点是实现简单,清晰易懂,WaitGroup优点是子协程个数动态可调整,Context优点是对子协程派生出来的孙子协程的控制。缺点是相对而言的,要结合实例应用场景进行选择。
2 分别介绍
2.1 Channel
channel一般用于协程之间的通信,channel也可以用于并发控制。比如主协程启动N个子协程,主协程等待所有子协程退出后再继续后续流程,这种场景下channel也可轻易实现。
下面程序展示一个使用channel控制子协程的例子:
package main import ( "time" "fmt" ) func Process(ch chan int) { //Do some work... time.Sleep(time.Second) //管道中写入一个元素表示当前协程已结束 ch <- 1 } func main() { //创建一个10个元素的切片,元素类型为channel channels := make([]chan int, 10) for i:= 0; i < 10; i++ { //切片中放入一个channel channels[i] = make(chan int) //启动协程,传一个管道用于通信 go Process(channels[i]) } //遍历切片,等待子协程结束 for i, ch := range channels { <-ch fmt.Println("Routine ", i, " quit!") } }
上面程序通过创建N个channel来管理N个协程,每个协程都有一个channel用于跟父协程通信,父协程创建完所有协程中等待所有协程结束。
这个例子中,父协程仅仅是等待子协程结束,其实父协程也可以向管道中写入数据通知子协程结束,这时子协程需要定期的探测管道中是否有消息出现。
使用channel来控制子协程的优点是实现简单,缺点是当需要大量创建协程时就需要有相同数量的channel,而且对于子协程继续派生出来的协程不方便控制。
后面继续介绍的WaitGroup、Context看起来比channel优雅一些,在各种开源组件中使用频率比channel高得多。
2.2 WaitGroup
WaitGroup是Golang应用开发过程中经常使用的并发控制技术。
WaitGroup,可理解为Wait-Goroutine-Group,即等待一组goroutine结束。比如某个goroutine需要等待其他几个goroutine全部完成,那么使用WaitGroup可以轻松实现。
下面程序展示了一个goroutine等待另外两个goroutine结束的例子:
package main import ( "fmt" "time" "sync" ) func main() { var wg sync.WaitGroup //设置计数器,数值即为goroutine的个数 wg.Add(2) go func() { //Do some work time.Sleep(1*time.Second) fmt.Println("Goroutine 1 finished!") //goroutine执行结束后将计数器减1 wg.Done() }() go func() { //Do some work time.Sleep(2*time.Second) fmt.Println("Goroutine 2 finished!") //goroutine执行结束后将计数器减1 wg.Done() }() //主goroutine阻塞等待计数器变为0 wg.Wait() fmt.Printf("All Goroutine finished!") }
简单的说,上面程序中wg内部维护了一个计数器:
- 启动goroutine前将计数器通过Add(2)将计数器设置为待启动的goroutine个数。
- 启动goroutine后,使用Wait()方法阻塞自己,等待计数器变为0。
- 每个goroutine执行结束通过Done()方法将计数器减1。
- 计数器变为0后,阻塞的goroutine被唤醒。
其实WaitGroup也可以实现一组goroutine等待另一组goroutine,这有点像玩杂技,很容出错,如果不了解其实现原理更是如此。实际上,WaitGroup的实现源码非常简单。
源码阅读
1. 信号量
信号量是Unix系统提供的一种保护共享资源的机制,用于防止多个线程同时访问某个资源。
可简单理解为信号量为一个数值:
- 当信号量>0时,表示资源可用,获取信号量时系统自动将信号量减1;
- 当信号量==0时,表示资源暂不可用,获取信号量时,当前线程会进入睡眠,当信号量为正时被唤醒;
由于WaitGroup实现中也使用了信号量,在此做个简单介绍。
2. 数据结构
源码包中src/sync/waitgroup.go:WaitGroup定义了其数据结构:
type WaitGroup struct { state1 [3]uint32 }
state1是个长度为3的数组,其中包含了state和一个信号量,而state实际上是两个计数器:
- counter:当前还未执行结束的goroutine计数器
- waiter count: 等待goroutine-group结束的goroutine数量,即有多少个等候者
- semaphore: 信号量
考虑到字节是否对齐,三者出现的位置不同,为简单起见,依照字节已对齐情况下,三者在内存中的位置如下所示:
WaitGroup对外提供三个接口:
- Add(delta int): 将delta值加到counter中
- Wait():waiter递增1,并阻塞等待信号量semaphore
- Done():counter递减1,按照waiter数值释放相应次数信号量
下面分别介绍这三个函数的实现细节。
2.1 Add(delta int)
Add()做了两件事,一是把delta值累加到counter中,因为delta可以为负值,也就是说counter有可能变成0或负值,所以第二件事就是当counter值变为0时,跟据waiter数值释放等量的信号量,把等待的goroutine全部唤醒,如果counter变为负值,则panic.
Add()伪代码如下
func (wg *WaitGroup) Add(delta int) { //获取state和semaphore地址指针 statep, semap := wg.state() //把delta左移32位累加到state,即累加到counter中 state := atomic.AddUint64(statep, uint64(delta)<<32) //获取counter值 v := int32(state >> 32) //获取waiter值 w := uint32(state) //经过累加后counter值变为负值,panic if v < 0 { panic("sync: negative WaitGroup counter") } //经过累加后,此时,counter >= 0 //如果counter为正,说明不需要释放信号量,直接退出 //如果waiter为零,说明没有等待者,也不需要释放信号量,直接退出 if v > 0 || w == 0 { return } //此时,counter一定等于0,而waiter一定大于0(内部维护waiter,不会出现小于0的情况), //先把counter置为0,再释放waiter个数的信号量 *statep = 0 for ; w != 0; w-- { //释放信号量,执行一次释放一个,唤醒一个等待者 runtime_Semrelease(semap, false) }
2.2 Wait()
Wait()方法也做了两件事,一是累加waiter, 二是阻塞等待信号量
func (wg *WaitGroup) Wait() { //获取state和semaphore地址指针 statep, semap := wg.state() for { //获取state值 state := atomic.LoadUint64(statep) //获取counter值 v := int32(state >> 32) //获取waiter值 w := uint32(state) //如果counter值为0,说明所有goroutine都退出了,不需要待待,直接返回 if v == 0 { return } // 使用CAS(比较交换算法)累加waiter,累加可能会失败,失败后通过for loop下次重试 if atomic.CompareAndSwapUint64(statep, state, state+1) { //累加成功后,等待信号量唤醒自己 runtime_Semacquire(semap) return } } }
这里用到了CAS算法保证有多个goroutine同时执行Wait()时也能正确累加waiter。
2.3 Done()
Done()只做一件事,即把counter减1,我们知道Add()可以接受负值,所以Done实际上只是调用了Add(-1)。
源码如下:
func (wg *WaitGroup) Done() { wg.Add(-1) }
Done()的执行逻辑就转到了Add(),实际上也正是最后一个完成的goroutine把等待者唤醒的。
Add()操作必须早于Wait(), 否则会panicAdd()设置的值必须与实际等待的goroutine个数一致,否则会panic
由于篇幅限制,Context并发控制下次分享,因为Context涉及到的东西非常多,而且也是常用的并发控制模型,因此需要好好聊聊,所以敬请期待吧。
6 关注公众号
微信公众号:堆栈future