Go-并发模式2(Patterns)

简介: Go-并发模式2(Patterns)

上篇文章,讲到了goroutine和channel,进行了简单的通信,接下来看看有哪些模式。

模式1 Generator:返回channel的函数

package main
import (
  "fmt"
  "math/rand"
  "time"
)
func main() {
  c := boring("boring!") // Function returning a channel.
  for i := 0; i < 5; i++ {
    fmt.Printf("You say: %q\n", <-c)
  }
  fmt.Println("You're boring; I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings. 
  c := make(chan string)
  go func() { // We launch the goroutine from inside the function.
    for i := 0; ; i++ {
      c <- fmt.Sprintf("%s %d", msg, i)
      time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
  }()
  return c // Return the channel to the caller.
}

boring返回一个channel,不断往里写数据。main调用,并从channel中获取数据,结果如下:

2020062310470442.png

通道做句柄

package main
import (
  "fmt"
  "math/rand"
  "time"
)
func main() {
  joe := boring("Joe")
  ann := boring("Ann")
  for i := 0; i < 5; i++ {
    fmt.Println(<-joe)
    fmt.Println(<-ann) // ann will block if joe is not ready to give a value
  }
  fmt.Println("You're both boring; I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
  c := make(chan string)
  go func() { // We launch the goroutine from inside the function.
    for i := 0; ; i++ {
      c <- fmt.Sprintf("%s: %d", msg, i)
      time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
  }()
  return c // Return the channel to the caller.
}

返回多个boring服务,channel做服务的句柄(也就是唯一标识)。返回channel时,通道没有数据会阻塞,按顺序来即可保证输出顺序。 结果如下:

2020062310470442.png

模式2 扇入(fan in):多个channel并入一个

package main
import (
  "fmt"
  "math/rand"
  "time"
)
func main() {
  c := fanIn(boring("Joe"), boring("Ann"))
  for i := 0; i < 10; i++ {
    fmt.Println(<-c) // HL
  }
  fmt.Println("You're both boring; I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
  c := make(chan string)
  go func() { // We launch the goroutine from inside the function.
    for i := 0; ; i++ {
      c <- fmt.Sprintf("%s: %d", msg, i)
      time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
    }
  }()
  return c // Return the channel to the caller.
}
func fanIn(input1, input2 <-chan string) <-chan string {
  c := make(chan string)
  go func() { for { c <- <-input1 } }()
  go func() { for { c <- <-input2 } }()
  return c
}

不考虑顺序时可以这样使用,类似开车时的汇入,“前方小心右侧车辆汇入”。

2020062310470442.png

input1、input2和c的关系如下图所示:

20210104203500268.jpg


阻塞与按序恢复

package main
import (
  "fmt"
  "math/rand"
  "time"
)
type Message struct {
  str string
  wait chan bool
}
func main() {
  c := fanIn(boring("Joe"), boring("Ann"))
  for i := 0; i < 5; i++ {
    msg1 := <-c; fmt.Println(msg1.str)
    msg2 := <-c; fmt.Println(msg2.str)
    msg1.wait <- true // block boring, false is also ok
    msg2.wait <- true
  }
  fmt.Println("You're both boring; I'm leaving.")
}
func boring(msg string) <-chan Message { // Returns receive-only channel of strings.
  c := make(chan Message)
  waitForIt := make(chan bool) // Shared between all messages.
  go func() { // We launch the goroutine from inside the function.
    for i := 0; ; i++ {
      c <- Message{ fmt.Sprintf("%s: %d", msg, i), waitForIt }
      time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
      <-waitForIt // to be blocked
    }
  }()
  return c // Return the channel to the caller.
}
func fanIn(inputs ... <-chan Message) <-chan Message {
  c := make(chan Message)
  for i := range inputs {
    input := inputs[i] // New instance of 'input' for each loop.
    go func() { for { c <- <-input } }()
  }
  return c
}

乱序到达,通过通道形成加锁同步。通道作为锁,

<-waitForIt

会阻塞,直到有数据。

主函数运行,第一句话,内存中有一个channel(称为20201204182323419.gif),存放Message类型,有20201204182323419.gif,放入20201204182323419.gif后阻塞。同样有20201204182323419.gif20201204182323419.gif

fanIn函数将两个channel的数据放到一个channel c,之后按序输出和去除阻塞(去除阻塞就是向对应的WaitForIt channel写一个数据,所以true还是false无所谓)。结果如下:

2020062310470442.png

通道关系如下:

2020062310470442.png

与Select结合

针对并发特有的控制结构。和switch很像,但每个case不是表达式而是通信,当有多个case可以时,将伪随机选择一个,所以不能依赖select来做顺序通信。

Rob给了例子,下方的select 10,篇幅原因就不展示了。

扇入(fan In)模式与Select的结合

package main
import (
  "fmt"
  "math/rand"
  "time"
)
func main() {
  c := fanIn(boring("Joe"), boring("Ann"))
  for i := 0; i < 10; i++ {
    fmt.Println(<-c)
  }
  fmt.Println("You're both boring1; I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
  c := make(chan string)
  go func() { // We launch the goroutine from inside the function.
    for i := 0; ; i++ {
      c <- fmt.Sprintf("%s: %d", msg, i)
      time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
  }()
  return c // Return the channel to the caller.
}
func fanIn(input1, input2 <-chan string) <-chan string {
  c := make(chan string)
  go func() {
    for {
      select {
      case s := <-input1:  c <- s
      case s := <-input2:  c <- s
      }
    }
  }()
  return c
}

fanIn不再使用多个goroutine了,而是使用一个goroutine,在其中有无限循环和select。

2020062310470442.png

模式3:通信超时

单次超时

package main
import (
  "fmt"
  "math/rand"
  "time"
)
func main() {
  c := boring("Joe")
  for {
    select {
    case s := <-c:
      fmt.Println(s)
    case <-time.After(1 * time.Second): // if you change it to more than 1.5 seconds it will not block, eg. 5
      fmt.Println("You're too slow.") // it's time to stop last case
      return
    }
  }
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
  c := make(chan string)
  go func() { // We launch the goroutine from inside the function.
    for i := 0; ; i++ {
      c <- fmt.Sprintf("%s: %d", msg, i)
      time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
    }
  }()
  return c // Return the channel to the caller.
}

有的时候boring服务可能是页面访问,获取资源等服务,我们并不清楚需要多长时间,但是我们有一个时间上限。这个时候可以使用库函数,time.After,到达等待时间后它会返回一个channel,这时我们可以退出程序。

2020062310470442.png

总体超时

package main
import (
  "fmt"
  "math/rand"
  "time"
)
func main() {
  c := boring("Joe")
  timeout := time.After(5 * time.Second)
  for {
    select {
    case s := <-c:
      fmt.Println(s)
    case <-timeout:
      fmt.Println("You talk too much.")
      return
    }
  }
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
  c := make(chan string)
  go func() { // We launch the goroutine from inside the function.
    for i := 0; ; i++ {
      c <- fmt.Sprintf("%s: %d", msg, i)
      time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
    }
  }()
  return c // Return the channel to the caller.
}

我们只定义一个timeout,到达后就退出。

2020062310470442.png

模式4:自定义退出

package main
import (
  "fmt"
  "math/rand"
  "time"
)
func main() {
  quit := make(chan bool)
  c := boring("Joe", quit)
  for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
  // modify by lady_killer9
  fmt.Println("You're boring!")
  quit <- true
}
func boring(msg string, quit <-chan bool) <-chan string {
  c := make(chan string)
  go func() {
    for i := 0; ; i++ {
      time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
      select {
      case c <- fmt.Sprintf("%s: %d", msg, i):
        // do nothing
      case <-quit:
        return
      }
    }
  }()
  return c
}

想退出时,在select的某个return的case对应的channel中写入数据

2020062310470442.png

安全的退出

package main
import (
  "fmt"
  "math/rand"
  "time"
)
func cleanup() {
  // added by lady_killer9
  fmt.Println("clean up ")
}
func main() {
  quit := make(chan string)
  c := boring("Joe", quit)
  for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
  quit <- "Bye!"
  fmt.Printf("Joe says: %q\n", <-quit)
}
func boring(msg string, quit chan string) <-chan string {
  c := make(chan string)
  go func() {
    for i := 0; ; i++ {
      time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
      select {
      case c <- fmt.Sprintf("%s: %d", msg, i):
        // do nothing
      case <-quit:
        cleanup()
        quit <- "See you!"
        return
      }
    }
  }()
  return c
}

有的时候,我们收到要关闭的消息后,需要进行文件关闭等清理工作,之后再告诉主程序我们清理完毕,可以退出了,防止内存泄露,文件占用等情况。

2020062310470442.png

goroutine的速度

package main
import (
  "fmt"
  "time"
)
func f(left, right chan int) {
  left <- 1 + <-right
}
func main() {
  const n = 10000
  leftmost := make(chan int)
  right := leftmost
  left := leftmost
  for i := 0; i < n; i++ {
    right = make(chan int)
    go f(left, right)
    left = right
  }
  start := time.Now()
  go func(c chan int) { c <- 1 }(right)
  fmt.Println(<-leftmost)
  fmt.Println(time.Since(start))
}

20210104203500268.jpg

类似于传话筒游戏,我们不断的右耳朵进,左耳朵出。这里使用了10000个goroutine,结果如下

2020062310470442.png

大概花了3ms多一点,就完成了10000个goroutine的通信,如果使用Python等其他语言是很难达到的,这就是goroutine,简单,高效。

接下来是一个模式使用的例子

Google Search

Google搜索是一个很好的例子,我们输入问题,然后Google发给多个后端程序进行搜索,可能是网页,图片,视频等,最后将结果进行一个汇总并返回。接下来进行一个仿造:

Google Search 1.0

package main
import (
  "fmt"
  "math/rand"
  "time"
)
type Result string
func Google(query string) (results []Result) {
  results = append(results, Web(query))
  results = append(results, Image(query))
  results = append(results, Video(query))
  return
}
var (
  Web = fakeSearch("web")
  Image = fakeSearch("image")
  Video = fakeSearch("video")
)
type Search func(query string) Result
func fakeSearch(kind string) Search {
        return func(query string) Result {
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            return Result(fmt.Sprintf("%s result for %q\n", kind, query))
        }
}
func main() {
  rand.Seed(time.Now().UnixNano())
  start := time.Now()
  results := Google("golang")
  elapsed := time.Since(start)
  fmt.Println(results)
  fmt.Println(elapsed)
}

在Google时,只是将结果放入结果队列依次放入会等待上一个结果出来

2020062310470442.png

等待太浪费时间了,我们可以使用goroutine

Google Search 2.0

package main
import (
  "fmt"
  "math/rand"
  "time"
)
type Result string
type Search func(query string) Result
var (
  Web = fakeSearch("web")
  Image = fakeSearch("image")
  Video = fakeSearch("video")
)
func Google(query string) (results []Result) {
  c := make(chan Result)
  go func() { c <- Web(query) } ()
  go func() { c <- Image(query) } ()
  go func() { c <- Video(query) } ()
  for i := 0; i < 3; i++ {
    result := <-c
    results = append(results, result)
  }
  return
}
func fakeSearch(kind string) Search {
        return func(query string) Result {
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            return Result(fmt.Sprintf("%s result for %q\n", kind, query))
        }
}
func main() {
  rand.Seed(time.Now().UnixNano())
  start := time.Now()
  results := Google("golang")
  elapsed := time.Since(start)
  fmt.Println(results)
  fmt.Println(elapsed)
}

使用goroutine就不必等待上一个结果出来

2020062310470442.png

Google Search 2.1

package main
import (
  "fmt"
  "math/rand"
  "time"
)
type Result string
type Search func(query string) Result
var (
  Web = fakeSearch("web")
  Image = fakeSearch("image")
  Video = fakeSearch("video")
)
func Google(query string) (results []Result) {
  c := make(chan Result)
  go func() { c <- Web(query) } ()
  go func() { c <- Image(query) } ()
  go func() { c <- Video(query) } ()
  timeout := time.After(80 * time.Millisecond)
  for i := 0; i < 3; i++ {
    select {
    case result := <-c:
      results = append(results, result)
    case <-timeout:
      fmt.Println("timed out")
      return
    }
  }
  return
}
func fakeSearch(kind string) Search {
        return func(query string) Result {
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            return Result(fmt.Sprintf("%s result for %q\n", kind, query))
        }
}
func main() {
  rand.Seed(time.Now().UnixNano())
  start := time.Now()
  results := Google("golang")
  elapsed := time.Since(start)
  fmt.Println(results)
  fmt.Println(elapsed)
}

在Google 2.0的fan In模式的基础上,增加了总体超时模式,超过时不再等待其他结果。

2020062310470442.png我们如何避免丢弃慢速服务器的结果呢?例如,上面的video被丢弃了

答:复制服务器。向多个副本发送请求,并使用第一个响应

Google Search 3.0

package main
import (
  "fmt"
  "math/rand"
  "time"
)
type Result string
type Search func(query string) Result
func First(query string, replicas ...Search) Result {
  c := make(chan Result)
  searchReplica := func(i int) { c <- replicas[i](query) }
  for i := range replicas {
    go searchReplica(i)
  }
  return <-c
}
func main() {
  rand.Seed(time.Now().UnixNano())
  start := time.Now()
  result := First("golang",
    fakeSearch("replica 1"),
    fakeSearch("replica 2"))
  elapsed := time.Since(start)
  fmt.Println(result)
  fmt.Println(elapsed)
}
func fakeSearch(kind string) Search {
        return func(query string) Result {
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            return Result(fmt.Sprintf("%s result for %q\n", kind, query))
        }
}

对于同一个问题,我们启用多个副本,返回最快的服务器搜索到的结果

2020062310470442.png

接下来,我们针对web、image、video都启用多个服务器进行搜索

package main
import (
  "fmt"
  "math/rand"
  "time"
)
type Result string
type Search func(query string) Result
var (
  Web1 = fakeSearch("web1")
  Web2 = fakeSearch("web2")
  Image1 = fakeSearch("image1")
  Image2 = fakeSearch("image2")
  Video1 = fakeSearch("video1")
  Video2 = fakeSearch("video2")
)
func Google(query string) (results []Result) {
  c := make(chan Result)
  go func() { c <- First(query, Web1, Web2) } ()
  go func() { c <- First(query, Image1, Image2) } ()
  go func() { c <- First(query, Video1, Video2) } ()
  timeout := time.After(80 * time.Millisecond)
  for i := 0; i < 3; i++ {
    select {
    case result := <-c:
      results = append(results, result)
    case <-timeout:
      fmt.Println("timed out")
      return
    }
  }
  return
}
func First(query string, replicas ...Search) Result {
  c := make(chan Result)
  searchReplica := func(i int) {
    c <- replicas[i](query)
  }
  for i := range replicas {
    go searchReplica(i)
  }
        return <-c
}
func fakeSearch(kind string) Search {
        return func(query string) Result {
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            return Result(fmt.Sprintf("%s result for %q\n", kind, query))
        }
}
func main() {
  rand.Seed(time.Now().UnixNano())
  start := time.Now()
  results := Google("golang")
  elapsed := time.Since(start)
  fmt.Println(results)
  fmt.Println(elapsed)
}

通过多个副本,选择最快的一个的方式,基本可以保证每种类型的结果都能在超时时间内完成。

2020062310470442.png

参考

Google IO 2012 Go Concurrency Patterns

b站中文字幕版

PPT

全部代码

更多Go相关内容:Go-Golang学习总结笔记

有问题请下方评论,转载请注明出处,并附有原文链接,谢谢!如有侵权,请及时联系。


相关文章
|
4天前
|
人工智能 Go 调度
掌握Go并发:Go语言并发编程深度解析
掌握Go并发:Go语言并发编程深度解析
|
4天前
|
SQL Go 数据库
【Go语言专栏】Go语言中的事务处理与并发控制
【4月更文挑战第30天】Go语言在数据库编程中支持事务处理和并发控制,确保ACID属性和多用户环境下的数据完整性。`database/sql`包提供事务管理,如示例所示,通过`Begin()`、`Commit()`和`Rollback()`执行和控制事务。并发控制利用Mutex、WaitGroup和Channel防止数据冲突。结合事务与并发控制,开发者可处理复杂场景,实现高效、可靠的数据库应用。
|
4天前
|
Cloud Native Go 云计算
多范式编程语言Go:并发与静态类型的结合
Go语言是Google于2007年开发的开源编程语言,旨在提高程序开发和部署的效率。它的独特特征在于结合了并发处理与静态类型系统,提供了简洁、高效、并行处理能力的编程体验。本文将探讨Go语言的特点、应用场景以及其在现代软件开发中的优势。
|
4天前
|
前端开发 Go
Golang深入浅出之-Go语言中的异步编程与Future/Promise模式
【5月更文挑战第3天】Go语言通过goroutines和channels实现异步编程,虽无内置Future/Promise,但可借助其特性模拟。本文探讨了如何使用channel实现Future模式,提供了异步获取URL内容长度的示例,并警示了Channel泄漏、错误处理和并发控制等常见问题。为避免这些问题,建议显式关闭channel、使用context.Context、并发控制机制及有效传播错误。理解并应用这些技巧能提升Go语言异步编程的效率和健壮性。
30 5
Golang深入浅出之-Go语言中的异步编程与Future/Promise模式
|
4天前
|
安全 Go
Golang深入浅出之-Go语言中的并发安全队列:实现与应用
【5月更文挑战第3天】本文探讨了Go语言中的并发安全队列,它是构建高性能并发系统的基础。文章介绍了两种实现方法:1) 使用`sync.Mutex`保护的简单队列,通过加锁解锁确保数据一致性;2) 使用通道(Channel)实现无锁队列,天生并发安全。同时,文中列举了并发编程中常见的死锁、数据竞争和通道阻塞问题,并给出了避免这些问题的策略,如明确锁边界、使用带缓冲通道、优雅处理关闭以及利用Go标准库。
26 5
|
4天前
|
存储 缓存 安全
Golang深入浅出之-Go语言中的并发安全容器:sync.Map与sync.Pool
Go语言中的`sync.Map`和`sync.Pool`是并发安全的容器。`sync.Map`提供并发安全的键值对存储,适合快速读取和少写入的情况。注意不要直接遍历Map,应使用`Range`方法。`sync.Pool`是对象池,用于缓存可重用对象,减少内存分配。使用时需注意对象生命周期管理和容量控制。在多goroutine环境下,这两个容器能提高性能和稳定性,但需根据场景谨慎使用,避免不当操作导致的问题。
35 4
|
4天前
|
安全 Go 开发者
Golang深入浅出之-Go语言中的CSP模型:深入理解并发哲学
【5月更文挑战第2天】Go语言的并发编程基于CSP模型,强调通过通信共享内存。核心概念是goroutines(轻量级线程)和channels(用于goroutines间安全数据传输)。常见问题包括数据竞争、死锁和goroutine管理。避免策略包括使用同步原语、复用channel和控制并发。示例展示了如何使用channel和`sync.WaitGroup`避免死锁。理解并发原则和正确应用CSP模型是编写高效安全并发程序的关键。
35 4
|
4天前
|
安全 Go 开发者
Golang深入浅出之-Go语言中的CSP模型:深入理解并发哲学
【5月更文挑战第1天】Go语言基于CSP理论,借助goroutines和channels实现独特的并发模型。Goroutine是轻量级线程,通过`go`关键字启动,而channels提供安全的通信机制。文章讨论了数据竞争、死锁和goroutine泄漏等问题及其避免方法,并提供了一个生产者消费者模型的代码示例。理解CSP和妥善处理并发问题对于编写高效、可靠的Go程序至关重要。
26 2
|
4天前
|
设计模式 Go 调度
Golang深入浅出之-Go语言中的并发模式:Pipeline、Worker Pool等
【5月更文挑战第1天】Go语言并发模拟能力强大,Pipeline和Worker Pool是常用设计模式。Pipeline通过多阶段处理实现高效并行,常见问题包括数据竞争和死锁,可借助通道和`select`避免。Worker Pool控制并发数,防止资源消耗,需注意任务分配不均和goroutine泄露,使用缓冲通道和`sync.WaitGroup`解决。理解和实践这些模式是提升Go并发性能的关键。
31 2
|
4天前
|
监控 安全 Go
【Go语言专栏】Go语言中的并发性能分析与优化
【4月更文挑战第30天】Go语言以其卓越的并发性能和简洁语法著称,通过goroutines和channels实现并发。并发性能分析旨在解决竞态条件、死锁和资源争用等问题,以提升多核环境下的程序效率。使用pprof等工具可检测性能瓶颈,优化策略包括减少锁范围、使用无锁数据结构、控制goroutines数量、应用worker pool和优化channel使用。理解并发模型和合理利用并发原语是编写高效并发代码的关键。