工作中用Go: Go中异步任务怎么写

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 工作中用Go: Go中异步任务怎么写

背景介绍

在响应应用请求的过程中, 有时候会遇到比较耗时的任务, 比如给用户发送邮件, 耗时任务时间不可能控, 很可能超过 1s, 为了给用户比较好的体验, 一般会控制请求响应时间(RT, response time)在300ms内(不考虑网络波动), 甚至在 200ms 内. 面对这样的工作场景, 就需要使用异步任务进行处理. 下面将围绕 工作用Go: 异步任务怎么写这个话题展开

Go协程与异步

从一段简单的代码开始:

func TestTask(t *testing.T) {
 task()
 log.Print("req done")
}
func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}
  • 代码在 Goland 中编写, 同时也推荐使用 Goland 进行 Go 开发
  • 这里使用单测(test)演示代码:
  • 输入 test 就可以快速生成代码(Goland 中称之为 live templates, 其实就是预设好的代码片段)
  • 快速执行: 1. 点击左侧(gutter icon)的运行图标; 2. 函数上右键菜单键; 3. 快捷键 ctl-shift-R

image.png

上面使用 task() 模拟耗时 1s 的任务, 整个test代表一次请求, 执行如下:

=== RUN   TestTask
2022/11/17 20:11:15 task done
2022/11/17 20:11:15 req done
--- PASS: TestTask (1.00s)
PASS
Go基础知识: 天生并发, 使用 go 关键字就可以开新协程, 将代码放到新协程中执行
func TestTask(t *testing.T) {
 go task()
 log.Print("req done")
}
func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}
  • 只需要在 task() 前添加 go 关键字, 就可以新开一个协程, 将 task() 在新协程中执行

不过在这里, 并没有得到预期的结果:

=== RUN   TestTask
2022/11/17 20:16:08 req done
--- PASS: TestTask (0.00s)
PASS
  • task() 中的日志没有输出, 看起来像没有执行
Go基础知识: Go的代码都在协程中执行, 入口 main() 函数是主协程, 之后使用 go 关键词开的协程都是子协程, 主协程退出后, 程序会终止(exit)

也就是说上面的 TestTask()(主协程) 和 go task()(子协程)都执行了, 但是主协程执行完, 程序退出了, 子协程没执行完(或者没调度到), 就被强制退出了

简单 Go 并发: 任务编排

上面的例子, 常见有 3 种解决方案:

  • 方案1: 等子协程执行完
func TestTask(t *testing.T) {
 go task()
 time.Sleep(time.Second) // 等待子协程执行完
 log.Print("req done")
}
func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}
  • 方案2: 使用 WaitGroup
func TestTask(t *testing.T) {
 var wg sync.WaitGroup
 wg.Add(1)
 go func() {
  task()
  wg.Done()
 }()
 wg.Wait()
 log.Print("req done")
}
func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}

WaitGroup 其实很好理解, 就是同时等待一组任务完成, 它分为 3 步:

  1. Add: 总共有多少任务
  2. Done(): 表示当前任务执行完
  3. Wait(): 等待所有任务完成
  • 方案3: 使用 Go 的并发语言 chan
func TestTask(t *testing.T) {
 ch := make(chan struct{}) // 初始化 chan
 go func() {
  task()
  ch <- struct{}{} // 发送到 chan
 }()
 <-ch // 从 chan 获取
 log.Print("req done")
}
func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}
Go基础知识: 通过 chan T 就可以申明 T 类型的 chan, 供协程间进行通信; struct{} 是 Go 中 0 memory use (0 内存 占用)类型, 适合上面使用 chan 进行 控制 而不需要 数据 进行通信的情况

虽然只是3个简单的 demo code, Go 提供的 2 种并发能力都有展示:

  • 传统并发原语: 大部分集中在sync包下, 上面案例2中的 sync.WaitGroup 就是其中之一
  • Go 基于 CSP 的并发编程范式: 包括 go chan select, 上面的案例3中展示了 go+chan 的基本用法

简单 Go 并发讲完了, 那任务编排又是啥? 其实, 某等程度上, 任务编排=异步, 任务需要 分工 完成时, 也就是一个任务相对于另一个任务需要 异步处理. 而任务编排, 恰恰是 Go 语言中基于 chan 进行并发编程的强项.

Go 中有一个大的方向,就是 任务 编排用 Channel,共享资源保护用传统并发原语。

回到最初的代码, 在实际使用中, 到底使用的是哪种方案呢? 答案是 方案1. 看看接近真实场景的代码

