本文为 工作用Go: 异步任务怎么写 系列的第6篇
如果只是 异步一下, 上面讲解的内容也基本够用了; 如果有重度异步任务使用, 就得考虑专业的异步任务队列框架了, Go 中可以选择 Async
Asynq Features
- Guaranteed at least one execution of a task
- Scheduling of tasks
- Retries of failed tasks
- Automatic recovery of tasks in the event of a worker crash
- Weighted priority queues
- Strict priority queues
- Low latency to add a task since writes are fast in Redis
- De-duplication of tasks using unique option
- Allow timeout and deadline per task
- Allow aggregating group of tasks to batch multiple successive operations
- Flexible handler interface with support for middlewares
Ability to pause queue
to stop processing tasks from the queue- Periodic Tasks
- Support Redis Cluster for automatic sharding and high availability
- Support Redis Sentinels for high availability
- Integration with Prometheus to collect and visualize queue metrics
Web UI
to inspect and remote-control queues and tasksCLI
to inspect and remote-control queues and tasks
整体架构图
实际使用
使用的 demo 就不贴了, asynq 的文档很详细, 说一下具体实践中遇到的 2个 case:
- 使用
web UI
: 处于安全考虑, 设置了ReadOnly
h :=asynqmon.New(asynqmon.Options{ RootPath: "/monitoring", // RootPath specifies the root for asynqmon appRedisConnOpt: tasks.GetRedis(), ReadOnly: true, // admin web can't operation}) r :=mux.NewRouter() r.PathPrefix(h.RootPath()).Handler(h) srv :=&http.Server{ Handler: r, Addr: ":8080", }
PS: 使用
web UI
由于涉及到使用新的端口, 而应用部署已经上 k8s 了, 如何顺利访问就需要一系列运维操作, 留个坑, 以后有机会再填
- 测试环境OK, 线上报错:
recoverer: could not move task to archive: INTERNAL_ERROR: redis eval error: ERR 'asynq:{}:t:' and 'asynq:{}:active' not in the same slot
对比发现, 是测试和线上使用的不同类型的 redis 实例导致的, 搜索云服务的帮助文档:
Redis实例类型差异
对比项 |
单机/主备 |
Proxy集群 |
Cluster集群 |
兼容Redis版本 |
兼容社区Redis 3.0、4.0、5.0。 Redis 6.0兼容社区KeyDB(当前只支持主备实例)。 可在购买实例时选择版本号。 |
兼容社区3.0、4.0和5.0版本。 |
兼容开源社区4.0/5.0版本。 可在购买集群实例时选择版本号。 |
特性支持 |
|
|
|
特性限制 |
单机不支持持久化。 |
||
客户端协议 |
使用传统Redis客户端即可。 |
使用传统Redis客户端即可,不需要支持Redis Cluster协议。 |
需要客户端支持Redis Cluster协议。 |
命令限制 |
单机和主备实例不支持的Redis命令,请参考表 Redis4.0单机和主备禁用命令和表 Redis 5.0单机和主备禁用命令。 |
Proxy集群实例不支持的Redis命令,请参考表 Redis3.0 Proxy集群实例禁用命令、表8和表8。 |
Cluster集群不支持的Redis命令,请参考表 Redis4.0 Cluster集群禁用命令和表 Redis5.0 Cluster集群禁用命令。 |
副本数 |
单机实例为单副本,只有一个节点。 主备实例为双副本,目前Redis 3.0、Redis 6.0主备不支持自定义副本数,默认为一主一从的架构。在创建Redis 4.0、5.0主备实例时,支持自定义副本数,形成一主多从的架构。 |
每个集群分片都为双副本,但不支持为分片新增副本,每个分片是一主一从的架构。 |
每个集群分片默认为双副本,支持自定义副本数,可以是一主多从的架构。在创建实例时,也可以定义为单副本,单副本表示实例只有主节点,无法保障数据高可靠。 |
集群架构实例的命令限制: 如需在集群架构实例中执行下述受限制的命令,请使用hash tag确保命令所要操作的key都分布在1个hash slot中
但是查看 asqnq 源码: 以 enqueue
操作为例, lua 操作中的部分 key 无法通过外部添加 hash tag
// github.com/hibiken/asynq/internal/rdb/rdb.go// enqueueCmd enqueues a given task message.//// Input:// KEYS[1] -> asynq:{<qname>}:t:<task_id>// KEYS[2] -> asynq:{<qname>}:pending// --// ARGV[1] -> task message data// ARGV[2] -> task ID// ARGV[3] -> current unix time in nsec//// Output:// Returns 1 if successfully enqueued// Returns 0 if task ID already existsvarenqueueCmd=redis.NewScript(`if redis.call("EXISTS", KEYS[1]) == 1 thenreturn 0endredis.call("HSET", KEYS[1],"msg", ARGV[1],"state", "pending","pending_since", ARGV[3])redis.call("LPUSH", KEYS[2], ARGV[2])return 1`)
最终, 通过使用线上另一台主从版redis解决问题
写在最后
到这里, 工作用Go: 异步任务怎么写
就暂时告一段落了, 这个过程中:
- 一些计算机基础概念的理解: 同步与异步, 异步与任务编排, 协程与异步, 协程与生命周期
- 一些 Go 语言的基础知识以及基础不牢地动山摇的坑: 野生Goroutine, panic&recover
- 可观测的实践之一: trace
- 专业的异步任务框架 Asynq 以及踩坑记
一起拥抱变化, 直面问题和挑战, 不断精进, 我们下个话题再见👋🏻.