让sync.WaitGroup支持并发数量限制
重新定义WaitGroup
重新定义WaitGroup,目的就是为了支持并发数量限制,跟以往不确定并发相比,重新构造的WaitGroup可以限制并发数量和查看pending数量。
代码
// WaitGroup 实现一个简单的goroutine池 type WaitGroup struct { size int pool chan byte waitCount int64 waitGroup sync.WaitGroup // 底层还是利用sync.WaitGroup去做并发控制 } // NewWaitGroup 创建一个带有size的并发池 当size为<=0时,直接走sync.WaitGroup逻辑 func NewWaitGroup(size int) *WaitGroup { wg := &WaitGroup{ size: size, } if size > 0 { wg.pool = make(chan byte, size) } return wg } // BlockAdd 将“1”推入并发池中,如果池已满,则阻塞,这样就打到了限制并发的目的 func (wg *WaitGroup) BlockAdd() { if wg.size > 0 { wg.pool <- 1 } wg.waitGroup.Add(1) } // Done 代表一个并发结束 func (wg *WaitGroup) Done() { if wg.size > 0 { <-wg.pool } wg.waitGroup.Done() } // Wait 等待所有并发goroutine结束 func (wg *WaitGroup) Wait() { wg.waitGroup.Wait() } // PendingCount 返回所有pending状态的goroutine数量 func (wg *WaitGroup) PendingCount() int64 { return int64(len(wg.pool)) } func main() { urls := []string{ "https://www.a.com/", "https://www.b.com", "https://www.c.com", "https://www.d.com/", "https://www.e.com", "https://www.f.com", } wg := NewWaitGroup(2) for _, url := range urls { wg.BlockAdd() go func(url string) { defer wg.Done() res, err := http.Get(url) if err != nil { fmt.Printf("%s: result: %v\n", url, err) return } defer res.Body.Close() }(url) } wg.Wait() fmt.Println("Finished") }
结果
小结
其实原则上我们用sync.WaitGroup
可以满足并发控制,但是有些业务场景需要限制并发数量,而语言本身提供的又无法满足,所以这里就简单封装,实现了WaitGroup的并发限制,当然,这只是一小步,如果大家有兴趣,可以在其之上加上context,这样就可以满足超时控制和取消了。