For-Select-Done
我们应该防止程序中发生任何泄露。所以我们应该对于留在程序中的go例程发送信号,让它知道它可以退出。
最常见的就是将for-select循环与通道结合起来,向go程序发送一个关闭信号。我们称它为“完成”通道。
func printIntegers(done <-chan struct{}, intStream <-chan int) { for{ select { case i := <-intStream: fmt.Println(i) case <-done: return } } }
没有看懂,先记在这里……
扇入模式
func fanIn(ctx context.Context, fetchers ...<-chan interface{}) <-chan interface{} { combinedFetcher := make(chan interface{}) // 1 var wg sync.WaitGroup wg.Add(len(fetchers)) // 2 for _, f := range fetchers { f := f go func() { // 3 defer wg.Done() for{ select{ case res := <-f: combinedFetcher <- res case <-ctx.Done(): return } } }() } // 4 // Channel cleanup go func() { wg.Wait() close(combinedFetcher) } () return combinedFetcher }
从流中获取前 n 个值
func takeFirstN(ctx context.Context, dataSource <-chan interface{}, n int) <-chan interface{} { // 1 takeChannel := make(chan interface{}) // 2 go func() { defer close(takeChannel) // 3 for i := 0; i< n; i++ { select { case val, ok := <-dataSource: if !ok{ return } takeChannel <- val case <-ctx.Done(): return } } }() return takeChannel }
订阅模式
type Subscription interface { Updates() <-chan Item } //On the other hand, we are going to use another interface as an abstraction to fetch the data we //need: //另一方面,我们将使用另一个接口作为抽象来获取我们需要的数据。 type Fetcher interface { Fetch() (Item, error) } //For each of these we are going to have a concrete type implementing them. //对于其中的每一个,我们都将有一个具体的类型来实现它们。 //For the subscription: //对于订阅来说。 func NewSubscription(ctx context.Context, fetcher Fetcher, freq int) Subscription { s := &sub{ fetcher: fetcher, updates: make(chan Item), } // Running the Task Supposed to fetch our data go s.serve(ctx, freq) return s } type sub struct { fetcher Fetcher updates chan Item } func (s *sub) Updates() <-chan Item { return s.updates } //We are going to go into more details about what happens inside the serve method. 我们将更详细地介绍服务方法内部发生的事情。 //For the fetcher: //对于取物者来说。 func NewFetcher(uri string) Fetcher { f := &fetcher{ uri: uri, } return f } type fetcher struct { uri string } //Inside the serve method //The serve method consists of a for-select-done type of loop: //服务方法由一个 for-select-done 类型的循环组成。 func (s *sub) serve(ctx context.Context, checkFrequency int) { clock := time.NewTicker(time.Duration(checkFrequency) * time.Second) type fetchResult struct { fetched Item err error } fetchDone := make(chan fetchResult, 1) for { select { // Clock that triggers the fetch case <-clock.C: go func() { fetched, err := s.fetcher.Fetch() fetchDone <- fetchResult{fetched, err} }() // Case where the fetch result is // Ready to be consumed case result := <-fetchDone: fetched := result.fetched if result.err != nil { log.Println("Fetch error: %v \n Waiting the next iteration", result.err.Error()) break } s.updates <-fetched // Case where we need to close the server case <-ctx.Done(): return } } }
有一个修正,暂时先不提。
地图模式
func Map(done <-chan struct{}, inputStream <-chan int, operator func(int)int) <-chan int { // 1 mappedStream := make(chan int) go func() { defer close(mappedStream) // 2 for { select { case <-done: return // 3 case i, ok := <-inputStream: if !ok { return } //4 select { case <-done: return case mappedStream <- operator(i): } } } }() return mappedStream
func main() { done := make(chan struct{}) defer close(done) // Generates a channel sending integers // From 0 to 9 range10 := rangeChannel(done, 10) multiplierBy10 := func(x int) int { return x * 10 } for num := range Map(done, range10, multiplierBy10) { fmt.Println(num) } }
过滤模式
func Filter(done <-chan struct{}, inputStream <-chan int, operator func(int)bool) <-chan int { filteredStream := make(chan int) go func() { defer close(filteredStream) for { select { case <-done: return case i, ok := <-inputStream: if !ok { return } if !operator(i) { break } select { case <-done: return case filteredStream <- i: } } } }() return filteredStream }
func main() { done := make(chan struct{}) defer close(done) // Generates a channel sending integers // From 0 to 9 range10 := rangeChannel(done, 10) isEven := func(x int) bool { return x % 2 == 0 } for num := range Filter(done, range10, isEven) { fmt.Println(num) } }
这五个模式是构建更大、更复杂的 Golang 应用程序的基石。这些解决方案可以解决你在处
理 GO 中的并发问题时可能遇到的问题。此外,你还可以在此基础上修改、扩展和创建新的
模式。