一. 按照正常思路来一个架构设计
1. 基础知识必备
- sync.WaitGroup
- chan
- go func
2. 原始消费架构模式设计
package main import ( "fmt" "math/rand" "sync" "time" ) func main() { const concurrencyProcesses = 10 // 限制最大并发处理数量 const jobCount = 100 // 有多少任务 var wg sync.WaitGroup wg.Add(jobCount) found := make(chan int) limitCh := make(chan struct{}, concurrencyProcesses) // 用chan去控制并发数量 for i := 0; i < jobCount; i++ { //处理100个job limitCh <- struct{}{} // 控制并发数量 go func(val int) { defer func() { wg.Done() <-limitCh }() waitTime := rand.Int31n(1000) fmt.Println("job:", val, "wait time:", waitTime, "millisecond") time.Sleep(time.Duration(waitTime) * time.Millisecond) found <- val }(i) } go func() { // 等待goroutine结束并且关闭结果chan wg.Wait() close(found) }() var results []int for p := range found { // 从结果chan获取数据 fmt.Println("Finished job:", p) results = append(results, p) } fmt.Println("result:", results) }
大家猜猜结果是怎么样的?
3. 公布结果
为什么deadlock?
4. 究其原因
这就是Limit Concurrency问题,即限制并发问题
在读取第一个job的时候,现将空的struct放进limitCh中,这个时候limitCh中就只剩下9个可以继续处理,接着重复这个步骤继续放入job,直到10个job装满limitCh。但是当第11个job需要处理的时候,程序就直接停止在limitCh <- struct{}{}
处,因为前10个goroutine不知道什么时候才能处理结束,这样第11个job它后面的代码就完全没机会执行,造成整个系统deadlock
。
但是如果你的job数量小于等于10,是完全看不出来任何问题的,系统可以正常运行,只有job数量大于并发数量的时候才会出这个BUG。
二. 按照BUG再设计一版
相信大家想到了一种解决方式,既然程序是卡在limitCh <- struct{}{}
,那么就直接将这段处理逻辑丢到go func中处理就好了。
... for i := 0; i < jobCount; i++ { go func() { //丢到go func中去处理 limitCh <- struct{}{} }() go func(val int) { ...
结果:
发现100个job同时处理直到结束,并没有达到限制并发处理的要求,虽然满足最终把这些job处理完成,但是我们要的可是Limit Concurrency
三. 最佳实践,也是大家拿来就可以用的限制并发框架模式
既然要限制并发数量,那么就建立特定数量的worker,每个worker读取chan就可以了,所以
第一步就是先建立queue的通道,将所有job都放入queue中,但是以go func方式去处理,避免阻塞main程序。
第二步就是建立特定的worker数量来消化全部的job。
package main import ( "fmt" "math/rand" "sync" "time" ) func main() { const concurrencyProcesses = 10 // 限制最大并发处理数量 const jobCount = 100 // 有多少任务 var wg sync.WaitGroup wg.Add(jobCount) found := make(chan int) queue := make(chan int) go func(queue chan<- int) { // 生产端负责生产任务到queue for i := 0; i < jobCount; i++ { queue <- i } close(queue) }(queue) for i := 0; i < concurrencyProcesses; i++ { // 当前只有10个并发 他们做的事情就是处理任务 并且将结果set in found go func(queue <-chan int, found chan<- int) { for val := range queue { defer wg.Done() waitTime := rand.Int31n(1000) fmt.Println("job:", val, "wait time:", waitTime, "millisecond") time.Sleep(time.Duration(waitTime) * time.Millisecond) found <- val } }(queue, found) // 传递进去 防止data race } go func() { // 等待goroutine结束并且关闭结果chan wg.Wait() close(found) }() var results []int for p := range found { // 从结果chan获取数据 fmt.Println("Finished job:", p) results = append(results, p) } fmt.Println("result:", results) }
可以看到这里的for循环是以concurrencyProcesses为并发数量去处理job了。10个goroutine通过内层for循环不断读取chan中的job,直到queue中没有job为止,这样生产端生产完job之后会关闭queue,那么goroutine最后收到关闭消息后退出for循环,结束goroutine。
可以看下结果:
小结
其实还有很多常见的其他限制并发处理模式,但是我喜欢用上面这种处理模式。这里有个细节需要大家注意哈,就是控制并发的时候要处理好数据竞争(data race
)问题,如果处理不好,可能最后结果不一定是对的。
- END -