Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库

Asynq是一个Go实现的分布式任务队列和异步处理库,基于redis,类似Ruby的sidekiq和Python的celery。Go生态类似的还有machinery和goworker

微信截图_20230814183527.png

同时提供一个WebUI asynqmon,可以源码形式安装或使用Docker image, 还可以和Prometheus集成

docker run --rm  --name asynqmon -p 8080:8080  hibiken/asynqmon,如果使用的是主机上的redis,还需加上 --redis-addr=host.docker.internal:6379,否则会报错

docker run --rm  --name asynqmon -p 8080:8080  hibiken/asynqmon --redis-addr=host.docker.internal:6379

➜  asynq-demo git:(main) ✗ tree
.
├── client.go
├── const.go
├── go.mod
├── go.sum
└── server.go
0 directories, 5 files

其中const.go:

package main
const (
  redisAddr   = "127.0.0.1:6379"
  redisPasswd = ""
)
const (
  TypeExampleTask    = "shuang:asynq-task:example"
)

client.go:

package main
import (
  "encoding/json"
  "fmt"
  "log"
  "time"
  "github.com/hibiken/asynq"
)
type ExampleTaskPayload struct {
  UserID string
  Msg    string
  // 业务需要的其他字段
}
func NewExampleTask(userID string, msg string) (*asynq.Task, error) {
  payload, err := json.Marshal(ExampleTaskPayload{UserID: userID, Msg: msg})
  if err != nil {
    return nil, err
  }
  return asynq.NewTask(TypeExampleTask, payload), nil
}
var client *asynq.Client
func main() {
  client = asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr, Password: redisPasswd, DB: 0})
  defer client.Close()
  //go startExampleTask()
  startExampleTask()
  //startGithubUpdate() // 定时触发
}
func startExampleTask() {
  fmt.Println("开始执行一次性的任务")
  // 立刻执行
  task1, err := NewExampleTask("10001", "mashangzhixing!")
  if err != nil {
    log.Fatalf("could not create task: %v", err)
  }
  info, err := client.Enqueue(task1)
  if err != nil {
    log.Fatalf("could not enqueue task: %v", err)
  }
  log.Printf("task1 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)
  // 10秒后执行(定时执行)
  task2, err := NewExampleTask("10002", "10s houzhixing")
  if err != nil {
    log.Fatalf("could not create task: %v", err)
  }
  info, err = client.Enqueue(task2, asynq.ProcessIn(10*time.Second))
  if err != nil {
    log.Fatalf("could not enqueue task: %v", err)
  }
  log.Printf("task2 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)
  // 30s后执行(定时执行)
  task3, err := NewExampleTask("10003", "30s houzhixing")
  if err != nil {
    log.Fatalf("could not create task: %v", err)
  }
  theTime := time.Now().Add(30 * time.Second)
  info, err = client.Enqueue(task3, asynq.ProcessAt(theTime))
  if err != nil {
    log.Fatalf("could not enqueue task: %v", err)
  }
  log.Printf("task3 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)
}

server.go:

package main
import (
  "context"
  "encoding/json"
  "fmt"
  "time"
  "github.com/davecgh/go-spew/spew"
  "github.com/hibiken/asynq"
)
var AsynqServer *asynq.Server // 异步任务server
func initTaskServer() error {
  // 初始化异步任务服务端
  AsynqServer = asynq.NewServer(
    asynq.RedisClientOpt{
      Addr:     redisAddr,
      Password: redisPasswd, //与client对应
      DB:       0,
    },
    asynq.Config{
      // Specify how many concurrent workers to use
      Concurrency: 100,
      // Optionally specify multiple queues with different priority.
      Queues: map[string]int{
        "critical": 6,
        "default":  3,
        "low":      1,
      },
      // See the godoc for other configuration options
    },
  )
  return nil
}
func main() {
  initTaskServer()
  mux := asynq.NewServeMux()
  mux.HandleFunc(TypeExampleTask, HandleExampleTask)
  // ...register other handlers...
  if err := AsynqServer.Run(mux); err != nil {
    fmt.Printf("could not run asynq server: %v", err)
  }
}
func HandleExampleTask(ctx context.Context, t *asynq.Task) error {
  res := make(map[string]string)
  spew.Dump("t.Payload() is:", t.Payload())
  err := json.Unmarshal(t.Payload(), &res)
  if err != nil {
    fmt.Printf("rum session, can not parse payload: %s,  err: %v", t.Payload(), err)
    return nil
  }
  //-----------具体处理逻辑------------
  spew.Println("拿到的入参为:", res, "接下来将进行具体处理")
  fmt.Println()
  // 模拟具体的处理
  time.Sleep(5 * time.Second)
  fmt.Println("--------------处理了5s,处理完成-----------------")
  return nil
}

执行redis-server


清除redis中所有的key:


执行docker run --rm  --name asynqmon -p 8080:8080  hibiken/asynqmon --redis-addr=host.docker.internal:6379

微信截图_20230814183811.png

执行 go run client.go const.go (生产者,产生消息放入队列)

微信截图_20230814183822.png

此时能看到redis中多个几个key

微信截图_20230814195251.png

同时管理后台能看到队列的信息

微信截图_20230814195303.png

执行 go run server.go const.go (消费者,消费队列中的消息)

image.png

可以看到都被处理了

微信截图_20230814195406.png

此时redis中的key:

微信截图_20230814195447.png

此处的业务处理为模拟,实际可能是某个被触发后不需要马上执行的操作




实际试一下。通过一个定时器(24h执行一次),触发代码每天向github push当天的代码等内容。收到触发后无需马上执行(可能当时其他请求量高,机器资源紧张),可以先放入队列,延迟30min后实际去执行。

完整Demo  push github的功能没有完全实现


另外可以配置队列的优先级,asynq队列如何配置队列优先级

  // 初始化异步任务服务端
  AsynqServer = asynq.NewServer(
    asynq.RedisClientOpt{
      Addr:     redisAddr,
      Password: redisPasswd, //与client对应
      DB:       0,
    },
    asynq.Config{
      // Specify how many concurrent workers to use
      Concurrency: 100,
      // Optionally specify multiple queues with different priority.
      Queues: map[string]int{
        "critical": 6,//关键队列中的任务将被处理 60% 的时间
        "default":  3,//默认队列中的任务将被处理 30% 的时间
        "low":      1,//低队列中的任务将被处理 10% 的时间
      },
      // See the godoc for other configuration options
    },
  )
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
编解码 NoSQL Java
使用Spring Boot + Redis 队列实现视频文件上传及FFmpeg转码的技术分享
【8月更文挑战第30天】在当前的互联网应用中,视频内容的处理与分发已成为不可或缺的一部分。对于视频平台而言,高效、稳定地处理用户上传的视频文件,并对其进行转码以适应不同设备的播放需求,是提升用户体验的关键。本文将围绕使用Spring Boot结合Redis队列技术来实现视频文件上传及FFmpeg转码的过程,分享一系列技术干货。
200 3
|
5天前
|
算法 关系型数据库 MySQL
分布式唯一ID生成:深入理解Snowflake算法在Go中的实现
在分布式系统中,确保每个节点生成的 ID 唯一且高效至关重要。Snowflake 算法由 Twitter 开发,通过 64 位 long 型数字生成全局唯一 ID,包括 1 位标识位、41 位时间戳、10 位机器 ID 和 12 位序列号。该算法具备全局唯一性、递增性、高可用性和高性能,适用于高并发场景,如电商促销时的大量订单生成。本文介绍了使用 Go 语言的 `bwmarrin/snowflake` 和 `sony/sonyflake` 库实现 Snowflake 算法的方法。
18 1
分布式唯一ID生成:深入理解Snowflake算法在Go中的实现
|
10天前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
31 6
|
1月前
|
消息中间件 存储 NoSQL
如何用Redis实现延迟队列?
综上所述,通过Redis的有序集合和一些基本命令,我们可以轻松地构建出功能完善的延迟队列系统。根据具体需求,可以进一步优化和扩展,以满足高性能和高可靠性的业务需求。
33 1
|
2月前
|
NoSQL Go API
go语言操作Redis
go语言操作Redis
|
3月前
|
监控 Go API
带你十天轻松搞定 Go 微服务之大结局(分布式事务)
带你十天轻松搞定 Go 微服务之大结局(分布式事务)
|
3月前
|
存储 NoSQL 算法
Go 分布式令牌桶限流 + 兜底保障
Go 分布式令牌桶限流 + 兜底保障
|
3月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
90 2
|
3月前
|
消息中间件 存储 NoSQL
redis实战——go-redis的使用与redis基础数据类型的使用场景(一)
本文档介绍了如何使用 Go 语言中的 `go-redis` 库操作 Redis 数据库
175 0
redis实战——go-redis的使用与redis基础数据类型的使用场景(一)
|
2月前
|
消息中间件 NoSQL Go
PHP转Go系列 | ThinkPHP与Gin框架之Redis延时消息队列技术实践
【9月更文挑战第7天】在从 PHP 的 ThinkPHP 框架迁移到 Go 的 Gin 框架时,涉及 Redis 延时消息队列的技术实践主要包括:理解延时消息队列概念,其能在特定时间处理消息,适用于定时任务等场景;在 ThinkPHP 中使用 Redis 实现延时队列;在 Gin 中结合 Go 的 Redis 客户端库实现类似功能;Go 具有更高性能和简洁性,适合处理大量消息。迁移过程中需考虑业务需求及系统稳定性。