skywalking09 - 异步线程链路续接(下)

本文涉及的产品
应用实时监控服务-用户体验监控,每月100OCU免费额度
应用实时监控服务-应用监控,每月50GB免费额度
简介: skywalking09 - 异步线程链路续接(下)

skywalking09 - 异步线程链路续接(下)–源码分析

在上篇,我们提到了,多线程可能会导致链路断开,而可以通过三种方式将其接上。那你有没有好奇,为什么它会断开,它又是怎么接上的呢?

链路为何断开

要知道链路为何断开,我们就需要知道,正常情况下的链路是如何工作的,几个Span之间是如何接在一起的。我们可以通过第四篇提到的@Trace注解进行入手,这个注解会增加一个Span。

正常情况下@Trace添加Span

对skywalking源码有一定了解的你一定知道,其对类做修改增强的时候,会定义一个该类全类名的字符串,以及会用来增强该类的增强类的全类名,所以我们找到了TraceAnnotationActivation:

/**
 * {@link TraceAnnotationActivation} enhance all method that annotated with <code>org.apache.skywalking.apm.toolkit.trace.annotation.Trace</code>
 * by <code>TraceAnnotationMethodInterceptor</code>.
 */
public class TraceAnnotationActivation extends ClassInstanceMethodsEnhancePluginDefine {
   // 用来增强的类
    public static final String TRACE_ANNOTATION_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.toolkit.activation.trace.TraceAnnotationMethodInterceptor";
    // 被增强处理的注解
    public static final String TRACE_ANNOTATION = "org.apache.skywalking.apm.toolkit.trace.Trace";
    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[0];
    }
    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[] {
            new DeclaredInstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return isAnnotatedWith(named(TRACE_ANNOTATION));
                }
                @Override
                public String getMethodsInterceptor() {
                    return TRACE_ANNOTATION_METHOD_INTERCEPTOR;
                }
                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }
    @Override
    protected ClassMatch enhanceClass() {
        return MethodAnnotationMatch.byMethodAnnotationMatch(TRACE_ANNOTATION);
    }
}

然后我们去翻查TraceAnnotationMethodInterceptor:

/**
 * {@link TraceAnnotationMethodInterceptor} create a local span and set the operation name which fetch from
 * <code>org.apache.skywalking.apm.toolkit.trace.annotation.Trace.operationName</code>. if the fetch value is blank
 * string, and the operation name will be the method name.
 */
public class TraceAnnotationMethodInterceptor implements InstanceMethodsAroundInterceptor {
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
        Trace trace = method.getAnnotation(Trace.class);
        final AbstractSpan localSpan = ContextManager.createLocalSpan(operationName);
    }
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable {
        ContextManager.stopSpan();
        return ret;
    }
}

代码做了一定精简,我们可以看到,在beforeMethod()方法中,其使用ContextManager创建了一个LocalSpan,在afterMethod()方法中,使用了ContextManager停止了Span。而ContextManager是skywalking链路管理的一个核心的类,那么也一定是它在创建Span的时候没有续接上导致的。

ContextManager创建Span的核心

/**
 * {@link ContextManager} controls the whole context of {@link TraceSegment}. Any {@link TraceSegment} relates to
 * single-thread, so this context use {@link ThreadLocal} to maintain the context, and make sure, since a {@link
 * TraceSegment} starts, all ChildOf spans are in the same context. <p> What is 'ChildOf'?
 * https://github.com/opentracing/specification/blob/master/specification.md#references-between-spans
 *
 * <p> Also, {@link ContextManager} delegates to all {@link AbstractTracerContext}'s major methods.
 */
