在并发系统中,Fan-out / Fan-in 模式是一种经典的设计方式,用于在多个 goroutine 之间进行任务分发和结果聚合,常用于提高处理吞吐量和并发能力。
一、什么是 Fan-out / Fan-in 模式?
- • Fan-out(扇出):将任务从一个入口分发给多个 worker 并发执行。
- • Fan-in(扇入):将多个 worker 的结果汇聚到一个通道中进行统一处理。
这种模式适用于“多产一收”的数据处理流程,如数据抓取、批量计算等。
二、基本结构图
┌────────────┐ │ 任务生产者 │ └────┬───────┘ │ Fan-out ┌──────┴──────┐ ▼ ▼ ▼ Worker Worker Worker │ │ │ └──────┬──────┘ Fan-in │ ┌─────▼─────┐ │ 结果处理器 │ └───────────┘
三、代码示例
package main import ( "fmt" "math/rand" "sync" "time" ) func producer(count int) <-chan int { out := make(chan int) go func() { for i := 0; i < count; i++ { out <- i } close(out) }() return out } func worker(id int, in <-chan int) <-chan int { out := make(chan int) go func() { for job := range in { time.Sleep(time.Millisecond * time.Duration(rand.Intn(500))) // 模拟处理 fmt.Printf("Worker %d processed job %d\n", id, job) out <- job * 2 } close(out) }() return out } func merge(channels ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) output := func(c <-chan int) { for val := range c { out <- val } wg.Done() } wg.Add(len(channels)) for _, c := range channels { go output(c) } go func() { wg.Wait() close(out) }() return out } func main() { rand.Seed(time.Now().UnixNano()) input := producer(10) // Fan-out:启动3个worker处理任务 w1 := worker(1, input) w2 := worker(2, input) w3 := worker(3, input) // Fan-in:合并3个worker输出 result := merge(w1, w2, w3) for res := range result { fmt.Println("结果:", res) } }
四、应用场景
Fan-out / Fan-in 非常适合如下场景:
应用场景 | 示例 |
并发抓取网页 | 多个 URL 同时请求并聚合结果 |
批量图像处理 | 多图片缩放或加水印 |
数据清洗与计算 | 并发处理 CSV/日志数据 |
大量任务排队处理 | 多任务分发并收集结果 |
五、注意事项
✅ 优点:
- • 利用多核并发,显著提高处理效率;
- • 模块清晰,生产者-工作者-聚合器分离;
- • 易于扩展和监控。
⚠️ 注意事项:
- • 输入通道必须是“广播型”,即可被多个 worker 消费;
- • 合并函数
merge
要注意关闭输出通道; - • worker 中如有异常(如 panic)应提前恢复;
- • 可加
context
实现取消控制;
六、小结
Fan-out / Fan-in 是构建并发处理流水线的核心模式,结合 goroutine 和 channel,可以构建高吞吐、高可扩展的数据处理系统。