8.1 基础知识
这个是通过通道,来控制goroutine协程结束的示例:
func coordinateWithChan() { sign := make(chan struct{}, 2) num := int32(0) fmt.Printf("The number: %d [with chan struct{}]\n", num) max := int32(10) go addNum(&num, 1, max, func() { sign <- struct{}{} }) go addNum(&num, 2, max, func() { sign <- struct{}{} }) <-sign <-sign}
上一节我们学习过,sign通道读取数据时,如果命中“有缓冲channel + 缓冲为空”的情况,会阻塞,只有两个go协程全部执行完毕,往sign塞数据后,程序才会退出,但是这种方式非常繁琐。在这种应用场景下,我们可以选用另外一个同步工具sync.WaitGroup(以下简称WaitGroup类型),它比通道更加适合实现这种一对多的 goroutine 协作流程。WaitGroup类型是开箱即用的,也是并发安全的,它拥有三个指针方法:Add、Done和Wait,你可以想象该类型中有一个计数器,它的默认值是0,我们可以通过调用该类型值的Add方法来增加,或者减少这个计数器的值,代码升级如下:
func coordinateWithWaitGroup() { var wg sync.WaitGroup wg.Add(2) // 计数器加2 num := int32(0) fmt.Printf("The number: %d [with sync.WaitGroup]\n", num) max := int32(10) go addNum(&num, 3, max, wg.Done) // 计数器减1 go addNum(&num, 4, max, wg.Done) // 计数器减1 wg.Wait() // 会阻塞,直到计数器值为0,然后就会被唤醒}
Add会增加计数器的值,Done会减少计数器的值,Wait会一直阻塞,直到计数器的值重新回归为0,然后才会被唤醒,继续往后面执行。
8.2 常见的坑
如果使用不当,容易抛出Panic,我就把相关知识点列出来:
- 坑1(计数器为负数):sync.WaitGroup类型值中计数器的值如果小于0,会直接抛出Panic。
- 坑2(同时调用Add和Wait):如果我们对它的Add方法的首次调用,与对它的Wait方法的调用是同时发起的,比如,在同时启用的两个 goroutine 中,分别调用这两个方法,那么就有可能会让这里的Add方法抛出一个 panic。
- 坑3(跨越计数周期):如果在一个此类值的Wait方法被执行期间,跨越了两个计数周期,那么就会引发一个 panic。
对于坑1,当调用Add方法,传入一个负数的时候可能会出现,所以我们使用WaitGroup时,需要保证计数一直大于0。对于坑2,需要说明一点,虽然WaitGroup值本身并不需要初始化,但是尽早地增加其计数器的值,还是非常有必要的。对于坑3,我们需要先了解WaitGroup的计数周期:
计数周期:WaitGroup中计数器值由0变为了某个正整数,而后又经过一系列的变化,最终由某个正整数又变回了0。也就是说,只要计数器的值始于0又归为0,就可以被视为一个计数周期。在一个此类值的生命周期中,它可以经历任意多个计数周期。但是,只有在它走完当前的计数周期之后,才能够开始下一个计数周期。那坑3什么情况会出现呢?场景如下:当前的goroutine因调用Wait方法被阻塞的时候,另一个goroutine调用了该值的Done方法,并使其计数器的值变为了0,这会唤醒当前的goroutine,并使它试图继续执行Wait方法中其余的代码。但在这时,又有一个goroutine调用了它的Add方法,并让其计数器的值又从0变为了某个正整数。此时,这里的Wait方法就会立即抛出一个panic。根据坑2和坑3,总结如下:不要把增加其计数器值的操作和调用其Wait方法的代码,放在不同的 goroutine 中执行。换句话说,要杜绝对同一个WaitGroup值的两种操作的并发执行,标准方式应该为“先统一Add,再并发Done,最后Wait”。
8.3 并发实例:Push
对于上一章的并发示例,当时提了一个问题:每消费一条Channel数据,需要记录Push发送成功,但是一条Channel数据包含2-3个Push内容(IOS/Android/PC),程序记录Push成功前,如何保证这2-3个Push都发送完毕了呢?根据“先统一Add,再并发Done,最后Wait”原则,看下面代码:
var ( wg sync.WaitGroup succs []*NotifyMessage fails []*NotifyMessage)for _, message := range t.PushMessages { wg.Add(1) // 计数加1 go func(message mipush.PushMessage) { defer func() { wg.Done() // 计数减1 }() // 发送IOS/Android/PC等渠道的Push // 代码省略... }(message)}wg.Wait() // 阻塞,直到计数器值为0,然后就会被唤醒// 数据统计SendNotify(t.ID, t.TotalPage, t.TaskPage, t.AppType, t.AppLocal, fails, succs)
8.4 总结
WaitGroup是开箱即用和并发安全的,可以通过它很方便地实现一对多goroutine协作流程,即:一个分发子任务的goroutine,和多个执行子任务的goroutine,共同来完成一个较大的任务。在使用WaitGroup值的时候,我们一定要注意,千万不要让其中的计数器的值小于0,否则就会引发 panic。另外,我们最好用“先统一Add,再并发Done,最后Wait”这种标准方式,来使用WaitGroup值, 尤其不要在调用Wait方法的同时,并发地通过调用Add方法去增加其计数器的值,因为这也有可能引发 panic。