golang的Fan模式在项目中实战,我后悔了

简介: golang的Fan模式在项目中实战,我后悔了

别在树下徘徊,别在雨中沉思,别在黑暗中落泪——梅里美《卡门》


1. 前言



不好意思,今天是中国传统节日七夕,我专门在家带娃,我媳妇去修理牙齿去了,日子特殊,需要做点有意义的事情,我就尝试着一边带娃一边敲文章,结果从早晨到下午才完稿,中间过程省略13个字


2. Fan in Fan out模式



先看图:


640.png

Fan out模式:

多个goroutine从同一个通道读取数据,直到该通道关闭。OUT是一种张开的模式,所以又被称为扇出,可以用来分发任务。多个goroutine有自己的chan,这样将结果暂存下来。


Fan in模式:

1个goroutine从多个通道读取数据,直到这些通道关闭。IN是一种收敛的模式,所以又被称为扇入,用来收集处理的结果。这样我们可以从最后这一个chan直接处理数据了。


3. 实战



package main
import (
 "fmt"
 "runtime"
 "sync"
 "time"
)
func main() {
 done := make(chan interface{})
 defer close(done)
 start := time.Now()
 //生成随机数切片
 genSlice := func() []int {
  sl := make([]int, 0)
  for i := 0; i< 1000000; i++ {
   sl = append(sl, i)
  }
  return sl
 }
// 把上面生成的slice转换为chan序列
 gen := func(done <- chan interface{}, nums ...int) <- chan int {
  intStream := make(chan int)
  go func() {
   defer close(intStream)
   for _, n := range nums {
    select {
    case <-done:
     return
    case intStream <- n:
    }
   }
  }()
  return intStream
 }
  // 对读取的数字进行相加操作
 sq := func (done <-chan interface{}, in <-chan int) <-chan int {
  out := make(chan int)
  go func() {
   defer close(out)
   for n := range in {
    select {
     case out <- n + n:
     case <-done:
      return
    }
   }
  }()
  return out
 }
// 获取cpu核数 多少核数运行多少goroutine 即Fan out模式
 nCpu := runtime.NumCPU()
 finders := make([]<-chan int, nCpu)
 in := gen(done, genSlice()...)
 for i:=0; i<nCpu; i++ {
  finders[i] = sq(done, in)
 }
 fmt.Println("cpu numbers is ", nCpu)
//Fan in模式
 fanIn := func(done chan interface{}, chans ...<-chan int) <- chan int{
  var wg = sync.WaitGroup{}
  multiPlexedStream := make(chan int)
  multiplex := func(c <- chan int) {
   defer wg.Done()
   for i:=range c {
    select {
    case <-done:
     return
    case multiPlexedStream <- i:
    }
   }
  }
  wg.Add(len(chans))
  for _, c := range chans {
   go multiplex(c)
  }
  go func() {
   wg.Wait()
   close(multiPlexedStream)
  }()
  return multiPlexedStream
 }
  // 
 for i:=range fanIn(done, finders...) {
  fmt.Println(i)
 }
 fmt.Println(time.Since(start))
}

我们看到就是nCpu的goroutine去调用sq执行操作,返回各自的chan,最后通过range操作将多个chan扇入到一个chan中。


4. 长话短说



我通过性能分析工具试了下运行上面代码需要多久?答案是5s,但是我用普通模式耗时是3s,普通模式如下:


for i := range sq(done, gen(done, genSlice()...)) {
  fmt.Println(i)
 }
 fmt.Println(time.Since(start))

普通模式没有扇出,没有扇入,运行时间很快,为什么呢?我们当前程序的瓶颈在FAN-IN,sq函数很快就完成,multiplex函数它把1000000个数据写入到1个通道的时候出现了瓶颈,适当使用带缓冲通道可以提高程序性能,例如multiPlexedStream := make(chan int, 10000)


5. 优势



FAN模式能提高Golang并发的性能(利用CPU核数),如果想以后运用自如,用到自己的项目中去,还是要多写去尝试一下,因为有时候Fan模式不一定是最优的。


6. 参考



google官方:https://blog.golang.org/pipelines

sf:https://segmentfault.com/a/1190000017182416


7. 关注公众号



 微信公众号:堆栈future