public class ContextManager implements BootService {
    private static final String EMPTY_TRACE_CONTEXT_ID = "N/A";
    private static final ILog LOGGER = LogManager.getLogger(ContextManager.class);
    private static ThreadLocal<AbstractTracerContext> CONTEXT = new ThreadLocal<AbstractTracerContext>();
    private static ThreadLocal<RuntimeContext> RUNTIME_CONTEXT = new ThreadLocal<RuntimeContext>();
    private static ContextManagerExtendService EXTEND_SERVICE;
    private static AbstractTracerContext getOrCreate(String operationName, boolean forceSampling) {
        AbstractTracerContext context = CONTEXT.get();
        if (context == null) {
            if (StringUtil.isEmpty(operationName)) {
                if (LOGGER.isDebugEnable()) {
                    LOGGER.debug("No operation name, ignore this trace.");
                }
                context = new IgnoredTracerContext();
            } else {
                if (EXTEND_SERVICE == null) {
                    EXTEND_SERVICE = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);
                }
                context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);
            }
            CONTEXT.set(context);
        }
        return context;
    }
    private static AbstractTracerContext get() {
        return CONTEXT.get();
    }
        public static AbstractSpan createLocalSpan(String operationName) {
        operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
        AbstractTracerContext context = getOrCreate(operationName, false);
        return context.createLocalSpan(operationName);
    }
}
  • 看到人家写的注释没,“Any TraceSegment relates to single-thread, so this context use ThreadLocal to maintain the context, and make sure, since a TraceSegment starts, all ChildOf spans are in the same context.” 一条链路就是一个单线程的,所以用了ThreadLocal来保存,让我们自己来保证,子Span是同一个上下文中的。
  • ThreadLocal<AbstractTracerContext> CONTEXT 这一个变量,用来存Span,那难怪了,新的线程中,它就是断开的。

链路如何续接

我们搞清楚了,断开是因为CONTEXT是存在ThreadLocal中的,导致新的线程中没有上下文,那么我们只要将父线程的上下文传入进去,就可以完成续接。那让我们来看看skywalking是怎么做的。我们以@TraceCrossThread为例,其他方式大体思路是一致的。

/**
 * {@link CallableOrRunnableActivation} presents that skywalking intercepts all Class with annotation
 * "org.skywalking.apm.toolkit.trace.TraceCrossThread" and method named "call" or "run".
 */
public class CallableOrRunnableActivation extends ClassInstanceMethodsEnhancePluginDefine {
    public static final String ANNOTATION_NAME = "org.apache.skywalking.apm.toolkit.trace.TraceCrossThread";
    private static final String INIT_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.toolkit.activation.trace.CallableOrRunnableConstructInterceptor";
    private static final String CALL_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.toolkit.activation.trace.CallableOrRunnableInvokeInterceptor";
    private static final String CALL_METHOD_NAME = "call";
    private static final String RUN_METHOD_NAME = "run";
    private static final String GET_METHOD_NAME = "get";
    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[] {
            new ConstructorInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getConstructorMatcher() {
                    return any();
                }
                @Override
                public String getConstructorInterceptor() {
                    return INIT_METHOD_INTERCEPTOR;
                }
            }
        };
    }
    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[] {
            new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named(CALL_METHOD_NAME)
                        .and(takesArguments(0))
                        .or(named(RUN_METHOD_NAME).and(takesArguments(0)))
                        .or(named(GET_METHOD_NAME).and(takesArguments(0)));
                }
                @Override
                public String getMethodsInterceptor() {
                    return CALL_METHOD_INTERCEPTOR;
                }
                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }
    @Override
    protected ClassMatch enhanceClass() {
        return byClassAnnotationMatch(new String[] {ANNOTATION_NAME});
    }
}

通过全局搜索,我们找到CallableOrRunnableActivation,它完成了对"org.skywalking.apm.toolkit.trace.TraceCrossThread" and method named “call” or "run"的增强。增强方式分为构造时增强、以及对方法的增强。

CallableOrRunnableConstructInterceptor

public class CallableOrRunnableConstructInterceptor implements InstanceConstructorInterceptor {
    @Override
    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
        if (ContextManager.isActive()) {
            objInst.setSkyWalkingDynamicField(ContextManager.capture());
        }
    }
}

在构造的时候,ContextManager对当前的上下文做了一次快照,并存到skyWalkingDynamicField这个动态属性中,共子线程来取。

CallableOrRunnableInvokeInterceptor

