1 扇入/扇出服务
们举一个现实世界的例子,一个电子商务网站将自己与一个第三方支付网关整合在一起。 这里,网站使用支付网关的 API 来弹出支付屏幕并输入安全证书。同时,网站可能会调用另一个称为分析的 API 来记录支付的尝试。这种将一个请求分叉成多个请求的过程被称为 fan-out 扇出。在现实世界中,一个客户请求可能涉及许多扇出服务。
另一个例子是 MapReduce。Map 是一个扇入的操作,而 Reduce 是一个扇出的 操作。一个服务器可以将一个信息扇出到下一组服务(API),并忽略结果。或者可以等到这些服务器的所有响应都返回。如 如下图所示,一个传入的请求被服务器复用为转换成两个传出的请求:
扇入 fan-in 是一种操作,即两个或更多传入的请求会聚成一个请求。这种情况下,API 如何聚合来自多个后端服务的结果,并将结果即时返回给客户。
例如,想想一个酒店价格聚合器或航班票务聚合器,它从不同的数据提供者那里获取关于多个酒店或航班的请求信息并显示出来。
下图显示了扇出操作是如何结合多个请求并准备一个最终的响应,由客户端消费的。
客户端也可以是一个服务器,为更多的客户提供服务。如上图所示,左侧的服务器正在收集来自酒店 A、酒店 B 和 航空公司供应商 A,并为不同的客户准备另一个响应。
因此,扇入和扇出操作并不总是完全相互独立的。大多数情况下,它将是一个混合场景,扇入和扇出操作都是相互配合的。
请记住,对下一组服务器的扇出操作可以是异步的。也是如此。对于扇入请求来说,这可能不是真的。扇入操作有时被称为 API 调用。
2 Go 语言实现扇入/扇出模式
Fan-out:多个 goroutine 从同一个通道读取数据,直到该通道关闭。OUT 是一种张开的模式,所以又被称为扇出,可以用来分发任务。
Fan-in:1 个 goroutine 从多个通道读取数据,直到这些通道关闭。IN 是一种收敛的模式,所以又被称为扇入,用来收集处理的结果。
package main import ( "context" "log" "sync" "time" ) // Task 包含任务编号及任务所需时长 type Task struct { Number int Cost time.Duration } // task channel 生成器 func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task { taskCh := make(chan Task) go func() { defer close(taskCh) for _, task := range taskList { select { case <-ctx.Done(): return case taskCh <- task: } } }() return taskCh } // doTask 处理并返回已处理的任务编号作为通道的函数 func doTask(ctx context.Context, taskCh <-chan Task) <-chan int { doneTaskCh := make(chan int) go func() { defer close(doneTaskCh) for task := range taskCh { select { case <-ctx.Done(): return default: log.Printf("do task number: %d\n", task.Number) // task 任务处理 // 根据任务耗时休眠 time.Sleep(task.Cost) doneTaskCh <- task.Number // 已处理任务的编号放入通道 } } }() return doneTaskCh } // `fan-in` 意味着将多个数据流复用或合并成一个流。 // merge 函数接收参数传递的多个通道 “taskChs”,并返回单个通道 “<-chan int” func merge(ctx context.Context, taskChs []<-chan int) <-chan int { var wg sync.WaitGroup mergedTaskCh := make(chan int) mergeTask := func(taskCh <-chan int) { defer wg.Done() for t := range taskCh { select { case <-ctx.Done(): return case mergedTaskCh <- t: } } } wg.Add(len(taskChs)) for _, taskCh := range taskChs { go mergeTask(taskCh) } // 等待所有任务处理完毕 go func() { wg.Wait() close(mergedTaskCh) }() return mergedTaskCh } func main() { start := time.Now() // 使用 context 来防止 goroutine 泄漏,即使在处理过程中被中断 ctx, cancel := context.WithCancel(context.Background()) defer cancel() // taskList 定义每个任务及其成本 taskList := []Task{ Task{1, 1 * time.Second}, Task{2, 7 * time.Second}, Task{3, 2 * time.Second}, Task{4, 3 * time.Second}, Task{5, 5 * time.Second}, Task{6, 3 * time.Second}, } // taskChannelGerenator 是一个函数,它接收一个 taskList 并将其转换为 Task 类型的通道 // 执行结果(int slice channel)存储在 worker 中 // 由于 doTask 的结果是一个通道,被分给了多个 worker,这就对应了 fan-out 处理 taskCh := taskChannelGerenator(ctx, taskList) numWorkers := 4 workers := make([]<-chan int, numWorkers) for i := 0; i < numWorkers; i++ { workers[i] = doTask(ctx, taskCh) // doTask 处理并返回已处理的任务编号作为通道的函数 } count := 0 for d := range merge(ctx, workers) { // merge 从中读取已处理的任务编号 count++ log.Printf("done task number: %d\n", d) } log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds()) }