【学习Seata1.6源码#01】全局事务注解@GlobalTransactional的识别

简介: 【学习Seata1.6源码#01】全局事务注解@GlobalTransactional的识别

一、声明式全局事务

Seata示例工程中,能看到@GlobalTransactional,如下方法示例:

@GlobalTransactional
public boolean purchase(long accountId, long stockId, long quantity) {
    String xid = RootContext.getXID();
    LOGGER.info("New Transaction Begins: " + xid);
    boolean stockResult = reduceAccount(accountId,stockId, quantity);
    if (!stockResult) {
        throw new RuntimeException("账号服务调用失败,事务回滚!");
    }
    Long orderId = createOrder(accountId, stockId, quantity);
    if (orderId == null || orderId <= 0) {
        throw new RuntimeException("订单服务调用失败,事务回滚!");
    }
    return true;
}
复制代码

purchase方法上加上此注解,即表示此方法内的reduceAccountcreateOrder两个微服务调用也将加入到分布式事务中,即扣除账户余额与创建订单将具有分布式事务的数据一致性保障能力。

了解 Spring 注解事务实现的话,应该也能推测出,Seata 的事务能力也可能是基于 Spring 的 AOP 机制,给标注了@GlobalTransactional 的方法做 AOP 增加,织入额外的逻辑以完成分布式事务的能力,伪代码大致如下:

GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
    tx.begin(xxx);
    ...
    purchase(xxx)//给purchase增加全局事务处理能力
    ...
    tx.commit();
} catch (Exception exx) {
    tx.rollback();
    throw exx;
}
复制代码

二、@GlobalTransactional 注解如何被识别?

2.1 运行环境

1)引入seata-spring-boot-starter模块

<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>${seata.version}</version>
</dependency>
复制代码

spring.factories 中有自动装配类SeataAutoConfiguration

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
...
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration
...
复制代码

此类负责处理全局事务扫描及设置,其中就有@GlobalTransactional

2)条件要求:

@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@AutoConfigureAfter({SeataCoreAutoConfiguration.class})
public class SeataAutoConfiguration
复制代码

从类的自动装备条件,可看出要满足 2 个条件:

  1. @ConditionalOnProperty,表明若要生效须具备以下配置条件:
seata.enabled = true
复制代码
  1. @AutoConfigureAfter,表示从 bean 的加载顺序来看,要求是在SeataCoreAutoConfiguration之后,SeataCoreAutoConfiguration又是什么呢?
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@Configuration(proxyBeanMethods = false)
public class SeataCoreAutoConfiguration {
    @Bean(BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER)
    @ConditionalOnMissingBean(name = {BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})
    public SpringApplicationContextProvider springApplicationContextProvider() {
        return new SpringApplicationContextProvider();
    }
}
复制代码

从源码可知,也很清晰,大概有几个功能点:

  1. 同样要求seata.enabled = true
  2. 另外会扫描"io.seata.spring.boot.autoconfigure.properties"
  3. 会提供一个SpringApplicationContextProvider,基于 Spring 的程序中,很常见这种方式,用于提供 ApplicationContext,通常用于方便获取 bean 和添加 bean 等。
public class SpringApplicationContextProvider implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        System.setProperty("file.listener.enabled", "false");
        ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT, applicationContext);
    }
}
复制代码

2.2 功能-注入事务执行失败处理器FailureHandler

注入一个FailureHandler,其默认实现DefaultFailureHandlerImpl中只会打印错误日志,建议重写,异常发生时及时告知使用者。

2.3 功能-构建全局事务扫描器GlobalTransactionScanner

构建全局事务扫描器GlobalTransactionScanner,注入到容器中,其内部做 2 件事情

  • 会初始化 TMRM 客户端
  • 扫描Bean,对添加了全局事务注解的类(@GlobalTransactional@GlobalLock@TwoPhaseBusinessAction)生成代理对象,做AOP增强,添加对应的拦截器(拦截器内补充分布式事务的能力)
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
复制代码

Spring 中 Bean 的关键初始化过程:

实例化 -> 属性注入 -> postProcessBeforeInitialization -> afterPropertiesSet/init 方法 -> postProcessAfterInitialization

从以下堆栈可以看出,AbstractAutoProxyCreator在 bean 初始化完成之后创建它的代理 AOP  代理,通过wrapIfNecessary判断是否该 bean 是否存在全局事务注解,如果有则需要增强,添加相应的拦截器。

wrapIfNecessary:269, GlobalTransactionScanner (io.seata.spring.annotation)
postProcessAfterInitialization:293, AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy)
applyBeanPostProcessorsAfterInitialization:455, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
initializeBean:1808, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
doCreateBean:620, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
复制代码

先看一个关键方法existsAnnotation,当扫描到 bean 之后,获取 bean 的原始类型,然后通过此方法原始类型的类或者方法中是否有@GlobalTransactional@GlobalLock注解

private boolean existsAnnotation(Class<?>[] classes) {
    if (CollectionUtils.isNotEmpty(classes)) {
        for (Class<?> clazz : classes) {
            if (clazz == null) {
                continue;
            }
            //判断类上是否有注解@GlobalTransactional
            GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);
            if (trxAnno != null) {
                return true;
            }
            Method[] methods = clazz.getMethods();
            for (Method method : methods) {
                //判断方法上是否有注解@GlobalTransactional
                trxAnno = method.getAnnotation(GlobalTransactional.class);
                if (trxAnno != null) {
                    return true;
                }
                 //判断方法上是否有注解@GlobalLock
                GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
                if (lockAnno != null) {
                    return true;
                }
            }
        }
    }
    return false;
}
复制代码