public class CallableOrRunnableInvokeInterceptor implements InstanceMethodsAroundInterceptor {
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        MethodInterceptResult result) throws Throwable {
        ContextManager.createLocalSpan("Thread/" + objInst.getClass().getName() + "/" + method.getName());
        ContextSnapshot cachedObjects = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
        if (cachedObjects != null) {
            ContextManager.continued(cachedObjects);
        }
    }
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        Object ret) throws Throwable {
        ContextManager.stopSpan();
        // clear ContextSnapshot
        objInst.setSkyWalkingDynamicField(null);
        return ret;
    }
}

这个类,将skyWalkingDynamicField这个动态属性中的内容取出,并通过“ContextManager.continued(cachedObjects);”完成了续接。最后,在afterMethod()中,也完成了对Span的关闭。

TracingContext#continued

/**
     * Continue the context from the given snapshot of parent thread.
     *
     * @param snapshot from {@link #capture()} in the parent thread. Ref to {@link AbstractTracerContext#continued(ContextSnapshot)}
     */
    @Override
    public void continued(ContextSnapshot snapshot) {
        if (snapshot.isValid()) {
            TraceSegmentRef segmentRef = new TraceSegmentRef(snapshot);
            this.segment.ref(segmentRef);
            this.activeSpan().ref(segmentRef);
            this.segment.relatedGlobalTraces(snapshot.getTraceId());
            this.correlationContext.continued(snapshot);
            this.extensionContext.continued(snapshot);
            this.extensionContext.handle(this.activeSpan());
        }
    }

这个注释也很明白的说明了,这个上下文会将父线程的快照进行续接。

总结

第一步,通过对对象的构造方法进行增强,将链路上下文快照作为动态属性赋值给子线程;第二步,子线程的异步方法在开始前,将快照续接上并创建新的Span,方法结束后将Span关闭。

相关实践学习
通过云拨测对指定服务器进行Ping/DNS监测
本实验将通过云拨测对指定服务器进行Ping/DNS监测,评估网站服务质量和用户体验。
目录
相关文章
|
3月前
|
编解码 数据安全/隐私保护 计算机视觉
Opencv学习笔记(十):同步和异步(多线程)操作打开海康摄像头
如何使用OpenCV进行同步和异步操作来打开海康摄像头,并提供了相关的代码示例。
130 1
Opencv学习笔记(十):同步和异步(多线程)操作打开海康摄像头
|
3月前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
|
3月前
|
网络协议 安全 Java
难懂,误点!将多线程技术应用于Python的异步事件循环
难懂,误点!将多线程技术应用于Python的异步事件循环
98 0
|
5月前
|
缓存 Java
异步&线程池 线程池的七大参数 初始化线程的4种方式 【上篇】
这篇文章详细介绍了Java中线程的四种初始化方式,包括继承Thread类、实现Runnable接口、实现Callable接口与FutureTask结合使用,以及使用线程池。同时,还深入探讨了线程池的七大参数及其作用,解释了线程池的运行流程,并列举了四种常见的线程池类型。最后,阐述了在开发中使用线程池的原因,如降低资源消耗、提高响应速度和增强线程的可管理性。
异步&线程池 线程池的七大参数 初始化线程的4种方式 【上篇】
|
5月前
|
Java 数据库
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
这篇文章通过一个电商商品详情页的实战案例,展示了如何使用`CompletableFuture`进行异步编排,以解决在不同数据库表中查询商品信息的问题,并提供了详细的代码实现和遇到问题(如图片未显示)的解决方案。
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
|
4月前
|
设计模式 缓存 Java
谷粒商城笔记+踩坑(14)——异步和线程池
初始化线程的4种方式、线程池详解、异步编排 CompletableFuture
|
5月前
|
Java
异步&线程池 CompletableFuture 异步编排 【下篇】
这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。
异步&线程池 CompletableFuture 异步编排 【下篇】
|
5月前
|
数据采集 Python
多线程和异步
【8月更文挑战第12天】
46 3
|
6月前
|
Java Spring 容器
Spring boot 自定义ThreadPoolTaskExecutor 线程池并进行异步操作
Spring boot 自定义ThreadPoolTaskExecutor 线程池并进行异步操作
293 3
|
5月前
|
Dart API C语言
Dart ffi 使用问题之想在C/C++中创建异步线程来调用Dart方法,如何操作
Dart ffi 使用问题之想在C/C++中创建异步线程来调用Dart方法,如何操作