Seata 分支事务

简介: 前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文先来介绍 Seata 中分支事务的整体实现思想。

引言

前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文先来介绍 Seata 中分支事务的整体实现思想,其他 Seata 相关文章均收录于 <Seata系列文章>中。

Branch Type

我们已经知道在 Seata 中, 分支事务分 AT 模式和 TCC 模式, 那么, Seata 是怎么区分出 AT 模式和 TCC 模式的呢? 这也借助了 Spring 的 AOP 特性, 我们在 TM 中介绍的 GlobalTransactionalInterceptor 实际上只负责 AT 模式, TCC 模式是另一套拦截器实现, 而这两种拦截器的注入, 全都是在 GlobalTransactionScanner 中进行的, 也就是说, 我们的 Spring 项目要将 GlobalTransactionScanner 注册为 Bean, 因为其继承自 AbstractAutoProxyCreator, Spring 在处理 AOP 过程时, 就会自动将 GlobalTransactionScanner 执行。接下来, 我们看看整个 Seata 的入口 GlobalTransactionScanner 的核心代码:

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
    implements InitializingBean, ApplicationContextAware, DisposableBean, BeanPostProcessor {
    // 只保留核心代码...

    // 判断是否需要嵌入 Seata 的 AOP 代码
    @Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        if (disableGlobalTransaction) {
            return bean;
        }
        try {
            synchronized (PROXYED_SET) {
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
                interceptor = null;
                //check TCC proxy, 实际上是根据 TCC 注解 TwoPhaseBusinessAction + RPC 协议类型进行判断, TCC 目前只支持一些特定的协议
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                    // 识别出 TCC 模式, 使用 TCC 拦截器
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                } else {
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                    // 判断是不是 AT 模式, 通过注解 GlobalTransactional GlobalLock
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
                     // 识别出AT 模式, 使用前面提到的 GlobalTransactionalInterceptor
                    if (interceptor == null) {
                        interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                    }
                }

                LOGGER.info(
                    "Bean[" + bean.getClass().getName() + "] with name [" + beanName + "] would use interceptor ["
                        + interceptor.getClass().getName() + "]");
                if (!AopUtils.isAopProxy(bean)) {
                    // 如果该类不由 Spring 管控, 则无能为力
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    // 如果是由 Spring 管控, 将 TCC 拦截器 或者 AT 拦截器注入
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, new Object[]{interceptor});
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
                PROXYED_SET.add(beanName);
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }
    // 判断是不是 AT 模式, 通过注解 GlobalTransactional GlobalLock
    private boolean existsAnnotation(Class<?>[] classes) {
        if (classes != null && classes.length > 0) {
            for (Class clazz : classes) {
                if (clazz == null) {
                    continue;
                }
                Method[] methods = clazz.getMethods();
                for (Method method : methods) {
                    GlobalTransactional trxAnno = method.getAnnotation(GlobalTransactional.class);
                    if (trxAnno != null) {
                        return true;
                    }

                    GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
                    if (lockAnno != null) {
                        return true;
                    }
                }
            }
        }
        return false;
    }
    // 替换默认的数据库连接源, 改为 AT 模式的数据源代理
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Auto proxy of  [" + beanName + "]");
            }
            DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean);
            return Enhancer.create(bean.getClass(), (org.springframework.cglib.proxy.MethodInterceptor) (o, method, args, methodProxy) -> {
                Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
                if (null != m) {
                    return m.invoke(dataSourceProxy, args);
                } else {
                    return method.invoke(bean, args);
                }
            });
        }
        return bean;
    }
}

整个项目是从这个 GlobalTransactionScanner 作为起点接入 Seata 的, 一般来说 SpringBoot 会用 @Configuration 类将其注册为 @Bean, 原生 Spring 项目需要在在 XML 中将其注册为 Bean,这一点我们从官方的 Sample 中就能发现。

// SpringBoot
@Configuration
public class SeataAutoConfig {
    /**
     * init global transaction scanner
     *
     * @Return: GlobalTransactionScanner
     */
    @Bean
    public GlobalTransactionScanner globalTransactionScanner(){
        return new GlobalTransactionScanner("account-gts-seata-example", "my_test_tx_group");
    }
}

Spring 配置如下所示:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    <!--省略其他内容-->
    <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
        <constructor-arg value="dubbo-demo-app"/>
        <constructor-arg value="my_test_tx_group"/>
    </bean>

</beans>

好了, 想必大家应该已经清楚 Seata 中是如何发现 TCC 模式和 AT 模式的了, 我们总结一下:

  1. 扫描 Spring 代理的类
  2. 如果发现带有 TCC 的注解, 并且 RPC 协议满足条件, 那么走 TCC 拦截器
  3. 如果发现带有 AT 的注解, 那么走 AT 拦截器
  4. 否则, 什么都不干

接下来我们, 分别看一下这两种模式各自都是如何运作起来的。

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1] fescar锁设计和隔离级别的理解
[2] 分布式事务中间件 Fescar - RM 模块源码解读
[3] Fescar分布式事务实现原理解析探秘
[4] Seata TCC 分布式事务源码分析
[5] 深度剖析一站式分布式事务方案 Seata-Server
[6] 分布式事务 Seata Saga 模式首秀以及三种模式详解
[7] 蚂蚁金服大规模分布式事务实践和开源详解
[8] 分布式事务 Seata TCC 模式深度解析
[9] Fescar (Seata)0.4.0 中文文档教程
[10] Seata Github Wiki
[11] 深度剖析一站式分布式事务方案Seata(Fescar)-Server

相关文章
|
8月前
|
开发者
seata事务问题之不回滚客户端如何解决
Seata是一款开源的分布式事务解决方案,旨在提供高效且无缝的分布式事务服务;在集成和使用Seata过程中,开发者可能会遇到不同的异常问题,本合集针对Seata常见异常进行系统整理,为开发者提供详细的问题分析和解决方案,助力高效解决分布式事务中的难题。
448 14
|
3月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
8月前
|
监控 数据库
在Seata中一张表使用了联合主键,在事务回滚时报异常,改为单个主键,就没有这个异常,如何解决?
在Seata中一张表使用了联合主键,在事务回滚时报异常,改为单个主键,就没有这个异常,如何解决?
|
8月前
|
Java 数据库连接 API
Seata异常捕获问题之回滚事务如何解决
Seata是一款开源的分布式事务解决方案,旨在提供高效且无缝的分布式事务服务;在集成和使用Seata过程中,开发者可能会遇到不同的异常问题,本合集针对Seata常见异常进行系统整理,为开发者提供详细的问题分析和解决方案,助力高效解决分布式事务中的难题。
602 14
|
8月前
|
Dubbo 关系型数据库 MySQL
Seata常见问题之serviceA方法无法注册分支事务到Seata如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
8月前
|
SQL 监控 Java
Seata常见问题之报找不到全局事务可能已经完成如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
1225 0
|
存储 SQL 中间件
微服务之分布式事务服务端搭建(seata 1.0以下版本)
微服务之分布式事务服务端搭建(seata 1.0以下版本)
63 1
微服务之分布式事务服务端搭建(seata 1.0以下版本)
|
Dubbo 关系型数据库 应用服务中间件
快速玩转Dubbo生态之一致性事务篇(Seata)
本场景将对Dubbo接入一致性事务的方式进行介绍,带您体验快速上手Dubbo接入Seata生态。
|
存储 SQL Java
|
安全 容灾 Nacos
Seata中的四种不同的事务模式之一SAGA模式
Saga 模式是 Seata 即将开源的长事务解决方案,由蚂蚁金服主要贡献
272 0