Opentelemetry SDK的简单用法

简介: Opentelemetry SDK的简单用法

概述

Opentelemetry trace的简单架构图如下,客户端和服务端都需要启动一个traceProvider,主要用于将trace数据传输到registry(如jaeger、opencensus等)。client和server通过context将整个链路串起来。

traceProvider会周期性的将数据推送到Registry,默认是5s:

func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
   ...
   o := BatchSpanProcessorOptions{
      BatchTimeout:       time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond,
      ExportTimeout:      time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond,
      MaxQueueSize:       maxQueueSize,
      MaxExportBatchSize: maxExportBatchSize,
   }
   ...
}

下面是官方提供的SDK,它实现了opentelemetry的API,也是操作opentelemetry所使用的基本库:

tracesdk "go.opentelemetry.io/otel/sdk/trace"

创建TracerProvider

要使用trace,首先要创建一个TracerProvider,定义exporter以及相关属性。

使用全局TracerProvider

参数表示应用名称或代码库名称

var tracer = otel.Tracer("app_or_package_name")

创建TracerProvider

下面展示了使用Jaeger作为exporter的tracerProvider,其中包含两个概念:exporter和resource。前者为发送遥测数据的目的地,如jaeger、zepkin、opencensus等;后者通常用于添加非临时的底层元数据信息,如主机名,实例ID等。

// tracerProvider returns an OpenTelemetry TracerProvider configured to use
// the Jaeger exporter that will send spans to the provided url. The returned
// TracerProvider will also use a Resource configured with all the information
// about the application.
func tracerProvider(url string) (*tracesdk.TracerProvider, error) {
  // Create the Jaeger exporter
  exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
  if err != nil {
    return nil, err
  }
  tp := tracesdk.NewTracerProvider(
    // Always be sure to batch in production.
    tracesdk.WithBatcher(exp),
    // Record information about this application in a Resource.
    tracesdk.WithResource(resource.NewWithAttributes(
      semconv.SchemaURL,
      semconv.ServiceNameKey.String(service),
      attribute.String("environment", environment),
      attribute.Int64("ID", id),
    )),
  )
  return tp, nil
}

可以使用如下方式创建resource,semconv包可以为资源属性提供规范化的名称。

// newResource returns a resource describing this application.
func newResource() *resource.Resource {
  r, _ := resource.Merge(
    resource.Default(),
    resource.NewWithAttributes(
      semconv.SchemaURL,
      semconv.ServiceNameKey.String("fib"),
      semconv.ServiceVersionKey.String("v0.1.0"),
      attribute.String("environment", "demo"),
    ),
  )
  return r
}
注册tracerProvider

如果使用自定义的tracerProvider,需要将其注册为全局tracerProvider:

tp, err := tracerProvider("http://localhost:14268/api/traces")
  if err != nil {
    log.Fatal(err)
  }
  // Register our TracerProvider as the global so any imported
  // instrumentation in the future will default to using it.
  otel.SetTracerProvider(tp)

启动tracerProvider

tr := tp.Tracer("component-main")
  ctx, span := tr.Start(ctx, "foo")
  defer span.End()

关闭tracerProvider

当程序退出前,需要关闭tracerProvider,执行数据清理工作:

ctx, cancel := context.WithCancel(context.Background())
  defer cancel()
// Cleanly shutdown and flush telemetry when the application exits.
  defer func(ctx context.Context) {
    // Do not make the application hang when it is shutdown.
    ctx, cancel = context.WithTimeout(ctx, time.Second*5)
    defer cancel()
    if err := tp.Shutdown(ctx); err != nil {
      log.Fatal(err)
    }
  }(ctx)

span的简单用法

tracer会创建span,为了创建span,需要一个context.Context实例。该context通常来自于请求对象,或已经存在的父span。Go的context用于保存活动的span,当span启用后,就可以操作创建好的span以及其包含的已修改的上下文。当span结束后,其将成为不可变状态。

下面为从请求中获取span:

func httpHandler(w http.ResponseWriter, r *http.Request) {
  ctx, span := tracer.Start(r.Context(), "hello-span")
  defer span.End()
  // do some work to track with hello-span
}

span.End()用于控制一个span是否结束,并加上时间戳

