重复别人所说的话,只需要教育;而要挑战别人所说的话,则需要头脑。——玛丽·佩蒂博恩·普尔
之前有两篇文章讲过Golang的并发控制,相信大家已经熟记于心,掌握其紧随,但是我们一直没有说Go的goroutine是否可以无限制的开辟,以及如何限定其数量,那么这篇文章我们就来聊聊Go是如何控制数量的。
1 不控制数量会引发问题
我们都知道Goroutine具备如下两个特点
- 体积轻量-几KB
- 优质的GMP调度-用户态调度
那么goroutine是否可以无限开辟呢,如果做一个服务器或者一些高业务的场景,能否随意的开辟goroutine并且放养不管呢?让他们自生自灭,毕竟有强大的GC和优质的调度算法支撑?
可以先看下下面这个问题。
package main import ( "fmt" "math" "runtime" ) func main() { //模拟用户需求业务的数量 task_cnt := math.MaxInt64 for i := 0; i < task_cnt; i++ { go func(i int) { //... do some busi... fmt.Println("go func ", i, " goroutine count = ", runtime.NumGoroutine()) }(i) } }
结果如下图所示:
从结果图可以看出,并发太大,单个socket所能处理的并发是有限的
所以,我们迅速的开辟goroutine(不控制并发的 goroutine 数量 )会在短时间内占据操作系统的资源(CPU、内存、文件描述符等)。
- CPU 使用率浮动上涨
- Memory 占用不断上涨。
- 主进程崩溃(被杀掉了)
这些资源实际上是所有用户态程序共享的资源,所以大批的goroutine最终引发的灾难不仅仅是自身,还会关联其他运行的程序。
所以在编写逻辑业务的时候,限制goroutine是我们必须要重视的问题。
2 一些简单方法控制goroutines数量
2.1 用有buffer的channel来限制
package main import ( "fmt" "math" "runtime" ) func busi(ch chan bool, i int) { fmt.Println("go func ", i, " goroutine count = ", runtime.NumGoroutine()) <-ch } func main() { //模拟用户需求业务的数量 task_cnt := math.MaxInt64 //task_cnt := 10 ch := make(chan bool, 3) for i := 0; i < task_cnt; i++ { ch <- true go busi(ch, i) } }
结果:
... go func 352277 goroutine count = 4 go func 352278 goroutine count = 4 go func 352279 goroutine count = 4 go func 352280 goroutine count = 4 go func 352281 goroutine count = 4 go func 352282 goroutine count = 4 ...
从结果看,程序并没有出现崩溃,而是按部就班的顺序执行,并且go的数量控制在了3,(4的原因是因为还有一个main goroutine)那么从数字上看,是不是在跑的goroutines有几十万个呢?
这里我们用了,buffer为3的channel, 在写的过程中,实际上是限制了速度。限制的是
for i := 0; i < go_cnt; i++ { //循环速度 ch <- true go busi(ch, i) }
for循环的速度,因为这个速度决定了go的创建速度,而go的结束速度取决于 busi()函数的执行速度。这样实际上,我们就能够保证了,同一时间内运行的goroutine的数量与buffer的数量一致。从而达到了限定效果。
但是这段代码有一个小问题,就是如果我们把go_cnt的数量变的小一些,会出现打出的结果不正确。
package main import ( "fmt" //"math" "runtime" ) func busi(ch chan bool, i int) { fmt.Println("go func ", i, " goroutine count = ", runtime.NumGoroutine()) <-ch } func main() { //模拟用户需求业务的数量 //task_cnt := math.MaxInt64 task_cnt := 10 ch := make(chan bool, 3) for i := 0; i < task_cnt; i++ { ch <- true go busi(ch, i) } }
结果:
go func 2 goroutine count = 4 go func 3 goroutine count = 4 go func 4 goroutine count = 4 go func 5 goroutine count = 4 go func 6 goroutine count = 4 go func 1 goroutine count = 4 go func 8 goroutine count = 4
是因为main将全部的go开辟完之后,就立刻退出进程了。所以想全部go都执行,需要在main的最后进行阻塞操作。
2.2 使用sync同步机制
import ( "fmt" "math" "sync" "runtime" ) var wg = sync.WaitGroup{} func busi(i int) { fmt.Println("go func ", i, " goroutine count = ", runtime.NumGoroutine()) wg.Done() } func main() { //模拟用户需求业务的数量 task_cnt := math.MaxInt64 for i := 0; i < task_cnt; i++ { wg.Add(1) go busi(i) } wg.Wait() }
很明显,单纯的使用sync依然达不到控制goroutine的数量,所以最终结果依然是崩溃。
... go func 7562 goroutine count = 7582 go func 24819 goroutine count = 17985 go func 7685 goroutine count = 7582 go func 24701 goroutine count = 17984 go func 7567 goroutine count = 7582 go func 24711 goroutine count = 17975 //操作系统停止响应
2.3 channel与sync同步组合方式
package main import ( "fmt" "math" "sync" "runtime" ) var wg = sync.WaitGroup{} func busi(ch chan bool, i int) { fmt.Println("go func ", i, " goroutine count = ", runtime.NumGoroutine()) <-ch wg.Done() } func main() { //模拟用户需求go业务的数量 task_cnt := math.MaxInt64 ch := make(chan bool, 3) for i := 0; i < task_cnt; i++ { wg.Add(1) ch <- true go busi(ch, i) } wg.Wait() }
结果:
//... go func 228851 goroutine count = 4 go func 228852 goroutine count = 4 go func 228853 goroutine count = 4 go func 228854 goroutine count = 4 go func 228855 goroutine count = 4 //...
这样我们程序就不会再造成资源爆炸而崩溃。而且运行go的数量控制住了在buffer为3的这个范围内。
2.4 利用无缓冲channel与任务发送/执行分离方式
package main import ( "fmt" "math" "sync" "runtime" ) var wg = sync.WaitGroup{} func busi(ch chan int) { for t := range ch { fmt.Println("go task = ", t, ", goroutine count = ", runtime.NumGoroutine()) wg.Done() } } func sendTask(task int, ch chan int) { wg.Add(1) ch <- task } func main() { ch := make(chan int) //无buffer channel goCnt := 3 //启动goroutine的数量 for i := 0; i < goCnt; i++ { //启动go go busi(ch) } taskCnt := math.MaxInt64 //模拟用户需求业务的数量 for t := 0; t < taskCnt; t++ { //发送任务 sendTask(t, ch) } wg.Wait() }
结果:
//... go task = 130069 , goroutine count = 4 go task = 130070 , goroutine count = 4 go task = 130071 , goroutine count = 4 go task = 130072 , goroutine count = 4 ...
执行流程大致如下,这里实际上是将任务的发送和执行做了业务上的分离。使得消息出去,输入SendTask的频率可设置、执行Goroutine的数量也可设置。也就是既控制输入(生产),又控制输出(消费)。使得可控更加灵活。这也是很多Go框架的Worker工作池的最初设计思想理念。
3 关注公众号
微信公众号:堆栈future