Jaeger链路跟踪埋点和上报代码分析

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Jaeger链路跟踪埋点原理Jaeger埋点数据如何上报?

[https://www.jaegertracing.io/docs/1.17/architecture/](https://www.jaegertracing.io/docs/1.17/architecture/)

##知识点

span:每一小步操作的过程,代表了操作的开始、经过、结束。

trace:一个完整的调用链路,由多个span构成,是整个流程的执行过程。比如从用户发起请求到返回结果这一整个流程,可能经历了数据库查询span,redis查询span,跨服务调用span等。

> 同一个调用链路的span的traceId相同。


>span与span之间的关系:

child_of:涉及操作:同步请求

follows_from:涉及操作:异步请求,springboot错误处理


![image.png](https://upload-images.jianshu.io/upload_images/7220517-4e9215fbfcaa476a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

###jaeger的整体结构:

直接存储db模式

![Illustration of direct-to-storage architecture](https://upload-images.jianshu.io/upload_images/7220517-6fcc7db76c8bf3bf.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

通过Kafka作为过渡存储模式

![Illustration of architecture with Kafka as intermediate buffer](https://upload-images.jianshu.io/upload_images/7220517-d4b0ad157120ba6a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

##主要内容

无论哪一种结构,都要经历样本采集问题,下面我们主要研究埋点代码部分方面的逻辑,即下图红框框住部分

![image.png](https://upload-images.jianshu.io/upload_images/7220517-1d5d817edf1f9fbb.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


##那么jaeger是怎么在方法调用的时候埋点记录的?

1. 拦截器?动态代理?每一次调用记录节点?怎么确定span之间的关系?

2. 这些节点信息在同一个服务是怎么传递的?比如Threadlocal。如果是多线程应该怎么传递?

3. 在不同服务之间是怎么传递?隐式传参?放在header里面?dubbo,http,springcloud?


###举一个完整的调用链过程

![一个完整的调用链](https://upload-images.jianshu.io/upload_images/7220517-a64cd81912a937f1.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

那么jaeger是在哪些节点埋点的?经分析,jaeger分别在如下地方埋点。

![一个完整的代码埋点调用链](https://upload-images.jianshu.io/upload_images/7220517-04a1ea3637b9a99c.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


###那么这些中间节点的实现逻辑又是怎样的?

![每个代码埋点的实现逻辑](https://upload-images.jianshu.io/upload_images/7220517-1ede2808f210d1c9.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


> 下面是详细的代码分析,代码量太多,也可直接跳到各章总结。

###一、span的记录

####1. Servlet请求:

从java-web-servlet开始

![image.png](https://upload-images.jianshu.io/upload_images/7220517-53ecb9cbe2559eeb.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

![image.png](https://upload-images.jianshu.io/upload_images/7220517-75ef9d302f04c1c3.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)



从ServerTracingAutoConfiguration类开始(为什么从这里开始?详情见[springboot如何自动加载](https://www.jianshu.com/p/e1e996265983)

该类初始化了FilterRegistrationBean

> 这个类为springboot的拦截器注册bean,看代码逻辑即加载了TracingFilter拦截器。



```

@Bean

   @ConditionalOnMissingBean(TracingFilter.class)

   public FilterRegistrationBean tracingFilter(Tracer tracer, WebTracingProperties tracingConfiguration) {

       log.info(format("Creating %s bean with %s mapped to %s, skip pattern is \"%s\"",

               FilterRegistrationBean.class.getSimpleName(), TracingFilter.class.getSimpleName(),

               tracingConfiguration.getUrlPatterns().toString(),

               tracingConfiguration.getSkipPattern()));


       List<ServletFilterSpanDecorator> decorators = servletFilterSpanDecorator.getIfAvailable();

       //初始化装饰器(添加一下tag属性)

       if (CollectionUtils.isEmpty(decorators)) {

           decorators = Collections.singletonList(ServletFilterSpanDecorator.STANDARD_TAGS);

       }

       //初始化TracingFilter拦截器

       TracingFilter tracingFilter = new TracingFilter(tracer, decorators, tracingConfiguration.getSkipPattern());


       FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean(tracingFilter);

       filterRegistrationBean.setUrlPatterns(tracingConfiguration.getUrlPatterns());

       filterRegistrationBean.setOrder(tracingConfiguration.getOrder());

       filterRegistrationBean.setAsyncSupported(true);

       return filterRegistrationBean;

   }

```

下面再来看看TracingFilter类,看看他是如何拦截请求的?

io.opentracing.contrib.web.servlet.filter.TracingFilter:

```

@Override

   public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain chain)

           throws IOException, ServletException {

       HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;

       HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;

      //判断该url是否需要跟踪,如果不需要跟踪则跳过。配置项为skipPattern

       if (!isTraced(httpRequest, httpResponse)) {

           chain.doFilter(httpRequest, httpResponse);

           return;

       }


       /**

        * If request is traced then do not start new span.

        */

      //判断请求的head里是否有span上下文,如果有则跳过,因为上一步已经记录了,避免重复记录

       if (servletRequest.getAttribute(SERVER_SPAN_CONTEXT) != null) {

           chain.doFilter(servletRequest, servletResponse);

       } else {

           //重点部分1:创建context

           SpanContext extractedContext = tracer.extract(Format.Builtin.HTTP_HEADERS,

                   new HttpServletRequestExtractAdapter(httpRequest));

          //重点部分2:创建span

           final Scope scope = tracer.buildSpan(httpRequest.getMethod())

                   .asChildOf(extractedContext)

                   .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)

                   .startActive(false);


           httpRequest.setAttribute(SERVER_SPAN_CONTEXT, scope.span().context());


           for (ServletFilterSpanDecorator spanDecorator: spanDecorators) {

               spanDecorator.onRequest(httpRequest, scope.span());

           }


           try {

               chain.doFilter(servletRequest, servletResponse);

…………下面代码忽略……

```

下面再来对上文所提到的重点部分进行分析


##### context的创建

```

SpanContext extractedContext = tracer.extract(Format.Builtin.HTTP_HEADERS,

                   new HttpServletRequestExtractAdapter(httpRequest));

```

> 其中 tracer.extract方法中,会对http header的key进行抽取。详情请看io.jaegertracing.internal.propagation.TextMapCodec


> 1. uber-trace-id(span上下文context)

> 2. uberctx- (baggage items的前缀)

> 3. jaeger-debug-id (debug跟踪id)


> 题外:

如果你要对某一个请求进行调试。可以在请求头中加入

>```

>curl -H "jaeger-debug-id: custom_debug_id12343434" http://localhost:2434

>```

>此时span的tag中会有jaeger-debug-id,可以在ui中通过这个jaeger-debug-id标签进行跟踪

baggage:你希望带上的自定义属性值叫做baggage。在属于同一个context的span中传递。(应避免使用,baggage不能过大)


##### span的创建


scope(个人理解:该scope是对span和上一个span的包装)

```

final Scope scope = tracer.buildSpan(httpRequest.getMethod())

                   .asChildOf(extractedContext) //赋值references

                   .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)//赋值tags

                   .startActive(false);


```

io.jaegertracing.internal.JaegerTracer.SpanBuilder#startActive:

```

@Override

   public Scope startActive(boolean finishSpanOnClose) {

     return scopeManager.activate(start(), finishSpanOnClose);

   }


```

>根据代码跟踪,jaeger用到了ThreadLocalScopeManager类,来管理scope,该类通过ThreadLocal的特性,记录不同线程的最新scope


io.opentracing.util.ThreadLocalScopeManager#activate(io.opentracing.Span, boolean)

```

@Override

   public Scope activate(Span span, boolean finishOnClose) {

       return new ThreadLocalScope(this, span, finishOnClose);

   }

```

>ThreadLocalScope属性介绍:

toRestore:该线程上一个scope

wrapped: 该线程的最新span


>ThreadLocalScope的close方法(恢复上一个scope状态):如果finishOnClose的true,则调用该方法时,会将触发最新span的close方法。最后将当前线程的scope设置为该scope保存的上一个scope。

详情请看io.opentracing.util.ThreadLocalScope#close


下面再来看看最新span的创建过程

io.jaegertracing.internal.JaegerTracer.SpanBuilder#start


```

@Override

   public JaegerSpan start() {

     JaegerSpanContext context;

……上部分代码忽略……

    if (references.isEmpty() || !references.get(0).getSpanContext().hasTrace()) {

     //如果没有reference则创建context(创建traceId,spanId与traceId相同,parentId=0)

       context = createNewContext();

     } else {

       //有reference,创建子context(设置traceId为上一个context的traceId,父parentId为上一个context的spanId,生成自己的spanId)

       context = createChildContext();

     }


     …………


   //创建span

     JaegerSpan jaegerSpan = getObjectFactory().createSpan(

             JaegerTracer.this,

             operationName,

             context,

             startTimeMicroseconds,

             startTimeNanoTicks,

             computeDurationViaNanoTicks,

             tags,

             references);

     if (context.isSampled()) {

       metrics.spansStartedSampled.inc(1);

     } else {

       metrics.spansStartedNotSampled.inc(1);

     }

     return jaegerSpan;

   }

```


下面再来看看span的结束过程

即TracingFilter的doFilter方法的最后过程,先对span触发finish方法,再对scope进行关闭操作

```

scope.span().finish();


```

```

scope.close();

```


###总结1

1. 对于Servlet请求,都会走filter过滤器。jaeger会对请求分发前生成span,该请求后续所做的子操作(如数据库查询、redis查询、跨服务调用)都是该span的子span

2. 应用threadlocal特性,不同线程通过ThreadlocalManager管理。ThreadLocalManager存储的是每个线程的最新scope

3. scope的概念:存储当前的span和上一个scope的内容。

4. scope的作用:可以方便设置父子关系,并恢复到上一个父节点,继续记录与其他子节点的关系


####2. 如果要对其他组件进行监控,使用静态代理模式对原有bean进行封装。如redis、jdbc、mongo

从redis简单分析:

io.opentracing.contrib.redis.spring.data.connection.TracingRedisConnection:

> 该类实现了RedisConnection接口,成员变量为系统中原来的redisConnection

```

   public Object execute(String command, byte[]... args) {

       return this.helper.doInScope(command, () -> {

           return this.connection.execute(command, args);

       });

   }


```

在执行comand前,先创建scope

io.opentracing.contrib.redis.common.TracingHelper#doInScope(java.lang.String, java.util.function.Supplier<T>):

```

public <T> T doInScope(String command, Supplier<T> supplier) {

   Span span = buildSpan(command);

   return activateAndCloseSpan(span, supplier);

 }


```

创建span

io.opentracing.contrib.redis.common.TracingHelper#buildSpan(java.lang.String):

```

public Span buildSpan(String operationName) {

   if (traceWithActiveSpanOnly && tracer.activeSpan() == null) {

     return NoopSpan.INSTANCE;

   } else {

     return builder(operationName).start();

   }

 }

```

将创建的span设置到最新的scope中,执行完后关闭span和scope

io.opentracing.contrib.redis.common.TracingHelper#activateAndCloseSpan(io.opentracing.Span, java.util.function.Supplier<T>)

```

private <T> T activateAndCloseSpan(Span span, Supplier<T> supplier) {

   //try(scope),程序执行完会自动调用scope.close()方法

   try (Scope ignored = tracer.activateSpan(span)) {

     return supplier.get();

   } catch (Exception e) {

     TracingHelper.onError(e, span);

     throw e;

   } finally {

     span.finish();

   }

 }


```

###总结2

1. redis、jdbc、mongo等组件通过静态代理模式对原有组件进行封装实现监控目的

2. 执行前->创建span->设置scope->执行->span触发finish方法->scope触发close方法



#### 3. 不同服务之间的传递(feign)

feign.opentracing.TracingClient#execute

->feign.opentracing.TracingClient#inject //将span注入header (traceId:spanId: parentId:flag)

->delegate.execute(request, options)


### 总结3

对于下一个服务来说,这个是一个servlet请求,所以会走上文提到的servlet过滤器逻辑。


##二、span的上报

![span上报流程](https://upload-images.jianshu.io/upload_images/7220517-08900eadfc58753d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

问题1:怎么确定需要上报?

JaegerAutoConfiguration初始化bean:Sampler,该样本采样器决定span是否需要上报。

ConstSampler:常量采样器(true or false),默认所有都采集

ProbabilisticSampler:概率采样器(samplingRate:0.001,比如0.1%的span会被收集)

RateLimitingSampler:令牌桶采样器(maxTracesPerSecond,一秒多少个Trace)

RemoteControlledSampler:远程控制采样器


问题2:如何上报?

JaegerAutoConfiguration初始化bean:Reporter

```

@ConditionalOnMissingBean

 @Bean

 public Reporter reporter(JaegerConfigurationProperties properties,

     Metrics metrics,

     @Autowired(required = false) ReporterAppender reporterAppender) {


   List<Reporter> reporters = new LinkedList<>();

   RemoteReporter remoteReporter = properties.getRemoteReporter();


   JaegerConfigurationProperties.HttpSender httpSender = properties.getHttpSender();

   if (!StringUtils.isEmpty(httpSender.getUrl())) {

     reporters.add(getHttpReporter(metrics, remoteReporter, httpSender));

   } else {

     reporters.add(getUdpReporter(metrics, remoteReporter, properties.getUdpSender()));

   }


   if (properties.isLogSpans()) {

     reporters.add(new LoggingReporter());

   }


   if (reporterAppender != null) {

     reporterAppender.append(reporters);

   }


   return new CompositeReporter(reporters.toArray(new Reporter[reporters.size()]));

 }

```

由代码可知,实际为CompositeReporter类,即组合所有reporter,报告时将所有reporter遍历执行一遍。

在开发中,可以实现自己的自定义reporter,只需实现ReporterAppender即可。

```

@Component

public class CustomReporter implements ReporterAppender {


   @Override

   public void append(Collection<Reporter> reporters) {

           reporters.add(new CustomLogReporter());

   }

}

```

```

public class CustomLogReporter implements io.jaegertracing.spi.Reporter {


   private Logger logger = LoggerFactory.getLogger(CustomLogReporter.class);

   @Override

   public void report(JaegerSpan span) {

       io.jaegertracing.thriftjava.Span thriftSpan = JaegerThriftSpanConverter.convertSpan(span);

       logger.info("Span reported: {}", JSON.toJSONString(thriftSpan));

   }


   @Override

   public void close() {


   }

}

```


再来看看默认的udp报告器

io.opentracing.contrib.java.spring.jaeger.starter.JaegerAutoConfiguration#createReporter:

```

private Reporter createReporter(Metrics metrics,

     RemoteReporter remoteReporter, Sender udpSender) {

   io.jaegertracing.internal.reporters.RemoteReporter.Builder builder =

       new io.jaegertracing.internal.reporters.RemoteReporter.Builder()

           .withSender(udpSender)

           .withMetrics(metrics);


   if (remoteReporter.getFlushInterval() != null) {

     builder.withFlushInterval(remoteReporter.getFlushInterval());

   }

   if (remoteReporter.getMaxQueueSize() != null) {

     builder.withMaxQueueSize(remoteReporter.getMaxQueueSize());

   }


   return builder.build();

 }

```

```

public RemoteReporter build() {

     if (sender == null) {

       sender = SenderResolver.resolve();

     }

     if (metrics == null) {

       metrics = new Metrics(new InMemoryMetricsFactory());

     }

     return new RemoteReporter(sender, flushInterval, maxQueueSize, closeEnqueTimeout, metrics);

   }

```

可知,http和udp的报告器实际都是RemoteReporter类

```

private RemoteReporter(Sender sender, int flushInterval, int maxQueueSize, int closeEnqueueTimeout,

     Metrics metrics) {

   this.sender = sender;

   this.metrics = metrics;

   this.closeEnqueueTimeout = closeEnqueueTimeout;

   //命令队列(最大size默认100)

   commandQueue = new ArrayBlockingQueue<Command>(maxQueueSize);


   //开启一个队列处理线程去把comand取出来

   queueProcessor = new QueueProcessor();

   queueProcessorThread = new Thread(queueProcessor, "jaeger.RemoteReporter-QueueProcessor");

   queueProcessorThread.setDaemon(true);

   queueProcessorThread.start();

   //FlushTimer定时任务

   flushTimer = new Timer("jaeger.RemoteReporter-FlushTimer", true /* isDaemon */);

   flushTimer.schedule(

       new TimerTask() {

         @Override

         public void run() {

           flush();

         }

       },

       flushInterval,

       flushInterval);

 }

```

队列处理任务:将command从队列取出并执行

```

@ToString

 class QueueProcessor implements Runnable {

   private boolean open = true;


   @Override

   public void run() {

     while (open) {

       try {

         RemoteReporter.Command command = commandQueue.take();

         try {

           command.execute();

         } catch (SenderException e) {

           metrics.reporterFailure.inc(e.getDroppedSpanCount());

         }

       } catch (Exception e) {

         log.error("QueueProcessor error:", e);

         // Do nothing, and try again on next span.

       }

     }

   }

```

FlushTimer定时任务具体逻辑(默认每秒执行一次):往command队列中添加FlushCommand

```

void flush() {

   // to reduce the number of updateGauge stats, we only emit queue length on flush

   metrics.reporterQueueLength.update(commandQueue.size());


   // We can safely drop FlushCommand when the queue is full - sender should take care of flushing

   // in such case

   commandQueue.offer(new FlushCommand());

 }

```

再来看看span的finish方法:

```

@Override

 public void finish(long finishMicros) {

   finishWithDuration(finishMicros - startTimeMicroseconds);

 }


 private void finishWithDuration(long durationMicros) {

   synchronized (this) {

     if (finished) {

       log.warn("Span has already been finished; will not be reported again.");

       return;

     }

     finished = true;


     this.durationMicroseconds = durationMicros;

   }


   if (context.isSampled()) {

     tracer.reportSpan(this);

   }

 }

```

io.jaegertracing.internal.reporters.RemoteReporter#report:

```

@Override

 public void report(JaegerSpan span) {

   // Its better to drop spans, than to block here

   boolean added = commandQueue.offer(new AppendCommand(span));


   if (!added) {

     metrics.reporterDropped.inc(1);

   }

 }

```

> 可知当触发span的finish方法时,实际是向报告器上报span,但是并不是马上发送远程服务器,只是在reporter的command队列中添加AppendCommand


最后再来看看FlushCommand和AppendCommand

```

 class FlushCommand implements Command {

   @Override

   public void execute() throws SenderException {

     int n = sender.flush();

     metrics.reporterSuccess.inc(n);

   }

 }


```

```

class AppendCommand implements Command {

   private final JaegerSpan span;


   public AppendCommand(JaegerSpan span) {

     this.span = span;

   }


   @Override

   public void execute() throws SenderException {

     int n = sender.append(span);

     if (n > 0) {

       metrics.reporterSuccess.inc(n);

     }

   }

 }

```

可知分别用到了sender的append方法和flush方法。

UdpSender:

![image.png](https://upload-images.jianshu.io/upload_images/7220517-ddc8e1f94af71a94.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


> 有兴趣的可细看这两个方法

>1. append方法:实质是sender.append会追加到该bean的成员变量:spanBuffer。下一步判断spanBuffer是否超过maxBytes(默认65000),如果超过则会触发sender的flush方法。

>2. flush方法:将spanBuffer发送到远程服务器


### 总结4

1. 只有一个线程真正执行向远程服务器上报的逻辑

2. span完成后不是马上上报,而是积累到一定的大小或到达一定的时间才会触发上报功能。

3. 当队列最大长度设置过短,线程处理不过来时,span容易丢失。设置过大,又有可能导致内存溢出。

4. 默认每秒会向队列中插入一个发送命令,但该命令不一定一秒后执行,因为有一个线程会处理这个命令队列,一秒后不一定能处理到该发送命令。


自己画的代码分析流程图

https://www.processon.com/view/link/5eb911ab5653bb6f2a00da9b

https://www.processon.com/view/link/5eb913930791290fe0571c8c

https://www.processon.com/view/link/5e9d116c5653bb6efc5bb761
































相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
2月前
|
消息中间件 Java 中间件
链路跟踪-SkyWalking系列(三)
链路跟踪-SkyWalking系列(三)
|
2月前
|
监控 Java Shell
链路跟踪-SkyWalking系列(一)
链路跟踪-SkyWalking系列(一)
|
2月前
|
存储 缓存 数据可视化
链路跟踪-SkyWalking系列(二)
链路跟踪-SkyWalking系列(二)
|
6月前
|
Java 微服务
skywalking链路追踪时忽略指定异常
skywalking链路追踪时忽略指定异常
|
存储 机器学习/深度学习 运维
基础篇丨链路追踪(Tracing)其实很简单(3)
基础篇丨链路追踪(Tracing)其实很简单
186 0
基础篇丨链路追踪(Tracing)其实很简单(3)
|
存储 运维 监控
基础篇丨链路追踪(Tracing)其实很简单(2)
基础篇丨链路追踪(Tracing)其实很简单
160 0
基础篇丨链路追踪(Tracing)其实很简单(2)
|
12月前
|
存储 NoSQL Java
链路跟踪Jaeger使用总结
链路跟踪Jaeger使用总结
152 0
|
数据采集 调度 数据库
基础篇丨链路追踪(Tracing)其实很简单(1)
基础篇丨链路追踪(Tracing)其实很简单
151 0
|
SQL 缓存 运维
使用篇丨链路追踪(Tracing)很简单:链路实时分析、监控与告警
使用篇丨链路追踪(Tracing)很简单:链路实时分析、监控与告警
6535 10
使用篇丨链路追踪(Tracing)很简单:链路实时分析、监控与告警
|
JSON 运维 监控
链路追踪Skywalking应用实战 2
链路追踪Skywalking应用实战
243 0