让消费数据处理更快版本2
一次性获取或者初始化所有数据并发模式
上个版本Go让消费速度更快我们重点有个LOOP,那就表明它是在后台一直运行的,那如果我们想一次性执行并发获取任务或者初始化任务,那该如何做呢?里面又有哪些需要注意的细节呢?show me the code,don‘t just say say!
代码
package main import ( "context" "fmt" "time" ) // TaskRunner 运行task type TaskRunner struct { } // Task 任务对象 type Task struct { Name string Age string IDCard string Addr string } func (taskRunner *TaskRunner)getConcurrency() int { //qps, _ := config.YAMLInt("qps") // 从配置文件获取qps qps := 5 //这里先写死 上线替换成从配置文件获取 if qps <= 0 { return 5 } return qps } func (taskRunner *TaskRunner)Run(ctx context.Context, task Task) { // 这里你就可以对task做处理 // 比如请求第三方接口 // 比如插入数据 // 比如通过task查询数据然后组装数据发送给kafka等消息队列 // 比如发送通知等 fmt.Println("消费结束", task.Name) } func (taskRunner *TaskRunner)initTask(ctx context.Context, tasks []Task) (interface{}, error) { if len(tasks) == 0 { return nil, nil } // 获取并发 channelSize := taskRunner.getConcurrency() // 初始化并发pool pool := make(chan Task, channelSize) // 初始化并发goroutine for i := 0; i < channelSize; i++ { go func(id int) { start := time.Now() fmt.Println("初始化任务开始, runner_id=", id) for v := range pool { // 消费task taskRunner.Run(ctx, v) } fmt.Println("初始化任务结束, runner_id=", id, " duration=", time.Since(start)) }(i) } // 塞数据 go func() { for _, v := range tasks { pool <- v // 当然在往pool赛数据之前 你可以针对task v做些额外的初始化或者改变值行为 比如v.Addr="北京"改为v.Addr="上海" } close(pool) }() return tasks, nil } func main() { // 测试demo var tasks []Task for i:=0; i< 10; i++ { task := Task{ Name: fmt.Sprintf("我是第%d个任务", i+1), } tasks = append(tasks, task) } runner := TaskRunner{} runner.initTask(context.Background(), tasks) time.Sleep(100*time.Second) }
运行结果:
初始化任务开始, runner_id= 2 消费结束 我是第1个任务 消费结束 我是第2个任务 消费结束 我是第3个任务 消费结束 我是第4个任务 消费结束 我是第5个任务 消费结束 我是第6个任务 消费结束 我是第7个任务 消费结束 我是第8个任务 消费结束 我是第9个任务 消费结束 我是第10个任务 初始化任务结束, runner_id= 2 duration= 39.417µs 初始化任务开始, runner_id= 4 初始化任务结束, runner_id= 4 duration= 1.166µs 初始化任务开始, runner_id= 0 初始化任务开始, runner_id= 1 初始化任务开始, runner_id= 3 初始化任务结束, runner_id= 1 duration= 69.042µs 初始化任务结束, runner_id= 3 duration= 15.041µs 初始化任务结束, runner_id= 0 duration= 58.042µs
小结
这套模板主要是在web项目接口中或者说一直运行的异步任务中针对一次性初始化任务比较常见,大家可以拿来直接用,简单高效,欢迎有兴趣的同学一起交流哈。