本文为 工作用Go: 异步任务怎么写 系列的第4篇
随着可观测技术的不断演进, 基建上的不断提升, 链路追踪技术也进行了演进
- trace1.0: opentracing jaeger 等
- trace2.0: otel
当用户请求进来时, 可以通过 traceId
串联起用户的完成调用链, 监控和排查问题能力大大增强!
{ "code": 200, "status": 200, "msg": "成功", "errors": null, "data": "env-t0", "timestamp": 1668696256, "traceId": "0a340332637648c0009b4ca2a51bbb85"}
trace 通过请求(request)中的 context
, 不断向下传递, 从而将当前请求的所用调用通过同一个 traceId 串联起来
funcTestTrace(t*testing.T) { Op1(ctx) // 比如操作了 DBOp2(ctx) // 比如操作了 cacheTask(ctx) log.Print("req done") } funcTask(ctxcontext.Context) { // 使用自定义span, 将当前操作上报到trace_, span :=otel.GetTracerProvider().Tracer("task").Start(ctx, "xxxTask") deferspan.End() // 模拟耗时任务time.Sleep(time.Second) log.Print("task done") }
如同上面演示的 demo code:
- 通过
ctx
, 将当前请求(request)的所有操作使用同一个 traceId 串起来 - otel 默认了很多操作的 trace 上报, 比如 mysql/redis/kafka 等等, 也可以使用自定义
span
的方式进行新增
如果要进行耗时任务异步处理, 直觉上直接 go
一下:
funcTestTrace(t*testing.T) { Op1(ctx) // 比如操作了 DBOp2(ctx) // 比如操作了 cachegoTask(ctx) log.Print("req done") }
这时候脑海中陡然蹦出一个声音: 野生Goroutine
funcTestTrace(t*testing.T) { Op1(ctx) // 比如操作了 DBOp2(ctx) // 比如操作了 cachegox.RunCtx(ctx, Task) // 在 gox.Run 的基础上, 添加 ctx 支持log.Print("req done") }
可是等测试一下, 就会发现, task()
并没有执行!
细心的小伙伴就会发现, 这和开始的例子有点像呀, 而且对比下就会知道, 此处多了一个 ctx
:
funcTestTask(t*testing.T) { gotask(ctx) log.Print("req done") } functask() { // 模拟耗时任务time.Sleep(time.Second) log.Print("task done") }
- 没有 ctx 的时候, 因为主协程一直在, 子协程可以处理完任务后退出, 也就是子协程的生命周期都在主协程内
- 有 ctx 的时候, 由于 ctx 的存在, 请求(request)中主协程需要接受 ctx 控制, 异步处理后, 请求也就结束了(上面
log.Print("req done")
模拟的部分), 这是 ctx 就会控制子协程一起结束掉, 也就是子协程的生命周期都在当前请求的协程内
于是, 又有了 2 种处理办法:
- 简单做法, 就像上面一样, 没有 ctx, 就没有问题了嘛. 如果用一句话来概括这种方法:
面试官: 你可以回家等消息了
- 既然又要执行异步任务, 又要有 trace, 那把 trace 继续传下去, 用一个新的 ctx 就好了嘛
上代码:
- 复制 ctx, 把 trace 继续传下去
packagectxkit// Clone 复制 ctx 中对应 key 的值,移除父级 cancel。funcClone(preCtxcontext.Context, keys...interface{}) context.Context { newCtx :=context.Background() // 从 pctx 开启一个子 span,来传递 traceId_, ospan :=otel.GetTracerProvider().Tracer(trace_in.InstrumentationPrefix+"/ctxkit").Start(preCtx, "ctxkit.Clone", otel_trace.WithAttributes( trace_attr.AttrAsyncFlag.Int(1), // 标记为异步 )) deferospan.End() newCtx=trace.ContextWithSpan(newCtx, ospan) returnctxClone(newCtx, preCtx, keys...) } // CloneWithoutSpan 功能同 Clone,但不会创建 trace span,建议在大批数据 for 循环之前使用,避免 span 链路过长。funcCloneWithoutSpan(preCtxcontext.Context, keys...interface{}) context.Context { tid :=trace_in.GetOtelTraceId(preCtx) iftid=="" { tid=trace_in.FakeTraceId() } newCtx :=context.WithValue(context.Background(), ictx.CtxKeyFakeTraceId, tid) returnctxClone(newCtx, preCtx, keys...) } funcctxClone(baseCtx, preCtxcontext.Context, keys...interface{}) context.Context { for_, key :=range_ctxKeys { ifv :=preCtx.Value(key); v!=nil { baseCtx=context.WithValue(baseCtx, key, v) } } keys=append(keys, _strKeys...) for_, key :=rangekeys { ifv :=preCtx.Value(key); v!=nil { baseCtx=context.WithValue(baseCtx, key, v) //nolint } } returnbaseCtx}
- 实际使用
funcTestTask(t*testing.T) { nexCtx :=ctxkit.Clone(ctx) gotask(newCtx) log.Print("req done") } functask() { // 模拟耗时任务time.Sleep(time.Second) log.Print("task done") }