WaitGroup等待goroutine的集合完成。主goroutine调用添加以设置要等待的goroutine的数量。然后,每个goroutine都会运行并在完成后调用Done。同时,可以使用Wait来阻塞,直到所有goroutine完成。
你可以理解为计数器
1// `sync.WaitGroup`一共有三个方法,他们分别是: 2Add(delta int) 3//Add将可能为负数的增量添加到WaitGroup计数器中。如果计数器为零,则释放等待时阻塞的所有goroutine 4Done() 5// 完成将WaitGroup计数器减一。 6 Wait() 7// 等待块,直到WaitGroup计数器为零。
example
1// WaitGroup 2package main 3 4import ( 5 "fmt" 6 "sync" 7) 8 9// 声明WaitGroup 10var wg sync.WaitGroup 11 12func main() { 13 for i := 0; i < 10; i++ { 14 // WaitGroup 计数器 + 1 15 // 其delta为你开启的`groutine`数量 16 wg.Add(1) 17 go exampleOut(i) 18 } 19 // 等待 WaitGroup 计数器为0 20 wg.Wait() 21} 22 23func exampleOut(i int) { 24 // WaitGroup 计数器 - 1 25 wg.Done() 26 fmt.Println("Hello, Gopher, I am [Id]", i) 27}
sync.Mutex
无论是前面的channle
还是sync都是为了干一件事,那就是并发控制,也许你也和我一样有以下几个问题
- 我们为什么需要并发控制,不要可以么?
- 并发控制到底是控制什么?
- 并发控制有哪几种方案,他们分别适用于哪种场景?
- 如何做好并发控制呢?
以上几点就是我们此节需要了解、以及解决的问题
首先解决我们一起探究第一个问题,为什么需要并发控制?
首先有这么一个问题、以及相关的解决措施,绝对不是脱裤子放屁,多此一举。需要并发控制的原因有很多,总结一句话那就是资源竞争
资源竞争
在一个 goroutine 中,如果分配的内存没有被其他 goroutine 访问,只在该 goroutine 中被使用,那么不存在资源竞争的问题。
但如果同一块内存被多个 goroutine 同时访问,就会产生不知道谁先访问也无法预料最后结果的情况。这就是资源竞争,这块内存可以称为共享的资源
还记得在channel中,我讲到 Go语言的并发模型是
CSP(Communicating Sequential Processes)
,提倡通过通信共享内存而不是通过共享内存而实现通信,这点尤为重要需要我们去记住与掌握
首先我们来看一个累加求和的例子,代码如下所示
1package main 2 3import ( 4 "fmt" 5 "sync" 6) 7 8var ( 9 x int64 10 wg sync.WaitGroup 11) 12 13func add() { 14 for i := 0; i < 5000; i++ { 15 x = x + 1 16 } 17 wg.Done() 18} 19func main() { 20 wg.Add(5) 21 go add() 22 go add() 23 go add() 24 go add() 25 go add() 26 wg.Wait() 27 fmt.Println(x) 28}
期待输出值为25000
,sum + 10 加和 5000次,执行五次,我们口算答案是5000
,可输出结果却是3048
,而且每次答案还不一样。好家伙
这是为什么呢?,靓仔疑惑~
其根本的原因就是资源恶意竞争
精囊妙计:
使用 go build、go run、go test 这些 Go 语言工具链提供的命令时,添加 -race 标识可以帮你检查 Go 语言代码是否存在资源竞争。
1// example 2go run -race demo3.go
那么该怎么解决呢?
sync.Mutex
互斥锁,顾名思义,指的是在同一时刻只有一个协程执行某段代码,其他协程都要等待该协程执行完毕后才能继续执行。
在下面的示例中,我声明了一个互斥锁 mutex,然后修改 add 函数,对 sum+=i 这段代码加锁保护。这样这段访问共享资源的代码片段就并发安全了,可以得到正确的结果
sync.Mutex
为我们提供了两个方法,加锁与解锁,修改时先获取锁,修改后释放锁
代码修改如下
1package main 2 3import ( 4 "fmt" 5 "sync" 6) 7 8var ( 9 x int64 10 lock sync.Mutex 11 wg sync.WaitGroup 12) 13 14func add() { 15 for i := 0; i < 1000; i++ { 16 lock.Lock() // 加锁 17 x += 1 18 lock.Unlock() // 解锁 19 } 20 wg.Done() 21} 22func main() { 23 wg.Add(5) 24 go add() 25 go add() 26 go add() 27 go add() 28 go add() 29 wg.Wait() 30 fmt.Println(x) 31}
女少啊~
在以上示例代码中
x += 1
,部分被称之为临界区
在同步的程序设计中,临界区段指的是一个访问共享资源的程序片段,而这些共享资源又有无法同时被多个协程访问的特性。当有协程进入临界区段时,其他协程必须等待,这样就保证了临界区的并发安全。
sync.RWMutex
互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync
包中的RWMutex
类型。
读写锁分为两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine
如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine
获取写锁之后,其他的goroutine
无论是获取读锁还是写锁都会等待。
1var ( 2 x int64 3 wg sync.WaitGroup 4 lock sync.Mutex 5 rwlock sync.RWMutex 6) 7 8func write() { 9 // lock.Lock() // 加互斥锁 10 rwlock.Lock() // 加写锁 11 x = x + 1 12 time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒 13 rwlock.Unlock() // 解写锁 14 // lock.Unlock() // 解互斥锁 15 wg.Done() 16} 17 18func read() { 19 // lock.Lock() // 加互斥锁 20 rwlock.RLock() // 加读锁 21 time.Sleep(1) // 假设读操作耗时1秒 22 rwlock.RUnlock() // 解读锁 23 // lock.Unlock() // 解互斥锁 24 wg.Done() 25} 26 27func main() { 28 start := time.Now() 29 for i := 0; i < 10; i++ { 30 wg.Add(1) 31 go write() 32 } 33 34 for i := 0; i < 1000; i++ { 35 wg.Add(1) 36 go read() 37 } 38 39 wg.Wait() 40 end := time.Now() 41 fmt.Println(end.Sub(start)) 42}
现在我们解决了多个 goroutine 同时读写的资源竞争问题,但是又遇到另外一个问题——性能。因为每次读写共享资源都要加锁,所以性能低下,这该怎么解决呢?
现在我们分析读写这个特殊场景,有以下几种情况:
- 写的时候不能同时读,因为这个时候读取的话可能读到脏数据(不正确的数据);
- 读的时候不能同时写,因为也可能产生不可预料的结果;
- 读的时候可以同时读,因为数据不会改变,所以不管多少个 goroutine 读都是并发安全的。
所以就可以通过读写锁 sync.RWMutex 来优化这段代码,提升性能。
sync.Once
在实际的工作中,你可能会有这样的需求:让代码只执行一次,哪怕是在高并发的情况下,比如创建一个单例。
针对这种情形,Go 语言为我们提供了 sync.Once 来保证代码只执行一次,例如只加载一次配置文件、只关闭一次通道等。
Go语言中的sync
包中提供了一个针对只执行一次场景的解决方案–sync.Once
。
sync.Once
只有一个Do
方法,其签名如下:
1func (o *Once) Do(f func()) {} 2// 如果要执行的函数f需要传递参数就需要搭配闭包来使用。
这是 Go 语言自带的一个示例,虽然启动了 10 个协程来执行 onceBody 函数,但是因为用了 once.Do 方法,所以函数 onceBody 只会被执行一次。也就是说在高并发的情况下,sync.Once 也会保证 onceBody 函数只执行一次。
sync.Once 适用于创建某个对象的单例、只加载一次的资源等只执行一次的场景。
1// example 2func main() { 3 doOnce() 4} 5func doOnce() { 6 var once sync.Once 7 onceBody := func() { 8 fmt.Println("Only once") 9 } 10 //用于等待协程执行完毕 11 done := make(chan bool) 12 //启动10个协程执行once.Do(onceBody) 13 for i := 0; i < 10; i++ { 14 go func() { 15 //把要执行的函数(方法)作为参数传给once.Do方法即可 16 once.Do(onceBody) 17 done <- true 18 }() 19 } 20 for i := 0; i < 10; i++ { 21 <-done 22 } 23}
sync.Map
1var m = make(map[string]int) 2 3func get(key string) int { 4 return m[key] 5} 6 7func set(key string, value int) { 8 m[key] = value 9} 10 11func main() { 12 wg := sync.WaitGroup{} 13 for i := 0; i < 20; i++ { 14 wg.Add(1) 15 go func(n int) { 16 key := strconv.Itoa(n) 17 set(key, n) 18 fmt.Printf("k=:%v,v:=%v\n", key, get(key)) 19 wg.Done() 20 }(i) 21 } 22 wg.Wait() 23}
上面的代码开启少量几个goroutine
的时候可能没什么问题,当并发多了之后执行上面的代码就会报错误。
像这种场景下就需要为map加锁来保证并发的安全性了,Go语言的sync
包中提供了一个开箱即用的并发安全版map–sync.Map
。开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。同时sync.Map
内置了诸如Store
、Load
、LoadOrStore
、Delete
、Range
等操作方法。
一个简单的例子
1var m = sync.Map{} 2 3func main() { 4 wg := sync.WaitGroup{} 5 for i := 0; i < 20; i++ { 6 wg.Add(1) 7 go func(n int) { 8 key := strconv.Itoa(n) 9 m.Store(key, n) 10 value, _ := m.Load(key) 11 fmt.Printf("k=:%v,v:=%v\n", key, value) 12 wg.Done() 13 }(i) 14 } 15 wg.Wait() 16}
原子操作
代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好。Go语言中原子操作由内置的标准库sync/atomic
提供。
atomic包
方法 | 解释 |
func LoadInt32(addr *int32) (val int32) func LoadInt64(addr *int64) (val int64) func LoadUint32(addr *uint32) (val uint32) func LoadUint64(addr *uint64) (val uint64) func LoadUintptr(addr *uintptr) (val uintptr) func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) | 读取操作 |
func StoreInt32(addr *int32, val int32) func StoreInt64(addr *int64, val int64) func StoreUint32(addr *uint32, val uint32) func StoreUint64(addr *uint64, val uint64) func StoreUintptr(addr *uintptr, val uintptr) func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) | 写入操作 |
func AddInt32(addr *int32, delta int32) (new int32) func AddInt64(addr *int64, delta int64) (new int64) func AddUint32(addr *uint32, delta uint32) (new uint32) func AddUint64(addr *uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) | 修改操作 |
func SwapInt32(addr *int32, new int32) (old int32) func SwapInt64(addr *int64, new int64) (old int64) func SwapUint32(addr *uint32, new uint32) (old uint32) func SwapUint64(addr *uint64, new uint64) (old uint64) func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) | 交换操作 |
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) | 比较并交换操作 |
示例
我们填写一个示例来比较下互斥锁和原子操作的性能。
1package main 2 3import ( 4 "fmt" 5 "sync" 6 "sync/atomic" 7 "time" 8) 9 10type Counter interface { 11 Inc() 12 Load() int64 13} 14 15// 普通版 16type CommonCounter struct { 17 counter int64 18} 19 20func (c CommonCounter) Inc() { 21 c.counter++ 22} 23 24func (c CommonCounter) Load() int64 { 25 return c.counter 26} 27 28// 互斥锁版 29type MutexCounter struct { 30 counter int64 31 lock sync.Mutex 32} 33 34func (m *MutexCounter) Inc() { 35 m.lock.Lock() 36 defer m.lock.Unlock() 37 m.counter++ 38} 39 40func (m *MutexCounter) Load() int64 { 41 m.lock.Lock() 42 defer m.lock.Unlock() 43 return m.counter 44} 45 46// 原子操作版 47type AtomicCounter struct { 48 counter int64 49} 50 51func (a *AtomicCounter) Inc() { 52 atomic.AddInt64(&a.counter, 1) 53} 54 55func (a *AtomicCounter) Load() int64 { 56 return atomic.LoadInt64(&a.counter) 57} 58 59func test(c Counter) { 60 var wg sync.WaitGroup 61 start := time.Now() 62 for i := 0; i < 1000; i++ { 63 wg.Add(1) 64 go func() { 65 c.Inc() 66 wg.Done() 67 }() 68 } 69 wg.Wait() 70 end := time.Now() 71 fmt.Println(c.Load(), end.Sub(start)) 72} 73 74func main() { 75 c1 := CommonCounter{} // 非并发安全 76 test(c1) 77 c2 := MutexCounter{} // 使用互斥锁实现并发安全 78 test(&c2) 79 c3 := AtomicCounter{} // 并发安全且比互斥锁效率更高 80 test(&c3) 81}
atomic
包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。
sync.Cond
Cond实现了一个条件变量,它是goroutines等待或宣布事件发生的集合点。每个Cond都有一个关联的Locker L(通常是Mutex或RWMutex),在更改条件和调用Wait方法时必须将其保留。第一次使用后,不得复制条件
在 Go 语言中,sync.WaitGroup 用于最终完成的场景,关键点在于一定要等待所有协程都执行完毕。
而 sync.Cond 可以用于发号施令,一声令下所有协程都可以开始执行,关键点在于协程开始的时候是等待的,要等待 sync.Cond 唤醒才能执行。
sync.Cond 从字面意思看是条件变量,它具有阻塞协程和唤醒协程的功能,所以可以在满足一定条件的情况下唤醒协程,但条件变量只是它的一种使用场景。
sync.Cond 有三个方法,它们分别是:
- Wait,Wait原子地解锁c.L并中止调用goroutine的执行。稍后恢复执行后,等待锁定c.L才返回。与其他系统不同,等待不会返回,除非被广播或信号唤醒。
- Signal,信号唤醒一个等待在c的goroutin
- Broadcast,唤醒所有等待c的goroutine
示例
1package main 2 3import ( 4 "fmt" 5 "sync" 6 "time" 7) 8 9//10个人赛跑,1个裁判发号施令 10func race() { 11 cond := sync.NewCond(&sync.Mutex{}) 12 var wg sync.WaitGroup 13 wg.Add(11) 14 for i := 0; i < 10; i++ { 15 go func(num int) { 16 defer wg.Done() 17 fmt.Println(num, "号已经就位") 18 cond.L.Lock() 19 cond.Wait() //等待发令枪响 20 fmt.Println(num, "号开始跑……") 21 cond.L.Unlock() 22 }(i) 23 } 24 //等待所有goroutine都进入wait状态 25 time.Sleep(2 * time.Second) 26 go func() { 27 defer wg.Done() 28 fmt.Println("裁判已经就位,准备发令枪") 29 fmt.Println("比赛开始,大家准备跑") 30 cond.Broadcast() //发令枪响 31 }() 32 //防止函数提前返回退出 33 wg.Wait() 34}
总结
这一节我们巴拉巴拉搞了很多,到底什么情况用哪个。相信你也可能和我一样半懵半醒,那么我们来总结一下。他们的使用场景,啥是啥?
需知:goroutine与线程
- Go语言的并发模型是
CSP(Communicating Sequential Processes)
,提倡通过通信共享内存而不是通过共享内存而实现通信。
1可增长的栈 2OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这么大。所以在Go语言中一次创建十万左右的goroutine也是可以的。 3 4goroutine调度 5GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。 6 7G很好理解,就是个goroutine的,里面除了存放本goroutine信息外 还有与所在P的绑定等信息。 8P管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务。 9M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系, 一个groutine最终是要放到M上执行的; 10P与M一般也是一一对应的。他们关系是: P管理着一组G挂载在M上运行。当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其他的G 挂载在新建的M上。当旧的G阻塞完成或者认为其已经死掉时 回收旧的M。 11 12P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。
使用场景
- Channel:关于数据流动、传递等情况的优先使用
channle
, 它是并发安全的,且性能优异, channel底层的实现为互斥锁
- sync.Once:让代码只执行一次,哪怕是在高并发的情况下,比如创建一个单例。
- Sync.WaitGroup:用于最终完成的场景,关键点在于一定要等待所有goroutine都执行完毕。有了它我们再也不用为了等待
goroutine
执行完成而添加time.sleep
了
- Sync.Mutew: 当资源发现竞争时,我们可以使用
Sync.Mutew
,加互斥锁保证并发安全
- Sync.RWMutew:
Sync.Mutew
进阶使用,当读多写少的时候,可以使用读写锁来保证并发安全,同时也提高了并发效率
- sync.Map:高并发的情况下,原始的map并不安全,使用sync.Map可用让我们的map在并发情况下也保证安全
- sync.Cond:sync.Cond 可以用于发号施令,一声令下所有协程都可以开始执行,关键点在于协程开始的时候是等待的,要等待 sync.Cond 唤醒才能执行。
说了这么多,这么多花里胡哨的,注意一点,Sync.Mutew,互斥锁,所有的锁的爸爸,原子操作。互斥锁的叔叔。
感谢您的阅读,如果感觉不错。也可以点赞、收藏、在读、当然推荐给身边的哥们也是不错的选择,同时欢迎关注我。一起从0到1
期待下一章节,铁索连环-context
以及下下章节:并发模式
我会在并发模式
中与你探讨:
channle
缓存区多大比较合适,
Goroutine Work Pool,减少Goroutine
过多重复的创建与销毁
Pipeline 模式:流水线工作模式,对任务中的部分进行剖析
扇出和扇入模式:对流水线工作模式进行优化,实现更高效的扇出和扇入模式
Futures 模式:未来模式,主协程不用等待子协程返回的结果,可以先去做其他事情,等未来需要子协程结果的时候再来取
同时再一次去搞一下,到底什么是可异步、并发的代码,并加以分析与优化
未来已来。Let‘s Go~