获取当前span
// This context needs contain the active span you plan to extract.
ctx := context.TODO()
span := trace.SpanFromContext(ctx)
// Do something with the current span, optionally calling `span.End()` if you want it to en
创建嵌套的span

下面将childSpan嵌套在了parentSpan中,表示串行执行:

func parentFunction(ctx context.Context) {
  ctx, parentSpan := tracer.Start(ctx, "parent")
  defer parentSpan.End()
  // call the child function and start a nested span in there
  childFunction(ctx)
  // do more work - when this function ends, parentSpan will complete.
}
func childFunction(ctx context.Context) {
  // Create a span to track `childFunction()` - this is a nested span whose parent is `parentSpan`
  ctx, childSpan := tracer.Start(ctx, "child")
  defer childSpan.End()
  // do work here, when this function returns, childSpan will complete.
}
设置span相关的信息
添加属性

属性是一组key/value元数据,用于聚合、过滤以及对traces进行分组。

// setting attributes at creation...
ctx, span = tracer.Start(ctx, "attributesAtCreation", trace.WithAttributes(attribute.String("hello", "world")))
// ... and after creation
span.SetAttributes(attribute.Bool("isTrue", true), attribute.String("stringAttr", "hi!"))

可以使用如下方式预设置属性,然后再添加到span中:

var myKey = attribute.Key("myCoolAttribute")
span.SetAttributes(myKey.String("a value"))

注:trace的属性并不是随便定义的,它有一些特定的约束,参见官方约定以及uptrace总结的约束

添加事件

事件为可读的消息,表示在span的生命周期中"发生了某些事情"。例如,假设某个函数需要获取锁来访问互斥的资源时,可以在两个节点创建事件,一个是尝试访问资源时,另一个是获取到锁时。如:

span.AddEvent("Acquiring lock")
mutex.Lock()
span.AddEvent("Got lock, doing work...")
// do stuff
span.AddEvent("Unlocking")
mutex.Unlock()

事件的一个有用的特点是,它们的时间戳显示为从span开始的偏移量(即事件发生的真实时间)。

事件也可以配置属性:

span.AddEvent("Cancelled wait due to external signal", trace.WithAttributes(attribute.Int("pid", 4328), attribute.String("signal", "SIGHUP")))
设置span状态

通常用于表示操作是否有异常。默认状态为Unset,可以手动将其设置为Ok,但通常没必要这么做。

result, err := operationThatCouldFail()
if err != nil {
  span.SetStatus(codes.Error, "operationThatCouldFail failed")
}
记录错误

用于记录错误日志或调用栈等信息。强烈建议在使用RecordError的同时,通过SetStatus将span状态设置为Error

result, err := operationThatCouldFail()
if err != nil {
  span.SetStatus(codes.Error, "operationThatCouldFail failed")
  span.RecordError(err)
}

完整代码

下面是对本地的一个函数bar生成trace信息:

func tracerProvider(url string) (*tracesdk.TracerProvider, error) {
  // Create the Jaeger exporter
  exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
  if err != nil {
    return nil, err
  }
  tp := tracesdk.NewTracerProvider(
    // Always be sure to batch in production.
    tracesdk.WithBatcher(exp),
    // Record information about this application in a Resource.
    tracesdk.WithResource(resource.NewWithAttributes(
      semconv.SchemaURL,
      semconv.ServiceNameKey.String(service),
      attribute.String("environment", environment),
      attribute.Int64("ID", id),
    )),
  )
  return tp, nil
}
func main() {
  tp, err := tracerProvider("http://localhost:14268/api/traces")
  if err != nil {
    log.Fatal(err)
  }
  // Register our TracerProvider as the global so any imported
  // instrumentation in the future will default to using it.
  otel.SetTracerProvider(tp)
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()
  // Cleanly shutdown and flush telemetry when the application exits.
  defer func(ctx context.Context) {
    // Do not make the application hang when it is shutdown.
    ctx, cancel = context.WithTimeout(ctx, time.Second*5)
    defer cancel()
    if err := tp.Shutdown(ctx); err != nil {
      log.Fatal(err)
    }
  }(ctx)
  tr := tp.Tracer("component-main")
  ctx, span := tr.Start(ctx, "foo")
  defer span.End()
  bar(ctx)
}
func bar(ctx context.Context) {
  // Use the global TracerProvider.
  tr := otel.Tracer("component-bar")
  _, span := tr.Start(ctx, "bar")
  span.SetAttributes(attribute.Key("testset").String("value"))
  defer span.End()
  // Do bar...
}

