WaitGroup
前言
- 理解WaitGroup的实现 - 核心是CAS的使用
- Add与Done应该放在哪? - Add放在Goroutine外,Done放在Goroutine中,逻辑复杂时建议用defer保证调用
- WaitGroup适合什么样的场景? - 并发的Goroutine执行的逻辑相同时,否则代码并不简洁,可以采用其它方式
正确与错误用法
package main import ( "fmt" "sync" ) func wg() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) //Add放在Goroutine外 go func(i int) { //TODO defer wg.Done() //建议用defer保证调用 }(i) } wg.Wait() fmt.Println("finished") } // Tip: waitGroup 不要进行copy func errWg1() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(i int, wg sync.WaitGroup) { fmt.Println(i) defer wg.Done() }(i, wg) /* type WaitGroup struct { noCopy noCopy state1 [3]uint32 } sync.WaitGroup是一个结构体类型,当参数传递是值拷贝 */ } wg.Wait() fmt.Println("finished") } // Tip: waitGroup 的 Add 要在goroutine前执行 func errWg2() { var wg sync.WaitGroup for i := 0; i < 10; i++ { go func(i int) { wg.Add(1) fmt.Println(i) defer wg.Done() }(i) } wg.Wait() /* 极端情况,wg.Add放在Goroutine里 可能for循环完了,还没执行到wg.Add(1) 就到了wg.Wait()这一步,此时直接wait掉结束了 */ fmt.Println("finished") } // Tip: waitGroup 的 Add 很大会有影响吗? func errWg3() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(100) go func(i int) { fmt.Println(i) defer wg.Done() wg.Add(-100) /* 使用是没问题的,查阅wg.Done()源码,其对应的是wg.Add(-1),查阅wg.Add源码,发现其内部 state := atomic.AddUint64(statep, uint64(delta)<<32) */ }(i) } fmt.Println("finished") }
源码剖析
package sync import ( "internal/race" "sync/atomic" "unsafe" ) type WaitGroup struct { noCopy noCopy //64位值:高32位为计数器,低32位为waiter计数,另外32bit是用作信号量 //64位原子操作需要64位对齐 //但需要32位对齐编译器并不能保证这一点 //所以我们分配12个字节,然后使用其中对齐的8个字节作为状态,其他4个字节作为存储 //state1:具有复合意义的字段,包含WaitGroup计数值,waiter计数和信号量 state1 [3]uint32 } // state返回指向存储在wg.state1中的state和sema字段的指针。 func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { //如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量 return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] } else { //如果地址是32bit对齐的,数组后两个元素用来做state,它可以用来做64bit的原子操作,第一个元素32bit用来做信号量 return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] } } // Add方法主要操作的state1字段中计数值部分 // 当Add方法被调用时,首先会将delta参数值左移32位(计数值在高32位) // 然后内部通过原子操作将这个值加到计数值上 // 需要注意的是,delta的取值范围可正可负,因为调用Done()方法时,内部通过Add(-1)方法实现的 func (wg *WaitGroup) Add(delta int) { // statep表示wait数和计数值 // 低32位表示wait数,高32位表示计数值 statep, semap := wg.state() //下面的代码去掉了源码里的race判断,race判断是竞争检测相关的 //只有当运行程序时带-race参数才起作用,不是WaitGroup的核心功能,去掉后方便看清程序主体 // uint64(delta)<<32 将delta左移32位 // 因为高32位表示计数值,所以将delta左移32,v = v + delta state := atomic.AddUint64(statep, uint64(delta)<<32) //state右移32位得到v的值 v := int32(state >> 32) //state的低32位是w的值 w := uint32(state) //v变成负数时就panic。比如第一次调用Add就传负数,就会出现这种panic if v < 0 { panic("sync: negative WaitGroup counter") } //正常Add在Wait之前调用,w是0才对 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } //正常Add之后程序return if v > 0 || w == 0 { return } //执行到这,此时计数器V=0;那么等待计数器肯定和整个state的值相等, //不然只有一个情况:有人调了Add(),并且是并发调用的。 if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } //把计数器v和w都置0 //如果计数值v为0并且waiter的数量w不为0,那么state的值就是waiter的数量 //将waiter的数量设置为0,因为计数值v也是0,所以它们俩的组合*statep直接设置为0即可。此时需要并唤醒所有的waiter *statep = 0 //发送w个信号量,也就是有多少goroutine调用了Wait()就发送多少信号量,让他们都解除阻塞 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } } //内部就是调用Add(-1)方法 func (wg *WaitGroup) Done() { wg.Add(-1) } //不断检查state值。如果其中的计数值为零,则说明所有的子goroutine已全部执行完毕,调用者不必等待,直接返回。 //如果计数值大于零,说明此时还有任务没有完成,那么调用者变成等待者,需要加入wait队列,并且阻塞自己。 func (wg *WaitGroup) Wait() { //statep表示wait数和计数值 //低32位表示wait数,高32位表示计数值 statep, semap := wg.state() //循环检查计数器V啥时候等于0 for { //原子操作的方式获取state的值 state := atomic.LoadUint64(statep) //将state右移32位,表示当前计数值 v := int32(state >> 32) //w表示waiter等待值 w := uint32(state) if v == 0 { //如果当前计数值为零,表示当前子goroutine已全部执行完毕,则直接返回 return } //否则使用原子操作将state值加一 //调用Wait()的goroutine的数目加1,Add()发送信号量的时候知道多少goroutine在等待 if atomic.CompareAndSwapUint64(statep, state, state+1) { //获取信号量,如果拿不到就阻塞,如果拿到了就继续执行 runtime_Semacquire(semap) //程序运行到这里说明Add()已经发送了信号量,Wait()解除阻塞了,这个时间state的值是0才正常。非0是个异常情况,这里panic if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return } } }
核心
//wg.Add state := atomic.AddUint64(statep, uint64(delta)<<32) //wg.Wait state := atomic.LoadUint64(statep) atomic.CompareAndSwapUint64(statep, state, state+1) //CAS一般情况下会比锁的性能更高
后记
CAS存在ABA问题要注意