背景介绍
在响应应用请求的过程中, 有时候会遇到比较耗时的任务, 比如给用户发送邮件, 耗时任务时间不可能控, 很可能超过 1s, 为了给用户比较好的体验, 一般会控制请求响应时间(RT, response time)在300ms内(不考虑网络波动), 甚至在 200ms 内. 面对这样的工作场景, 就需要使用异步任务进行处理. 下面将围绕 工作用Go: 异步任务怎么写
这个话题展开
Go协程与异步
从一段简单的代码开始:
func TestTask(t *testing.T) { task() log.Print("req done") } func task() { // 模拟耗时任务 time.Sleep(time.Second) log.Print("task done") }
- 代码在 Goland 中编写, 同时也推荐使用 Goland 进行 Go 开发
- 这里使用单测(
test
)演示代码:
- 输入
test
就可以快速生成代码(Goland 中称之为live templates
, 其实就是预设好的代码片段) - 快速执行: 1. 点击左侧(gutter icon)的运行图标; 2. 函数上右键菜单键; 3. 快捷键
ctl-shift-R
上面使用 task()
模拟耗时 1s 的任务, 整个test代表一次请求, 执行如下:
=== RUN TestTask 2022/11/17 20:11:15 task done 2022/11/17 20:11:15 req done --- PASS: TestTask (1.00s) PASS
Go基础知识: 天生并发, 使用
go
关键字就可以开新协程, 将代码放到新协程中执行
func TestTask(t *testing.T) { go task() log.Print("req done") } func task() { // 模拟耗时任务 time.Sleep(time.Second) log.Print("task done") }
- 只需要在
task()
前添加go
关键字, 就可以新开一个协程, 将task()
在新协程中执行
不过在这里, 并没有得到预期的结果:
=== RUN TestTask 2022/11/17 20:16:08 req done --- PASS: TestTask (0.00s) PASS
task()
中的日志没有输出, 看起来像没有执行
Go基础知识: Go的代码都在协程中执行, 入口main()
函数是主协程, 之后使用go
关键词开的协程都是子协程, 主协程退出后, 程序会终止(exit)
也就是说上面的 TestTask()
(主协程) 和 go task()
(子协程)都执行了, 但是主协程执行完, 程序退出了, 子协程没执行完(或者没调度到), 就被强制退出了
简单 Go 并发: 任务编排
上面的例子, 常见有 3 种解决方案:
- 方案1: 等子协程执行完
func TestTask(t *testing.T) { go task() time.Sleep(time.Second) // 等待子协程执行完 log.Print("req done") } func task() { // 模拟耗时任务 time.Sleep(time.Second) log.Print("task done") }
- 方案2: 使用
WaitGroup
func TestTask(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go func() { task() wg.Done() }() wg.Wait() log.Print("req done") } func task() { // 模拟耗时任务 time.Sleep(time.Second) log.Print("task done") }
WaitGroup
其实很好理解, 就是同时等待一组任务完成, 它分为 3 步:
Add
: 总共有多少任务Done()
: 表示当前任务执行完Wait()
: 等待所有任务完成
- 方案3: 使用 Go 的并发语言
chan
func TestTask(t *testing.T) { ch := make(chan struct{}) // 初始化 chan go func() { task() ch <- struct{}{} // 发送到 chan }() <-ch // 从 chan 获取 log.Print("req done") } func task() { // 模拟耗时任务 time.Sleep(time.Second) log.Print("task done") }
Go基础知识: 通过chan T
就可以申明T
类型的 chan, 供协程间进行通信;struct{}
是 Go 中0 memory use
(0 内存 占用)类型, 适合上面使用 chan 进行 控制 而不需要 数据 进行通信的情况
虽然只是3个简单的 demo code, Go 提供的 2 种并发能力都有展示:
- 传统并发原语: 大部分集中在
sync
包下, 上面案例2中的sync.WaitGroup
就是其中之一 - Go 基于 CSP 的并发编程范式: 包括
go chan select
, 上面的案例3中展示了go+chan
的基本用法
简单 Go 并发讲完了, 那任务编排又是啥? 其实, 某等程度上, 任务编排=异步
, 任务需要 分工 完成时, 也就是一个任务相对于另一个任务需要 异步处理. 而任务编排, 恰恰是 Go 语言中基于 chan 进行并发编程的强项.
Go 中有一个大的方向,就是 任务 编排用 Channel,共享资源保护用传统并发原语。
回到最初的代码, 在实际使用中, 到底使用的是哪种方案呢? 答案是 方案1. 看看接近真实场景的代码
func TestTrace(t *testing.T) { for { // 服务以 daemon 的方式持续运行 // 不断处理用户的请求 { go task() log.Print("req done") } } } func task() { // 模拟耗时任务 time.Sleep(time.Second) log.Print("task done") }
也就是真实场景下, 主协程所在的 server 会一直常驻, 请求(request)所有的子协程不用担心还没执行完就被强制退出了
避坑: 野生 Goroutine
在继续讲解之前, 一定要提一下使用 go
开协程的一个坑, 或者说一个非常重要的基础知识:
Go基础知识: panic只对当前goroutine的defer有效
Go中出现 panic()
, 程序会立即终止:
func TestPanic(t *testing.T) { panic("panic") log.Print("end") }
=== RUN TestPanic --- FAIL: TestPanic (0.00s) panic: panic [recovered] panic: panic goroutine 118 [running]: testing.tRunner.func1.2({0x103e15940, 0x10405c208}) /opt/homebrew/opt/go/libexec/src/testing/testing.go:1396 +0x1c8 testing.tRunner.func1() /opt/homebrew/opt/go/libexec/src/testing/testing.go:1399 +0x378 panic({0x103e15940, 0x10405c208}) /opt/homebrew/opt/go/libexec/src/runtime/panic.go:884 +0x204 testing.tRunner(0x14000603040, 0x104058678) /opt/homebrew/opt/go/libexec/src/testing/testing.go:1446 +0x10c created by testing.(*T).Run /opt/homebrew/opt/go/libexec/src/testing/testing.go:1493 +0x300 Process finished with the exit code 1
- 可以看到,
panic
后程序直接退出,panic
后的log.Print("end")
并没有执行
当然, 想要程序健壮一些, panic
是可以 吃掉
的:
func TestPanic(t *testing.T) { defer func() { if r := recover(); r != nil { log.Print(r) } }() panic("panic") log.Print("end") }
=== RUN TestPanic 2022/11/17 22:25:08 panic --- PASS: TestPanic (0.00s) PASS
使用 recover()
对 panic()
进行恢复, 程序就不会崩掉(exit)
但是, 一定要注意:
panic只对当前goroutine的defer有效!
panic只对当前goroutine的defer有效!
panic只对当前goroutine的defer有效!
重要的事情说三遍.
func TestPanic(t *testing.T) { defer func() { if r := recover(); r != nil { log.Print(r) } }() go func() { panic("panic") }() log.Print("end") }
=== RUN TestPanic panic: panic goroutine 88 [running]: Process finished with the exit code 1
而 go 里面开协程又是如此的方便, 简单一个 go
关键字即可, 所以大家给这种情况起了个外号: 野生 Goroutine. 最简单的做法就是对协程进行一次封装, 比如这样:
package gox // Run start with a goroutine func Run(fn func()) { go func() { defer func() { if r := recover(); r != nil { log.Print(r) } }() fn() }() }
原本的 go task()
, 使用 gox.Run(task)
进行替换, 就可以 task 出现 panic 的时候, 程序还能恢复
Trace: 异步任务还能进行链路追踪么?
随着可观测技术的不断演进, 基建上的不断提升, 链路追踪技术也进行了演进:
- trace1.0: opentracing jaeger 等
- trace2.0: otel
当用户请求进来时, 可以通过 traceId
串联起用户的完整调用链, 监控和排查问题能力大大增强!
{ "code": 200, "status": 200, "msg": "成功", "errors": null, "data": "env-t0", "timestamp": 1668696256, "traceId": "0a340332637648c0009b4ca2a51bbb85" }
trace 通过请求(request)中的 context
, 不断向下传递, 从而将当前请求的所有调用通过同一个 traceId 串联起来
func TestTrace(t *testing.T) { Op1(ctx) // 比如操作了 DB Op2(ctx) // 比如操作了 cache Task(ctx) log.Print("req done") } func Task(ctx context.Context) { // 使用自定义span, 将当前操作上报到trace _, span := otel.GetTracerProvider().Tracer("task").Start(ctx, "xxxTask") defer span.End() // 模拟耗时任务 time.Sleep(time.Second) log.Print("task done") }
如同上面演示的 demo code 演示:
- 通过
ctx
, 将当前请求(request)的所有操作使用同一个 traceId 串起来 - otel 默认了很多操作的 trace 上报, 比如 mysql/redis/kafka 等等, 也可以使用自定义
span
的方式进行新增
如果要进行耗时任务异步处理, 直觉上直接 go
一下:
func TestTrace(t *testing.T) { Op1(ctx) // 比如操作了 DB Op2(ctx) // 比如操作了 cache go Task(ctx) log.Print("req done") }
这时候脑海中陡然蹦出一个声音: 野生Goroutine
func TestTrace(t *testing.T) { Op1(ctx) // 比如操作了 DB Op2(ctx) // 比如操作了 cache gox.RunCtx(ctx, Task) // 在 gox.Run 的基础上, 添加 ctx 支持 log.Print("req done") }
可是等测试一下, 就会发现, task()
并没有执行!
细心的小伙伴就会发现, 这和开始的例子有点像呀, 而且对比下就会知道, 此处多了一个 ctx
:
func TestTask(t *testing.T) { go task(ctx) log.Print("req done") } func task() { // 模拟耗时任务 time.Sleep(time.Second) log.Print("task done") }
- 没有 ctx 的时候, 因为主协程一直在, 子协程可以处理完任务在退出, 也就是子协程的生命周期都在主协程内
- 有 ctx 的时候, 由于 ctx 的存在, 请求(request)中的协程需要接受 ctx 控制, 异步处理后, 请求也就结束了(上面
log.Print("req done")
模拟的部分), 这时 ctx 就会控制子协程一起结束掉, 也就是子协程的生命周期都在当前请求内
于是, 又有了 2 种处理办法:
- 简单做法, 就像上面一样, 没有 ctx, 就没有问题了嘛. 如果用一句话来概括这种方法:
面试官: 你可以回家等消息了
- 既然又要执行异步任务, 又要有 trace, 那把 trace 继续传下, 用一个新的 ctx 就好了嘛
上代码:
- 复制 ctx, 把 trace 继续传下去
package ctxkit // Clone 复制 ctx 中对应 key 的值,移除父级 cancel。 func Clone(preCtx context.Context, keys ...interface{}) context.Context { newCtx := context.Background() // 从 pctx 开启一个子 span,来传递 traceId _, ospan := otel.GetTracerProvider(). Tracer(trace_in.InstrumentationPrefix+"/ctxkit"). Start(preCtx, "ctxkit.Clone", otel_trace.WithAttributes( trace_attr.AttrAsyncFlag.Int(1), // 标记为异步 )) defer ospan.End() newCtx = trace.ContextWithSpan(newCtx, ospan) return ctxClone(newCtx, preCtx, keys...) } // CloneWithoutSpan 功能同 Clone,但不会创建 trace span,建议在大批数据 for 循环之前使用,避免 span 链路过长。 func CloneWithoutSpan(preCtx context.Context, keys ...interface{}) context.Context { tid := trace_in.GetOtelTraceId(preCtx) if tid == "" { tid = trace_in.FakeTraceId() } newCtx := context.WithValue(context.Background(), ictx.CtxKeyFakeTraceId, tid) return ctxClone(newCtx, preCtx, keys...) } func ctxClone(baseCtx, preCtx context.Context, keys ...interface{}) context.Context { for _, key := range _ctxKeys { if v := preCtx.Value(key); v != nil { baseCtx = context.WithValue(baseCtx, key, v) } } keys = append(keys, _strKeys...) for _, key := range keys { if v := preCtx.Value(key); v != nil { baseCtx = context.WithValue(baseCtx, key, v) //nolint } } return baseCtx }
- 实际使用
func TestTask(t *testing.T) { nexCtx := ctxkit.Clone(ctx) go task(newCtx) log.Print("req done") } func task() { // 模拟耗时任务 time.Sleep(time.Second) log.Print("task done") }
异步任务: 能否更优雅点
如果是从请求过来的, 请求中自带 trace, 并会在请求(request)的初始化的时候建 trace 写入到请求的 ctx 中, 那如果直接执行一个异步任务呢?
那就需要手动初始化 trace 了.
上代码:
- 封装异步任务(job): 封装trace -> clone ctx -> 指标收集(jobMetricsWrap) -> 野生Goroutine捕获
package job // AsyncJob 异步任务。 // name: 任务名。 // return: waitFunc,调用可以等待任务完成。 func AsyncJob(ctx context.Context, name string, fn func(ctx context.Context) error, opts ...Option) func() { ctx = tel_in.CtxAdjuster(ctx) // 初始化 trace newCtx := ctxkit.Clone(ctx) wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() // 指标收集 jobMetricsWrap(newCtx, fn, applyOption(name, true, opts...)) }() return wg.Wait }
- 实际使用:
func TestJob(t *testing.T) { ctx := context.Background() // 异步任务 // 逻辑在协程中执行,已包装 recover 逻辑 wait := job.AsyncJob(ctx, "your_task_name", func(ctx context.Context) error { // 内部处理使用传入的 ctx,已经执行过 citkit.Clone return doAsyncTask(ctx) }) wait() // 如果需要等待任务结束则调用 wait,不需要则忽略返回值 } func doAsyncTask(ctx context.Context) error { logs.InfoCtx(ctx, "async task done") return nil }
=== RUN TestJob 2022-11-18T10:18:39.014+0800 INFO tests/async_job_test.go:250 async task done {"traceId": "0a9599556376eb7fd7fb497adacbf712"} --- PASS: TestJob (0.00s) PASS
PS: 这里需要查看效果, 所以调用了wait()
等待异步任务结束, 实际使用可以直接使用job.AsyncJob()
或者_ = job.AsyncJob()
最后一起来看看 trace 使用的效果: 此处省略截图
Asynq: 专业异步任务框架
如果只是 异步一下, 上面讲解的内容也基本够用了; 如果有重度异步任务使用, 就得考虑专业的异步任务队列框架了, Go 中可以选择 Async
Features
- Guaranteed at least one execution of a task
- Scheduling of tasks
- Retries of failed tasks
- Automatic recovery of tasks in the event of a worker crash
- Weighted priority queues
- Strict priority queues
- Low latency to add a task since writes are fast in Redis
- De-duplication of tasks using unique option
- Allow timeout and deadline per task
- Allow aggregating group of tasks to batch multiple successive operations
- Flexible handler interface with support for middlewares
Ability to pause queue
to stop processing tasks from the queue- Periodic Tasks
- Support Redis Cluster for automatic sharding and high availability
- Support Redis Sentinels for high availability
- Integration with Prometheus to collect and visualize queue metrics
Web UI
to inspect and remote-control queues and tasksCLI
to inspect and remote-control queues and tasks
实际使用
使用的 demo 就不贴了, asynq 的文档很详细, 说一下具体实践中遇到的 2个 case:
- 使用
web UI
: 处于安全考虑, 设置了ReadOnly
h := asynqmon.New(asynqmon.Options{ RootPath: "/monitoring", // RootPath specifies the root for asynqmon app RedisConnOpt: tasks.GetRedis(), ReadOnly: true, // admin web can't operation }) r := mux.NewRouter() r.PathPrefix(h.RootPath()).Handler(h) srv := &http.Server{ Handler: r, Addr: ":8080", }
PS: 使用
web UI
由于涉及到使用新的端口, 而应用部署已经上 k8s 了, 如何顺利访问就需要一系列运维操作, 留个坑, 以后有机会再填
- 测试环境OK, 线上报错:
recoverer: could not move task to archive: INTERNAL_ERROR: redis eval error: ERR 'asynq:{}:t:' and 'asynq:{}:active' not in the same slot
对比发现, 是测试和线上使用的不同类型的 redis 实例导致的, 搜索云服务的帮助文档:
对比项 | 单机/主备 | Proxy集群 | Cluster集群 |
兼容Redis版本 | 兼容社区Redis 3.0、4.0、5.0。Redis 6.0兼容社区KeyDB(当前只支持主备实例)。可在购买实例时选择版本号。 | 兼容社区3.0、4.0和5.0版本。 | 兼容开源社区4.0/5.0版本。可在购买集群实例时选择版本号。 |
特性支持 | 支持event notify。支持pipeline。 | 支持pipeline、mset、mget。支持scan、keys、slowlog。支持发布订阅。 | 支持event notify。支持brpop、blpop、brpoplpush。支持发布订阅。 |
特性限制 | 单机不支持持久化。 | lua脚本受限使用,所有的key必须在同一个slot,否则会报错,建议使用hashtag技术。多个key的命令中,所有key必须属于同一个slot,否则会报错,建议使用hashtag技术。不支持event notify用法。 | lua脚本受限使用,所有的key必须在同一个slot,建议使用hashtag技术。需要客户端SDK支持redis cluster协议,需要能够处理"-MOVED"响应。使用pipeline、mset/mget模式时,所有key必须属于同一个slot,否则报错,建议使用hashtag技术。使用event notify时,需要建立与每个redis-server的连接,分别处理每个连接上的事件。执行scan、keys等遍历类或者全局类命令时,需要对每个redis-server分别执行该命令。 |
客户端协议 | 使用传统Redis客户端即可。 | 使用传统Redis客户端即可,不需要支持Redis Cluster协议。 | 需要客户端支持Redis Cluster协议。 |
命令限制 | 单机和主备实例不支持的Redis命令,请参考表 Redis4.0单机和主备禁用命令和表 Redis 5.0单机和主备禁用命令。 | Proxy集群实例不支持的Redis命令,请参考表 Redis3.0 Proxy集群实例禁用命令、表8和表8。 | Cluster集群不支持的Redis命令,请参考表 Redis4.0 Cluster集群禁用命令和表 Redis5.0 Cluster集群禁用命令。 |
副本数 | 单机实例为单副本,只有一个节点。主备实例为双副本,目前Redis 3.0、Redis 6.0主备不支持自定义副本数,默认为一主一从的架构。在创建Redis 4.0、5.0主备实例时,支持自定义副本数,形成一主多从的架构。 | 每个集群分片都为双副本,但不支持为分片新增副本,每个分片是一主一从的架构。 | 每个集群分片默认为双副本,支持自定义副本数,可以是一主多从的架构。在创建实例时,也可以定义为单副本,单副本表示实例只有主节点,无法保障数据高可靠。 |
集群架构实例的命令限制: 如需在集群架构实例中执行下述受限制的命令,
请使用hash tag确保命令所要操作的key都分布在1个hash slot中
但是查看 asqnq 源码: 以 enqueue
操作为例, lua 操作中的部分 key 无法通过外部添加 hash tag
// github.com/hibiken/asynq/internal/rdb/rdb.go // enqueueCmd enqueues a given task message. // // Input: // KEYS[1] -> asynq:{<qname>}:t:<task_id> // KEYS[2] -> asynq:{<qname>}:pending // -- // ARGV[1] -> task message data // ARGV[2] -> task ID // ARGV[3] -> current unix time in nsec // // Output: // Returns 1 if successfully enqueued // Returns 0 if task ID already exists var enqueueCmd = redis.NewScript(` if redis.call("EXISTS", KEYS[1]) == 1 then return 0 end redis.call("HSET", KEYS[1], "msg", ARGV[1], "state", "pending", "pending_since", ARGV[3]) redis.call("LPUSH", KEYS[2], ARGV[2]) return 1 `)
最终, 通过使用线上另一台主从版redis解决问题
写在最后
到这里, 工作用Go: 异步任务怎么写
就暂时告一段落了, 这个过程中:
- 一些计算机基础概念的理解: 同步与异步, 异步与任务编排, 协程与异步, 协程与生命周期
- 一些 Go 语言的基础知识以及基础不牢地动山摇的坑: 野生Goroutine, panic&recover
- 可观测的实践之一: trace
- 专业的异步任务框架 Asynq 以及踩坑记
一起拥抱变化, 直面问题和挑战, 不断精进, 我们下个话题再见👋🏻.