相关文章
|
7月前
|
物联网 Go 网络性能优化
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式【1月更文挑战第21天】【1月更文挑战第104篇】
508 1
|
3月前
|
Go
Golang生成随机数案例实战
关于如何使用Go语言生成随机数的三个案例教程。
200 91
Golang生成随机数案例实战
|
7月前
|
NoSQL 测试技术 Go
【Golang】国密SM2公钥私钥序列化到redis中并加密解密实战_sm2反编(1)
【Golang】国密SM2公钥私钥序列化到redis中并加密解密实战_sm2反编(1)
|
3月前
|
Linux Go 开发工具
Golang各平台环境搭建实战
这篇文章详细介绍了如何在Windows、Linux和Mac平台上搭建Golang开发环境,包括下载和安装Go SDK、配置环境变量、安装开发工具如Visual Studio Code和Go扩展,以及如何编写和运行第一个Go程序。
131 3
|
4月前
|
NoSQL Java 测试技术
Golang内存分析工具gctrace和pprof实战
文章详细介绍了Golang的两个内存分析工具gctrace和pprof的使用方法,通过实例分析展示了如何通过gctrace跟踪GC的不同阶段耗时与内存量对比,以及如何使用pprof进行内存分析和调优。
107 0
Golang内存分析工具gctrace和pprof实战
|
5月前
|
测试技术 Go
golang 的重试弹性模式
Golang 中的重试机制实现了一个名为 `Retrier` 的结构体,用于实现弹性模式。`Retrier` 创建时需要指定重试间隔(如常量间隔或指数递增间隔)和错误分类器。分类器决定了哪些错误应被重试。默认情况下,如果未提供分类器,则使用默认分类器,它简单地将非 nil 错误标记为应重试。提供了三种分类器:默认、白名单和黑名单。`Run` 和 `RunCtx` 是执行重试的函数,后者接受上下文以便处理超时。通过 `calcSleep` 计算带有随机抖动的休眠时间,增加重试的不可预测性,减少并发冲突。如果达到最大重试次数或上下文超时,重试将停止。
|
5月前
|
测试技术 Go
golang 的重试弹性模式怎么设计?
Golang的可重构弹性模式通过`Retrier`实现了重试逻辑。创建`Retrier`需指定重试间隔(隐含重试次数)及错误分类器,决定哪些错误需重试。示例代码展示了如何创建一个重试器并执行带有重试逻辑的工作函数。`Retrier`结构体包含重试间隔、分类器等字段。
|
7月前
|
存储 测试技术 Go
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
KisFlow项目源码位于&lt;https://github.com/aceld/kis-flow,初始阶段涉及项目构建和基础模块定义。首先在GitHub创建仓库,克隆到本地。项目目录包括`common/`, `example/`, `function/`, `conn/`, `config/`, `flow/`, 和 `kis/`。`go.mod`用于包管理,`KisLogger`接口定义了日志功能,提供不同级别的日志方法。默认日志对象`kisDefaultLogger`打印到标准输出。
670 9
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
|
7月前
|
前端开发 Go
Golang深入浅出之-Go语言中的异步编程与Future/Promise模式
【5月更文挑战第3天】Go语言通过goroutines和channels实现异步编程,虽无内置Future/Promise,但可借助其特性模拟。本文探讨了如何使用channel实现Future模式,提供了异步获取URL内容长度的示例,并警示了Channel泄漏、错误处理和并发控制等常见问题。为避免这些问题,建议显式关闭channel、使用context.Context、并发控制机制及有效传播错误。理解并应用这些技巧能提升Go语言异步编程的效率和健壮性。
374 5
Golang深入浅出之-Go语言中的异步编程与Future/Promise模式
|
7月前
|
JSON JavaScript 前端开发
Golang深入浅出之-Go语言JSON处理:编码与解码实战
【4月更文挑战第26天】本文探讨了Go语言中处理JSON的常见问题及解决策略。通过`json.Marshal`和`json.Unmarshal`进行编码和解码,同时指出结构体标签、时间处理、omitempty使用及数组/切片区别等易错点。建议正确使用结构体标签,自定义处理`time.Time`,明智选择omitempty,并理解数组与切片差异。文中提供基础示例及时间类型处理的实战代码,帮助读者掌握JSON操作。
183 1
Golang深入浅出之-Go语言JSON处理:编码与解码实战