开发者社区> 问答> 正文

技术实践——教你用100行写一个 go 的协程池 (任务池)!!!

要解决这个问题, 要思考两个问题

  • goroutine 的数量如何限制, goroutine 如何重用

  • 任务如何执行

goroutine 的数量如何限制, goroutine 如何重用
说到限制和重用, 那么最先想到的就是池化。比如 TCP 连接池, 线程池, 都是有效限制、重用资源的最好实践。所以, 我们可以创建一个 goroutine 池, 用来管理 goroutine。

任务如何执行 在使用原生 goroutine 的场景中, 运行一个任务直接启动一个 goroutine 来运行, 在池化的场景而言, 任务也是要在 goroutine 中执行, 但是任务需要任务池来放入 goroutine。

生产者消费者模型
在连接池中, 连接在使用时从池中取出, 用完后放入池中。对于 goroutine 而言, goroutine 通过语言关键字启动, 无法像连接一样操作。那么如何让 goroutine 可以执行任务, 且执行后可以重新用来执行其它任务呢?这里就需要使用生产者消费者模型了:

生产者 --(生产任务)--> 队列 --(消费任务)--> 消费者

用来执行任务的 goroutine 可以作为消费者, 操作任务池的 goroutine 作为生产者, 而队列则可以使用 go 的 buffer channel, 任务池的建模到此结束。

加入阿里云钉钉群享福利:每周技术直播,定期群内有奖活动、大咖问答

409449315718661054.jpg