func TestTrace(t *testing.T) {
 for { // 服务以 daemon 的方式持续运行
  // 不断处理用户的请求
  {
   go task()
   log.Print("req done")
  }
 }
}
func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}

也就是真实场景下, 主协程所在的 server 会一直常驻, 请求(request)所有的子协程不用担心还没执行完就被强制退出了

避坑: 野生 Goroutine

在继续讲解之前, 一定要提一下使用 go 开协程的一个, 或者说一个非常重要的基础知识:

Go基础知识: panic只对当前goroutine的defer有效

Go中出现 panic(), 程序会立即终止:

func TestPanic(t *testing.T) {
 panic("panic")
 log.Print("end")
}
=== RUN   TestPanic
--- FAIL: TestPanic (0.00s)
panic: panic [recovered]
 panic: panic
goroutine 118 [running]:
testing.tRunner.func1.2({0x103e15940, 0x10405c208})
 /opt/homebrew/opt/go/libexec/src/testing/testing.go:1396 +0x1c8
testing.tRunner.func1()
 /opt/homebrew/opt/go/libexec/src/testing/testing.go:1399 +0x378
panic({0x103e15940, 0x10405c208})
 /opt/homebrew/opt/go/libexec/src/runtime/panic.go:884 +0x204
testing.tRunner(0x14000603040, 0x104058678)
 /opt/homebrew/opt/go/libexec/src/testing/testing.go:1446 +0x10c
created by testing.(*T).Run
 /opt/homebrew/opt/go/libexec/src/testing/testing.go:1493 +0x300
Process finished with the exit code 1
  • 可以看到, panic 后程序直接退出, panic 后的 log.Print("end") 并没有执行

当然, 想要程序健壮一些, panic 是可以 吃掉 的:

func TestPanic(t *testing.T) {
 defer func() {
  if r := recover(); r != nil {
   log.Print(r)
  }
 }()
 panic("panic")
 log.Print("end")
}
=== RUN   TestPanic
2022/11/17 22:25:08 panic
--- PASS: TestPanic (0.00s)
PASS

使用 recover()panic() 进行恢复, 程序就不会崩掉(exit)

但是, 一定要注意:

panic只对当前goroutine的defer有效!

panic只对当前goroutine的defer有效!

panic只对当前goroutine的defer有效!

重要的事情说三遍.

func TestPanic(t *testing.T) {
 defer func() {
  if r := recover(); r != nil {
   log.Print(r)
  }
 }()
 go func() {
  panic("panic")
 }()
 log.Print("end")
}
=== RUN   TestPanic
panic: panic
goroutine 88 [running]:
Process finished with the exit code 1

而 go 里面开协程又是如此的方便, 简单一个 go 关键字即可, 所以大家给这种情况起了个外号: 野生 Goroutine. 最简单的做法就是对协程进行一次封装, 比如这样:

package gox
// Run start with a goroutine
func Run(fn func()) {
 go func() {
  defer func() {
   if r := recover(); r != nil {
    log.Print(r)
   }
  }()
  fn()
 }()
}

原本的 go task(), 使用 gox.Run(task)进行替换, 就可以 task 出现 panic 的时候, 程序还能恢复

Trace: 异步任务还能进行链路追踪么?

随着可观测技术的不断演进, 基建上的不断提升, 链路追踪技术也进行了演进:

  • trace1.0: opentracing jaeger 等
  • trace2.0: otel

当用户请求进来时, 可以通过 traceId 串联起用户的完整调用链, 监控和排查问题能力大大增强!

{
    "code": 200,
    "status": 200,
    "msg": "成功",
    "errors": null,
    "data": "env-t0",
    "timestamp": 1668696256,
    "traceId": "0a340332637648c0009b4ca2a51bbb85"
}

trace 通过请求(request)中的 context, 不断向下传递, 从而将当前请求的所有调用通过同一个 traceId 串联起来

func TestTrace(t *testing.T) {
 Op1(ctx) // 比如操作了 DB
 Op2(ctx) // 比如操作了 cache
 Task(ctx)
 log.Print("req done")
}
func Task(ctx context.Context) {
 // 使用自定义span, 将当前操作上报到trace
 _, span := otel.GetTracerProvider().Tracer("task").Start(ctx, "xxxTask")
 defer span.End()
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}

如同上面演示的 demo code 演示:

  • 通过 ctx, 将当前请求(request)的所有操作使用同一个 traceId 串起来
  • otel 默认了很多操作的 trace 上报, 比如 mysql/redis/kafka 等等, 也可以使用自定义 span 的方式进行新增

