异步 API 设计之扇入扇出模式

简介: 扇出/扇入模式是更高级 API 集成的主要内容。这些应用程序并不总是表现出相同的可用性或性能特征。

1 扇入/扇出服务

们举一个现实世界的例子,一个电子商务网站将自己与一个第三方支付网关整合在一起。 这里,网站使用支付网关的 API 来弹出支付屏幕并输入安全证书。同时,网站可能会调用另一个称为分析的 API 来记录支付的尝试。这种将一个请求分叉成多个请求的过程被称为 fan-out 扇出。在现实世界中,一个客户请求可能涉及许多扇出服务。


另一个例子是 MapReduce。Map 是一个扇入的操作,而 Reduce 是一个扇出的 操作。一个服务器可以将一个信息扇出到下一组服务(API),并忽略结果。或者可以等到这些服务器的所有响应都返回。如 如下图所示,一个传入的请求被服务器复用为转换成两个传出的请求:


image.png


扇入 fan-in 是一种操作,即两个或更多传入的请求会聚成一个请求。这种情况下,API 如何聚合来自多个后端服务的结果,并将结果即时返回给客户。

例如,想想一个酒店价格聚合器或航班票务聚合器,它从不同的数据提供者那里获取关于多个酒店或航班的请求信息并显示出来。


下图显示了扇出操作是如何结合多个请求并准备一个最终的响应,由客户端消费的。


image.png


客户端也可以是一个服务器,为更多的客户提供服务。如上图所示,左侧的服务器正在收集来自酒店 A、酒店 B 和 航空公司供应商 A,并为不同的客户准备另一个响应。


因此,扇入和扇出操作并不总是完全相互独立的。大多数情况下,它将是一个混合场景,扇入和扇出操作都是相互配合的。


请记住,对下一组服务器的扇出操作可以是异步的。也是如此。对于扇入请求来说,这可能不是真的。扇入操作有时被称为 API 调用。


2 Go 语言实现扇入/扇出模式


Fan-out:多个 goroutine 从同一个通道读取数据,直到该通道关闭。OUT 是一种张开的模式,所以又被称为扇出,可以用来分发任务。

Fan-in:1 个 goroutine 从多个通道读取数据,直到这些通道关闭。IN 是一种收敛的模式,所以又被称为扇入,用来收集处理的结果。


image.png

package main
import (
  "context"
  "log"
  "sync"
  "time"
)
// Task 包含任务编号及任务所需时长
type Task struct {
  Number int
  Cost   time.Duration
}
// task channel 生成器
func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task {
  taskCh := make(chan Task)
  go func() {
    defer close(taskCh)
    for _, task := range taskList {
      select {
      case <-ctx.Done():
        return
      case taskCh <- task:
      }
    }
  }()
  return taskCh
}
// doTask 处理并返回已处理的任务编号作为通道的函数
func doTask(ctx context.Context, taskCh <-chan Task) <-chan int {
  doneTaskCh := make(chan int)
  go func() {
    defer close(doneTaskCh)
    for task := range taskCh {
      select {
      case <-ctx.Done():
        return
      default:
        log.Printf("do task number: %d\n", task.Number)
        // task 任务处理
        // 根据任务耗时休眠
        time.Sleep(task.Cost)
        doneTaskCh <- task.Number // 已处理任务的编号放入通道
      }
    }
  }()
  return doneTaskCh
}
// `fan-in` 意味着将多个数据流复用或合并成一个流。
// merge 函数接收参数传递的多个通道 “taskChs”,并返回单个通道 “<-chan int”
func merge(ctx context.Context, taskChs []<-chan int) <-chan int {
  var wg sync.WaitGroup
  mergedTaskCh := make(chan int)
  mergeTask := func(taskCh <-chan int) {
    defer wg.Done()
    for t := range taskCh {
      select {
      case <-ctx.Done():
        return
      case mergedTaskCh <- t:
      }
    }
  }
  wg.Add(len(taskChs))
  for _, taskCh := range taskChs {
    go mergeTask(taskCh)
  }
  // 等待所有任务处理完毕
  go func() {
    wg.Wait()
    close(mergedTaskCh)
  }()
  return mergedTaskCh
}
func main() {
  start := time.Now()
  // 使用 context 来防止 goroutine 泄漏,即使在处理过程中被中断
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()
  // taskList 定义每个任务及其成本
  taskList := []Task{
    Task{1, 1 * time.Second},
    Task{2, 7 * time.Second},
    Task{3, 2 * time.Second},
    Task{4, 3 * time.Second},
    Task{5, 5 * time.Second},
    Task{6, 3 * time.Second},
  }
  // taskChannelGerenator 是一个函数,它接收一个 taskList 并将其转换为 Task 类型的通道
  // 执行结果(int slice channel)存储在 worker 中
  // 由于 doTask 的结果是一个通道,被分给了多个 worker,这就对应了 fan-out 处理
  taskCh := taskChannelGerenator(ctx, taskList)
  numWorkers := 4
  workers := make([]<-chan int, numWorkers)
  for i := 0; i < numWorkers; i++ {
    workers[i] = doTask(ctx, taskCh)  // doTask 处理并返回已处理的任务编号作为通道的函数
  }
  count := 0
  for d := range merge(ctx, workers) { // merge 从中读取已处理的任务编号
    count++
    log.Printf("done task number: %d\n", d)
  }
  log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}
相关文章
|
1月前
|
缓存 中间件 API
|
7月前
|
API
api一键自动合约跟单模式 | 程序化交易系统开发讲解【附样板源码实例分析】
“量化交易”有着两层含义:一是从狭义上来讲,是指量化交易的内容,将交易条件转变成为程序,自动下单;二是从广义上来讲,是指系统交易方法,就是一个整合的交易系统。
|
8月前
|
存储 API 流计算
Flink DataStream API-概念、模式、作业流程和程序
前几篇介绍了Flink的入门、架构原理、安装等,相信你对Flink已经了解入门。接下来开始介绍Flink DataStream API内容,先介绍DataStream API基本概念和使用,然后介绍核心概念,最后再介绍经典案例和代码实现。本篇内容:Flink DataStream API的概念、模式、作业流程和程序。
Flink DataStream API-概念、模式、作业流程和程序
|
3月前
|
Web App开发 前端开发 测试技术
【Web API系列】使用异步剪贴板API(async clipboard)的图像的编程复制和粘贴
【Web API系列】使用异步剪贴板API(async clipboard)的图像的编程复制和粘贴
75 1
|
3月前
|
消息中间件 Kafka API
Kafka - 异步/同步发送API
Kafka - 异步/同步发送API
45 0
|
8月前
|
消息中间件 负载均衡 Kafka
Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验(一)
Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验(一)
|
8月前
|
消息中间件 缓存 Kafka
Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验(二)
Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验(二)
|
9月前
|
开发框架 API 数据安全/隐私保护
合约跟单带单模式API对接平台开发部署指南(附源码实例分析)
合约跟单带单模式API对接平台开发部署指南(附源码实例分析)
|
9月前
|
Kubernetes API 容器
Kubernetes 控制器模式为何会依赖声明式 API?
Kubernetes 控制器模式依赖声明式的 API。另外一种常见的 API 类型是命令式 API。
53 0
|
10月前
|
XML JSON 缓存
电商无货源模式中API的应用
首先,无货源不代表真的没有货,那如何上架,上哪些产品呢?这里就需要用到一些数据,比如淘宝的热销榜产品,某个店铺的商品详情,等等……