别在树下徘徊,别在雨中沉思,别在黑暗中落泪——梅里美《卡门》
1. 前言
不好意思,今天是中国传统节日七夕
,我专门在家带娃,我媳妇去修理牙齿去了,日子特殊,需要做点有意义的事情,我就尝试着一边带娃一边敲文章,结果从早晨到下午才完稿,中间过程省略13个字。
2. Fan in Fan out模式
先看图:
Fan out模式:
多个goroutine从同一个通道读取数据,直到该通道关闭。OUT是一种张开的模式,所以又被称为扇出,可以用来分发任务。多个goroutine有自己的chan,这样将结果暂存下来。
Fan in模式:
1个goroutine从多个通道读取数据,直到这些通道关闭。IN是一种收敛的模式,所以又被称为扇入,用来收集处理的结果。这样我们可以从最后这一个chan直接处理数据了。
3. 实战
package main import ( "fmt" "runtime" "sync" "time" ) func main() { done := make(chan interface{}) defer close(done) start := time.Now() //生成随机数切片 genSlice := func() []int { sl := make([]int, 0) for i := 0; i< 1000000; i++ { sl = append(sl, i) } return sl } // 把上面生成的slice转换为chan序列 gen := func(done <- chan interface{}, nums ...int) <- chan int { intStream := make(chan int) go func() { defer close(intStream) for _, n := range nums { select { case <-done: return case intStream <- n: } } }() return intStream } // 对读取的数字进行相加操作 sq := func (done <-chan interface{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n + n: case <-done: return } } }() return out } // 获取cpu核数 多少核数运行多少goroutine 即Fan out模式 nCpu := runtime.NumCPU() finders := make([]<-chan int, nCpu) in := gen(done, genSlice()...) for i:=0; i<nCpu; i++ { finders[i] = sq(done, in) } fmt.Println("cpu numbers is ", nCpu) //Fan in模式 fanIn := func(done chan interface{}, chans ...<-chan int) <- chan int{ var wg = sync.WaitGroup{} multiPlexedStream := make(chan int) multiplex := func(c <- chan int) { defer wg.Done() for i:=range c { select { case <-done: return case multiPlexedStream <- i: } } } wg.Add(len(chans)) for _, c := range chans { go multiplex(c) } go func() { wg.Wait() close(multiPlexedStream) }() return multiPlexedStream } // for i:=range fanIn(done, finders...) { fmt.Println(i) } fmt.Println(time.Since(start)) }
我们看到就是nCpu的goroutine去调用sq执行操作,返回各自的chan,最后通过range操作将多个chan扇入到一个chan中。
4. 长话短说
我通过性能分析工具试了下运行上面代码需要多久?答案是5s,但是我用普通模式耗时是3s,普通模式如下:
for i := range sq(done, gen(done, genSlice()...)) { fmt.Println(i) } fmt.Println(time.Since(start))
普通模式没有扇出,没有扇入,运行时间很快,为什么呢?我们当前程序的瓶颈在FAN-IN,sq函数很快就完成,multiplex函数它把1000000个数据写入到1个通道的时候出现了瓶颈,适当使用带缓冲通道可以提高程序性能,例如multiPlexedStream := make(chan int, 10000)
。
5. 优势
FAN模式能提高Golang并发的性能(利用CPU核数),如果想以后运用自如,用到自己的项目中去,还是要多写去尝试一下,因为有时候Fan模式不一定是最优的。
6. 参考
google官方:https://blog.golang.org/pipelines
sf:https://segmentfault.com/a/1190000017182416
7. 关注公众号
微信公众号:堆栈future