如果要进行耗时任务异步处理, 直觉上直接 go 一下:

func TestTrace(t *testing.T) {
 Op1(ctx) // 比如操作了 DB
 Op2(ctx) // 比如操作了 cache
 go Task(ctx)
 log.Print("req done")
}

这时候脑海中陡然蹦出一个声音: 野生Goroutine

func TestTrace(t *testing.T) {
 Op1(ctx) // 比如操作了 DB
 Op2(ctx) // 比如操作了 cache
 gox.RunCtx(ctx, Task) // 在 gox.Run 的基础上, 添加 ctx 支持
 log.Print("req done")
}

可是等测试一下, 就会发现, task() 并没有执行!

细心的小伙伴就会发现, 这和开始的例子有点像呀, 而且对比下就会知道, 此处多了一个 ctx:

func TestTask(t *testing.T) {
 go task(ctx)
 log.Print("req done")
}
func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}
  • 没有 ctx 的时候, 因为主协程一直在, 子协程可以处理完任务在退出, 也就是子协程的生命周期都在主协程内
  • 有 ctx 的时候, 由于 ctx 的存在, 请求(request)中的协程需要接受 ctx 控制, 异步处理后, 请求也就结束了(上面log.Print("req done")模拟的部分), 这时 ctx 就会控制子协程一起结束掉, 也就是子协程的生命周期都在当前请求内

于是, 又有了 2 种处理办法:

  • 简单做法, 就像上面一样, 没有 ctx, 就没有问题了嘛. 如果用一句话来概括这种方法: 面试官: 你可以回家等消息了
  • 既然又要执行异步任务, 又要有 trace, 那把 trace 继续传下, 用一个新的 ctx 就好了嘛

上代码:

  • 复制 ctx, 把 trace 继续传下去
package ctxkit
// Clone 复制 ctx 中对应 key 的值,移除父级 cancel。
func Clone(preCtx context.Context, keys ...interface{}) context.Context {
 newCtx := context.Background()
 // 从 pctx 开启一个子 span,来传递 traceId
 _, ospan := otel.GetTracerProvider().
  Tracer(trace_in.InstrumentationPrefix+"/ctxkit").
  Start(preCtx, "ctxkit.Clone", otel_trace.WithAttributes(
   trace_attr.AttrAsyncFlag.Int(1), // 标记为异步
  ))
 defer ospan.End()
 newCtx = trace.ContextWithSpan(newCtx, ospan)
 return ctxClone(newCtx, preCtx, keys...)
}
// CloneWithoutSpan  功能同 Clone,但不会创建 trace span,建议在大批数据 for 循环之前使用,避免 span 链路过长。
func CloneWithoutSpan(preCtx context.Context, keys ...interface{}) context.Context {
 tid := trace_in.GetOtelTraceId(preCtx)
 if tid == "" {
  tid = trace_in.FakeTraceId()
 }
 newCtx := context.WithValue(context.Background(), ictx.CtxKeyFakeTraceId, tid)
 return ctxClone(newCtx, preCtx, keys...)
}
func ctxClone(baseCtx, preCtx context.Context, keys ...interface{}) context.Context {
 for _, key := range _ctxKeys {
  if v := preCtx.Value(key); v != nil {
   baseCtx = context.WithValue(baseCtx, key, v)
  }
 }
 keys = append(keys, _strKeys...)
 for _, key := range keys {
  if v := preCtx.Value(key); v != nil {
   baseCtx = context.WithValue(baseCtx, key, v) //nolint
  }
 }
 return baseCtx
}
  • 实际使用
func TestTask(t *testing.T) {
 nexCtx := ctxkit.Clone(ctx)
 go task(newCtx)
 log.Print("req done")
}
func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}

异步任务: 能否更优雅点

如果是从请求过来的, 请求中自带 trace, 并会在请求(request)的初始化的时候建 trace 写入到请求的 ctx 中, 那如果直接执行一个异步任务呢?

那就需要手动初始化 trace 了.

上代码:

  • 封装异步任务(job): 封装trace -> clone ctx -> 指标收集(jobMetricsWrap) -> 野生Goroutine捕获
package job
// AsyncJob 异步任务。
// name: 任务名。
// return: waitFunc,调用可以等待任务完成。
func AsyncJob(ctx context.Context, name string, fn func(ctx context.Context) error, opts ...Option) func() {
 ctx = tel_in.CtxAdjuster(ctx) // 初始化 trace
 newCtx := ctxkit.Clone(ctx)
wg := sync.WaitGroup{}
 wg.Add(1)
 go func() {
  defer wg.Done()
  // 指标收集
  jobMetricsWrap(newCtx, fn, applyOption(name, true, opts...))
 }()
 return wg.Wait
}
  • 实际使用:
