了解Spring R2DBC的声明式事务实现机制

简介: # Spring非反应式事务实现原理 Spring基于注解和AOP的声明式事务(@Transactional)已经是业务开发的常用工具,默认是采用同步的方式基于ThreadLocal(保存连接信息和会话信息等)实现,在具体数据库操作时就使用同一个数据库连接,并手动提交事务,保证数据正确性。 # 基于反应式的Spring事务有何不同 Spring的反应式实现是基于Reactor框架,该框架

Spring非反应式事务实现原理

Spring基于注解和AOP的声明式事务(@Transactional)已经是业务开发的常用工具,默认是采用同步的方式基于ThreadLocal(保存连接信息和会话信息等)实现,在具体数据库操作时就使用同一个数据库连接,并手动提交事务,保证数据正确性。

基于反应式的Spring事务有何不同

Spring的反应式实现是基于Reactor框架,该框架对异步编程做了高度的抽象化,主动的线程切换只能通过publishOn/subscribeOn跟换线程池,导致在同步场景表现出色的ThreadLocal无法满足全异步化的事务信息存储需求。Reactor 3提供了一种叫做Context的数据结构,用来替代Threadlocal。

Context的传播机制

整体上Context类非常类似一个不可变的Map\<Object, Object>,采用CopyOnWrite策略,绑定在每一个订阅者上。但是,context传播具体是怎么实现的呢?有一个简单的例子:

Flux.just(1, 2, 3)
    .flatMap(x -> Mono.subscriberContext()
        .map(context -> String.format("%s%d", context.get("msg"), x))
    ).subscriberContext(context -> context.put("msg", "no."))
    .subscribe(System.out::println);

从代码可以看到是通过subscriberContext方法直接put数据,但是这个Context对象是什么时候创建的呢?查看subscriberContext方法的源码发现方法会创建一个FluxContextStart对象,该对象是InternalFluxOperator的子类,实现了subscribeOrReturn(被订阅时调用),在其中将上下文的操作应用到订阅者已有的上下文,而大多数订阅者初始上下文都是Context.empty()。

final class FluxContextStart<T> extends InternalFluxOperator<T, T> implements Fuseable {
....
    @Override
    public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
        Context c = doOnContext.apply(actual.currentContext());
        return new ContextStartSubscriber<>(actual, c);
    }
....
}

从FluxContextStart的实现可见,Context是由CoreSubscriber的实例所持有,因此Context的传播实际是订阅者实例的传播。而为了避免因为不同操作导致的并发问题,对订阅者的操作都是采用装饰者模式包装一个新的实例,类似Spark RDD的形式。

public final void subscribe(Subscriber<? super T> actual) {
    CorePublisher publisher = Operators.onLastAssembly(this);
    CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

    try {
        ...
        publisher.subscribe(subscriber);
    }
    catch (Throwable e) {
        Operators.reportThrowInSubscribe(subscriber, e);
        return;
    }
}

基于Flux的subscribe方法可见,每次订阅时都会向上游发布者传递订阅者实例,因此Context是自底向上传播。

Spring R2DBC的事务实现

基于对常规声明式事务的认识,找到TransactionAspectSupport#invokeWithinTransaction方法,这个方法定义了声明的事务具体要路由到哪个事务管理器执行。自Spring 5.2 M2之后,Spring开始支持反应式事务,在invokeWithinTransaction方法内可以看到如下代码:

if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
    // 从缓存中获取已经加载的反应式事务管理器
    ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
        if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
            throw new TransactionUsageException(
                    "Unsupported annotated transaction on suspending function detected: " + method +
                    ". Use TransactionalOperator.transactional extensions instead.");
        }
        // 根据返回值类型获取适配器
        ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
        if (adapter == null) {
            throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
                    method.getReturnType());
        }
        return new ReactiveTransactionSupport(adapter);
    });
    // 执行事务
    return txSupport.invokeWithinTransaction(
            method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}

再继续跟进到反应式的txSupport.invokeWithinTransaction里面,会将相关的事务管理器和事务设置等信息放入上文说到的Context中。具体的对于R2DBC的数据库反应式事务而言,其主要调用的是TransactionalOperatorImpl#transactional方法:

@Override
public <T> Mono<T> transactional(Mono<T> mono) {
    return TransactionContextManager.currentContext().flatMap(context -> {
        Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        // Need re-wrapping of ReactiveTransaction until we get hold of the exception
        // through usingWhen.
        return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono,
                this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::commit)
                .onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
    })
    .subscriberContext(TransactionContextManager.getOrCreateContext())
    .subscriberContext(TransactionContextManager.getOrCreateContextHolder());
}    

