Asynq是一个Go实现的分布式任务队列和异步处理库,基于redis,类似Ruby的sidekiq和Python的celery。Go生态类似的还有machinery和goworker
同时提供一个WebUI asynqmon,可以源码形式安装或使用Docker image, 还可以和Prometheus集成
docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon
,如果使用的是主机上的redis,还需加上 --redis-addr=host.docker.internal:6379
,否则会报错
即 docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon --redis-addr=host.docker.internal:6379
➜ asynq-demo git:(main) ✗ tree . ├── client.go ├── const.go ├── go.mod ├── go.sum └── server.go 0 directories, 5 files
其中const.go:
package main const ( redisAddr = "127.0.0.1:6379" redisPasswd = "" ) const ( TypeExampleTask = "shuang:asynq-task:example" )
client.go:
package main import ( "encoding/json" "fmt" "log" "time" "github.com/hibiken/asynq" ) type ExampleTaskPayload struct { UserID string Msg string // 业务需要的其他字段 } func NewExampleTask(userID string, msg string) (*asynq.Task, error) { payload, err := json.Marshal(ExampleTaskPayload{UserID: userID, Msg: msg}) if err != nil { return nil, err } return asynq.NewTask(TypeExampleTask, payload), nil } var client *asynq.Client func main() { client = asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr, Password: redisPasswd, DB: 0}) defer client.Close() //go startExampleTask() startExampleTask() //startGithubUpdate() // 定时触发 } func startExampleTask() { fmt.Println("开始执行一次性的任务") // 立刻执行 task1, err := NewExampleTask("10001", "mashangzhixing!") if err != nil { log.Fatalf("could not create task: %v", err) } info, err := client.Enqueue(task1) if err != nil { log.Fatalf("could not enqueue task: %v", err) } log.Printf("task1 -> enqueued task: id=%s queue=%s", info.ID, info.Queue) // 10秒后执行(定时执行) task2, err := NewExampleTask("10002", "10s houzhixing") if err != nil { log.Fatalf("could not create task: %v", err) } info, err = client.Enqueue(task2, asynq.ProcessIn(10*time.Second)) if err != nil { log.Fatalf("could not enqueue task: %v", err) } log.Printf("task2 -> enqueued task: id=%s queue=%s", info.ID, info.Queue) // 30s后执行(定时执行) task3, err := NewExampleTask("10003", "30s houzhixing") if err != nil { log.Fatalf("could not create task: %v", err) } theTime := time.Now().Add(30 * time.Second) info, err = client.Enqueue(task3, asynq.ProcessAt(theTime)) if err != nil { log.Fatalf("could not enqueue task: %v", err) } log.Printf("task3 -> enqueued task: id=%s queue=%s", info.ID, info.Queue) }
server.go:
package main import ( "context" "encoding/json" "fmt" "time" "github.com/davecgh/go-spew/spew" "github.com/hibiken/asynq" ) var AsynqServer *asynq.Server // 异步任务server func initTaskServer() error { // 初始化异步任务服务端 AsynqServer = asynq.NewServer( asynq.RedisClientOpt{ Addr: redisAddr, Password: redisPasswd, //与client对应 DB: 0, }, asynq.Config{ // Specify how many concurrent workers to use Concurrency: 100, // Optionally specify multiple queues with different priority. Queues: map[string]int{ "critical": 6, "default": 3, "low": 1, }, // See the godoc for other configuration options }, ) return nil } func main() { initTaskServer() mux := asynq.NewServeMux() mux.HandleFunc(TypeExampleTask, HandleExampleTask) // ...register other handlers... if err := AsynqServer.Run(mux); err != nil { fmt.Printf("could not run asynq server: %v", err) } } func HandleExampleTask(ctx context.Context, t *asynq.Task) error { res := make(map[string]string) spew.Dump("t.Payload() is:", t.Payload()) err := json.Unmarshal(t.Payload(), &res) if err != nil { fmt.Printf("rum session, can not parse payload: %s, err: %v", t.Payload(), err) return nil } //-----------具体处理逻辑------------ spew.Println("拿到的入参为:", res, "接下来将进行具体处理") fmt.Println() // 模拟具体的处理 time.Sleep(5 * time.Second) fmt.Println("--------------处理了5s,处理完成-----------------") return nil }
执行redis-server
清除redis中所有的key:
执行docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon --redis-addr=host.docker.internal:6379
执行 go run client.go const.go
(生产者,产生消息放入队列)
此时能看到redis中多个几个key
同时管理后台能看到队列的信息
执行 go run server.go const.go
(消费者,消费队列中的消息)
可以看到都被处理了
此时redis中的key:
此处的业务处理为模拟,实际可能是某个被触发后不需要马上执行的操作
实际试一下。通过一个定时器(24h执行一次),触发代码每天向github push当天的代码等内容。收到触发后无需马上执行(可能当时其他请求量高,机器资源紧张),可以先放入队列,延迟30min后实际去执行。
完整Demo push github的功能没有完全实现
另外可以配置队列的优先级,asynq队列如何配置队列优先级
// 初始化异步任务服务端 AsynqServer = asynq.NewServer( asynq.RedisClientOpt{ Addr: redisAddr, Password: redisPasswd, //与client对应 DB: 0, }, asynq.Config{ // Specify how many concurrent workers to use Concurrency: 100, // Optionally specify multiple queues with different priority. Queues: map[string]int{ "critical": 6,//关键队列中的任务将被处理 60% 的时间 "default": 3,//默认队列中的任务将被处理 30% 的时间 "low": 1,//低队列中的任务将被处理 10% 的时间 }, // See the godoc for other configuration options }, )