Go语言并发模式:扇出/扇入的实用技巧
在Go并发编程中,扇出/扇入模式是处理大量任务的利器。今天分享一个实际项目中验证过的实现技巧。
问题场景
假设我们需要并发处理1000个URL,每个URL请求耗时不等,需要控制并发数并收集所有结果。扇出阶段分发任务,扇入阶段聚合结果。
实现方案
func worker(id int, jobs <-chan string, results chan<- string) {
for job := range jobs {
// 模拟耗时操作
result := fmt.Sprintf("worker %d processed %s", id, job)
results <- result
}
}
func fanIn(results ...<-chan string) <-chan string {
var wg sync.WaitGroup
out := make(chan string)
output := func(c <-chan string) {
for v := range c {
out <- v
}
wg.Done()
}
wg.Add(len(results))
for _, c := range results {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
关键技巧
- 有界队列:使用带缓冲的channel避免goroutine阻塞
- 优雅退出:通过sync.WaitGroup确保所有worker完成后再关闭结果通道
- 错误处理:建议在results channel中传递结构体,包含error字段
性能优化
- 根据CPU核心数设置worker池大小
- 使用context超时控制避免goroutine泄漏
- 考虑使用
semaphore库实现更精细的并发控制
这个模式在我处理百万级日志分析时表现优异,将处理时间从小时级降至分钟级。掌握扇出/扇入,Go并发编程将如虎添翼。