Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库

Asynq是一个Go实现的分布式任务队列和异步处理库,基于redis,类似Ruby的sidekiq和Python的celery。Go生态类似的还有machinery和goworker

微信截图_20230814183527.png

同时提供一个WebUI asynqmon,可以源码形式安装或使用Docker image, 还可以和Prometheus集成

docker run --rm  --name asynqmon -p 8080:8080  hibiken/asynqmon,如果使用的是主机上的redis,还需加上 --redis-addr=host.docker.internal:6379,否则会报错

docker run --rm  --name asynqmon -p 8080:8080  hibiken/asynqmon --redis-addr=host.docker.internal:6379

➜  asynq-demo git:(main) ✗ tree
.
├── client.go
├── const.go
├── go.mod
├── go.sum
└── server.go
0 directories, 5 files

其中const.go:

package main
const (
  redisAddr   = "127.0.0.1:6379"
  redisPasswd = ""
)
const (
  TypeExampleTask    = "shuang:asynq-task:example"
)

client.go:

package main
import (
  "encoding/json"
  "fmt"
  "log"
  "time"
  "github.com/hibiken/asynq"
)
type ExampleTaskPayload struct {
  UserID string
  Msg    string
  // 业务需要的其他字段
}
func NewExampleTask(userID string, msg string) (*asynq.Task, error) {
  payload, err := json.Marshal(ExampleTaskPayload{UserID: userID, Msg: msg})
  if err != nil {
    return nil, err
  }
  return asynq.NewTask(TypeExampleTask, payload), nil
}
var client *asynq.Client
func main() {
  client = asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr, Password: redisPasswd, DB: 0})
  defer client.Close()
  //go startExampleTask()
  startExampleTask()
  //startGithubUpdate() // 定时触发
}
func startExampleTask() {
  fmt.Println("开始执行一次性的任务")
  // 立刻执行
  task1, err := NewExampleTask("10001", "mashangzhixing!")
  if err != nil {
    log.Fatalf("could not create task: %v", err)
  }
  info, err := client.Enqueue(task1)
  if err != nil {
    log.Fatalf("could not enqueue task: %v", err)
  }
  log.Printf("task1 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)
  // 10秒后执行(定时执行)
  task2, err := NewExampleTask("10002", "10s houzhixing")
  if err != nil {
    log.Fatalf("could not create task: %v", err)
  }
  info, err = client.Enqueue(task2, asynq.ProcessIn(10*time.Second))
  if err != nil {
    log.Fatalf("could not enqueue task: %v", err)
  }
  log.Printf("task2 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)
  // 30s后执行(定时执行)
  task3, err := NewExampleTask("10003", "30s houzhixing")
  if err != nil {
    log.Fatalf("could not create task: %v", err)
  }
  theTime := time.Now().Add(30 * time.Second)
  info, err = client.Enqueue(task3, asynq.ProcessAt(theTime))
  if err != nil {
    log.Fatalf("could not enqueue task: %v", err)
  }
  log.Printf("task3 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)
}

server.go:

package main
import (
  "context"
  "encoding/json"
  "fmt"
  "time"
  "github.com/davecgh/go-spew/spew"
  "github.com/hibiken/asynq"
)
var AsynqServer *asynq.Server // 异步任务server
func initTaskServer() error {
  // 初始化异步任务服务端
  AsynqServer = asynq.NewServer(
    asynq.RedisClientOpt{
      Addr:     redisAddr,
      Password: redisPasswd, //与client对应
      DB:       0,
    },
    asynq.Config{
      // Specify how many concurrent workers to use
      Concurrency: 100,
      // Optionally specify multiple queues with different priority.
      Queues: map[string]int{
        "critical": 6,
        "default":  3,
        "low":      1,
      },
      // See the godoc for other configuration options
    },
  )
  return nil
}
func main() {
  initTaskServer()
  mux := asynq.NewServeMux()
  mux.HandleFunc(TypeExampleTask, HandleExampleTask)
  // ...register other handlers...
  if err := AsynqServer.Run(mux); err != nil {
    fmt.Printf("could not run asynq server: %v", err)
  }
}
func HandleExampleTask(ctx context.Context, t *asynq.Task) error {
  res := make(map[string]string)
  spew.Dump("t.Payload() is:", t.Payload())
  err := json.Unmarshal(t.Payload(), &res)
  if err != nil {
    fmt.Printf("rum session, can not parse payload: %s,  err: %v", t.Payload(), err)
    return nil
  }
  //-----------具体处理逻辑------------
  spew.Println("拿到的入参为:", res, "接下来将进行具体处理")
  fmt.Println()
  // 模拟具体的处理
  time.Sleep(5 * time.Second)
  fmt.Println("--------------处理了5s,处理完成-----------------")
  return nil
}

执行redis-server


清除redis中所有的key:


执行docker run --rm  --name asynqmon -p 8080:8080  hibiken/asynqmon --redis-addr=host.docker.internal:6379

微信截图_20230814183811.png

执行 go run client.go const.go (生产者,产生消息放入队列)

微信截图_20230814183822.png

此时能看到redis中多个几个key

微信截图_20230814195251.png

同时管理后台能看到队列的信息

微信截图_20230814195303.png

执行 go run server.go const.go (消费者,消费队列中的消息)

image.png

可以看到都被处理了

微信截图_20230814195406.png

此时redis中的key:

微信截图_20230814195447.png

此处的业务处理为模拟,实际可能是某个被触发后不需要马上执行的操作




实际试一下。通过一个定时器(24h执行一次),触发代码每天向github push当天的代码等内容。收到触发后无需马上执行(可能当时其他请求量高,机器资源紧张),可以先放入队列,延迟30min后实际去执行。

完整Demo  push github的功能没有完全实现


另外可以配置队列的优先级,asynq队列如何配置队列优先级

  // 初始化异步任务服务端
  AsynqServer = asynq.NewServer(
    asynq.RedisClientOpt{
      Addr:     redisAddr,
      Password: redisPasswd, //与client对应
      DB:       0,
    },
    asynq.Config{
      // Specify how many concurrent workers to use
      Concurrency: 100,
      // Optionally specify multiple queues with different priority.
      Queues: map[string]int{
        "critical": 6,//关键队列中的任务将被处理 60% 的时间
        "default":  3,//默认队列中的任务将被处理 30% 的时间
        "low":      1,//低队列中的任务将被处理 10% 的时间
      },
      // See the godoc for other configuration options
    },
  )
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
打赏
0
1
0
0
373
分享
相关文章
通义灵码 Rules 库合集来了,覆盖Java、TypeScript、Python、Go、JavaScript 等
通义灵码新上的外挂 Project Rules 获得了开发者的一致好评:最小成本适配我的开发风格、相当把团队经验沉淀下来,是个很好功能……
205 72
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
104 0
分布式爬虫框架Scrapy-Redis实战指南
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
532 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
|
2月前
|
Springboot使用Redis实现分布式锁
通过这些步骤和示例,您可以系统地了解如何在Spring Boot中使用Redis实现分布式锁,并在实际项目中应用。希望这些内容对您的学习和工作有所帮助。
224 83
|
8天前
|
新一代 Cron-Job分布式调度平台,v1.0.8版本发布,支持Go执行器SDK!
现代化的Cron-Job分布式任务调度平台,支持Go语言执行器SDK,多项核心优势优于其他调度平台。
32 8
|
1月前
|
go-carbon v2.6.0 重大版本更新,轻量级、语义化、对开发者友好的 golang 时间处理库
carbon 是一个轻量级、语义化、对开发者友好的 Golang 时间处理库,提供了对时间穿越、时间差值、时间极值、时间判断、星座、星座、农历、儒略日 / 简化儒略日、波斯历 / 伊朗历的支持
64 3
Redis分布式锁如何实现 ?
Redis分布式锁主要依靠一个SETNX指令实现的 , 这条命令的含义就是“SET if Not Exists”,即不存在的时候才会设置值。 只有在key不存在的情况下,将键key的值设置为value。如果key已经存在,则SETNX命令不做任何操作。 这个命令的返回值如下。 ● 命令在设置成功时返回1。 ● 命令在设置失败时返回0。 假设此时有线程A和线程B同时访问临界区代码,假设线程A首先执行了SETNX命令,并返回结果1,继续向下执行。而此时线程B再次执行SETNX命令时,返回的结果为0,则线程B不能继续向下执行。只有当线程A执行DELETE命令将设置的锁状态删除时,线程B才会成功执行S
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
本文介绍了如何通过Lua脚本在Redis中实现分布式锁的原子性操作,避免并发问题。首先讲解了Lua脚本的基本概念及其在Redis中的使用方法,包括通过`eval`指令执行Lua脚本和通过`script load`指令缓存脚本。接着详细展示了如何用Lua脚本实现加锁、解锁及可重入锁的功能,确保同一线程可以多次获取锁而不发生死锁。最后,通过代码示例演示了如何在实际业务中调用这些Lua脚本,确保锁操作的原子性和安全性。
143 6
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
Redis,分布式缓存演化之路
本文介绍了基于Redis的分布式缓存演化,探讨了分布式锁和缓存一致性问题及其解决方案。首先分析了本地缓存和分布式缓存的区别与优劣,接着深入讲解了分布式远程缓存带来的并发、缓存失效(穿透、雪崩、击穿)等问题及应对策略。文章还详细描述了如何使用Redis实现分布式锁,确保高并发场景下的数据一致性和系统稳定性。最后,通过双写模式和失效模式讨论了缓存一致性问题,并提出了多种解决方案,如引入Canal中间件等。希望这些内容能为读者在设计分布式缓存系统时提供有价值的参考。感谢您的阅读!
156 6
Redis,分布式缓存演化之路
|
2月前
|
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
本文深入探讨了基于Redis实现分布式锁时遇到的细节问题及解决方案。首先,针对锁续期问题,提出了通过独立服务、获取锁进程自己续期和异步线程三种方式,并详细介绍了如何利用Lua脚本和守护线程实现自动续期。接着,解决了锁阻塞问题,引入了带超时时间的`tryLock`机制,确保在高并发场景下不会无限等待锁。最后,作为知识扩展,讲解了RedLock算法原理及其在实际业务中的局限性。文章强调,在并发量不高的场景中手写分布式锁可行,但推荐使用更成熟的Redisson框架来实现分布式锁,以保证系统的稳定性和可靠性。
86 0
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
下一篇
oss创建bucket
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等