wrapIfNecessary方法中实现了事务注解识别的核心逻辑:

/**
 * The following will be scanned, and added corresponding interceptor:
 *
 * TM:
 * @see io.seata.spring.annotation.GlobalTransactional // TM annotation
 * Corresponding interceptor:
 * @see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction(MethodInvocation, AspectTransactional) // TM handler
 *
 * GlobalLock:
 * @see io.seata.spring.annotation.GlobalLock // GlobalLock annotation
 * Corresponding interceptor:
 * @see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalLock(MethodInvocation, GlobalLock)  // GlobalLock handler
 *
 * TCC mode:
 * @see io.seata.rm.tcc.api.LocalTCC // TCC annotation on interface
 * @see io.seata.rm.tcc.api.TwoPhaseBusinessAction // TCC annotation on try method
 * @see io.seata.rm.tcc.remoting.RemotingParser // Remote TCC service parser
 * Corresponding interceptor:
 * @see io.seata.spring.tcc.TccActionInterceptor // the interceptor of TCC mode
 */
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    // do checkers
    // seata提供的有扩展逻辑用于辅助判断是否需要增强
    if (!doCheckers(bean, beanName)) {
        return bean;
    }
    try {
        synchronized (PROXYED_SET) {
            //如果已被代理,则跳过该Bean ,PROXYED_SET是一个Set<String> 集合
            if (PROXYED_SET.contains(beanName)) {
                return bean;
            }
            interceptor = null;
            //check TCC proxy
            //判断是否TCC模式,如果是TCC,则添加TCC 拦截器
            if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                // init tcc fence clean task if enable useTccFence
                TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
                //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener)interceptor);
            } else {
                //非TCC模式
                // 查询Bean的 Class 类型
                Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                //判断是否有GlobalTransactional或者GlobalLock注解,如果没有就不会代理,直接返回bean
                if (!existsAnnotation(new Class[]{serviceInterface})
                    && !existsAnnotation(interfacesIfJdk)) {
                    return bean;
                }
                // 初始化GlobalTransactionalInterceptor
                if (globalTransactionalInterceptor == null) {
                    globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                    //添加 禁用全局事务的监听器
                    ConfigurationCache.addConfigListener(
                            ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener)globalTransactionalInterceptor);
                }
                interceptor = globalTransactionalInterceptor;
            }
            LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
            // 判断是否已经是AOP代理类,如果不是,则执行父类的wrapIfNecessary
            if (!AopUtils.isAopProxy(bean)) {
                bean = super.wrapIfNecessary(bean, beanName, cacheKey);
            } else {
                // 给代理对象添加拦截器
                AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                int pos;
                for (Advisor avr : advisor) {
                    // Find the position based on the advisor's order, and add to advisors by pos
                    pos = findAddSeataAdvisorPosition(advised, avr);
                    advised.addAdvisor(pos, avr);
                }
            }
            //标识该bean已经代理过,本方法入口处有判断
            PROXYED_SET.add(beanName);
            return bean;
        }
    } catch (Exception exx) {
        throw new RuntimeException(exx);
    }
}
复制代码

Seata 提供的有扩展逻辑用于辅助判断是否需要增强,这里的扩展点有好几个,这些扩展是用于安全保障,使用者可以按需采用。

2Zmh5D.gif

三、小结:

本篇梳理了引入seata-spring-boot-starter模块后,其内部会通过的自动装配机制会在SeataAutoConfiguration类中,扫描具有@GlobalTransactional全局事务注解的类和方法的 bean,并对这类 bean 添加GlobalTransactionalInterceptor,进行 AOP 增强,加入分布式事务的能力,增强后的功能复杂,下篇继续。

四、最后说一句

我是石页兄,如果这篇文章对您有帮助,或者有所启发的话,欢迎关注笔者的微信公众号【 架构染色 】进行交流和学习。您的支持是我坚持写作最大的动力。

欢迎点击链接扫马儿关注、交流。


相关文章
|
7月前
|
SQL 关系型数据库 数据库
学习分布式事务Seata看这一篇就够了,建议收藏
学习分布式事务Seata看这一篇就够了,建议收藏
|
2月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
7月前
|
监控 数据库
在Seata中一张表使用了联合主键,在事务回滚时报异常,改为单个主键,就没有这个异常,如何解决?
在Seata中一张表使用了联合主键,在事务回滚时报异常,改为单个主键,就没有这个异常,如何解决?
|
7月前
|
Java 数据库连接 API
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
146 0
|
7月前
|
存储 Oracle 关系型数据库
分布式事物【Seata实现、下载启动Seata服务、搭建聚合父工程构建】(四)-全面详解(学习总结---从入门到深化)
分布式事物【Seata实现、下载启动Seata服务、搭建聚合父工程构建】(四)-全面详解(学习总结---从入门到深化)
97 0
|
7月前
|
Dubbo 关系型数据库 MySQL
Seata常见问题之serviceA方法无法注册分支事务到Seata如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
7月前
|
存储 NoSQL 关系型数据库
Seata常见问题之使用了@GlobalTransactional出现空指针的错误如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
698 0
|
7月前
|
SQL 监控 Java
Seata常见问题之报找不到全局事务可能已经完成如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
1208 0
|
1月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
20天前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
133 7