让消费数据处理更快版本2(有并发控制)-一次性并发获取或者初始化任务最快有效方式

简介: 让消费数据处理更快版本2(有并发控制)-一次性并发获取或者初始化任务最快有效方式

让消费数据处理更快版本2



一次性获取或者初始化所有数据并发模式


上个版本Go让消费速度更快我们重点有个LOOP,那就表明它是在后台一直运行的,那如果我们想一次性执行并发获取任务或者初始化任务,那该如何做呢?里面又有哪些需要注意的细节呢?show me the code,don‘t just say say!


代码


package main
import (
 "context"
 "fmt"
 "time"
)
// TaskRunner 运行task
type TaskRunner struct {
}
// Task 任务对象
type Task struct {
 Name  string
 Age  string
 IDCard  string
 Addr    string
}
func (taskRunner *TaskRunner)getConcurrency() int {
 //qps, _ := config.YAMLInt("qps") // 从配置文件获取qps
  qps := 5 //这里先写死 上线替换成从配置文件获取
 if qps <= 0 {
  return 5
 }
 return qps
}
func  (taskRunner *TaskRunner)Run(ctx context.Context, task Task) {
 // 这里你就可以对task做处理
 // 比如请求第三方接口
 // 比如插入数据
 // 比如通过task查询数据然后组装数据发送给kafka等消息队列
 // 比如发送通知等
 fmt.Println("消费结束", task.Name)
}
func (taskRunner *TaskRunner)initTask(ctx context.Context, tasks []Task) (interface{}, error) {
 if len(tasks) == 0 {
  return nil, nil
 }
 // 获取并发
 channelSize := taskRunner.getConcurrency()
 // 初始化并发pool
 pool := make(chan Task, channelSize)
 // 初始化并发goroutine
 for i := 0; i < channelSize; i++ {
  go func(id int) {
   start := time.Now()
   fmt.Println("初始化任务开始, runner_id=", id)
   for v := range pool { // 消费task
    taskRunner.Run(ctx, v)
   }
   fmt.Println("初始化任务结束, runner_id=", id, " duration=", time.Since(start))
  }(i)
 }
 // 塞数据
 go func() {
  for _, v := range tasks {
   pool <- v // 当然在往pool赛数据之前 你可以针对task v做些额外的初始化或者改变值行为 比如v.Addr="北京"改为v.Addr="上海"
  }
  close(pool)
 }()
 return tasks, nil
}
func main() {
 // 测试demo
 var tasks []Task
 for i:=0; i< 10; i++ {
  task := Task{
   Name: fmt.Sprintf("我是第%d个任务", i+1),
  }
  tasks = append(tasks, task)
 }
 runner := TaskRunner{}
 runner.initTask(context.Background(), tasks)
 time.Sleep(100*time.Second)
}


运行结果:


初始化任务开始, runner_id= 2
消费结束 我是第1个任务
消费结束 我是第2个任务
消费结束 我是第3个任务
消费结束 我是第4个任务
消费结束 我是第5个任务
消费结束 我是第6个任务
消费结束 我是第7个任务
消费结束 我是第8个任务
消费结束 我是第9个任务
消费结束 我是第10个任务
初始化任务结束, runner_id= 2  duration= 39.417µs
初始化任务开始, runner_id= 4
初始化任务结束, runner_id= 4  duration= 1.166µs
初始化任务开始, runner_id= 0
初始化任务开始, runner_id= 1
初始化任务开始, runner_id= 3
初始化任务结束, runner_id= 1  duration= 69.042µs
初始化任务结束, runner_id= 3  duration= 15.041µs
初始化任务结束, runner_id= 0  duration= 58.042µs


小结


这套模板主要是在web项目接口中或者说一直运行的异步任务中针对一次性初始化任务比较常见,大家可以拿来直接用,简单高效,欢迎有兴趣的同学一起交流哈。

目录
打赏
0
0
0
0
5
分享
相关文章
线程池关闭时未完成的任务如何保证数据的一致性?
保证线程池关闭时未完成任务的数据一致性需要综合运用多种方法和机制。通过备份与恢复、事务管理、任务状态记录与恢复、数据同步与协调、错误处理与补偿、监控与预警等手段的结合,以及结合具体业务场景进行分析和制定策略,能够最大程度地确保数据的一致性,保障系统的稳定运行和业务的顺利开展。同时,不断地优化和改进这些方法和机制,也是提高系统性能和可靠性的重要途径。
143 62
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的
实时计算 Flink版产品使用问题之如何确保多并发sink同时更新Redis值时,数据能按事件时间有序地更新并且保持一致性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
通用快照方案问题之调整Hystrix的信号量隔离模式的并发限制如何解决
通用快照方案问题之调整Hystrix的信号量隔离模式的并发限制如何解决
51 0
|
9月前
线程间的同步的方式有哪些
线程间的同步的方式有哪些
使用队列和事务实现采集数据实例流程
使用队列和事务实现采集数据实例流程
95 0
并发模式
并发模式并不是一种函数的运用、亦或者实际存在的东西。他是前人对于并发场景的运用总结与经验。他与23中设计模式一样。好啦,话不多说。开干
173 0
并发与并行 同步或异步
我们都知道,程序猿是一种逻辑性极强的生物,他们不擅言辞,不擅表达,但是他们能够用一种神秘的语言与机器进行沟通,知道怎么让机器听他们的。
118 0

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等