Goroutine(协程)
在java/c++中我们要实现并发编程的时候,我们通常需要自己维护一个线程池,并且需要自己去包装一个又一个的任务,同时需要自己去调度线程执行任务并维护上下文切换,这一切通常会耗费程序员大量的心智。
为此Go语言提供了 goroutine 这样的机制,goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。
使用goroutine
`单个 goroutine` func main() { //hello() go hello() // 有时 hello() 并不执行,因为程序会为main函数创建一个默认的goroutine, // 当main里的语句执行完goroutine也就结束了,没有 go hello() 执行的时间 fmt.Println("main hello") // 为了确保go hello() 的goroutine能够执行可以延缓程序结束时间 time.Sleep(time.Second) } func hello() { fmt.Println("hello") }
单个 goroutine 可以通过时间延后来使这个 goroutine 被完全执行,但是当 goroutine 多到上百上千或更多时在使用 time.Sleep() 显然就没办法确定给多少时间来让 goroutine 被完全执行了,给多了影响程序效率,给少了有的 goroutine 又不会执行影响程序结果,这时候我们就要用到另一个东西那就是 sync.WaitGroup。
WaitGroup对象内部有个计时器, 最初从0 开始, 他有3个方法 Add() , Done(), Wait()用来控制计数器的数量。 Add(n) 把计数器设置成n, Done() 每次把计数器-1, wait() 会阻塞代码的运行, 直到计数器的值减为0。将 goroutine 所剩数量与WaitGroup结合可以解决上述问题
var wg sync.WaitGroup func main() { for i := 0; i < 10; i++ { // 每添加一个 goroutine wg + 1 wg.Add(1) go sayNum(i) } // 等待 wg = 0 在执行后面的代码 wg.Wait() fmt.Println("end") } func sayNum(i int) { // 当一个 goroutine 结束就 - 1 defer wg.Done() fmt.Println(i) }
当我们运行这个代码就会发现每次的输出都不同,这是因为这10个 goroutine 的执行是并发的,而调度却是随机的
goroutine与线程
goroutine的栈是可增长的
OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这个大。所以在Go语言中一次创建十万左右的goroutine也是可以的。
goroutine调度
GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。
- 1.G很好理解,就是个goroutine的,里面除了存放本goroutine信息外 还有与所在P的绑定等信息。
- 2.P管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务。
- 3.M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系, 一个groutine最终是要放到M上执行的;
P与M一般也是一一对应的。他们关系是: P管理着一组G挂载在M上运行。当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其他的G 挂载在新建的M上。当旧的G阻塞完成或者认为其已经死掉时 回收旧的M。
P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。
单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。 其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PKmOgZxa-1637983441094)(image-20211124133622706.png)]
Goroutine池使用实例
作用:Goroutine池可以有效控制goroutine数量,防止goroutine数量暴涨
需求:
- 计算一个数字的各个位数之和,例如数字123,结果为1+2+3=6
- 随机生成数字进行计算,一直生成,一直计算
package main import ( "fmt" "math/rand" ) type Job struct { // id Id int // 需要计算的随机数 RandNum int } type Result struct { // 这里必须传对象实例 job *Job // 求和 sum int } func main() { // 需要2个管道 // 1.job管道 jobChan := make(chan *Job, 128) // 2.结果管道 resultChan := make(chan *Result, 128) // 3.创建工作池 createPool(64, jobChan, resultChan) // 4.开个打印的协程 go func(resultChan chan *Result) { // 遍历结果管道打印 for result := range resultChan { fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id, result.job.RandNum, result.sum) } }(resultChan) var id int // 循环创建job,输入到管道 for { id++ // 生成随机数 r_num := rand.Int() job := &Job{ Id: id, RandNum: r_num, } jobChan <- job } } // 创建工作池 // 参数1:开几个协程 func createPool(num int, jobChan chan *Job, resultChan chan *Result) { // 根据开协程个数,去跑运行 for i := 0; i < num; i++ { go func(jobChan chan *Job, resultChan chan *Result) { // 执行运算 // 遍历job管道所有数据,进行相加 for job := range jobChan { // 随机数接过来 r_num := job.RandNum // 随机数每一位相加 // 定义返回值 var sum int for r_num != 0 { tmp := r_num % 10 sum += tmp r_num /= 10 } // 想要的结果是Result r := &Result{ job: job, sum: sum, } //运算结果扔到管道 resultChan <- r } }(jobChan, resultChan) } }
部分结果(控制台打印):
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x1cOXqLo-1637983441095)(image-20211124211848787.png)]
runtime包
runtime.Gosched()
退出当前的 goroutine ,为其他 goroutine 腾出执行空间,最后再执行被退出的 goroutine
中文文档给了我们一个特别有意思的比喻:
(大概意思就是本来计划的好好的周末出去烧烤,但是你妈让你去相亲,两种情况第一就是你相亲速度非常快,见面就黄不耽误你继续烧烤,第二种情况就是你相亲速度特别慢,见面就是你侬我侬的,耽误了烧烤,但是还馋就是耽误了烧烤你还得去烧烤)
package main import ( "fmt" "runtime" ) func main() { // 让所有协程在一个核上执行 runtime.GOMAXPROCS(1) go func(s string) { for i := 0; i < 2; i++ { fmt.Println(s,i) } }("协程运行中:") // 主协程 for i := 0; i < 2; i++ { fmt.Println("hello") // 停一下,再次分配任务 runtime.Gosched() fmt.Println("world",i) } }
runtime.Goexit()
退出当前的 goroutine ,以后也不会执行
(一边烧烤一边相亲,突然发现相亲对象太丑影响烧烤,果断让她滚蛋,然后也就没有然后了)
package main import ( "fmt" "runtime" ) func main() { go func() { defer fmt.Println("A.defer") func() { defer fmt.Println("B.defer") // 结束协程 runtime.Goexit() defer fmt.Println("C.defer") fmt.Println("B") }() fmt.Println("A") }() for { } }
runtime.GOMAXPROCS
Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)。
Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。
运行下面两个代码实例有助于理解核的数量对协程执行的影响
func a() { for i := 1; i < 10; i++ { fmt.Println("A:", i) } } func b() { for i := 1; i < 10; i++ { fmt.Println("B:", i) } } func main() { // 设定为单核执行 runtime.GOMAXPROCS(1) go a() go b() time.Sleep(time.Second) }
func a() { for i := 1; i < 10; i++ { fmt.Println("A:", i) } } func b() { for i := 1; i < 10; i++ { fmt.Println("B:", i) } } func main() { // 设定为双核执行 runtime.GOMAXPROCS(2) go a() go b() time.Sleep(time.Second) }
操作系统线程和goroutine的关系
- 1.一个操作系统线程对应用户态多个goroutine。
- 2.go程序可以同时使用多个操作系统线程。
- 3.goroutine和OS线程是多对多的关系,即m:n。
channel
我们设置函数的意义就是为了在特定的输入下获取到特定的输出,如果只是让函数一味的并发而不进行值的传递,那么这个并发就是没有意义的。
如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
channel是个特殊的类型,通道类似传送带或是队列,遵循先进先出(First In First Out)的原则。
通道的声明和初始化
`每个通道都是特定类型的,在声明时需要指明通道里传输的元素类型` var 变量 chan 元素类型 `例` var ch chan int println(ch) // 0x0 空值为 nil
声明的通道后需要使用make函数初始化之后才能使用。初始化后的通道空值为一个十六进制的地址
`在不进行初始化的情况下使用通道会报 deadlock` `其中缓存大小是可选项` make(chan 元素类型, [缓冲大小]) `例` var ch1,ch2 chan int ch1 = make(chan int) // 无缓存的通道ch1 ch2 = make(chan int,20) // 缓存大小为20的通道ch2
也可以直接定义通道
ch3 := make(chan int)