在Go语言高并发程序中,若每来一个任务就新建一个 Goroutine,不加控制地并发可能会导致资源耗尽甚至系统崩溃。Worker Pool(工作池)模式可以有效地限制并发数量,实现资源的可控利用。
一、什么是 Worker Pool 模式
Worker Pool 模式通过固定数量的工作者(Worker Goroutines)来消费任务通道中的任务,从而达到控制并发数的目的。
组成要素包括:
- • 任务通道(Jobs):任务的来源;
- • Worker(工作者):处理任务的 Goroutine;
- • 结果通道(可选):传递任务执行结果;
- • 同步机制:如
sync.WaitGroup
等,等待所有任务完成。
二、基本实现示例
package main import ( "fmt" "sync" "time" ) func worker(id int, jobs <-chan int, wg *sync.WaitGroup) { defer wg.Done() for job := range jobs { fmt.Printf("Worker %d started job %d\n", id, job) time.Sleep(time.Second) // 模拟耗时任务 fmt.Printf("Worker %d finished job %d\n", id, job) } } func main() { const numJobs = 5 const numWorkers = 3 jobs := make(chan int, numJobs) var wg sync.WaitGroup // 启动固定数量的 worker for w := 1; w <= numWorkers; w++ { wg.Add(1) go worker(w, jobs, &wg) } // 分发任务 for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) // 所有任务已分发完毕 wg.Wait() // 等待所有 worker 执行完毕 }
输出示例(可能顺序不同):
Worker 1 started job 1 Worker 2 started job 2 Worker 3 started job 3 Worker 1 finished job 1 Worker 1 started job 4 Worker 2 finished job 2 Worker 2 started job 5 ...
三、带返回值的 Worker Pool(使用结果通道)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) { defer wg.Done() for job := range jobs { time.Sleep(time.Millisecond * 500) results <- job * 2 } } func main() { jobs := make(chan int, 5) results := make(chan int, 5) var wg sync.WaitGroup for w := 1; w <= 2; w++ { wg.Add(1) go worker(w, jobs, results, &wg) } for j := 1; j <= 5; j++ { jobs <- j } close(jobs) go func() { wg.Wait() close(results) }() for res := range results { fmt.Println("结果:", res) } }
四、使用场景
Worker Pool 模式适用于以下场景:
应用类型 | 示例 |
网络服务 | HTTP 请求处理、代理服务器 |
批量任务 | 图片处理、数据转码 |
消息消费 | Kafka、RabbitMQ 消费者池 |
异步处理 | 日志收集、邮件发送 |
五、优点与注意事项
✅ 优点:
- • 限制并发数,防止系统资源耗尽;
- • 提高任务执行效率与稳定性;
- • 结构清晰,便于扩展与维护。
⚠️ 注意事项:
- • 注意关闭通道的时机,避免死锁;
- • 如果任务较重,可配合
context
实现取消; - • 防止 Worker 泄漏或 panic 未捕获导致崩溃;
- • 可加入错误通道收集失败任务。
六、小结
Worker Pool 是高并发 Go 程序中一种简单而强大的模式,通过它我们可以有效管理 goroutine 的数量,构建健壮、可扩展的任务处理系统。