让消费数据处理更快版本2(有并发控制)-一次性并发获取或者初始化任务最快有效方式

简介: 让消费数据处理更快版本2(有并发控制)-一次性并发获取或者初始化任务最快有效方式

让消费数据处理更快版本2



一次性获取或者初始化所有数据并发模式


上个版本Go让消费速度更快我们重点有个LOOP,那就表明它是在后台一直运行的,那如果我们想一次性执行并发获取任务或者初始化任务,那该如何做呢?里面又有哪些需要注意的细节呢?show me the code,don‘t just say say!


代码


package main
import (
 "context"
 "fmt"
 "time"
)
// TaskRunner 运行task
type TaskRunner struct {
}
// Task 任务对象
type Task struct {
 Name  string
 Age  string
 IDCard  string
 Addr    string
}
func (taskRunner *TaskRunner)getConcurrency() int {
 //qps, _ := config.YAMLInt("qps") // 从配置文件获取qps
  qps := 5 //这里先写死 上线替换成从配置文件获取
 if qps <= 0 {
  return 5
 }
 return qps
}
func  (taskRunner *TaskRunner)Run(ctx context.Context, task Task) {
 // 这里你就可以对task做处理
 // 比如请求第三方接口
 // 比如插入数据
 // 比如通过task查询数据然后组装数据发送给kafka等消息队列
 // 比如发送通知等
 fmt.Println("消费结束", task.Name)
}
func (taskRunner *TaskRunner)initTask(ctx context.Context, tasks []Task) (interface{}, error) {
 if len(tasks) == 0 {
  return nil, nil
 }
 // 获取并发
 channelSize := taskRunner.getConcurrency()
 // 初始化并发pool
 pool := make(chan Task, channelSize)
 // 初始化并发goroutine
 for i := 0; i < channelSize; i++ {
  go func(id int) {
   start := time.Now()
   fmt.Println("初始化任务开始, runner_id=", id)
   for v := range pool { // 消费task
    taskRunner.Run(ctx, v)
   }
   fmt.Println("初始化任务结束, runner_id=", id, " duration=", time.Since(start))
  }(i)
 }
 // 塞数据
 go func() {
  for _, v := range tasks {
   pool <- v // 当然在往pool赛数据之前 你可以针对task v做些额外的初始化或者改变值行为 比如v.Addr="北京"改为v.Addr="上海"
  }
  close(pool)
 }()
 return tasks, nil
}
func main() {
 // 测试demo
 var tasks []Task
 for i:=0; i< 10; i++ {
  task := Task{
   Name: fmt.Sprintf("我是第%d个任务", i+1),
  }
  tasks = append(tasks, task)
 }
 runner := TaskRunner{}
 runner.initTask(context.Background(), tasks)
 time.Sleep(100*time.Second)
}


运行结果:


初始化任务开始, runner_id= 2
消费结束 我是第1个任务
消费结束 我是第2个任务
消费结束 我是第3个任务
消费结束 我是第4个任务
消费结束 我是第5个任务
消费结束 我是第6个任务
消费结束 我是第7个任务
消费结束 我是第8个任务
消费结束 我是第9个任务
消费结束 我是第10个任务
初始化任务结束, runner_id= 2  duration= 39.417µs
初始化任务开始, runner_id= 4
初始化任务结束, runner_id= 4  duration= 1.166µs
初始化任务开始, runner_id= 0
初始化任务开始, runner_id= 1
初始化任务开始, runner_id= 3
初始化任务结束, runner_id= 1  duration= 69.042µs
初始化任务结束, runner_id= 3  duration= 15.041µs
初始化任务结束, runner_id= 0  duration= 58.042µs


小结


这套模板主要是在web项目接口中或者说一直运行的异步任务中针对一次性初始化任务比较常见,大家可以拿来直接用,简单高效,欢迎有兴趣的同学一起交流哈。

相关文章
|
3月前
|
分布式计算 关系型数据库 MySQL
DataWork数据处理问题之调整并发数量如何解决
DataWork数据处理是指使用DataWorks平台进行数据开发、数据处理和数据治理的活动;本合集将涵盖DataWork数据处理的工作流程、工具使用和问题排查,帮助用户提高数据处理的效率和质量。
|
22天前
|
运维 分布式计算 算法
函数计算产品使用问题之如何使用重试机制
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
5天前
|
NoSQL Redis
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的
|
5天前
|
存储 Kubernetes 测试技术
阿里云块存储问题之生产代码与测试代码需要同步原子提交如何解决
阿里云块存储问题之生产代码与测试代码需要同步原子提交如何解决
16 0
|
1月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之全量和增量同步数据的一致性、不丢失和不重复读取可以通过什么方式保证
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
27天前
|
Java
通用快照方案问题之调整Hystrix的信号量隔离模式的并发限制如何解决
通用快照方案问题之调整Hystrix的信号量隔离模式的并发限制如何解决
12 0
|
30天前
|
存储 缓存 NoSQL
架构设计篇问题之在数据割接过程中,多线程处理会导致数据错乱和重复问题如何解决
架构设计篇问题之在数据割接过程中,多线程处理会导致数据错乱和重复问题如何解决
|
2月前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何确保多并发sink同时更新Redis值时,数据能按事件时间有序地更新并且保持一致性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
消息中间件 缓存 并行计算
每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习
每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习
|
3月前
|
SQL 安全 算法
在高并发情况下,如何做到安全的修改同一行数据?
在高并发情况下,如何做到安全的修改同一行数据?