展开
收起
钉群小二 2020-01-09 13:30:22 5732 0
2 条回答
写回答
取消 提交回答
  • 不定期更新钉群圈子活动等内容

    实现
    Talk is cheap. Show me the code.

    任务的定义
    任务要包含需要执行的函数、以及函数要传的参数, 因为参数类型、个数不确定, 这里使用可变参数和空接口的形式

    type Task struct {
        Handler func(v ...interface{})
        Params  []interface{}
    }
    
    

    任务池的定义
    任务池的定义包括了池的容量 capacity、当前运行的 worker(goroutine)数量 runningWorkers、任务队列(channel)taskC、关闭任务池的 channel closeC 以及任务池的状态 state(运行中或已关闭, 用于安全关闭任务池)

    type Pool struct {
        capacity       uint64
        runningWorkers uint64
        state          int64
        taskC          chan *Task
        closeC         chan bool
    }
    
    

    任务池的构造函数:

    var ErrInvalidPoolCap = errors.New("invalid pool cap")
    
    const (
        RUNNING = 1
        STOPED = 0
    )
    
    func NewPool(capacity uint64) (*Pool, error) {
        if capacity <= 0 {
            return nil, ErrInvalidPoolCap
        }
        return &Pool{
            capacity: capacity,
            state:    RUNNING,
            // 初始化任务队列, 队列长度为容量
            taskC:    make(chan *Task, capacity),
            closeC:   make(chan bool),
        }, nil
    }
    

    启动 worker

    新建 run() 方法作为启动 worker 的方法:

    func (p *Pool) run() {
        p.runningWorkers++ // 运行中的任务加一
    
        go func() {
            defer func() {
                p.runningWorkers-- // worker 结束, 运行中的任务减一
            }()
    
            for {
                select { // 阻塞等待任务、结束信号到来
                case task, ok := <-p.taskC: // 从 channel 中消费任务
                    if !ok { // 如果 channel 被关闭, 结束 worker 运行
                        return
                    }
                    // 执行任务
                    task.Handler(task.Params...)
                case <-p.closeC: // 如果收到关闭信号, 结束 worker 运行
                    return
                }
            }
        }()
    }
    

    上述代码中, runningWorkers 的加减直接使用了自增运算, 但是考虑到启动多个 worker 时, runningWorkers 就会有数据竞争, 所以我们使用 sync.atomic 包来保证 runningWorkers 的自增操作是原子的。

    对 runningWorkers 的操作进行封装:

    func (p *Pool) incRunning() { // runningWorkers + 1
        atomic.AddUint64(&p.runningWorkers, 1)
    }
    
    func (p *Pool) decRunning() { // runningWorkers - 1
        atomic.AddUint64(&p.runningWorkers, ^uint64(0))
    }
    
    func (p *Pool) GetRunningWorkers() uint64 {
        return atomic.LoadUint64(&p.runningWorkers)
    }
    
    

    打铁乘热, 对于 capacity 的操作也考虑数据竞争, 封装 GetCap() 方法:

    func (p *Pool) GetCap() uint64 {
        return atomic.LoadUint64(&p.capacity)
    }
    run() 方法改造:
    
    func (p *Pool) run() {
        p.incRunning()
    
        go func() {
            defer func() {
                p.decRunning()
            }()
    
            for {
                select {
                case task, ok := <-p.taskC:
                    if !ok {
                        return
                    }
                    task.Handler(task.Params...)
                case <-p.closeC:
                    return
                }
            }
        }()
    }
    

    生产任务
    新建 Put() 方法用来将任务放入池中:

    func (p *Pool) Put(task *Task) {
    
        if p.GetRunningWorkers() < p.GetCap() { // 如果任务池满, 则不再创建 worker
            // 创建启动一个 worker
            p.run()
        }
        // 将任务推入队列, 等待消费
        p.taskC <- task
    }
    
    

    任务池安全关闭
    当有关闭任务池来节省 goroutine 资源的场景时, 我们需要有一个关闭任务池的方法。

    直接销毁 worker 关闭 channel 并不合适, 因为此时可能还有任务在队列中没有被消费掉。要确保所有任务被安全消费后再销毁掉 worker。

    首先, 在关闭任务池时, 需要先关闭掉生产任务的入口。改造 Put() 方法:

    var ErrPoolAlreadyClosed = errors.New("pool already closed")
    
    func (p *Pool) Put(task *Task) error {
    
        if p.state == STOPED { // 如果任务池处于关闭状态, 再 put 任务会返回 ErrPoolAlreadyClosed 错误
            return ErrPoolAlreadyClosed
        }
        
        if p.GetRunningWorkers() < p.GetCap() { 
            p.run()
        }
    
        p.taskC <- task
        
        return nil
    }
    

    在 run() 方法中已经对 closeC 进行了监听, 销毁 worker 只需等待任务被消费完后向 closeC 发出信号。Close() 方法如下:

    func (p *Pool) Close() {
        p.state = STOPED // 设置 state 为已停止
    
        for len(p.taskC) > 0 { // 阻塞等待所有任务被 worker 消费
        }
    
        p.closeC <- true // 发送销毁 worker 信号
        close(p.taskC) // 关闭任务队列
    }
    
    

    panic handler

    每个 worker 都是一个 goroutine, 如果 goroutine 中产生了 panic, 会导致整个程序崩溃。为了保证程序的安全进行, 任务池需要对每个 worker 中的 panic 进行 recover 操作, 并提供可订制的 panic handler。

    更新任务池定义:

    type Pool struct {
        capacity       uint64
        runningWorkers uint64
        state          int64
        taskC          chan *Task
        closeC         chan bool
        PanicHandler   func(interface{})
    }
    
    

    更新 run() 方法:

    func (p *Pool) run() {
        p.incRunning()
    
        go func() {
            defer func() {
                p.decRunning()
                if r := recover(); r != nil { // 恢复 panic
                    if p.PanicHandler != nil { // 如果设置了 PanicHandler, 调用
                        p.PanicHandler(r)
                    } else { // 默认处理
                        log.Printf("Worker panic: %s\n", r)
                    }
                }
            }()
    
            for {
                select {
                case task, ok := <-p.taskC:
                    if !ok {
                        return
                    }
                    task.Handler(task.Params...)
                case <-p.closeC:
                    return
                }
            }
        }()
    }
    

    使用 OK, 我们的任务池就这么简单的写好了, 试试:

    func main() {
        // 创建任务池
        pool, err := NewPool(10)
        if err != nil {
            panic(err)
        }
    
        for i := 0; i < 20; i++ {
            // 任务放入池中
            pool.Put(&Task{
                Handler: func(v ...interface{}) {
                    fmt.Println(v)
                },
                Params: []interface{}{i},
            })
        }
    
        time.Sleep(1e9) // 等待执行
    }
    

    详细例子见 mortar/examples

    benchmark 作为协程池, 性能和内存占用的指标测试肯定是少不了的, 测试数据才是最有说服力的

    测试流程 100w 次执行,原子增量操作

    测试任务:

    var wg = sync.WaitGroup{}
    
    var sum int64
    
    func demoTask(v ...interface{}) {
        defer wg.Done()
        for i := 0; i < 100; i++ {
            atomic.AddInt64(∑, 1)
        }
    }
    
    

    测试方法:

    var runTimes = 1000000 // 原生 goroutine func BenchmarkGoroutineTimeLifeSetTimes(b *testing.B) {

    for i := 0; i < runTimes; i++ {
        wg.Add(1)
        go demoTask2()
    }
    wg.Wait() // 等待执行完毕
    

    }

    // 使用协程池 func BenchmarkPoolTimeLifeSetTimes(b *testing.B) { pool, err := NewPool(20) if err != nil { b.Error(err) }

    task := &Task{
        Handler: demoTask2,
    }
    
    for i := 0; i < runTimes; i++ {
        wg.Add(1)
        pool.Put(task)
    }
    
    wg.Wait() // 等待执行完毕
    

    } 对比结果

    模式 操作时间消耗 ns/op 内存分配大小 B/op 内存分配次数 allocs/op 原生 goroutine (100w goroutine) 1596177880 103815552 240022 任务池开启 20 个 worker 20 goroutine) 1378909099 15312 89 使用任务池和原生 goroutine 性能相近(略好于原生)

    使用任务池比直接 goroutine 内存分配节省 7000 倍左右, 内存分配次数减少 2700 倍左右

    tips: 当任务为耗时任务时, 防止任务堆积(消费不过来)可以结合业务调整容量, 或根据业务控制每个任务的超时时间

    2020-01-09 14:02:09
    赞同 展开评论 打赏
  • 技术架构师 阿里云开发者社区技术专家博主 CSDN签约专栏技术博主 掘金签约技术博主 云安全联盟专家 众多开源代码库Commiter

    厉害厉害

    2020-01-09 13:58:42
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
gohbase :HBase go客户端 立即下载
Go构建日请求千亿级微服务实践 立即下载
fibjs 模块重构从回调到协程--陈垒 立即下载