skywalking09 - 异步线程链路续接(下)

本文涉及的产品
应用实时监控服务-应用监控,每月50GB免费额度
简介: skywalking09 - 异步线程链路续接(下)

skywalking09 - 异步线程链路续接(下)–源码分析

在上篇,我们提到了,多线程可能会导致链路断开,而可以通过三种方式将其接上。那你有没有好奇,为什么它会断开,它又是怎么接上的呢?

链路为何断开

要知道链路为何断开,我们就需要知道,正常情况下的链路是如何工作的,几个Span之间是如何接在一起的。我们可以通过第四篇提到的@Trace注解进行入手,这个注解会增加一个Span。

正常情况下@Trace添加Span

对skywalking源码有一定了解的你一定知道,其对类做修改增强的时候,会定义一个该类全类名的字符串,以及会用来增强该类的增强类的全类名,所以我们找到了TraceAnnotationActivation:

/**
 * {@link TraceAnnotationActivation} enhance all method that annotated with <code>org.apache.skywalking.apm.toolkit.trace.annotation.Trace</code>
 * by <code>TraceAnnotationMethodInterceptor</code>.
 */
public class TraceAnnotationActivation extends ClassInstanceMethodsEnhancePluginDefine {
   // 用来增强的类
    public static final String TRACE_ANNOTATION_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.toolkit.activation.trace.TraceAnnotationMethodInterceptor";
    // 被增强处理的注解
    public static final String TRACE_ANNOTATION = "org.apache.skywalking.apm.toolkit.trace.Trace";
    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[0];
    }
    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[] {
            new DeclaredInstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return isAnnotatedWith(named(TRACE_ANNOTATION));
                }
                @Override
                public String getMethodsInterceptor() {
                    return TRACE_ANNOTATION_METHOD_INTERCEPTOR;
                }
                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }
    @Override
    protected ClassMatch enhanceClass() {
        return MethodAnnotationMatch.byMethodAnnotationMatch(TRACE_ANNOTATION);
    }
}

然后我们去翻查TraceAnnotationMethodInterceptor:

/**
 * {@link TraceAnnotationMethodInterceptor} create a local span and set the operation name which fetch from
 * <code>org.apache.skywalking.apm.toolkit.trace.annotation.Trace.operationName</code>. if the fetch value is blank
 * string, and the operation name will be the method name.
 */
public class TraceAnnotationMethodInterceptor implements InstanceMethodsAroundInterceptor {
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
        Trace trace = method.getAnnotation(Trace.class);
        final AbstractSpan localSpan = ContextManager.createLocalSpan(operationName);
    }
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable {
        ContextManager.stopSpan();
        return ret;
    }
}

代码做了一定精简,我们可以看到,在beforeMethod()方法中,其使用ContextManager创建了一个LocalSpan,在afterMethod()方法中,使用了ContextManager停止了Span。而ContextManager是skywalking链路管理的一个核心的类,那么也一定是它在创建Span的时候没有续接上导致的。

ContextManager创建Span的核心

/**
 * {@link ContextManager} controls the whole context of {@link TraceSegment}. Any {@link TraceSegment} relates to
 * single-thread, so this context use {@link ThreadLocal} to maintain the context, and make sure, since a {@link
 * TraceSegment} starts, all ChildOf spans are in the same context. <p> What is 'ChildOf'?
 * https://github.com/opentracing/specification/blob/master/specification.md#references-between-spans
 *
 * <p> Also, {@link ContextManager} delegates to all {@link AbstractTracerContext}'s major methods.
 */