Trace context的跨服务传播

为了跨服务传播Trace context需要注册一个propagator ,通常在创建注册TracerProvider之后执行。

span的事件和属性并不会跨服务传播

func initTracer() (*sdktrace.TracerProvider, error) {
  // Create stdout exporter to be able to retrieve
  // the collected spans.
  exporter, err := stdout.New(stdout.WithPrettyPrint())
  if err != nil {
    return nil, err
  }
  // For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces.
  // In a production application, use sdktrace.ProbabilitySampler with a desired probability.
  tp := sdktrace.NewTracerProvider(
    sdktrace.WithSampler(sdktrace.AlwaysSample()),
    sdktrace.WithBatcher(exporter),
  )
  otel.SetTracerProvider(tp)
  otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
  return tp, err
}

如上注册了两种propagator :TraceContext和Baggage,因此可以使用这两种数据结构传播上下文。

TraceContext

下面是gorilla/mux的服务端代码,通过trace.SpanFromContext(r.Context())从请求的context构建span,当然也可以通过tracer.Start(c.Context(), "getUser", oteltrace.WithAttributes(attribute.String("id", id)))这种方式来启动一个新的span:

func TestPropagationWithCustomPropagators(t *testing.T) {
  prop := propagation.TraceContext{}
  r := httptest.NewRequest("GET", "/user/123", nil)
  w := httptest.NewRecorder()
  ctx := trace.ContextWithRemoteSpanContext(context.Background(), sc)
  prop.Inject(ctx, propagation.HeaderCarrier(r.Header))
  var called bool
  router := mux.NewRouter()
  router.Use(Middleware("foobar", WithPropagators(prop)))
  router.HandleFunc("/user/{id}", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    called = true
    span := trace.SpanFromContext(r.Context())
defer span.End()
    assert.Equal(t, sc, span.SpanContext())
    w.WriteHeader(http.StatusOK)
  }))
  router.ServeHTTP(w, r)
  assert.True(t, called, "failed to run test")
}

baggage

下面是使用baggage的客户端和服务端代码,需要注意的是,客户端需要使用otelhttp

客户端代码:

package main
import (
  "context"
  "flag"
  "fmt"
  "io/ioutil"
  "log"
  "net/http"
  "time"
  "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
  "go.opentelemetry.io/otel"
  "go.opentelemetry.io/otel/baggage"
  stdout "go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
  "go.opentelemetry.io/otel/propagation"
  sdktrace "go.opentelemetry.io/otel/sdk/trace"
  semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
  "go.opentelemetry.io/otel/trace"
)
func initTracer() (*sdktrace.TracerProvider, error) {
  // Create stdout exporter to be able to retrieve
  // the collected spans.
  exporter, err := stdout.New(stdout.WithPrettyPrint())
  if err != nil {
    return nil, err
  }
  // For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces.
  // In a production application, use sdktrace.ProbabilitySampler with a desired probability.
  tp := sdktrace.NewTracerProvider(
    sdktrace.WithSampler(sdktrace.AlwaysSample()),
    sdktrace.WithBatcher(exporter),
  )
  otel.SetTracerProvider(tp)
  otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
  return tp, err
}
func main() {
  tp, err := initTracer()
  if err != nil {
    log.Fatal(err)
  }
  defer func() {
    if err := tp.Shutdown(context.Background()); err != nil {
      log.Printf("Error shutting down tracer provider: %v", err)
    }
  }()
  url := flag.String("server", "http://localhost:7777/hello", "server url")
  flag.Parse()
  client := http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)}
  bag, _ := baggage.Parse("username=donuts")
  ctx := baggage.ContextWithBaggage(context.Background(), bag)
  var body []byte
  tr := otel.Tracer("example/client")
  err = func(ctx context.Context) error {
    ctx, span := tr.Start(ctx, "say hello", trace.WithAttributes(semconv.PeerServiceKey.String("ExampleService")))
    defer span.End()
    req, _ := http.NewRequestWithContext(ctx, "GET", *url, nil)
    fmt.Printf("Sending request...\n")
    res, err := client.Do(req)
    if err != nil {
      panic(err)
    }
    body, err = ioutil.ReadAll(res.Body)
    _ = res.Body.Close()
    return err
  }(ctx)
  if err != nil {
    log.Fatal(err)
  }
  fmt.Printf("Response Received: %s\n\n\n", body)
  fmt.Printf("Waiting for few seconds to export spans ...\n\n")
  time.Sleep(10 * time.Second)
  fmt.Printf("Inspect traces on stdout\n")
}