func TestJob(t *testing.T) {
 ctx := context.Background()
 // 异步任务
 // 逻辑在协程中执行,已包装 recover 逻辑
 wait := job.AsyncJob(ctx, "your_task_name", func(ctx context.Context) error {
  // 内部处理使用传入的 ctx,已经执行过 citkit.Clone
  return doAsyncTask(ctx)
 })
 wait() // 如果需要等待任务结束则调用 wait,不需要则忽略返回值
}
func doAsyncTask(ctx context.Context) error {
 logs.InfoCtx(ctx, "async task done")
 return nil
}
=== RUN   TestJob
2022-11-18T10:18:39.014+0800 INFO tests/async_job_test.go:250 async task done {"traceId": "0a9599556376eb7fd7fb497adacbf712"}
--- PASS: TestJob (0.00s)
PASS
PS: 这里需要查看效果, 所以调用了 wait() 等待异步任务结束, 实际使用可以直接使用 job.AsyncJob() 或者 _ = job.AsyncJob()

最后一起来看看 trace 使用的效果: 此处省略截图 


Asynq: 专业异步任务框架

如果只是 异步一下, 上面讲解的内容也基本够用了; 如果有重度异步任务使用, 就得考虑专业的异步任务队列框架了, Go 中可以选择 Async

Features


实际使用

使用的 demo 就不贴了, asynq 的文档很详细, 说一下具体实践中遇到的 2个 case:

  • 使用 web UI: 处于安全考虑, 设置了 ReadOnly
h := asynqmon.New(asynqmon.Options{
   RootPath:     "/monitoring", // RootPath specifies the root for asynqmon app
   RedisConnOpt: tasks.GetRedis(),
   ReadOnly:     true, // admin web can't operation
})
r := mux.NewRouter()
r.PathPrefix(h.RootPath()).Handler(h)
srv := &http.Server{
   Handler: r,
   Addr:    ":8080",
}
PS: 使用 web UI 由于涉及到使用新的端口, 而应用部署已经上 k8s 了, 如何顺利访问就需要一系列运维操作, 留个坑, 以后有机会再填
  • 测试环境OK, 线上报错: recoverer: could not move task to archive: INTERNAL_ERROR: redis eval error: ERR 'asynq:{}:t:' and 'asynq:{}:active' not in the same slot

对比发现, 是测试和线上使用的不同类型的 redis 实例导致的, 搜索云服务的帮助文档:

Redis实例类型差异

对比项 单机/主备 Proxy集群 Cluster集群
兼容Redis版本 兼容社区Redis 3.0、4.0、5.0。Redis 6.0兼容社区KeyDB(当前只支持主备实例)。可在购买实例时选择版本号。 兼容社区3.0、4.0和5.0版本。 兼容开源社区4.0/5.0版本。可在购买集群实例时选择版本号。
特性支持 支持event notify。支持pipeline。 支持pipeline、mset、mget。支持scan、keys、slowlog。支持发布订阅。 支持event notify。支持brpop、blpop、brpoplpush。支持发布订阅。
特性限制 单机不支持持久化。 lua脚本受限使用,所有的key必须在同一个slot,否则会报错,建议使用hashtag技术。多个key的命令中,所有key必须属于同一个slot,否则会报错,建议使用hashtag技术。不支持event notify用法。 lua脚本受限使用,所有的key必须在同一个slot,建议使用hashtag技术。需要客户端SDK支持redis cluster协议,需要能够处理"-MOVED"响应。使用pipeline、mset/mget模式时,所有key必须属于同一个slot,否则报错,建议使用hashtag技术。使用event notify时,需要建立与每个redis-server的连接,分别处理每个连接上的事件。执行scan、keys等遍历类或者全局类命令时,需要对每个redis-server分别执行该命令。
客户端协议 使用传统Redis客户端即可。 使用传统Redis客户端即可,不需要支持Redis Cluster协议。 需要客户端支持Redis Cluster协议。
命令限制 单机和主备实例不支持的Redis命令,请参考表 Redis4.0单机和主备禁用命令表 Redis 5.0单机和主备禁用命令 Proxy集群实例不支持的Redis命令,请参考表 Redis3.0 Proxy集群实例禁用命令表8表8 Cluster集群不支持的Redis命令,请参考表 Redis4.0 Cluster集群禁用命令表 Redis5.0 Cluster集群禁用命令
副本数 单机实例为单副本,只有一个节点。主备实例为双副本,目前Redis 3.0、Redis 6.0主备不支持自定义副本数,默认为一主一从的架构。在创建Redis 4.0、5.0主备实例时,支持自定义副本数,形成一主多从的架构。 每个集群分片都为双副本,但不支持为分片新增副本,每个分片是一主一从的架构。 每个集群分片默认为双副本,支持自定义副本数,可以是一主多从的架构。在创建实例时,也可以定义为单副本,单副本表示实例只有主节点,无法保障数据高可靠。

