异步 API 设计之扇入扇出模式

简介: 扇出/扇入模式是更高级 API 集成的主要内容。这些应用程序并不总是表现出相同的可用性或性能特征。

1 扇入/扇出服务

们举一个现实世界的例子,一个电子商务网站将自己与一个第三方支付网关整合在一起。 这里,网站使用支付网关的 API 来弹出支付屏幕并输入安全证书。同时,网站可能会调用另一个称为分析的 API 来记录支付的尝试。这种将一个请求分叉成多个请求的过程被称为 fan-out 扇出。在现实世界中,一个客户请求可能涉及许多扇出服务。


另一个例子是 MapReduce。Map 是一个扇入的操作,而 Reduce 是一个扇出的 操作。一个服务器可以将一个信息扇出到下一组服务(API),并忽略结果。或者可以等到这些服务器的所有响应都返回。如 如下图所示,一个传入的请求被服务器复用为转换成两个传出的请求:


image.png


扇入 fan-in 是一种操作,即两个或更多传入的请求会聚成一个请求。这种情况下,API 如何聚合来自多个后端服务的结果,并将结果即时返回给客户。

例如,想想一个酒店价格聚合器或航班票务聚合器,它从不同的数据提供者那里获取关于多个酒店或航班的请求信息并显示出来。


下图显示了扇出操作是如何结合多个请求并准备一个最终的响应,由客户端消费的。


image.png


客户端也可以是一个服务器,为更多的客户提供服务。如上图所示,左侧的服务器正在收集来自酒店 A、酒店 B 和 航空公司供应商 A,并为不同的客户准备另一个响应。


因此,扇入和扇出操作并不总是完全相互独立的。大多数情况下,它将是一个混合场景,扇入和扇出操作都是相互配合的。


请记住,对下一组服务器的扇出操作可以是异步的。也是如此。对于扇入请求来说,这可能不是真的。扇入操作有时被称为 API 调用。


2 Go 语言实现扇入/扇出模式


Fan-out:多个 goroutine 从同一个通道读取数据,直到该通道关闭。OUT 是一种张开的模式,所以又被称为扇出,可以用来分发任务。

Fan-in:1 个 goroutine 从多个通道读取数据,直到这些通道关闭。IN 是一种收敛的模式,所以又被称为扇入,用来收集处理的结果。


image.png

package main
import (
  "context"
  "log"
  "sync"
  "time"
)
// Task 包含任务编号及任务所需时长
type Task struct {
  Number int
  Cost   time.Duration
}
// task channel 生成器
func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task {
  taskCh := make(chan Task)
  go func() {
    defer close(taskCh)
    for _, task := range taskList {
      select {
      case <-ctx.Done():
        return
      case taskCh <- task:
      }
    }
  }()
  return taskCh
}
// doTask 处理并返回已处理的任务编号作为通道的函数
func doTask(ctx context.Context, taskCh <-chan Task) <-chan int {
  doneTaskCh := make(chan int)
  go func() {
    defer close(doneTaskCh)
    for task := range taskCh {
      select {
      case <-ctx.Done():
        return
      default:
        log.Printf("do task number: %d\n", task.Number)
        // task 任务处理
        // 根据任务耗时休眠
        time.Sleep(task.Cost)
        doneTaskCh <- task.Number // 已处理任务的编号放入通道
      }
    }
  }()
  return doneTaskCh
}
// `fan-in` 意味着将多个数据流复用或合并成一个流。
// merge 函数接收参数传递的多个通道 “taskChs”,并返回单个通道 “<-chan int”
func merge(ctx context.Context, taskChs []<-chan int) <-chan int {
  var wg sync.WaitGroup
  mergedTaskCh := make(chan int)
  mergeTask := func(taskCh <-chan int) {
    defer wg.Done()
    for t := range taskCh {
      select {
      case <-ctx.Done():
        return
      case mergedTaskCh <- t:
      }
    }
  }
  wg.Add(len(taskChs))
  for _, taskCh := range taskChs {
    go mergeTask(taskCh)
  }
  // 等待所有任务处理完毕
  go func() {
    wg.Wait()
    close(mergedTaskCh)
  }()
  return mergedTaskCh
}
func main() {
  start := time.Now()
  // 使用 context 来防止 goroutine 泄漏,即使在处理过程中被中断
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()
  // taskList 定义每个任务及其成本
  taskList := []Task{
    Task{1, 1 * time.Second},
    Task{2, 7 * time.Second},
    Task{3, 2 * time.Second},
    Task{4, 3 * time.Second},
    Task{5, 5 * time.Second},
    Task{6, 3 * time.Second},
  }
  // taskChannelGerenator 是一个函数,它接收一个 taskList 并将其转换为 Task 类型的通道
  // 执行结果(int slice channel)存储在 worker 中
  // 由于 doTask 的结果是一个通道,被分给了多个 worker,这就对应了 fan-out 处理
  taskCh := taskChannelGerenator(ctx, taskList)
  numWorkers := 4
  workers := make([]<-chan int, numWorkers)
  for i := 0; i < numWorkers; i++ {
    workers[i] = doTask(ctx, taskCh)  // doTask 处理并返回已处理的任务编号作为通道的函数
  }
  count := 0
  for d := range merge(ctx, workers) { // merge 从中读取已处理的任务编号
    count++
    log.Printf("done task number: %d\n", d)
  }
  log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}
