如果了解了GMP模型之后,自然了解go的并发特点,协程之间都可能是多线程并发执行的,通过开协程就可以实现并发:
package main import ( "fmt" "strconv" "time" ) func main() { go test("1") go test("2") go test("3") test("main") time.Sleep(time.Second*10) } func test(name string) { for i:=0;i<10;i++ { time.Sleep(1) fmt.Println(name+": "+strconv.Itoa(i)) } }
输出:
要注意的是,GMP模型下,协程一定是并发的,但不一定是并行的
看代码可以看到,我额外加了一个sleep,那是因为main协程如果结束运行了,子协程也会直接结束,sleep等待子协程执行一会儿,这样才能打印出数据
这个实现方案显然不太好,我们可以通过waitGroup实现协程等待
WaitGroup
package main import ( "fmt" "strconv" "sync" ) var wg sync.WaitGroup func main() { for i:=1;i<=3;i++ { wg.Add(1) go test(strconv.Itoa(i)) } wg.Add(1) test("main") wg.Wait() } func test(name string) { defer wg.Done() for i:=0;i<10;i++ { time.Sleep(1) fmt.Println(name+": "+strconv.Itoa(i)) } }
回到之前的代码,可看到我在for循环中增加了一个sleep,sleep的意义是让出时间片,从而去执行其他的协程进行并发 (GMP模型,如果没有让出时间片,同时所有协程都在同一个线程下时,协程之间将顺序执行,例如协程1运行完才会运行协程2)
主要实现了一个协程切换调度的功能
我们也可通过runtime包去做协程调度
runtime
package main import ( "fmt" "runtime" "strconv" "sync" ) var wg sync.WaitGroup func main() { //runtime.Gosched() //当前协程让出 //runtime.Goexit() //直接退出当前协程 //runtime.GOMAXPROCS(1) //限制P队列数量,如果为1,则无法并行 //runtime.NumGoroutine() //返回正在执行和排队的协程数 for i:=1;i<=3;i++ { wg.Add(1) go test(strconv.Itoa(i)) } wg.Add(1) test("main") wg.Wait() } func test(name string) { defer wg.Done() for i:=0;i<10;i++ { runtime.Gosched() fmt.Printf("当前协程数:%d \\n",runtime.NumGoroutine()) fmt.Println(name+": "+strconv.Itoa(i)) } }
并发问题
多开协程自然会有并发问题,我们可以通过waitGroup去控制主协程在子协程执行完之后进行操作,可以通过runtime包进行做协程并发切换,但这2个都没有涉及到变量共享问题,如何实现go的变量协程安全呢?
首先我们要理解一句话:
goroutine 奉行通过通信来共享内存,而不是共享内存来通信。
channel
通过channel,进行安全的传输变量
package main import ( "fmt" "runtime" "strconv" "sync" ) var wg sync.WaitGroup func main() { runtime.GOMAXPROCS(8) var chann = make(chan int) go func() { //模拟100条数据需要处理 for i:=0;i<100;i++ { chann<-i } close(chann) }() //开3个协程处理 for j := 0; j < 3; j++ { wg.Add(1) go queueHandle(strconv.Itoa(j),chann) } wg.Wait() } func queueHandle(name string,chann chan int) { defer wg.Done() for i := range chann { fmt.Println("协程"+name+"处理数据:",i) } }
可看到,3个协程通过channel,安全的获取到了需要处理的通道数据:
协程变量安全
package main import ( "fmt" "runtime" "sync" "time" ) var a int = 0 var wg sync.WaitGroup func main() { runtime.GOMAXPROCS(8) for i := 0; i < 10000; i++ { go add() } time.Sleep(time.Second * 1) wg.Wait() fmt.Println("i:", a) } func add() { defer wg.Done() wg.Add(1) a += 1 }
开启足够多的协程之后,协程变量出现了协程污染,导致最后a的值小于10000:
sync包
上面的waitGroup,其实就是sync包的一种类型,sync中还存在着其他的类型
sync.Mutex互斥锁
package main import ( "fmt" "runtime" "sync" "time" ) var a int = 0 var wg sync.WaitGroup var lock sync.Mutex func main() { //var lock sync.Mutex //lock.Lock() //加锁,加锁后其他协程调用将阻塞直到解锁 //lock.Unlock() //解锁 runtime.GOMAXPROCS(8) for i := 0; i < 10000; i++ { go add() } time.Sleep(time.Second * 1) wg.Wait() fmt.Println("i:", a) } func add() { defer wg.Done() wg.Add(1) lock.Lock() defer lock.Unlock() a += 1 }
sync.RWMutex 读写锁
func (rw *RWMutex) Lock() func (rw *RWMutex) Unlock() func (rw *RWMutex) RLock() func (rw *RWMutex) RUnlock()
rwmutex基于 mutex实现,多个协程可以重复获取读锁,如果获取写锁,其他协程读锁也将阻塞,这个读写锁太简单了,不说了
sync.Once 只执行一次
当在高并发情况下时,我们可能需要保证一个函数只执行一次,例如单例模式,加载配置文件,等等,我们可以通过sync.once实现
func (o *Once) Do(f func()) {}
单例模式实现
package main import ( "fmt" "sync" ) type testStruct struct {} var singleton *testStruct var once = sync.Once{} func GetInstance()*testStruct { once.Do(func() { singleton = &testStruct{} fmt.Println("执行实例化") }) return singleton }
sync.once内部存在一个mutex锁和一个bool值,如果bool为false,则通过mutex加锁执行一次,然后bool为true直接忽略执行
协程安全类型
代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型我们还可以使用原子操作来保证并发安全
协程安全的变量类型有sync.map,atomic包等
太简单了,不讲了