Go语言并发模型与模式:Fan-out / Fan-in 模式

简介: Fan-out/Fan-in 是一种经典的并发设计模式,用于任务分发与结果聚合。Fan-out 将任务分发给多个 worker 并发执行,Fan-in 将结果汇聚统一处理。适用于数据抓取、批量计算等“多产一收”场景。通过 goroutine 和 channel,可构建高效的数据处理流水线,具备高吞吐与扩展性。使用时需注意通道设计、异常处理及取消控制等问题。

 

在并发系统中,Fan-out / Fan-in 模式是一种经典的设计方式,用于在多个 goroutine 之间进行任务分发和结果聚合,常用于提高处理吞吐量和并发能力。


一、什么是 Fan-out / Fan-in 模式?

  • Fan-out(扇出):将任务从一个入口分发给多个 worker 并发执行。
  • Fan-in(扇入):将多个 worker 的结果汇聚到一个通道中进行统一处理。

这种模式适用于“多产一收”的数据处理流程,如数据抓取、批量计算等。


二、基本结构图

┌────────────┐
     │ 任务生产者 │
     └────┬───────┘
      Fan-out
  ┌──────┴──────┐
  ▼      ▼      ▼
Worker Worker Worker
  │      │      │
  └──────┬──────┘
       Fan-in
   ┌─────▼─────┐
   │ 结果处理器 │
   └───────────┘

三、代码示例

package main
import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)
func producer(count int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 0; i < count; i++ {
            out <- i
        }
        close(out)
    }()
    return out
}
func worker(id int, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for job := range in {
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(500))) // 模拟处理
            fmt.Printf("Worker %d processed job %d\n", id, job)
            out <- job * 2
        }
        close(out)
    }()
    return out
}
func merge(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    output := func(c <-chan int) {
        for val := range c {
            out <- val
        }
        wg.Done()
    }
    wg.Add(len(channels))
    for _, c := range channels {
        go output(c)
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}
func main() {
    rand.Seed(time.Now().UnixNano())
    input := producer(10)
    // Fan-out:启动3个worker处理任务
    w1 := worker(1, input)
    w2 := worker(2, input)
    w3 := worker(3, input)
    // Fan-in:合并3个worker输出
    result := merge(w1, w2, w3)
    for res := range result {
        fmt.Println("结果:", res)
    }
}

四、应用场景

Fan-out / Fan-in 非常适合如下场景:

应用场景 示例
并发抓取网页 多个 URL 同时请求并聚合结果
批量图像处理 多图片缩放或加水印
数据清洗与计算 并发处理 CSV/日志数据
大量任务排队处理 多任务分发并收集结果

五、注意事项

✅ 优点:

  • • 利用多核并发,显著提高处理效率;
  • • 模块清晰,生产者-工作者-聚合器分离;
  • • 易于扩展和监控。

⚠️ 注意事项:

  • • 输入通道必须是“广播型”,即可被多个 worker 消费;
  • • 合并函数 merge 要注意关闭输出通道;
  • • worker 中如有异常(如 panic)应提前恢复;
  • • 可加 context 实现取消控制;

六、小结

Fan-out / Fan-in 是构建并发处理流水线的核心模式,结合 goroutine 和 channel,可以构建高吞吐、高可扩展的数据处理系统。

 

相关文章
|
5月前
|
消息中间件 编解码 Kafka
Go语言并发模型与模式:Worker Pool 模式
Worker Pool(工作池)模式是Go语言中管理高并发任务的有效方法。通过限制 Goroutine 数量,避免资源耗尽或系统崩溃。其核心包括任务通道、工作者 Goroutine、结果通道(可选)及同步机制。示例代码展示了如何分配与处理任务,同时支持带返回值的实现。该模式适用于网络服务、批量任务处理、消息消费等场景,具有限制并发、提高稳定性和结构清晰的优点。但需注意通道关闭时机、任务取消机制及错误处理等问题。Worker Pool 是构建高效任务处理系统的强大工具。
|
Unix Linux 网络安全
【工具使用】SecureCRT的下载、安装图文详细过程介绍
【工具使用】SecureCRT的下载、安装图文详细过程介绍
1718 0
|
SQL 存储 消息中间件
大厂偏爱的Agent技术究竟是个啥
为了解释什么是Agent技术,我在网上搜了一圈,但没有找到想要的结果。反倒是搜到了不少Java Agent技术,要注意Java Agent技术指的是一种Java字节码修改技术,和本文要说的完全是两码事。 既然搜不到,我就说下自己的理解吧。Agent技术是在「客户端」机器上部署一个Agent进程,「客户端」与「服务端」的交互通过这个Agent进行代理,其中Agent与Client通常在同一主机,即可通过「localhost」进行访问。
1716 0
大厂偏爱的Agent技术究竟是个啥
|
API 开发工具 Android开发
AppsFlyer 研究(一)AppsFlyer SDK 集成(1)
AppsFlyer 研究(一)AppsFlyer SDK 集成
1992 0
|
5月前
|
存储 安全 BI
硬盘有坏道怎么办?硬盘坏道如何屏蔽?
本文简单介绍一下坏道的类型,然后介绍检测工具、坏道屏蔽方法,感兴趣的用户可以尝试。
|
5月前
|
Ubuntu 机器人 开发者
Docker环境下的ROS Noetic:Ubuntu 20.04 系统下的解决方案
这就是在Docker环境下安装ROS Noetic在Ubuntu 20.04系统的一种简单方法,希望能对你有所帮助。
631 16
|
5月前
|
人工智能 IDE 算法
通义灵码 AI IDE 上线!智能体+MCP 从手动调用工具过渡到“AI 主动调度资源”
编程智能体与 MCP 的结合,不只是“工具+助手”,而是一次范式上的跃迁——从“手动调用工具”过渡到“AI 主动调度资源”。
|
Java 应用服务中间件 API
如何安装与使用Java EE 8、Servlet 3.0及Apache Maven进行高效开发
【7月更文第1天】搭建高效Java EE 8开发环境,包括安装JDK、选择WildFly或Payara Server作为应用服务器,以及安装Apache Maven。使用Maven创建Servlet 3.0 Web项目,编写 HelloWorldServlet,打包部署到服务器,通过访问特定URL测试应用。这一流程助力开发者实现快速原型和大型项目开发。
500 0
|
供应链 搜索推荐 数据挖掘
电商ERP系统中电商API接口的应用
电商API接口在电子商务中扮演着至关重要的角色,它们允许开发者将电商功能集成到自己的应用程序中,实现商品检索、订单处理、支付、物流跟踪等功能。以下是关于电商API接口的应用:
|
缓存 NoSQL Java
【Redis】5、Redis 的分布式锁、Lua 脚本保证 Redis 命令的原子性
【Redis】5、Redis 的分布式锁、Lua 脚本保证 Redis 命令的原子性
886 0