Go语言学习编程实践:五种模式解决go中的并发问题

简介: Go语言学习编程实践:五种模式解决go中的并发问题

For-Select-Done


我们应该防止程序中发生任何泄露。所以我们应该对于留在程序中的go例程发送信号,让它知道它可以退出。

最常见的就是将for-select循环与通道结合起来,向go程序发送一个关闭信号。我们称它为“完成”通道。

func printIntegers(done <-chan struct{}, intStream <-chan int) {
  for{
     select {
     case i := <-intStream:
     fmt.Println(i)
     case <-done:
     return
     }
   }
}

没有看懂,先记在这里……


扇入模式


1f2a0f51806742fc922ed38b724c8dbd.png

func fanIn(ctx context.Context, fetchers ...<-chan interface{}) <-chan interface{} {
 combinedFetcher := make(chan interface{})
 // 1
 var wg sync.WaitGroup
 wg.Add(len(fetchers))
 // 2
 for _, f := range fetchers {
 f := f
 go func() {
 // 3
 defer wg.Done()
 for{
 select{
 case res := <-f:
 combinedFetcher <- res
 case <-ctx.Done():
 return
 }
 }
 }()
 }
 // 4
 // Channel cleanup
 go func() {
 wg.Wait()
 close(combinedFetcher)
 } ()
 return combinedFetcher
}


从流中获取前 n 个值


7f1f26d541414417a8720827abe02f86.png

func takeFirstN(ctx context.Context, dataSource <-chan interface{}, n int) <-chan interface{} {
 // 1
 takeChannel := make(chan interface{})
 // 2
 go func() {
 defer close(takeChannel)
 // 3 
 for i := 0; i< n; i++ {
 select {
 case val, ok := <-dataSource:
 if !ok{
 return
 }
 takeChannel <- val
 case <-ctx.Done():
 return
 }
 }
 }()
 return takeChannel
}


订阅模式


type Subscription interface {
 Updates() <-chan Item
}
//On the other hand, we are going to use another interface as an abstraction to fetch the data we 
//need:
//另一方面,我们将使用另一个接口作为抽象来获取我们需要的数据。
type Fetcher interface {
 Fetch() (Item, error)
}
//For each of these we are going to have a concrete type implementing them.
//对于其中的每一个,我们都将有一个具体的类型来实现它们。
//For the subscription:
//对于订阅来说。
func NewSubscription(ctx context.Context, fetcher Fetcher, freq int) Subscription {
 s := &sub{
 fetcher: fetcher,
 updates: make(chan Item),
 }
// Running the Task Supposed to fetch our data
 go s.serve(ctx, freq)
 return s
}
type sub struct {
 fetcher Fetcher
 updates chan Item
}
func (s *sub) Updates() <-chan Item {
 return s.updates
}
//We are going to go into more details about what happens inside the serve method.
我们将更详细地介绍服务方法内部发生的事情。
//For the fetcher:
//对于取物者来说。
func NewFetcher(uri string) Fetcher {
 f := &fetcher{
 uri: uri,
 }
 return f
}
type fetcher struct {
 uri string
}
//Inside the serve method
//The serve method consists of a for-select-done type of loop:
//服务方法由一个 for-select-done 类型的循环组成。
func (s *sub) serve(ctx context.Context, checkFrequency int) {
 clock := time.NewTicker(time.Duration(checkFrequency) * time.Second)
 type fetchResult struct {
 fetched Item
 err error
 }
 fetchDone := make(chan fetchResult, 1)
 for {
 select {
 // Clock that triggers the fetch
 case <-clock.C:
 go func() {
fetched, err := s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, err}
 }()
 // Case where the fetch result is
 // Ready to be consumed
 case result := <-fetchDone:
 fetched := result.fetched
 if result.err != nil {
 log.Println("Fetch error: %v \n Waiting the next iteration", result.err.Error())
break
 }
 s.updates <-fetched
 // Case where we need to close the server
 case <-ctx.Done():
 return
 }
 }
}


有一个修正,暂时先不提。


地图模式


