Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库

Asynq是一个Go实现的分布式任务队列和异步处理库,基于redis,类似Ruby的sidekiq和Python的celery。Go生态类似的还有machinery和goworker

微信截图_20230814183527.png

同时提供一个WebUI asynqmon,可以源码形式安装或使用Docker image, 还可以和Prometheus集成

docker run --rm  --name asynqmon -p 8080:8080  hibiken/asynqmon,如果使用的是主机上的redis,还需加上 --redis-addr=host.docker.internal:6379,否则会报错

docker run --rm  --name asynqmon -p 8080:8080  hibiken/asynqmon --redis-addr=host.docker.internal:6379

➜  asynq-demo git:(main) ✗ tree
.
├── client.go
├── const.go
├── go.mod
├── go.sum
└── server.go
0 directories, 5 files

其中const.go:

package main
const (
  redisAddr   = "127.0.0.1:6379"
  redisPasswd = ""
)
const (
  TypeExampleTask    = "shuang:asynq-task:example"
)

client.go:

package main
import (
  "encoding/json"
  "fmt"
  "log"
  "time"
  "github.com/hibiken/asynq"
)
type ExampleTaskPayload struct {
  UserID string
  Msg    string
  // 业务需要的其他字段
}
func NewExampleTask(userID string, msg string) (*asynq.Task, error) {
  payload, err := json.Marshal(ExampleTaskPayload{UserID: userID, Msg: msg})
  if err != nil {
    return nil, err
  }
  return asynq.NewTask(TypeExampleTask, payload), nil
}
var client *asynq.Client
func main() {
  client = asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr, Password: redisPasswd, DB: 0})
  defer client.Close()
  //go startExampleTask()
  startExampleTask()
  //startGithubUpdate() // 定时触发
}
func startExampleTask() {
  fmt.Println("开始执行一次性的任务")
  // 立刻执行
  task1, err := NewExampleTask("10001", "mashangzhixing!")
  if err != nil {
    log.Fatalf("could not create task: %v", err)
  }
  info, err := client.Enqueue(task1)
  if err != nil {
    log.Fatalf("could not enqueue task: %v", err)
  }
  log.Printf("task1 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)
  // 10秒后执行(定时执行)
  task2, err := NewExampleTask("10002", "10s houzhixing")
  if err != nil {
    log.Fatalf("could not create task: %v", err)
  }
  info, err = client.Enqueue(task2, asynq.ProcessIn(10*time.Second))
  if err != nil {
    log.Fatalf("could not enqueue task: %v", err)
  }
  log.Printf("task2 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)
  // 30s后执行(定时执行)
  task3, err := NewExampleTask("10003", "30s houzhixing")
  if err != nil {
    log.Fatalf("could not create task: %v", err)
  }
  theTime := time.Now().Add(30 * time.Second)
  info, err = client.Enqueue(task3, asynq.ProcessAt(theTime))
  if err != nil {
    log.Fatalf("could not enqueue task: %v", err)
  }
  log.Printf("task3 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)
}

server.go:

package main
import (
  "context"
  "encoding/json"
  "fmt"
  "time"
  "github.com/davecgh/go-spew/spew"
  "github.com/hibiken/asynq"
)
var AsynqServer *asynq.Server // 异步任务server
func initTaskServer() error {
  // 初始化异步任务服务端
  AsynqServer = asynq.NewServer(
    asynq.RedisClientOpt{
      Addr:     redisAddr,
      Password: redisPasswd, //与client对应
      DB:       0,
    },
    asynq.Config{
      // Specify how many concurrent workers to use
      Concurrency: 100,
      // Optionally specify multiple queues with different priority.
      Queues: map[string]int{
        "critical": 6,
        "default":  3,
        "low":      1,
      },
      // See the godoc for other configuration options
    },
  )
  return nil
}
func main() {
  initTaskServer()
  mux := asynq.NewServeMux()
  mux.HandleFunc(TypeExampleTask, HandleExampleTask)
  // ...register other handlers...
  if err := AsynqServer.Run(mux); err != nil {
    fmt.Printf("could not run asynq server: %v", err)
  }
}
func HandleExampleTask(ctx context.Context, t *asynq.Task) error {
  res := make(map[string]string)
  spew.Dump("t.Payload() is:", t.Payload())
  err := json.Unmarshal(t.Payload(), &res)
  if err != nil {
    fmt.Printf("rum session, can not parse payload: %s,  err: %v", t.Payload(), err)
    return nil
  }
  //-----------具体处理逻辑------------
  spew.Println("拿到的入参为:", res, "接下来将进行具体处理")
  fmt.Println()
  // 模拟具体的处理
  time.Sleep(5 * time.Second)
  fmt.Println("--------------处理了5s,处理完成-----------------")
  return nil
}

执行redis-server


清除redis中所有的key:


执行docker run --rm  --name asynqmon -p 8080:8080  hibiken/asynqmon --redis-addr=host.docker.internal:6379

微信截图_20230814183811.png

执行 go run client.go const.go (生产者,产生消息放入队列)

微信截图_20230814183822.png

此时能看到redis中多个几个key

微信截图_20230814195251.png

同时管理后台能看到队列的信息

微信截图_20230814195303.png

执行 go run server.go const.go (消费者,消费队列中的消息)

image.png

可以看到都被处理了

微信截图_20230814195406.png

此时redis中的key:

微信截图_20230814195447.png

此处的业务处理为模拟,实际可能是某个被触发后不需要马上执行的操作




实际试一下。通过一个定时器(24h执行一次),触发代码每天向github push当天的代码等内容。收到触发后无需马上执行(可能当时其他请求量高,机器资源紧张),可以先放入队列,延迟30min后实际去执行。

完整Demo  push github的功能没有完全实现


另外可以配置队列的优先级,asynq队列如何配置队列优先级

  // 初始化异步任务服务端
  AsynqServer = asynq.NewServer(
    asynq.RedisClientOpt{
      Addr:     redisAddr,
      Password: redisPasswd, //与client对应
      DB:       0,
    },
    asynq.Config{
      // Specify how many concurrent workers to use
      Concurrency: 100,
      // Optionally specify multiple queues with different priority.
      Queues: map[string]int{
        "critical": 6,//关键队列中的任务将被处理 60% 的时间
        "default":  3,//默认队列中的任务将被处理 30% 的时间
        "low":      1,//低队列中的任务将被处理 10% 的时间
      },
      // See the godoc for other configuration options
    },
  )
目录
相关文章
|
4月前
|
存储 负载均衡 NoSQL
【赵渝强老师】Redis Cluster分布式集群
Redis Cluster是Redis的分布式存储解决方案,通过哈希槽(slot)实现数据分片,支持水平扩展,具备高可用性和负载均衡能力,适用于大规模数据场景。
341 2
|
4月前
|
存储 缓存 NoSQL
【📕分布式锁通关指南 12】源码剖析redisson如何利用Redis数据结构实现Semaphore和CountDownLatch
本文解析 Redisson 如何通过 Redis 实现分布式信号量(RSemaphore)与倒数闩(RCountDownLatch),利用 Lua 脚本与原子操作保障分布式环境下的同步控制,帮助开发者更好地理解其原理与应用。
270 6
|
3月前
|
消息中间件 缓存 NoSQL
Redis各类数据结构详细介绍及其在Go语言Gin框架下实践应用
这只是利用Go语言和Gin框架与Redis交互最基础部分展示;根据具体业务需求可能需要更复杂查询、事务处理或订阅发布功能实现更多高级特性应用场景。
284 86
|
5月前
|
存储 缓存 NoSQL
Redis核心数据结构与分布式锁实现详解
Redis 是高性能键值数据库,支持多种数据结构,如字符串、列表、集合、哈希、有序集合等,广泛用于缓存、消息队列和实时数据处理。本文详解其核心数据结构及分布式锁实现,帮助开发者提升系统性能与并发控制能力。
|
3月前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
215 0
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
3月前
|
缓存 NoSQL 关系型数据库
Redis缓存和分布式锁
Redis 是一种高性能的键值存储系统,广泛用于缓存、消息队列和内存数据库。其典型应用包括缓解关系型数据库压力,通过缓存热点数据提高查询效率,支持高并发访问。此外,Redis 还可用于实现分布式锁,解决分布式系统中的资源竞争问题。文章还探讨了缓存的更新策略、缓存穿透与雪崩的解决方案,以及 Redlock 算法等关键技术。
|
5月前
|
NoSQL Redis
Lua脚本协助Redis分布式锁实现命令的原子性
利用Lua脚本确保Redis操作的原子性是分布式锁安全性的关键所在,可以大幅减少由于网络分区、客户端故障等导致的锁无法正确释放的情况,从而在分布式系统中保证数据操作的安全性和一致性。在将这些概念应用于生产环境前,建议深入理解Redis事务与Lua脚本的工作原理以及分布式锁的可能问题和解决方案。
208 8
|
6月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
1598 7
|
7月前
|
NoSQL 算法 安全
redis分布式锁在高并发场景下的方案设计与性能提升
本文探讨了Redis分布式锁在主从架构下失效的问题及其解决方案。首先通过CAP理论分析,Redis遵循AP原则,导致锁可能失效。针对此问题,提出两种解决方案:Zookeeper分布式锁(追求CP一致性)和Redlock算法(基于多个Redis实例提升可靠性)。文章还讨论了可能遇到的“坑”,如加从节点引发超卖问题、建议Redis节点数为奇数以及持久化策略对锁的影响。最后,从性能优化角度出发,介绍了减少锁粒度和分段锁的策略,并结合实际场景(如下单重复提交、支付与取消订单冲突)展示了分布式锁的应用方法。
522 3
|
7月前
|
存储 NoSQL Java
从扣减库存场景来讲讲redis分布式锁中的那些“坑”
本文从一个简单的库存扣减场景出发,深入分析了高并发下的超卖问题,并逐步优化解决方案。首先通过本地锁解决单机并发问题,但集群环境下失效;接着引入Redis分布式锁,利用SETNX命令实现加锁,但仍存在死锁、锁过期等隐患。文章详细探讨了通过设置唯一标识、续命机制等方法完善锁的可靠性,并最终引出Redisson工具,其内置的锁续命和原子性操作极大简化了分布式锁的实现。最后,作者剖析了Redisson源码,揭示其实现原理,并预告后续关于主从架构下分布式锁的应用与性能优化内容。
343 0

热门文章

最新文章