一、异步链路追踪的概述
SkyWalking 的中构建 Trace 信息时会借助 ThreadLocal来存储一些上下文信息,当遇到跨线程的时候,如果 Trace 的上下文信息没有传递到新线程的ThreadLocal 中,那么链路就断开了。
SkyWalking提供了跨线程构建Trace的能力,通过对 Callable
、Runnable
、Supplier
这3种接口的实现者进行增强拦截,将 Trace 的上下文信息传递到子线程中,实现了异步链路追踪。有非常多的方式来实现Callable
,Runnable
,Supplier
这3种接口,那么增强就面临以下问题:
- 增强所有的实现类显然不可能,必须基于有限的约定
- 不能让使用者大量修改代码,尽可能的基于现有的实现
可能基于以上问题的考虑,SkyWalking提供了一种既通用又快捷的方式来规范这一现象:
- 只拦截增强带有
@TraceCrossThread
注解的类: - 通过装饰的方式包装任务,避免大刀阔斧的修改
原始类 | 提供的包装类 | 拦截方法 | 使用技巧 |
Callable | CallableWrapper | call | CallableWrapper.of(xxxCallable) |
Runnable | RunnableWrapper | run | RunnableWrapper.of(xxxRunable) |
Supplier | SupplierWrapper | get | SupplierWrapper.of(xxxSupplier) |
包装类 都有注解 @TraceCrossThread
,skywalking内部的拦截匹配逻辑是,标注了@TraceCrossThread
的类,拦截 其名称为call
或run
或 get
,且没有入参的方法;对使用者来说大致分为2种方式:
- 自定义类,实现接口
Callable
、Runnable
、Supplier
,加@TraceCrossThread
注解。当需要有更多的自定义属性时,考虑这种方式;参考CallableWrapper
、RunnableWrapper
、SupplierWrapper
的实现方式。 - 通过xxxWrapper.of 装饰的方式,即
CallableWrapper.of(xxxCallable)
、RunnableWrapper.of(xxxRunable)
、SupplierWrapper.of(xxxSupplier)
。大多情况下,通过这种包装模式即可。
二、异步链路追踪的使用
2.1 pom依赖:
<dependency> <groupId>org.apache.skywalking</groupId> <artifactId>apm-toolkit-trace</artifactId> <version>xxx</version> </dependency> 复制代码
2.2 CallableWrapper
Skywalking 通过CallableWrapper
包装Callable
1) thread+callable
private String async_thread_callable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask<String>(CallableWrapper.of(()->{ ActiveSpan.debug("async_Thread_Callable"); String str1 = service.sendMessage(way, time11, time22); return str1; })); new Thread(futureTask).start(); return futureTask.get(); } 复制代码
2)threadPool+callable
private String async_executorService_callable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException { Future<String> callableResult = executorService.submit(CallableWrapper.of(() -> { String str1 = service.sendMessage(way, time11, time22); return str1; })); return (String) callableResult.get(); } 复制代码
2.3 RunnableWrapper
Skywalking 通过RunnableWrapper
包装Runnable
1)thread+runnable
private String async_thread_runnable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException { //忽略返回值 FutureTask futureTask = new FutureTask(RunnableWrapper.of(() -> { String str1 = service.sendMessage(way, time11, time22); }), "mockRunnableResult"); new Thread(futureTask).start(); return (String) futureTask.get(); } 复制代码
2)threadPool+runnable
private String async_executorService_runnable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException { //忽略真实返回值,mock固定返回值 Future<String> mockRunnableResult = executorService.submit(RunnableWrapper.of(() -> { String str1 = service.sendMessage(way, time11, time22); }), "mockRunnableResult"); return (String) mockRunnableResult.get(); } 复制代码
3)completableFuture + runAsync
通过RunnableWrapper.of(xxx)包装rannable即可。
2.4 SupplierWrapper
Skywalking 通过SupplierWrapper<V>
包装Supplier<V>
1) completableFuture + supplyAsync
private String async_completableFuture_supplyAsync(String way,long time11,long time22 ) throws ExecutionException, InterruptedException { CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(SupplierWrapper.of(() -> { String str1 = service.sendMessage(way, time11, time22); return str1; })); return stringCompletableFuture.get(); } 复制代码
三、异步链路追踪的内部原理
Trace 相关的信息需要由能跨越线程的那些对象来搭载,比如 线程A 调用 线程B 的场景:
- 线程A
1. 调用ContextManager.capture()
将 Trace 的上下文信息保存到一个ContextSnapshot
的实例并返回。
2. ContextSnapshot则被附加到任务对象的特定属性中,那么当线程B接触到任务对象时,便能感知到ContextSnapshot。 - 线程B
1. 线程B中,在任务对象的任务方法被执行前,从任务对象的特定属性中获取ContextSnapshot对象,并将其作为入参调用ContextManager.continued(contextSnapshot)
。
2. ContextManager.continued(contextSnapshot)方法中解析出 Trace 的信息后,存储到线程B的线程上下文中。
四、专用属性的机理
SkyWalking Agent 会给被增强的类中扩展一个专用属性的机制是这样的:这个类会被修改,实现了接口EnhancedInstance
,此接口中提供了2个方法来读写这个扩展属性
public interface EnhancedInstance { Object getSkyWalkingDynamicField(); void setSkyWalkingDynamicField(Object value); } 复制代码
这个扩展属性就是一个普通的 Object ,在宿主应用这边感知不到它的存在,因为它不是在宿主应用中定义的;但是在 Agent 的上下文中可将其作为数据载体,在如下这些场景使用:
- 拦截目标类的构造方法,在构造房中new 一个自定义对象,通过
setSkyWalkingDynamicField
赋值给这个专用属性 - 在其他方法中,捕获到不同的数据,暂存到这个专用属性里;在构建 Span 的时候,将专用属性中暂存的数据读取出来,填充至 Span 的相关属性
这种通过在对象中扩展专用属性来在上下文中传递一些信息的方式,我个人的使用感受是比 ThreadLocal 要更舒服,既能实现信息传递,也能解决跨线程的问题。
五、总结
本篇介绍了在SkyWalking构建异步链路 Trace 的多种方法,并描述了异步链路Trace 信息的传递原理,最后还介绍了 SkyWakling 内特殊的增强机制,是如何增加了专用属性以应对上下文之间传递 Trace 信息的需求。
最后说一句(请关注,莫错过)
如果这篇文章对您有帮助,或者有所启发的话,欢迎关注公众号【 架构染色 】进行交流和学习。您的支持是我坚持写作最大的动力。