public class ContextManager implements BootService {
    private static final String EMPTY_TRACE_CONTEXT_ID = "N/A";
    private static final ILog LOGGER = LogManager.getLogger(ContextManager.class);
    private static ThreadLocal<AbstractTracerContext> CONTEXT = new ThreadLocal<AbstractTracerContext>();
    private static ThreadLocal<RuntimeContext> RUNTIME_CONTEXT = new ThreadLocal<RuntimeContext>();
    private static ContextManagerExtendService EXTEND_SERVICE;
    private static AbstractTracerContext getOrCreate(String operationName, boolean forceSampling) {
        AbstractTracerContext context = CONTEXT.get();
        if (context == null) {
            if (StringUtil.isEmpty(operationName)) {
                if (LOGGER.isDebugEnable()) {
                    LOGGER.debug("No operation name, ignore this trace.");
                }
                context = new IgnoredTracerContext();
            } else {
                if (EXTEND_SERVICE == null) {
                    EXTEND_SERVICE = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);
                }
                context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);
            }
            CONTEXT.set(context);
        }
        return context;
    }
    private static AbstractTracerContext get() {
        return CONTEXT.get();
    }
        public static AbstractSpan createLocalSpan(String operationName) {
        operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
        AbstractTracerContext context = getOrCreate(operationName, false);
        return context.createLocalSpan(operationName);
    }
}
  • 看到人家写的注释没,“Any TraceSegment relates to single-thread, so this context use ThreadLocal to maintain the context, and make sure, since a TraceSegment starts, all ChildOf spans are in the same context.” 一条链路就是一个单线程的,所以用了ThreadLocal来保存,让我们自己来保证,子Span是同一个上下文中的。
  • ThreadLocal<AbstractTracerContext> CONTEXT 这一个变量,用来存Span,那难怪了,新的线程中,它就是断开的。

链路如何续接

我们搞清楚了,断开是因为CONTEXT是存在ThreadLocal中的,导致新的线程中没有上下文,那么我们只要将父线程的上下文传入进去,就可以完成续接。那让我们来看看skywalking是怎么做的。我们以@TraceCrossThread为例,其他方式大体思路是一致的。

/**
 * {@link CallableOrRunnableActivation} presents that skywalking intercepts all Class with annotation
 * "org.skywalking.apm.toolkit.trace.TraceCrossThread" and method named "call" or "run".
 */
public class CallableOrRunnableActivation extends ClassInstanceMethodsEnhancePluginDefine {
    public static final String ANNOTATION_NAME = "org.apache.skywalking.apm.toolkit.trace.TraceCrossThread";
    private static final String INIT_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.toolkit.activation.trace.CallableOrRunnableConstructInterceptor";
    private static final String CALL_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.toolkit.activation.trace.CallableOrRunnableInvokeInterceptor";
    private static final String CALL_METHOD_NAME = "call";
    private static final String RUN_METHOD_NAME = "run";
    private static final String GET_METHOD_NAME = "get";
    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[] {
            new ConstructorInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getConstructorMatcher() {
                    return any();
                }
                @Override
                public String getConstructorInterceptor() {
                    return INIT_METHOD_INTERCEPTOR;
                }
            }
        };
    }
    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[] {
            new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named(CALL_METHOD_NAME)
                        .and(takesArguments(0))
                        .or(named(RUN_METHOD_NAME).and(takesArguments(0)))
                        .or(named(GET_METHOD_NAME).and(takesArguments(0)));
                }
                @Override
                public String getMethodsInterceptor() {
                    return CALL_METHOD_INTERCEPTOR;
                }
                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }
    @Override
    protected ClassMatch enhanceClass() {
        return byClassAnnotationMatch(new String[] {ANNOTATION_NAME});
    }
}

通过全局搜索,我们找到CallableOrRunnableActivation,它完成了对"org.skywalking.apm.toolkit.trace.TraceCrossThread" and method named “call” or "run"的增强。增强方式分为构造时增强、以及对方法的增强。

CallableOrRunnableConstructInterceptor

public class CallableOrRunnableConstructInterceptor implements InstanceConstructorInterceptor {
    @Override
    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
        if (ContextManager.isActive()) {
            objInst.setSkyWalkingDynamicField(ContextManager.capture());
        }
    }
}

在构造的时候,ContextManager对当前的上下文做了一次快照,并存到skyWalkingDynamicField这个动态属性中,共子线程来取。

CallableOrRunnableInvokeInterceptor

