让消费数据处理更快版本2(有并发控制)-一次性并发获取或者初始化任务最快有效方式

简介: 让消费数据处理更快版本2(有并发控制)-一次性并发获取或者初始化任务最快有效方式

让消费数据处理更快版本2



一次性获取或者初始化所有数据并发模式


上个版本Go让消费速度更快我们重点有个LOOP,那就表明它是在后台一直运行的,那如果我们想一次性执行并发获取任务或者初始化任务,那该如何做呢?里面又有哪些需要注意的细节呢?show me the code,don‘t just say say!


代码


package main
import (
 "context"
 "fmt"
 "time"
)
// TaskRunner 运行task
type TaskRunner struct {
}
// Task 任务对象
type Task struct {
 Name  string
 Age  string
 IDCard  string
 Addr    string
}
func (taskRunner *TaskRunner)getConcurrency() int {
 //qps, _ := config.YAMLInt("qps") // 从配置文件获取qps
  qps := 5 //这里先写死 上线替换成从配置文件获取
 if qps <= 0 {
  return 5
 }
 return qps
}
func  (taskRunner *TaskRunner)Run(ctx context.Context, task Task) {
 // 这里你就可以对task做处理
 // 比如请求第三方接口
 // 比如插入数据
 // 比如通过task查询数据然后组装数据发送给kafka等消息队列
 // 比如发送通知等
 fmt.Println("消费结束", task.Name)
}
func (taskRunner *TaskRunner)initTask(ctx context.Context, tasks []Task) (interface{}, error) {
 if len(tasks) == 0 {
  return nil, nil
 }
 // 获取并发
 channelSize := taskRunner.getConcurrency()
 // 初始化并发pool
 pool := make(chan Task, channelSize)
 // 初始化并发goroutine
 for i := 0; i < channelSize; i++ {
  go func(id int) {
   start := time.Now()
   fmt.Println("初始化任务开始, runner_id=", id)
   for v := range pool { // 消费task
    taskRunner.Run(ctx, v)
   }
   fmt.Println("初始化任务结束, runner_id=", id, " duration=", time.Since(start))
  }(i)
 }
 // 塞数据
 go func() {
  for _, v := range tasks {
   pool <- v // 当然在往pool赛数据之前 你可以针对task v做些额外的初始化或者改变值行为 比如v.Addr="北京"改为v.Addr="上海"
  }
  close(pool)
 }()
 return tasks, nil
}
func main() {
 // 测试demo
 var tasks []Task
 for i:=0; i< 10; i++ {
  task := Task{
   Name: fmt.Sprintf("我是第%d个任务", i+1),
  }
  tasks = append(tasks, task)
 }
 runner := TaskRunner{}
 runner.initTask(context.Background(), tasks)
 time.Sleep(100*time.Second)
}


运行结果:


初始化任务开始, runner_id= 2
消费结束 我是第1个任务
消费结束 我是第2个任务
消费结束 我是第3个任务
消费结束 我是第4个任务
消费结束 我是第5个任务
消费结束 我是第6个任务
消费结束 我是第7个任务
消费结束 我是第8个任务
消费结束 我是第9个任务
消费结束 我是第10个任务
初始化任务结束, runner_id= 2  duration= 39.417µs
初始化任务开始, runner_id= 4
初始化任务结束, runner_id= 4  duration= 1.166µs
初始化任务开始, runner_id= 0
初始化任务开始, runner_id= 1
初始化任务开始, runner_id= 3
初始化任务结束, runner_id= 1  duration= 69.042µs
初始化任务结束, runner_id= 3  duration= 15.041µs
初始化任务结束, runner_id= 0  duration= 58.042µs


小结


这套模板主要是在web项目接口中或者说一直运行的异步任务中针对一次性初始化任务比较常见,大家可以拿来直接用,简单高效,欢迎有兴趣的同学一起交流哈。

相关文章
|
6月前
|
分布式计算 关系型数据库 MySQL
DataWork数据处理问题之调整并发数量如何解决
DataWork数据处理是指使用DataWorks平台进行数据开发、数据处理和数据治理的活动;本合集将涵盖DataWork数据处理的工作流程、工具使用和问题排查,帮助用户提高数据处理的效率和质量。
|
2月前
|
canal 缓存 NoSQL
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
根据对一致性的要求程度,提出多种解决方案:同步删除、同步删除+可靠消息、延时双删、异步监听+可靠消息、多重保障方案
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
|
3月前
|
NoSQL Redis
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的
|
3月前
|
存储 Kubernetes 测试技术
阿里云块存储问题之生产代码与测试代码需要同步原子提交如何解决
阿里云块存储问题之生产代码与测试代码需要同步原子提交如何解决
37 0
|
4月前
|
Java
通用快照方案问题之调整Hystrix的信号量隔离模式的并发限制如何解决
通用快照方案问题之调整Hystrix的信号量隔离模式的并发限制如何解决
29 0
|
5月前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何确保多并发sink同时更新Redis值时,数据能按事件时间有序地更新并且保持一致性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
消息中间件 缓存 并行计算
每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习
每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习
|
6月前
|
缓存 NoSQL Java
【亮剑】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护,如何使用注解来实现 Redis 分布式锁的功能?
【4月更文挑战第30天】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护。基于 Redis 的分布式锁利用 SETNX 或 SET 命令实现,并考虑自动过期、可重入及原子性以确保可靠性。在 Java Spring Boot 中,可通过 `@EnableCaching`、`@Cacheable` 和 `@CacheEvict` 注解轻松实现 Redis 分布式锁功能。
122 0
|
6月前
|
SQL 安全 算法
在高并发情况下,如何做到安全的修改同一行数据?
在高并发情况下,如何做到安全的修改同一行数据?
|
11月前
|
NoSQL Cloud Native Redis
【性能优化下】组织结构同步优化二,全量同步/增量同步,断点续传实现方式
【性能优化下】组织结构同步优化二,全量同步/增量同步,断点续传实现方式