实现 Talk is cheap. Show me the code.
任务的定义 任务要包含需要执行的函数、以及函数要传的参数, 因为参数类型、个数不确定, 这里使用可变参数和空接口的形式
type Task struct {
Handler func(v ...interface{})
Params []interface{}
}
任务池的定义 任务池的定义包括了池的容量 capacity、当前运行的 worker(goroutine)数量 runningWorkers、任务队列(channel)taskC、关闭任务池的 channel closeC 以及任务池的状态 state(运行中或已关闭, 用于安全关闭任务池)
type Pool struct {
capacity uint64
runningWorkers uint64
state int64
taskC chan *Task
closeC chan bool
}
任务池的构造函数:
var ErrInvalidPoolCap = errors.New("invalid pool cap")
const (
RUNNING = 1
STOPED = 0
)
func NewPool(capacity uint64) (*Pool, error) {
if capacity <= 0 {
return nil, ErrInvalidPoolCap
}
return &Pool{
capacity: capacity,
state: RUNNING,
// 初始化任务队列, 队列长度为容量
taskC: make(chan *Task, capacity),
closeC: make(chan bool),
}, nil
}
启动 worker
新建 run() 方法作为启动 worker 的方法:
func (p *Pool) run() {
p.runningWorkers++ // 运行中的任务加一
go func() {
defer func() {
p.runningWorkers-- // worker 结束, 运行中的任务减一
}()
for {
select { // 阻塞等待任务、结束信号到来
case task, ok := <-p.taskC: // 从 channel 中消费任务
if !ok { // 如果 channel 被关闭, 结束 worker 运行
return
}
// 执行任务
task.Handler(task.Params...)
case <-p.closeC: // 如果收到关闭信号, 结束 worker 运行
return
}
}
}()
}
上述代码中, runningWorkers 的加减直接使用了自增运算, 但是考虑到启动多个 worker 时, runningWorkers 就会有数据竞争, 所以我们使用 sync.atomic 包来保证 runningWorkers 的自增操作是原子的。
对 runningWorkers 的操作进行封装:
func (p *Pool) incRunning() { // runningWorkers + 1
atomic.AddUint64(&p.runningWorkers, 1)
}
func (p *Pool) decRunning() { // runningWorkers - 1
atomic.AddUint64(&p.runningWorkers, ^uint64(0))
}
func (p *Pool) GetRunningWorkers() uint64 {
return atomic.LoadUint64(&p.runningWorkers)
}
打铁乘热, 对于 capacity 的操作也考虑数据竞争, 封装 GetCap() 方法:
func (p *Pool) GetCap() uint64 {
return atomic.LoadUint64(&p.capacity)
}
run() 方法改造:
func (p *Pool) run() {
p.incRunning()
go func() {
defer func() {
p.decRunning()
}()
for {
select {
case task, ok := <-p.taskC:
if !ok {
return
}
task.Handler(task.Params...)
case <-p.closeC:
return
}
}
}()
}
生产任务 新建 Put() 方法用来将任务放入池中:
func (p *Pool) Put(task *Task) {
if p.GetRunningWorkers() < p.GetCap() { // 如果任务池满, 则不再创建 worker
// 创建启动一个 worker
p.run()
}
// 将任务推入队列, 等待消费
p.taskC <- task
}
任务池安全关闭 当有关闭任务池来节省 goroutine 资源的场景时, 我们需要有一个关闭任务池的方法。
直接销毁 worker 关闭 channel 并不合适, 因为此时可能还有任务在队列中没有被消费掉。要确保所有任务被安全消费后再销毁掉 worker。
首先, 在关闭任务池时, 需要先关闭掉生产任务的入口。改造 Put() 方法:
var ErrPoolAlreadyClosed = errors.New("pool already closed")
func (p *Pool) Put(task *Task) error {
if p.state == STOPED { // 如果任务池处于关闭状态, 再 put 任务会返回 ErrPoolAlreadyClosed 错误
return ErrPoolAlreadyClosed
}
if p.GetRunningWorkers() < p.GetCap() {
p.run()
}
p.taskC <- task
return nil
}
在 run() 方法中已经对 closeC 进行了监听, 销毁 worker 只需等待任务被消费完后向 closeC 发出信号。Close() 方法如下:
func (p *Pool) Close() {
p.state = STOPED // 设置 state 为已停止
for len(p.taskC) > 0 { // 阻塞等待所有任务被 worker 消费
}
p.closeC <- true // 发送销毁 worker 信号
close(p.taskC) // 关闭任务队列
}
panic handler
每个 worker 都是一个 goroutine, 如果 goroutine 中产生了 panic, 会导致整个程序崩溃。为了保证程序的安全进行, 任务池需要对每个 worker 中的 panic 进行 recover 操作, 并提供可订制的 panic handler。
更新任务池定义:
type Pool struct {
capacity uint64
runningWorkers uint64
state int64
taskC chan *Task
closeC chan bool
PanicHandler func(interface{})
}
更新 run() 方法:
func (p *Pool) run() {
p.incRunning()
go func() {
defer func() {
p.decRunning()
if r := recover(); r != nil { // 恢复 panic
if p.PanicHandler != nil { // 如果设置了 PanicHandler, 调用
p.PanicHandler(r)
} else { // 默认处理
log.Printf("Worker panic: %s\n", r)
}
}
}()
for {
select {
case task, ok := <-p.taskC:
if !ok {
return
}
task.Handler(task.Params...)
case <-p.closeC:
return
}
}
}()
}
使用 OK, 我们的任务池就这么简单的写好了, 试试:
func main() {
// 创建任务池
pool, err := NewPool(10)
if err != nil {
panic(err)
}
for i := 0; i < 20; i++ {
// 任务放入池中
pool.Put(&Task{
Handler: func(v ...interface{}) {
fmt.Println(v)
},
Params: []interface{}{i},
})
}
time.Sleep(1e9) // 等待执行
}
详细例子见 mortar/examples
benchmark 作为协程池, 性能和内存占用的指标测试肯定是少不了的, 测试数据才是最有说服力的
测试流程 100w 次执行,原子增量操作
测试任务:
var wg = sync.WaitGroup{}
var sum int64
func demoTask(v ...interface{}) {
defer wg.Done()
for i := 0; i < 100; i++ {
atomic.AddInt64(∑, 1)
}
}
测试方法:
var runTimes = 1000000 // 原生 goroutine func BenchmarkGoroutineTimeLifeSetTimes(b *testing.B) {
for i := 0; i < runTimes; i++ {
wg.Add(1)
go demoTask2()
}
wg.Wait() // 等待执行完毕
}
// 使用协程池 func BenchmarkPoolTimeLifeSetTimes(b *testing.B) { pool, err := NewPool(20) if err != nil { b.Error(err) }
task := &Task{
Handler: demoTask2,
}
for i := 0; i < runTimes; i++ {
wg.Add(1)
pool.Put(task)
}
wg.Wait() // 等待执行完毕
} 对比结果
模式 操作时间消耗 ns/op 内存分配大小 B/op 内存分配次数 allocs/op 原生 goroutine (100w goroutine) 1596177880 103815552 240022 任务池开启 20 个 worker 20 goroutine) 1378909099 15312 89 使用任务池和原生 goroutine 性能相近(略好于原生)
使用任务池比直接 goroutine 内存分配节省 7000 倍左右, 内存分配次数减少 2700 倍左右
tips: 当任务为耗时任务时, 防止任务堆积(消费不过来)可以结合业务调整容量, 或根据业务控制每个任务的超时时间