从代码可以看见,这里也是首先从上下文获取事务信息,保证整个反应式处理的各个操作符都会用到同样的数据库连接,并最终实现声明式事务功能。

总结

Spring实现反应式事务本质上基于Reactor的Context传播机制,结合原有事务机制改造出来的,所以总结下来就两个核心点:1、Reactor的Context是一种类似不可变Map,绑定在每个订阅者自底向上传播;2、Spring反应式事务通过Reactor的Context在不同线程池共享。

参考资料

相关文章
|
3月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
124 2
|
8天前
|
XML Java Maven
Spring 手动实现Spring底层机制
Spring 第六节 手动实现Spring底层机制 万字详解!
62 31
|
1月前
|
SQL Java 关系型数据库
【SpringFramework】Spring事务
本文简述Spring中数据库及事务相关衍伸知识点。
48 9
|
2月前
|
Java 开发者 Spring
理解和解决Spring框架中的事务自调用问题
事务自调用问题是由于 Spring AOP 代理机制引起的,当方法在同一个类内部自调用时,事务注解将失效。通过使用代理对象调用、将事务逻辑分离到不同类中或使用 AspectJ 模式,可以有效解决这一问题。理解和解决这一问题,对于保证 Spring 应用中的事务管理正确性至关重要。掌握这些技巧,可以提高开发效率和代码的健壮性。
108 13
|
2月前
|
NoSQL Java Redis
Spring Boot 自动配置机制:从原理到自定义
Spring Boot 的自动配置机制通过 `spring.factories` 文件和 `@EnableAutoConfiguration` 注解,根据类路径中的依赖和条件注解自动配置所需的 Bean,大大简化了开发过程。本文深入探讨了自动配置的原理、条件化配置、自定义自动配置以及实际应用案例,帮助开发者更好地理解和利用这一强大特性。
127 14
|
2月前
|
缓存 安全 Java
Spring高手之路26——全方位掌握事务监听器
本文深入探讨了Spring事务监听器的设计与实现,包括通过TransactionSynchronization接口和@TransactionalEventListener注解实现事务监听器的方法,并通过实例详细展示了如何在事务生命周期的不同阶段执行自定义逻辑,提供了实际应用场景中的最佳实践。
62 2
Spring高手之路26——全方位掌握事务监听器
|
2月前
|
Java 关系型数据库 数据库
京东面试:聊聊Spring事务?Spring事务的10种失效场景?加入型传播和嵌套型传播有什么区别?
45岁老架构师尼恩分享了Spring事务的核心知识点,包括事务的两种管理方式(编程式和声明式)、@Transactional注解的五大属性(transactionManager、propagation、isolation、timeout、readOnly、rollbackFor)、事务的七种传播行为、事务隔离级别及其与数据库隔离级别的关系,以及Spring事务的10种失效场景。尼恩还强调了面试中如何给出高质量答案,推荐阅读《尼恩Java面试宝典PDF》以提升面试表现。更多技术资料可在公众号【技术自由圈】获取。
|
2月前
|
缓存 Java 数据库连接
深入探讨:Spring与MyBatis中的连接池与缓存机制
Spring 与 MyBatis 提供了强大的连接池和缓存机制,通过合理配置和使用这些机制,可以显著提升应用的性能和可扩展性。连接池通过复用数据库连接减少了连接创建和销毁的开销,而 MyBatis 的一级缓存和二级缓存则通过缓存查询结果减少了数据库访问次数。在实际应用中,结合具体的业务需求和系统架构,优化连接池和缓存的配置,是提升系统性能的重要手段。
128 4
|
3月前
|
Java 开发者 Spring
Spring高手之路24——事务类型及传播行为实战指南
本篇文章深入探讨了Spring中的事务管理,特别是事务传播行为(如REQUIRES_NEW和NESTED)的应用与区别。通过详实的示例和优化的时序图,全面解析如何在实际项目中使用这些高级事务控制技巧,以提升开发者的Spring事务管理能力。
80 1
Spring高手之路24——事务类型及传播行为实战指南
|
3月前
|
Java 开发者 Spring
深入解析:Spring AOP的底层实现机制
在现代软件开发中,Spring框架的AOP(面向切面编程)功能因其能够有效分离横切关注点(如日志记录、事务管理等)而备受青睐。本文将深入探讨Spring AOP的底层原理,揭示其如何通过动态代理技术实现方法的增强。
101 8