了解Spring R2DBC的声明式事务实现机制-阿里云开发者社区

开发者社区> 开发与运维> 正文

了解Spring R2DBC的声明式事务实现机制

简介: # Spring非反应式事务实现原理 Spring基于注解和AOP的声明式事务(@Transactional)已经是业务开发的常用工具,默认是采用同步的方式基于ThreadLocal(保存连接信息和会话信息等)实现,在具体数据库操作时就使用同一个数据库连接,并手动提交事务,保证数据正确性。 # 基于反应式的Spring事务有何不同 Spring的反应式实现是基于Reactor框架,该框架

Spring非反应式事务实现原理

Spring基于注解和AOP的声明式事务(@Transactional)已经是业务开发的常用工具,默认是采用同步的方式基于ThreadLocal(保存连接信息和会话信息等)实现,在具体数据库操作时就使用同一个数据库连接,并手动提交事务,保证数据正确性。

基于反应式的Spring事务有何不同

Spring的反应式实现是基于Reactor框架,该框架对异步编程做了高度的抽象化,主动的线程切换只能通过publishOn/subscribeOn跟换线程池,导致在同步场景表现出色的ThreadLocal无法满足全异步化的事务信息存储需求。Reactor 3提供了一种叫做Context的数据结构,用来替代Threadlocal。

Context的传播机制

整体上Context类非常类似一个不可变的Map<Object, Object>,采用CopyOnWrite策略,绑定在每一个订阅者上。但是,context传播具体是怎么实现的呢?有一个简单的例子:

Flux.just(1, 2, 3)
    .flatMap(x -> Mono.subscriberContext()
        .map(context -> String.format("%s%d", context.get("msg"), x))
    ).subscriberContext(context -> context.put("msg", "no."))
    .subscribe(System.out::println);

从代码可以看到是通过subscriberContext方法直接put数据,但是这个Context对象是什么时候创建的呢?查看subscriberContext方法的源码发现方法会创建一个FluxContextStart对象,该对象是InternalFluxOperator的子类,实现了subscribeOrReturn(被订阅时调用),在其中将上下文的操作应用到订阅者已有的上下文,而大多数订阅者初始上下文都是Context.empty()。

final class FluxContextStart<T> extends InternalFluxOperator<T, T> implements Fuseable {
....
    @Override
    public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
        Context c = doOnContext.apply(actual.currentContext());
        return new ContextStartSubscriber<>(actual, c);
    }
....
}

从FluxContextStart的实现可见,Context是由CoreSubscriber的实例所持有,因此Context的传播实际是订阅者实例的传播。而为了避免因为不同操作导致的并发问题,对订阅者的操作都是采用装饰者模式包装一个新的实例,类似Spark RDD的形式。

public final void subscribe(Subscriber<? super T> actual) {
    CorePublisher publisher = Operators.onLastAssembly(this);
    CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

    try {
        ...
        publisher.subscribe(subscriber);
    }
    catch (Throwable e) {
        Operators.reportThrowInSubscribe(subscriber, e);
        return;
    }
}

基于Flux的subscribe方法可见,每次订阅时都会向上游发布者传递订阅者实例,因此Context是自底向上传播。

Spring R2DBC的事务实现

基于对常规声明式事务的认识,找到TransactionAspectSupport#invokeWithinTransaction方法,这个方法定义了声明的事务具体要路由到哪个事务管理器执行。自Spring 5.2 M2之后,Spring开始支持反应式事务,在invokeWithinTransaction方法内可以看到如下代码:

if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
    // 从缓存中获取已经加载的反应式事务管理器
    ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
        if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
            throw new TransactionUsageException(
                    "Unsupported annotated transaction on suspending function detected: " + method +
                    ". Use TransactionalOperator.transactional extensions instead.");
        }
        // 根据返回值类型获取适配器
        ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
        if (adapter == null) {
            throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
                    method.getReturnType());
        }
        return new ReactiveTransactionSupport(adapter);
    });
    // 执行事务
    return txSupport.invokeWithinTransaction(
            method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}

再继续跟进到反应式的txSupport.invokeWithinTransaction里面,会将相关的事务管理器和事务设置等信息放入上文说到的Context中。具体的对于R2DBC的数据库反应式事务而言,其主要调用的是TransactionalOperatorImpl#transactional方法:

@Override
public <T> Mono<T> transactional(Mono<T> mono) {
    return TransactionContextManager.currentContext().flatMap(context -> {
        Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        // Need re-wrapping of ReactiveTransaction until we get hold of the exception
        // through usingWhen.
        return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono,
                this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::commit)
                .onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
    })
    .subscriberContext(TransactionContextManager.getOrCreateContext())
    .subscriberContext(TransactionContextManager.getOrCreateContextHolder());
}    

从代码可以看见,这里也是首先从上下文获取事务信息,保证整个反应式处理的各个操作符都会用到同样的数据库连接,并最终实现声明式事务功能。

总结

Spring实现反应式事务本质上基于Reactor的Context传播机制,结合原有事务机制改造出来的,所以总结下来就两个核心点:1、Reactor的Context是一种类似不可变Map,绑定在每个订阅者自底向上传播;2、Spring反应式事务通过Reactor的Context在不同线程池共享。

参考资料

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
开发与运维
使用钉钉扫一扫加入圈子
+ 订阅

集结各类场景实战经验,助你开发运维畅行无忧

其他文章