服务端代码:

package main
import (
  "context"
  "io"
  "log"
  "net/http"
  "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
  "go.opentelemetry.io/otel"
  "go.opentelemetry.io/otel/attribute"
  "go.opentelemetry.io/otel/baggage"
  stdout "go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
  "go.opentelemetry.io/otel/propagation"
  "go.opentelemetry.io/otel/sdk/resource"
  sdktrace "go.opentelemetry.io/otel/sdk/trace"
  semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
  "go.opentelemetry.io/otel/trace"
)
func initTracer() (*sdktrace.TracerProvider, error) {
  // Create stdout exporter to be able to retrieve
  // the collected spans.
  exporter, err := stdout.New(stdout.WithPrettyPrint())
  if err != nil {
    return nil, err
  }
  // For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces.
  // In a production application, use sdktrace.ProbabilitySampler with a desired probability.
  tp := sdktrace.NewTracerProvider(
    sdktrace.WithSampler(sdktrace.AlwaysSample()),
    sdktrace.WithBatcher(exporter),
    sdktrace.WithResource(resource.NewWithAttributes(semconv.SchemaURL, semconv.ServiceNameKey.String("ExampleService"))),
  )
  otel.SetTracerProvider(tp)
  otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
  return tp, err
}
func main() {
  tp, err := initTracer()
  if err != nil {
    log.Fatal(err)
  }
  defer func() {
    if err := tp.Shutdown(context.Background()); err != nil {
      log.Printf("Error shutting down tracer provider: %v", err)
    }
  }()
  uk := attribute.Key("username")
  helloHandler := func(w http.ResponseWriter, req *http.Request) {
    ctx := req.Context()
    span := trace.SpanFromContext(ctx) // span为Hello
defer span.End()
    bag := baggage.FromContext(ctx)
    span.AddEvent("handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value())))
    _, _ = io.WriteString(w, "Hello, world!\n")
  }
// otelhttp.NewHandler会在处理请求的同时创建一个名为Hello的span
  otelHandler := otelhttp.NewHandler(http.HandlerFunc(helloHandler), "Hello")
  http.Handle("/hello", otelHandler)
  err = http.ListenAndServe(":7777", nil)
  if err != nil {
    log.Fatal(err)
  }
}

上述代码生成的链路跟踪如下,client的HTTP GET会调用server端的Hello。Server的Hello span是在处理请求时生成的,上述用的是otelhttp,其他registry也是类似的处理方式。

使用如下代码则可以启动两个独立的span,可以表示两个并行的任务:

helloHandler := func(w http.ResponseWriter, req *http.Request) {
    ctx := req.Context()
    ctx, span1 := tracer.Start(ctx, "span1 proecss", trace.WithLinks())
    defer span1.End()
    bag := baggage.FromContext(req.Context())
    span1.SetAttributes(attribute.String("span1", "test1"))
    span1.AddEvent("span1 handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value())))
    ctx, span2 := tracer.Start(req.Context(), "span2 proecss", trace.WithLinks())
    defer span2.End()
    span2.SetAttributes(attribute.String("span2", "test2"))
    span2.AddEvent("span2 handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value())))
    _, _ = io.WriteString(w, "Hello, world!\n")
  }

此外还可以通过baggage.NewKeyValueProperty("key", "value")等方式创建baggage。

如果服务端不需要解析客户端传递的baggage,且不需要传递context的话,则无需在handler中对span做任何处理,仅在handler外面使用如下方法即可:

otelHandler := otelhttp.NewHandler(http.HandlerFunc(helloHandler), "Hello")

注:baggage要遵循W3C Baggage 规范

支持otel的工具

官方给出了很多Registry,如Gorilla MuxGORMGin-gonic gRPC等。更多可以参见官方代码库

采样

provider := sdktrace.NewTracerProvider(
  sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
  • AlwaysSample:采集每条链路信息
  • NeverSample :不采集
  • TraceIDRatioBased:按比例采集,即如果将其设置.5,则表示采集一半链路信息
  • ParentBased:根据传入的采样决策表现不同。如果已经对本span的父span进行了采样,则也会对本span进行采样,反之则不会采样。使用方式如sdktrace.ParentBased(sdktrace.AlwaysSample())。如果本span是root span,则也会进行采样。

生产中可以考虑使用TraceIDRatioBasedParentBased

Tips

  • 设置自定义属性时,注意属性字段需要小写

参考

目录
相关文章
|
Kubernetes 网络协议 Docker
基于kubernetes部署jaeger
基于kubernetes部署jaeger,采用非all-in-one的方式分别部署query,collector,elasticsearch和agent
7155 0
|
SQL 弹性计算 关系型数据库
HTAP数据库 PostgreSQL 场景与性能测试之 3.1 - (OLAP) 大表JOIN统计查询-10亿 join 1亿 agg
标签 PostgreSQL , HTAP , OLTP , OLAP , 场景与性能测试 背景 PostgreSQL是一个历史悠久的数据库,历史可以追溯到1973年,最早由2014计算机图灵奖得主,关系数据库的鼻祖Michael_Stonebraker 操刀设计,PostgreSQL具备与Oracle类似的功能、性能、架构以及稳定性。 PostgreSQL社区的贡献者众多
2194 0
|
SQL 关系型数据库 MySQL
如何快速在表级别做同构或者异构数据库之间的数据迁移/备份
【8月更文挑战第17天】本文介绍在同构与异构数据库间快速迁移/备份表级数据的方法。同构迁移可利用数据库自带工具(如MySQL的`mysqldump`)或管理软件(如phpMyAdmin);异构迁移则推荐使用ETL工具(如Pentaho Data Integration)或数据库复制工具(如SymmetricDS),亦可通过编程方式实现。实施前需测试以确保数据完整准确,并注意处理兼容性问题。
491 4
|
11月前
|
Web App开发 移动开发 前端开发
H5微信外支付(移动端浏览器)
H5微信外支付(移动端浏览器)
368 1
 H5微信外支付(移动端浏览器)
|
存储 数据安全/隐私保护 Python
`zxcvbn`是一个用于密码强度估计的开源库,由Dropbox开发。它基于一系列启发式方法,包括密码字典、常见密码模式、键盘布局等,来评估密码的强度。
`zxcvbn`是一个用于密码强度估计的开源库,由Dropbox开发。它基于一系列启发式方法,包括密码字典、常见密码模式、键盘布局等,来评估密码的强度。
|
Prometheus 监控 Cloud Native
在 HBase 集群中,Prometheus 通常监控哪些类型的性能指标?
在 HBase 集群中,Prometheus 通常监控哪些类型的性能指标?
159 0
|
网络协议 C++ Docker
Docker pull拉取镜像报错“Error response from daemon: Get "https://registry-1.docker.io/v2”解决办法
Docker pull拉取镜像报错“Error response from daemon: Get "https://registry-1.docker.io/v2”解决办法
47751 1
使用ants并发任务,事半功倍
使用ants并发任务,事半功倍
|
人工智能 大数据 程序员
一文看懂开源图化框架中的循环设计逻辑!
相信大家在日常工作中,已经精通各种循环逻辑的实现。就拿我来说吧,多年的工作经验,已经让我可以熟练的使用 C++,Python,英语等多种语言,循环多次输出“hello word”。不过大家有没有想过一个这样的问题:如何在一个有向无环图(Directed Acyclic Graph,简称dag)中实现循环呢?
912 0
一文看懂开源图化框架中的循环设计逻辑!
|
存储 程序员 编译器
【C++ 模板类与虚函数】解析C++中的多态与泛型
【C++ 模板类与虚函数】解析C++中的多态与泛型
300 0