Spring非反应式事务实现原理
Spring基于注解和AOP的声明式事务(@Transactional)已经是业务开发的常用工具,默认是采用同步的方式基于ThreadLocal(保存连接信息和会话信息等)实现,在具体数据库操作时就使用同一个数据库连接,并手动提交事务,保证数据正确性。
基于反应式的Spring事务有何不同
Spring的反应式实现是基于Reactor框架,该框架对异步编程做了高度的抽象化,主动的线程切换只能通过publishOn/subscribeOn跟换线程池,导致在同步场景表现出色的ThreadLocal无法满足全异步化的事务信息存储需求。Reactor 3提供了一种叫做Context的数据结构,用来替代Threadlocal。
Context的传播机制
整体上Context类非常类似一个不可变的Map\<Object, Object>,采用CopyOnWrite策略,绑定在每一个订阅者上。但是,context传播具体是怎么实现的呢?有一个简单的例子:
Flux.just(1, 2, 3)
.flatMap(x -> Mono.subscriberContext()
.map(context -> String.format("%s%d", context.get("msg"), x))
).subscriberContext(context -> context.put("msg", "no."))
.subscribe(System.out::println);
从代码可以看到是通过subscriberContext方法直接put数据,但是这个Context对象是什么时候创建的呢?查看subscriberContext方法的源码发现方法会创建一个FluxContextStart对象,该对象是InternalFluxOperator的子类,实现了subscribeOrReturn(被订阅时调用),在其中将上下文的操作应用到订阅者已有的上下文,而大多数订阅者初始上下文都是Context.empty()。
final class FluxContextStart<T> extends InternalFluxOperator<T, T> implements Fuseable {
....
@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
Context c = doOnContext.apply(actual.currentContext());
return new ContextStartSubscriber<>(actual, c);
}
....
}
从FluxContextStart的实现可见,Context是由CoreSubscriber的实例所持有,因此Context的传播实际是订阅者实例的传播。而为了避免因为不同操作导致的并发问题,对订阅者的操作都是采用装饰者模式包装一个新的实例,类似Spark RDD的形式。
public final void subscribe(Subscriber<? super T> actual) {
CorePublisher publisher = Operators.onLastAssembly(this);
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
try {
...
publisher.subscribe(subscriber);
}
catch (Throwable e) {
Operators.reportThrowInSubscribe(subscriber, e);
return;
}
}
基于Flux的subscribe方法可见,每次订阅时都会向上游发布者传递订阅者实例,因此Context是自底向上传播。
Spring R2DBC的事务实现
基于对常规声明式事务的认识,找到TransactionAspectSupport#invokeWithinTransaction方法,这个方法定义了声明的事务具体要路由到哪个事务管理器执行。自Spring 5.2 M2之后,Spring开始支持反应式事务,在invokeWithinTransaction方法内可以看到如下代码:
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
// 从缓存中获取已经加载的反应式事务管理器
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException(
"Unsupported annotated transaction on suspending function detected: " + method +
". Use TransactionalOperator.transactional extensions instead.");
}
// 根据返回值类型获取适配器
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
method.getReturnType());
}
return new ReactiveTransactionSupport(adapter);
});
// 执行事务
return txSupport.invokeWithinTransaction(
method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}
再继续跟进到反应式的txSupport.invokeWithinTransaction里面,会将相关的事务管理器和事务设置等信息放入上文说到的Context中。具体的对于R2DBC的数据库反应式事务而言,其主要调用的是TransactionalOperatorImpl#transactional方法:
@Override
public <T> Mono<T> transactional(Mono<T> mono) {
return TransactionContextManager.currentContext().flatMap(context -> {
Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
// through usingWhen.
return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono,
this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::commit)
.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
})
.subscriberContext(TransactionContextManager.getOrCreateContext())
.subscriberContext(TransactionContextManager.getOrCreateContextHolder());
}
从代码可以看见,这里也是首先从上下文获取事务信息,保证整个反应式处理的各个操作符都会用到同样的数据库连接,并最终实现声明式事务功能。
总结
Spring实现反应式事务本质上基于Reactor的Context传播机制,结合原有事务机制改造出来的,所以总结下来就两个核心点:1、Reactor的Context是一种类似不可变Map,绑定在每个订阅者自底向上传播;2、Spring反应式事务通过Reactor的Context在不同线程池共享。