func Map(done <-chan struct{}, inputStream <-chan int, operator func(int)int) <-chan int {
 // 1
 mappedStream := make(chan int)
 go func() {
 defer close(mappedStream)
 // 2
 for {
 select {
 case <-done:
 return
 // 3
 case i, ok := <-inputStream:
 if !ok { return }
 //4
 select {
 case <-done:
 return
 case mappedStream <- operator(i):
 }
 }
 }
 }()
 return mappedStream
func main() {
 done := make(chan struct{})
 defer close(done)
 // Generates a channel sending integers
 // From 0 to 9
 range10 := rangeChannel(done, 10)
 multiplierBy10 := func(x int) int {
 return x * 10
 }
 for num := range Map(done, range10, multiplierBy10) {
 fmt.Println(num)
 }
}


过滤模式


1.png

func Filter(done <-chan struct{}, inputStream <-chan int, operator func(int)bool) <-chan int {
 filteredStream := make(chan int)
 go func() {
 defer close(filteredStream)
 for {
 select {
 case <-done:
 return
 case i, ok := <-inputStream:
 if !ok {
 return
 }
 if !operator(i) { break }
 select {
 case <-done:
 return
 case filteredStream <- i:
 }
 }
 }
 }()
 return filteredStream
}
func main() {
 done := make(chan struct{})
 defer close(done)
 // Generates a channel sending integers
 // From 0 to 9
 range10 := rangeChannel(done, 10)
 isEven := func(x int) bool {
 return x % 2 == 0
 }
 for num := range Filter(done, range10, isEven) {
 fmt.Println(num)
 }
}

这五个模式是构建更大、更复杂的 Golang 应用程序的基石。这些解决方案可以解决你在处

理 GO 中的并发问题时可能遇到的问题。此外,你还可以在此基础上修改、扩展和创建新的

模式。

相关文章
|
13天前
|
Go 开发工具
百炼-千问模型通过openai接口构建assistant 等 go语言
由于阿里百炼平台通义千问大模型没有完善的go语言兼容openapi示例,并且官方答复assistant是不兼容openapi sdk的。 实际使用中发现是能够支持的,所以自己写了一个demo test示例,给大家做一个参考。
|
13天前
|
程序员 Go
go语言中结构体(Struct)
go语言中结构体(Struct)
90 71
|
12天前
|
存储 Go 索引
go语言中的数组(Array)
go语言中的数组(Array)
98 67
|
13天前
|
存储 Go
go语言中映射
go语言中映射
31 11
|
5天前
|
Go 数据安全/隐私保护 UED
优化Go语言中的网络连接:设置代理超时参数
优化Go语言中的网络连接:设置代理超时参数
|
7月前
|
开发框架 安全 中间件
Go语言开发小技巧&易错点100例(十二)
Go语言开发小技巧&易错点100例(十二)
80 1
|
16天前
|
开发框架 Go 计算机视觉
纯Go语言开发人脸检测、瞳孔/眼睛定位与面部特征检测插件-助力GoFly快速开发框架
开发纯go插件的原因是因为目前 Go 生态系统中几乎所有现有的人脸检测解决方案都是纯粹绑定到一些 C/C++ 库,如 OpenCV 或 dlib,但通过 cgo 调用 C 程序会引入巨大的延迟,并在性能方面产生显著的权衡。此外,在许多情况下,在各种平台上安装 OpenCV 是很麻烦的。使用纯Go开发的插件不仅在开发时方便,在项目部署和项目维护也能省很多时间精力。
|
1月前
|
Go 数据安全/隐私保护 开发者
Go语言开发
【10月更文挑战第26天】Go语言开发
38 3
|
1月前
|
Java 程序员 Go
Go语言的开发
【10月更文挑战第25天】Go语言的开发
32 3
|
4月前
|
JSON 中间件 Go
go语言后端开发学习(四) —— 在go项目中使用Zap日志库
本文详细介绍了如何在Go项目中集成并配置Zap日志库。首先通过`go get -u go.uber.org/zap`命令安装Zap,接着展示了`Logger`与`Sugared Logger`两种日志记录器的基本用法。随后深入探讨了Zap的高级配置,包括如何将日志输出至文件、调整时间格式、记录调用者信息以及日志分割等。最后,文章演示了如何在gin框架中集成Zap,通过自定义中间件实现了日志记录和异常恢复功能。通过这些步骤,读者可以掌握Zap在实际项目中的应用与定制方法
156 1
go语言后端开发学习(四) —— 在go项目中使用Zap日志库