public class CallableOrRunnableInvokeInterceptor implements InstanceMethodsAroundInterceptor {
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        MethodInterceptResult result) throws Throwable {
        ContextManager.createLocalSpan("Thread/" + objInst.getClass().getName() + "/" + method.getName());
        ContextSnapshot cachedObjects = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
        if (cachedObjects != null) {
            ContextManager.continued(cachedObjects);
        }
    }
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        Object ret) throws Throwable {
        ContextManager.stopSpan();
        // clear ContextSnapshot
        objInst.setSkyWalkingDynamicField(null);
        return ret;
    }
}

这个类,将skyWalkingDynamicField这个动态属性中的内容取出,并通过“ContextManager.continued(cachedObjects);”完成了续接。最后,在afterMethod()中,也完成了对Span的关闭。

TracingContext#continued

/**
     * Continue the context from the given snapshot of parent thread.
     *
     * @param snapshot from {@link #capture()} in the parent thread. Ref to {@link AbstractTracerContext#continued(ContextSnapshot)}
     */
    @Override
    public void continued(ContextSnapshot snapshot) {
        if (snapshot.isValid()) {
            TraceSegmentRef segmentRef = new TraceSegmentRef(snapshot);
            this.segment.ref(segmentRef);
            this.activeSpan().ref(segmentRef);
            this.segment.relatedGlobalTraces(snapshot.getTraceId());
            this.correlationContext.continued(snapshot);
            this.extensionContext.continued(snapshot);
            this.extensionContext.handle(this.activeSpan());
        }
    }

这个注释也很明白的说明了,这个上下文会将父线程的快照进行续接。

总结

第一步,通过对对象的构造方法进行增强,将链路上下文快照作为动态属性赋值给子线程;第二步,子线程的异步方法在开始前,将快照续接上并创建新的Span,方法结束后将Span关闭。

相关实践学习
通过轻量消息队列(原MNS)主题HTTP订阅+ARMS实现自定义数据多渠道告警
本场景将自定义告警信息同时分发至多个通知渠道的需求,例如短信、电子邮件及钉钉群组等。通过采用轻量消息队列(原 MNS)的主题模型的HTTP订阅方式,并结合应用实时监控服务提供的自定义集成能力,使得您能够以简便的配置方式实现上述多渠道同步通知的功能。
目录
相关文章
|
4月前
|
数据采集 存储 JSON
Python爬取知乎评论:多线程与异步爬虫的性能优化
Python爬取知乎评论:多线程与异步爬虫的性能优化
|
4月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
编解码 数据安全/隐私保护 计算机视觉
Opencv学习笔记(十):同步和异步(多线程)操作打开海康摄像头
如何使用OpenCV进行同步和异步操作来打开海康摄像头,并提供了相关的代码示例。
770 1
Opencv学习笔记(十):同步和异步(多线程)操作打开海康摄像头
|
9月前
|
缓存 安全 Java
面试中的难题:线程异步执行后如何共享数据?
本文通过一个面试故事,详细讲解了Java中线程内部开启异步操作后如何安全地共享数据。介绍了异步操作的基本概念及常见实现方式(如CompletableFuture、ExecutorService),并重点探讨了volatile关键字、CountDownLatch和CompletableFuture等工具在线程间数据共享中的应用,帮助读者理解线程安全和内存可见性问题。通过这些方法,可以有效解决多线程环境下的数据共享挑战,提升编程效率和代码健壮性。
311 6
|
10月前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
379 17
|
Java 数据库
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
这篇文章通过一个电商商品详情页的实战案例,展示了如何使用`CompletableFuture`进行异步编排,以解决在不同数据库表中查询商品信息的问题,并提供了详细的代码实现和遇到问题(如图片未显示)的解决方案。
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
|
设计模式 缓存 Java
谷粒商城笔记+踩坑(14)——异步和线程池
初始化线程的4种方式、线程池详解、异步编排 CompletableFuture
谷粒商城笔记+踩坑(14)——异步和线程池
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
336 0
|
网络协议 安全 Java
难懂,误点!将多线程技术应用于Python的异步事件循环
难懂,误点!将多线程技术应用于Python的异步事件循环
异步&线程池 CompletableFuture 异步编排 【下篇】
这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。
异步&线程池 CompletableFuture 异步编排 【下篇】

热门文章

最新文章