了解Spring R2DBC的声明式事务实现机制

简介: # Spring非反应式事务实现原理 Spring基于注解和AOP的声明式事务(@Transactional)已经是业务开发的常用工具,默认是采用同步的方式基于ThreadLocal(保存连接信息和会话信息等)实现,在具体数据库操作时就使用同一个数据库连接,并手动提交事务,保证数据正确性。 # 基于反应式的Spring事务有何不同 Spring的反应式实现是基于Reactor框架,该框架

Spring非反应式事务实现原理

Spring基于注解和AOP的声明式事务(@Transactional)已经是业务开发的常用工具,默认是采用同步的方式基于ThreadLocal(保存连接信息和会话信息等)实现,在具体数据库操作时就使用同一个数据库连接,并手动提交事务,保证数据正确性。

基于反应式的Spring事务有何不同

Spring的反应式实现是基于Reactor框架,该框架对异步编程做了高度的抽象化,主动的线程切换只能通过publishOn/subscribeOn跟换线程池,导致在同步场景表现出色的ThreadLocal无法满足全异步化的事务信息存储需求。Reactor 3提供了一种叫做Context的数据结构,用来替代Threadlocal。

Context的传播机制

整体上Context类非常类似一个不可变的Map\<Object, Object>,采用CopyOnWrite策略,绑定在每一个订阅者上。但是,context传播具体是怎么实现的呢?有一个简单的例子:

Flux.just(1, 2, 3)
    .flatMap(x -> Mono.subscriberContext()
        .map(context -> String.format("%s%d", context.get("msg"), x))
    ).subscriberContext(context -> context.put("msg", "no."))
    .subscribe(System.out::println);

从代码可以看到是通过subscriberContext方法直接put数据,但是这个Context对象是什么时候创建的呢?查看subscriberContext方法的源码发现方法会创建一个FluxContextStart对象,该对象是InternalFluxOperator的子类,实现了subscribeOrReturn(被订阅时调用),在其中将上下文的操作应用到订阅者已有的上下文,而大多数订阅者初始上下文都是Context.empty()。

final class FluxContextStart<T> extends InternalFluxOperator<T, T> implements Fuseable {
....
    @Override
    public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
        Context c = doOnContext.apply(actual.currentContext());
        return new ContextStartSubscriber<>(actual, c);
    }
....
}

从FluxContextStart的实现可见,Context是由CoreSubscriber的实例所持有,因此Context的传播实际是订阅者实例的传播。而为了避免因为不同操作导致的并发问题,对订阅者的操作都是采用装饰者模式包装一个新的实例,类似Spark RDD的形式。

public final void subscribe(Subscriber<? super T> actual) {
    CorePublisher publisher = Operators.onLastAssembly(this);
    CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

    try {
        ...
        publisher.subscribe(subscriber);
    }
    catch (Throwable e) {
        Operators.reportThrowInSubscribe(subscriber, e);
        return;
    }
}

基于Flux的subscribe方法可见,每次订阅时都会向上游发布者传递订阅者实例,因此Context是自底向上传播。

Spring R2DBC的事务实现

基于对常规声明式事务的认识,找到TransactionAspectSupport#invokeWithinTransaction方法,这个方法定义了声明的事务具体要路由到哪个事务管理器执行。自Spring 5.2 M2之后,Spring开始支持反应式事务,在invokeWithinTransaction方法内可以看到如下代码:

if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
    // 从缓存中获取已经加载的反应式事务管理器
    ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
        if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
            throw new TransactionUsageException(
                    "Unsupported annotated transaction on suspending function detected: " + method +
                    ". Use TransactionalOperator.transactional extensions instead.");
        }
        // 根据返回值类型获取适配器
        ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
        if (adapter == null) {
            throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
                    method.getReturnType());
        }
        return new ReactiveTransactionSupport(adapter);
    });
    // 执行事务
    return txSupport.invokeWithinTransaction(
            method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}

再继续跟进到反应式的txSupport.invokeWithinTransaction里面,会将相关的事务管理器和事务设置等信息放入上文说到的Context中。具体的对于R2DBC的数据库反应式事务而言,其主要调用的是TransactionalOperatorImpl#transactional方法:

@Override
public <T> Mono<T> transactional(Mono<T> mono) {
    return TransactionContextManager.currentContext().flatMap(context -> {
        Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        // Need re-wrapping of ReactiveTransaction until we get hold of the exception
        // through usingWhen.
        return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono,
                this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::commit)
                .onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
    })
    .subscriberContext(TransactionContextManager.getOrCreateContext())
    .subscriberContext(TransactionContextManager.getOrCreateContextHolder());
}    