集群架构实例的命令限制: 如需在集群架构实例中执行下述受限制的命令,请使用hash tag确保命令所要操作的key都分布在1个hash slot中

但是查看 asqnq 源码: 以 enqueue 操作为例, lua 操作中的部分 key 无法通过外部添加 hash tag

// github.com/hibiken/asynq/internal/rdb/rdb.go
// enqueueCmd enqueues a given task message.
//
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:pending
// --
// ARGV[1] -> task message data
// ARGV[2] -> task ID
// ARGV[3] -> current unix time in nsec
//
// Output:
// Returns 1 if successfully enqueued
// Returns 0 if task ID already exists
var enqueueCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[1]) == 1 then
 return 0
end
redis.call("HSET", KEYS[1],
           "msg", ARGV[1],
           "state", "pending",
           "pending_since", ARGV[3])
redis.call("LPUSH", KEYS[2], ARGV[2])
return 1
`)

最终, 通过使用线上另一台主从版redis解决问题

写在最后

到这里, 工作用Go: 异步任务怎么写 就暂时告一段落了, 这个过程中:

  • 一些计算机基础概念的理解: 同步与异步, 异步与任务编排, 协程与异步, 协程与生命周期
  • 一些 Go 语言的基础知识以及基础不牢地动山摇的坑: 野生Goroutine, panic&recover
  • 可观测的实践之一: trace
  • 专业的异步任务框架 Asynq 以及踩坑记

一起拥抱变化, 直面问题和挑战, 不断精进, 我们下个话题再见👋🏻.

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
4月前
|
Go
go的并发初体验、加锁、异步
go的并发初体验、加锁、异步
|
2月前
|
Go 数据库 UED
[go 面试] 同步与异步:程序执行方式的不同之处
[go 面试] 同步与异步:程序执行方式的不同之处
|
2月前
|
存储 Go 调度
go-zero 如何应对海量定时/延迟任务?
go-zero 如何应对海量定时/延迟任务?
|
4月前
|
并行计算 Go 数据处理
掌握Go语言:Go 并发编程,轻松应对大规模任务处理和高并发请求(34)
掌握Go语言:Go 并发编程,轻松应对大规模任务处理和高并发请求(34)
|
4月前
|
Go
go之channel任意任务完成、全部任务完成退出
go之channel任意任务完成、全部任务完成退出
|
5月前
|
NoSQL Go Redis
Go异步任务处理解决方案:Asynq
Go异步任务处理解决方案:Asynq
273 1
Go异步任务处理解决方案:Asynq
|
5月前
|
Go 开发者
Go语言带缓冲通道:异步通信的艺术
Go语言带缓冲通道:异步通信的艺术
51 0
|
监控 NoSQL 数据可视化
一文带您了解Go异步任务处理解决方案:Asynq
一文带您了解Go异步任务处理解决方案:Asynq
496 0
|
安全 Go API
MoE 系列(四)|Go 扩展的异步模式
上一篇我们体验了用 Istio 做控制面,给 Go 扩展推送配置,这次我们来体验一下,在 Go 扩展的异步模式下,对 Goroutine 等全部 Go 特性的支持。
|
Go 调度 开发者
并发与并行,同步和异步,Go lang1.18入门精炼教程,由白丁入鸿儒,Go lang并发编程之GoroutineEP13
如果说Go lang是静态语言中的皇冠,那么,Goroutine就是并发编程方式中的钻石。Goroutine是Go语言设计体系中最核心的精华,它非常轻量,一个 Goroutine 只占几 KB,并且这几 KB 就足够 Goroutine 运行完,这就能在有限的内存空间内支持大量 Goroutine协程任务,方寸之间,运筹帷幄,用极少的成本获取最高的效率,支持了更多的并发,毫无疑问,Goroutine是比Python的协程原理事件循环更高级的并发异步编程方式。
并发与并行,同步和异步,Go lang1.18入门精炼教程,由白丁入鸿儒,Go lang并发编程之GoroutineEP13