相关文章
|
2月前
|
缓存 监控 API
探索微服务架构中的API网关模式
【10月更文挑战第5天】随着微服务架构的兴起,企业纷纷采用这一模式构建复杂应用。在这种架构下,应用被拆分成若干小型、独立的服务,每个服务围绕特定业务功能构建并通过HTTP协议协作。随着服务数量增加,统一管理这些服务间的交互变得至关重要。API网关作为微服务架构的关键组件,承担起路由请求、聚合数据、处理认证与授权等功能。本文通过一个在线零售平台的具体案例,探讨API网关的优势及其实现细节,展示其在简化客户端集成、提升安全性和性能方面的关键作用。
78 2
|
2月前
|
存储 缓存 监控
探索微服务架构中的API网关模式
【10月更文挑战第1天】探索微服务架构中的API网关模式
98 2
|
4月前
|
JavaScript 前端开发 API
[译] 用 Vue 3 Composition API 实现 React Context/Provider 模式
[译] 用 Vue 3 Composition API 实现 React Context/Provider 模式
|
1月前
|
缓存 负载均衡 JavaScript
探索微服务架构下的API网关模式
【10月更文挑战第37天】在微服务架构的海洋中,API网关犹如一座灯塔,指引着服务的航向。它不仅是客户端请求的集散地,更是后端微服务的守门人。本文将深入探讨API网关的设计哲学、核心功能以及它在微服务生态中扮演的角色,同时通过实际代码示例,揭示如何实现一个高效、可靠的API网关。
|
1月前
|
缓存 监控 API
探索微服务架构中的API网关模式
随着微服务架构的兴起,API网关成为管理和服务间交互的关键组件。本文通过在线零售公司的案例,探讨了API网关在路由管理、认证授权、限流缓存、日志监控和协议转换等方面的优势,并详细介绍了使用Kong实现API网关的具体步骤。
50 3
|
1月前
|
存储 缓存 监控
探索微服务架构中的API网关模式
探索微服务架构中的API网关模式
53 2
|
3月前
|
存储 API 数据库
如何使用 ef core 的 code first(fluent api)模式实现自定义类型转换器?
本文介绍了如何在 EF Core 的 Code First 模式下使用自定义类型转换器实现 JsonDocument 和 DateTime 类型到 SQLite 数据库的正确映射。通过自定义 ValueConverter,实现了数据类型的转换,并展示了完整的项目结构和代码实现,包括实体类定义、DbContext 配置、Repositories 仓储模式及数据库应用迁移(Migrations)操作。
76 6
如何使用 ef core 的 code first(fluent api)模式实现自定义类型转换器?
|
3月前
|
JSON 监控 安全
探索微服务架构中的API网关模式
【9月更文挑战第22天】在微服务架构的海洋中,API网关如同一位智慧的守门人,不仅管理着服务的进出,还维护着整个系统的秩序。本文将带你一探究竟,看看这位守门人是如何工作的,以及它为何成为现代云原生应用不可或缺的一部分。从流量控制到安全防护,再到服务聚合,我们将一起解锁API网关的秘密。
|
2月前
|
API C#
异步轮询 Web API 的实现与 C# 示例
异步轮询 Web API 的实现与 C# 示例
90 0
|
4月前
|
设计模式 API Go
REST API设计模式和反模式
REST API设计模式和反模式