从代码可以看见,这里也是首先从上下文获取事务信息,保证整个反应式处理的各个操作符都会用到同样的数据库连接,并最终实现声明式事务功能。

总结

Spring实现反应式事务本质上基于Reactor的Context传播机制,结合原有事务机制改造出来的,所以总结下来就两个核心点:1、Reactor的Context是一种类似不可变Map,绑定在每个订阅者自底向上传播;2、Spring反应式事务通过Reactor的Context在不同线程池共享。

参考资料

相关文章
|
11天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
39 2
|
19天前
|
Java 开发者 Spring
Spring高手之路24——事务类型及传播行为实战指南
本篇文章深入探讨了Spring中的事务管理,特别是事务传播行为(如REQUIRES_NEW和NESTED)的应用与区别。通过详实的示例和优化的时序图,全面解析如何在实际项目中使用这些高级事务控制技巧,以提升开发者的Spring事务管理能力。
33 1
Spring高手之路24——事务类型及传播行为实战指南
|
13天前
|
XML Java 数据库连接
Spring中的事务是如何实现的
Spring中的事务管理机制通过一系列强大的功能和灵活的配置选项,为开发者提供了高效且可靠的事务处理手段。无论是通过注解还是AOP配置,Spring都能轻松实现复杂的事务管理需求。掌握这些工具和最佳实践,能
20 3
|
2月前
|
Java 数据库连接 数据库
spring复习05,spring整合mybatis,声明式事务
这篇文章详细介绍了如何在Spring框架中整合MyBatis以及如何配置声明式事务。主要内容包括:在Maven项目中添加依赖、创建实体类和Mapper接口、配置MyBatis核心配置文件和映射文件、配置数据源、创建sqlSessionFactory和sqlSessionTemplate、实现Mapper接口、配置声明式事务以及测试使用。此外,还解释了声明式事务的传播行为、隔离级别、只读提示和事务超时期间等概念。
spring复习05,spring整合mybatis,声明式事务
|
2月前
|
Java 测试技术 数据库
Spring事务传播机制(最全示例)
在使用Spring框架进行开发时,`service`层的方法通常带有事务。本文详细探讨了Spring事务在多个方法间的传播机制,主要包括7种传播类型:`REQUIRED`、`SUPPORTS`、`MANDATORY`、`REQUIRES_NEW`、`NOT_SUPPORTED`、`NEVER` 和 `NESTED`。通过示例代码和数据库插入测试,逐一展示了每种类型的运作方式。例如,`REQUIRED`表示如果当前存在事务则加入该事务,否则创建新事务;`SUPPORTS`表示如果当前存在事务则加入,否则以非事务方式执行;`MANDATORY`表示必须在现有事务中运行,否则抛出异常;
151 4
Spring事务传播机制(最全示例)
|
1月前
|
Java 关系型数据库 MySQL
Spring事务失效,我总结了这7个主要原因
本文详细探讨了Spring事务在日常开发中常见的七个失效原因,包括数据库不支持事务、类不受Spring管理、事务方法非public、异常被捕获、`rollbackFor`属性配置错误、方法内部调用事务方法及事务传播属性使用不当。通过具体示例和源码分析,帮助开发者更好地理解和应用Spring事务机制,避免线上事故。适合所有使用Spring进行业务开发的工程师参考。
32 2
|
1月前
|
Java 程序员 Spring
Spring事务的1道面试题
每次聊起Spring事务,好像很熟悉,又好像很陌生。本篇通过一道面试题和一些实践,来拆解几个Spring事务的常见坑点。
Spring事务的1道面试题
|
2月前
|
Java Spring
Spring 事务传播机制是什么?
Spring 事务传播机制是什么?
23 4
|
1月前
|
监控 Java 数据库
Spring事务中的@Transactional注解剖析
通过上述分析,可以看到 `@Transactional`注解在Spring框架中扮演着关键角色,它简化了事务管理的复杂度,让开发者能够更加专注于业务逻辑本身。合理运用并理解其背后的机制,对于构建稳定、高效的Java企业应用至关重要。
58 0
|
3月前
|
XML Java 数据库
Spring5入门到实战------15、事务操作---概念--场景---声明式事务管理---事务参数--注解方式---xml方式
这篇文章是Spring5框架的实战教程,详细介绍了事务的概念、ACID特性、事务操作的场景,并通过实际的银行转账示例,演示了Spring框架中声明式事务管理的实现,包括使用注解和XML配置两种方式,以及如何配置事务参数来控制事务的行为。
Spring5入门到实战------15、事务操作---概念--场景---声明式事务管理---事务参数--注解方式---xml方式
下一篇
无影云桌面