本系列是 我TM人傻了 系列第五期[捂脸],往期精彩回顾:
升级到Spring 5.3.x之后,GC次数急剧增加,我TM人傻了
这个大表走索引字段查询的 SQL 怎么就成全扫描了,我TM人傻了
spring-data-redis 连接泄漏,我 TM 人傻了
本篇文章涉及底层设计以及原理,以及问题定位和可能的问题点,非常深入,篇幅较长,所以拆分成上中下三篇:
- 上:问题简单描述以及 Spring Cloud Gateway 基本结构和流程以及底层原理
- 中:Spring Cloud Sleuth 如何在 Spring Cloud Gateway 加入的链路追踪以及为何会出现这个问题
- 下:现有 Spring Cloud Sleuth 的非侵入设计带来的性能问题,其他可能的问题点,以及如何解决
Spring Cloud Gateway 其他的可能丢失链路信息的点
经过前面的分析,我们可以看出,不止这里,还有其他地方会导致 Spring Cloud Sleuth 的链路追踪信息消失,这里举几个大家常见的例子:
1.在 GatewayFilter 中指定了异步执行某些任务,由于线程切换了,并且这时候可能 Span 已经结束了,所以没有链路信息,例如:
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return chain.filter(exchange).publishOn(Schedulers.parallel()).doOnSuccess(o -> { //这里就没有链路信息了 log.info("success"); }); }
2.将 GatewayFilter 中继续链路的chain.filter(exchange)
放到了异步任务中执行,上面的 AdaptCachedBodyGlobalFilter 就属于这种情况,这样会导致之后的 GatewayFilter 都没有链路信息,例如:
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return Mono.delay(Duration.ofSeconds(1)).then(chain.filter(exchange)); }
Java 并发编程模型与 Project Reactor 编程模型的冲突思考
Java 中的很多框架,都用到了 ThreadLocal,或者通过 Thread 来标识唯一性。例如:
- 日志框架中的 MDC,一般都是 ThreadLocal 实现。
- 所有的锁、基于 AQS 的数据结构,都是通过 Thread 的属性来唯一标识谁获取到了锁的。
- 分布式锁等数据结构,也是通过 Thread 的属性来唯一标识谁获取到了锁的,例如 Redisson 中分布式 Redis 锁的实现。
但是放到 Project Reactor 编程模型,这就显得格格不入了,因为 Project Reactor 异步响应式编程就是不固定线程,没法保证提交任务和回调能在同一个线程,所以 ThreadLocal 的语义在这里很难成立。Project Reactor 虽然提供了对标 ThreadLocal 的 Context,但是主流框架还没有兼容这个 Context,所以给 Spring Cloud Sleuth 粘合这些链路追踪带来了很大困难,因为 MDC 是一个 ThreadLocal 的 Map 实现,而不是基于 Context 的 Map。这就需要 Spring Cloud Sleuth 在订阅一开始,就需要将链路信息放入 MDC,同时还需要保证运行时不切换线程。
运行不切换线程,这样其实限制了 Project Reactor 的灵活调度,是有一些性能损失的。我们其实想尽量就算加入了链路追踪信息,也不用强制运行不切换线程。但是 Spring Cloud Sleuth 是非侵入式设计,很难实现这一点。但是对于我们自己业务的使用,我们可以定制一些编程规范,来保证大家写的代码不丢失链路信息。
改进我们的编程规范
首先,我们自定义 Mono 和 Flux 的工厂
公共 Subscriber 封装,将 reactor Subscriber 的所有关键接口,都检查当前上下文是否有链路信息,即 Span,如果没有就包裹上,如果有则直接执行即可。
public class TracedCoreSubscriber<T> implements Subscriber<T>{ private final Subscriber<T> delegate; private final Tracer tracer; private final CurrentTraceContext currentTraceContext; private final Span span; TracedCoreSubscriber(Subscriber<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) { this.delegate = delegate; this.tracer = tracer; this.currentTraceContext = currentTraceContext; this.span = span; } @Override public void onSubscribe(Subscription s) { executeWithinScope(() -> { delegate.onSubscribe(s); }); } @Override public void onError(Throwable t) { executeWithinScope(() -> { delegate.onError(t); }); } @Override public void onComplete() { executeWithinScope(() -> { delegate.onComplete(); }); } @Override public void onNext(T o) { executeWithinScope(() -> { delegate.onNext(o); }); } private void executeWithinScope(Runnable runnable) { //如果当前没有链路信息,强制包裹 if (tracer.currentSpan() == null) { try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(this.span.context())) { runnable.run(); } } else { //如果当前已有链路信息,则直接执行 runnable.run(); } } }
之后分别定义所有 Flux 的代理 TracedFlux,和所有 Mono 的代理 TracedMono,其实就是在 subscribe 的时候,用 TracedCoreSubscriber 包装传入的 CoreSubscriber:
public class TracedFlux<T> extends Flux<T> { private final Flux<T> delegate; private final Tracer tracer; private final CurrentTraceContext currentTraceContext; private final Span span; TracedFlux(Flux<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) { this.delegate = delegate; this.tracer = tracer; this.currentTraceContext = currentTraceContext; this.span = span; } @Override public void subscribe(CoreSubscriber<? super T> actual) { delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span)); } } public class TracedMono<T> extends Mono<T> { private final Mono<T> delegate; private final Tracer tracer; private final CurrentTraceContext currentTraceContext; private final Span span; TracedMono(Mono<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) { this.delegate = delegate; this.tracer = tracer; this.currentTraceContext = currentTraceContext; this.span = span; } @Override public void subscribe(CoreSubscriber<? super T> actual) { delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span)); } }
定义工厂类,使用请求 ServerWebExchange 和原始 Flux 创建 TracedFlux,以及使用请求 ServerWebExchange 和原始 Mono 创建 TracedMono,并且 Span 是通过 Attributes 获取的,根据前文的源码分析我们知道,这个 Attribute 是通过 TraceWebFilter 放入 Attributes 的。由于我们只在 GatewayFilter 中使用,一定在 TraceWebFilter 之后 所以这个 Attribute 一定存在。
@Component public class TracedPublisherFactory { protected static final String TRACE_REQUEST_ATTR = Span.class.getName(); @Autowired private Tracer tracer; @Autowired private CurrentTraceContext currentTraceContext; public <T> Flux<T> getTracedFlux(Flux<T> publisher, ServerWebExchange exchange) { return new TracedFlux<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR)); } public <T> Mono<T> getTracedMono(Mono<T> publisher, ServerWebExchange exchange) { return new TracedMono<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR)); } }
然后,我们规定:1. 所有的 GatewayFilter,需要继承我们自定义的抽象类,这个抽象类仅仅是把 filter 的结果用 TracedPublisherFactory 的 getTracedMono 给封装了一层 TracedMono,以 GlobalFilter 为例子:
public abstract class AbstractTracedFilter implements GlobalFilter { @Autowired protected TracedPublisherFactory tracedPublisherFactory; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return tracedPublisherFactory.getTracedMono(traced(exchange, chain), exchange); } protected abstract Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain); }
2. GatewayFilter 中新生成的 Flux 或者 Mono,统一使用 TracedPublisherFactory 再封装一层。
3. 对于 AdaptCachedBodyGlobalFilter 读取 Request Body 导致的链路丢失,我向社区提了一个 Pull Request:fix #2004 Span is not terminated properly in Spring Cloud Gateway,大家可以参考。也可以在这个 Filter 之前自己将 Request Body 使用 TracedPublisherFactory 进行封装解决。