【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务

@[TOC]

一、前言

至此,seata系列的内容包括:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么【云原生】
  8. 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
  9. 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信

本文接着Seata使用@GlobalTransactional是如何开启全局事务的?

PS:前文中搭建的Seata案例,seata的版本为1.3.0,而本文开始的源码分析将基于当前(2022年8月)最新的版本1.5.2进行源码解析。

二、@GlobalTransactional

我们知道可以将@GlobalTransactional注解标注在类或方法上 开启全局事务,下面来看一下@GlobalTransactional是如何开启的全局事务?

【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么一文中,我们知道了SpringBoot启动过程中会自动装配GlobalTransactionScanner类;

1、GlobalTransactionScanner类(BPP)

先看GlobalTransactionScanner类的继承关系:

在这里插入图片描述

GlobalTransactionScanner类继承了AbstractAutoProxyCreatorAbstractAutoProxyCreator类又实现了BeanPostProcessor接口;因此GlobalTransactionScanner类也是BPP(BeanPostProcessor)

下面简单看一下AbstractAutoProxyCreator类;

1)AbstractAutoProxyCreator(自动创建动态代理)

在这里插入图片描述

AbstractAutoProxyCreator是Spring AOP中的一个抽象类,其主要功能是自动创建动态代理;因为其实现了BeanPostProcessor接口,所以在类加载到Spring容器之前,会进入到其wrapIfNecessary()方法对Bean进行代理包装,后续调用Bean之将委托给指定的拦截器。

另外其getAdvicesAndAdvisorsForBean()方法用于给子类实现,由子类决定一个Bean是否需要被代理(是否存在切面);并且它还可以返回只应用于特定Bean实例的附加拦截器

2)BeanPostProcessor(对Bean进行修改的入口)

BeanPostProcessor是Bean的后置处理器,可以通过实现其 并 覆写其postProcessBeforeInitialization()postProcessAfterInitialization() 方法在Bean初始化前后对其进行修改;

AbstractAutoProxyCreator正是通过覆写BeanPostProcessorpostProcessAfterInitialization() 创建并返回Bean的代理对象;

在这里插入图片描述

下面从SpringBoot启动流程来看针对标注了@GlobalTransactional的类 或 类中包含标注了@GlobalTransactional方法的类 创建动态代理的入口。

3)从SpringBoot启动流程来看入口

TradeService类为例:

package com.saint.trade.service;

import com.saint.trade.feign.OrderFeignClient;
import com.saint.trade.feign.StockFeignClient;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

/**
 * @author Saint
 */
@Service
@RequiredArgsConstructor
public class TradeService {

    private final StockFeignClient stockFeignClient;
    private final OrderFeignClient orderFeignClient;

    /**
     * 减库存,下订单
     *
     * @param userId
     * @param commodityCode
     * @param orderCount
     */
    @GlobalTransactional
    public void purchase(String userId, String commodityCode, int orderCount) {
        stockFeignClient.deduct(commodityCode, orderCount);

        orderFeignClient.create(userId, commodityCode, orderCount);
    }
    
    public void test() {
        System.out.println("hhahaha");
    }
}

TradeService类被@Component衍生注解@Service标注,TradeService类又在SpringBoot扫描路径中,因此SpringBoot启动时会扫描到TradeService类;

TradeService类中包含两个方法:purchase()test(),其中purchase()方法被@GlobalTransactional注解标注。

下面来看创建Bean时设计到BeanPostProcessor的代码片段:

AbstractBeanFactory抽象Bean工厂的实现类AbstractAutowireCapableBeanFactoryinitializeBean()方法是初始化Bean的入口:

在这里插入图片描述
在Bean初始化之后会调用BeanPostProcessorpostProcessAfterInitialization() 创建并返回Bean的代理对象;整体线程栈帧信息如下:

在这里插入图片描述

在这里插入图片描述

最终进入到GlobalTransactionScanner覆写AbstractAutoProxyCreator抽象类的wrapIfNecessary()方法创建并返回代理对象(如果需要创建动态代理的话)。

4)是否 / 如何生成动态代理对象

从上面我们知道了GlobalTransactionScanner类的wrapIfNecessary()是创建动态代理的入口;这里接着来看wrapIfNecessary()方法如何判断是否Bean是否需要生成动态代理?

// 对于扫描到的@GlobalTransactional注解, bean和beanName
// 判断类 或 类的某一个方法是否被@GlobalTransactional注解标注,进而决定当前Class是否需要创建动态代理;存在则创建。
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    // do checkers,做一些检查,不用花过多精力关注
    if (!doCheckers(bean, beanName)) {
        return bean;
    }

    try {
        synchronized (PROXYED_SET) {
            if (PROXYED_SET.contains(beanName)) {
                return bean;
            }
            interceptor = null;
            //check TCC proxy TCC的动态代理
            if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                // init tcc fence clean task if enable useTccFence
                TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
                //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener) interceptor);
            } else {
                // 先获取目标Class的接口
                Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                // existsAnnotation()表示类或类方法是否有被@GlobalTransactional注解标注,进而决定类是否需要被动态代理
                if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                    return bean;
                }

                if (globalTransactionalInterceptor == null) {
                    // 构建一个全局拦截器
                    globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                    ConfigurationCache.addConfigListener(
                            ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener) globalTransactionalInterceptor);
                }
                interceptor = globalTransactionalInterceptor;
            }

            LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
            // 如果当前Bean没有被AOP代理
            if (!AopUtils.isAopProxy(bean)) {
                // 基于Spring AOP的AutoProxyCreator对当前Class创建全局事务动态动态代理类
                bean = super.wrapIfNecessary(bean, beanName, cacheKey);
            } else {
                AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                int pos;
                for (Advisor avr : advisor) {
                    // Find the position based on the advisor's order, and add to advisors by pos
                    // 找到seata切面的位置
                    pos = findAddSeataAdvisorPosition(advised, avr);
                    advised.addAdvisor(pos, avr);
                }
            }
            PROXYED_SET.add(beanName);
            return bean;
        }
    } catch (Exception exx) {
        throw new RuntimeException(exx);
    }
}

第一步

1> 首先doCheckers()方法对Bean做一些检查,包括:Bean是否已经生成了代理类、Bean不允许生成代理类.....

在这里插入图片描述

第二步

2> 对已经创建了动态代理的Bean的Set集合PROXYED_SET加锁做同步操作,如果PROXYED_SET中存在当前Bean的代理对象,则直接返回。

在这里插入图片描述

第三步

3> 根据@TwoPhaseBusinessAction注解判断是否是TCC模式下的动态代理(默认是AT模式,即不是TCC模式);

在这里插入图片描述

第四步

4> 获取Bean的目标Class,再通过existsAnnotation()方法查看 类或类方法是否有被@GlobalTransactional注解标注,进而决定类是否需要被动态代理;

在这里插入图片描述

existsAnnotation()方法用于判断类是否需要被动态代理

在这里插入图片描述

existsAnnotation()方法判断类是否需要被动态代理时:

  1. 首先判断类上是否标注了@GlobalTransactional注解,如果标注了,则直接返回true,表示类需要被动态代理;
  2. 否者,接着利用反射获取类的所有public方法,只要存在一个方法被@GlobalTransactional@GlobalLock 注解标注,则表示当前类需要被动态代理;

PS:聪明的你肯定发现,当一个类中有多个方法并且类没有被@GlobalTransactional注解标注,但只有一个方法被@GlobalTransactional注解标注时,这里针对整个类生成了动态代理对象,那么调用没加@GlobalTransactional注解的方法也会进入到代理对象,会不会有问题呢? 继续往后看,拦截器GlobalTransactionalInterceptor中会对其进行处理。

当目标Class需要被动态代理时,则会初始化一个拦截器GlobalTransactionalInterceptor,用于拦截后面对目标Class的调用;

在这里插入图片描述

那么GlobalTransactionalInterceptor是如何被应用于目标Class上做拦截的?

第五步

5> 如果针对当前Bean的代理是JDK 或 CGLIB动态代理,则根据GlobalTransactionalInterceptor创建切面,并应用到Bean上;

在这里插入图片描述

在第四步时,我们对GlobalTransactionalInterceptor如何被应用于目标Class上做拦截持有疑问,在前面介绍AbstractAutoProxyCreator我们提到过:

AbstractAutoProxyCreator的 getAdvicesAndAdvisorsForBean()方法用于给子类实现,由子类决定一个Bean是否需要被代理(是否存在切面);并且它还可以返回 只应用于特定Bean实例的附加拦截器;>>

在这里插入图片描述

GlobalTransactionScanner覆写了getAdvicesAndAdvisorsForBean()方法,将上面初始化后的GlobalTransactionalInterceptor作为切面返回给AbstractAutoProxyCreator,供其创建动态代理类时使用;

在这里插入图片描述

创建完代理对象之后,将代理对象放入到GlobalTransactionScanner的动态代理Bean的Set集合PROXYED_SET,以快去获取Bean的代理对象 并 防止Bean代理对象的重复创建。最后将代理对象返回,创建Bean流程结束。

==至此,我们知道了所谓的@GlobalTransactional注解开启全局事务,实际就是针对类 或 类的方法上标注了@GlobalTransactional注解的类创建动态代理对象==

三、全局事务的执行(前戏)

上面我们知道了所谓的@GlobalTransactional注解开启全局事务,其实就是类 或 类的方法上标注了@GlobalTransactional注解的类创建动态代理对象。但是动态代理对象是针对类的;

当一个类中有多个方法并且类没有被@GlobalTransactional注解标注,但只有一个方法被@GlobalTransactional注解标注时,这里针对整个类生成了动态代理对象,当调用Bean时,拦截器GlobalTransactionalInterceptor会做进一步处理,保证只有加了@GlobalTransactional注解的方法才会开启全局事务。

先看GlobalTransactionalInterceptor类的继承图:

在这里插入图片描述

GlobalTransactionalInterceptor实现了MethodInterceptor接口,所以当每次执行添加了 GlobalTransactionalInterceptor拦截器的Bean的方法时,都会进入到GlobalTransactionalInterceptor类覆写MethodInterceptor接口的invoke()方法:

@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    // method invocation是一次方法调用,一定是针对某个对象的方法调用;
    // methodInvocation.getThis()就是拿到当前方法所属的对象;
    // AopUtils.getTargetClass()获取到当前实例对象所对应的Class
    Class<?> targetClass =
            methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;

    // 通过反射获取到被调用目标Class的method方法
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);

    // 如果目标method不为空,并且方法的DeclaringClass不是Object
    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
        // 通过BridgeMethodResolver寻找method的桥接方法
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
        // 获取目标方法的@GlobalTransactional注解
        final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
        // 如果目标方法被@GlobalLock注解标注,获取到@GlobalLock注解内容
        final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
        // 如果禁用了全局事务 或 开启了事务降级检查并且降级检查次数大于等于降级检查允许的次数
        // 则localDisable等价于全局事务被禁用了
        boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);

        // 如果全局事务没有被禁用
        if (!localDisable) {
            // 全局事务注解不为空 或者 AOP切面全局事务核心配置不为空
            if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                AspectTransactional transactional;
                if (globalTransactionalAnnotation != null) {
                    // 构建一个AOP切面全局事务核心配置,配置的数据从全局事务注解中取
                    transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
                            globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
                            globalTransactionalAnnotation.rollbackForClassName(),
                            globalTransactionalAnnotation.noRollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.propagation(),
                            globalTransactionalAnnotation.lockRetryInterval(),
                            globalTransactionalAnnotation.lockRetryTimes());
                } else {
                    transactional = this.aspectTransactional;
                }
                // 真正处理全局事务的入口
                return handleGlobalTransaction(methodInvocation, transactional);
            } else if (globalLockAnnotation != null) {
                // 获取事务锁
                return handleGlobalLock(methodInvocation, globalLockAnnotation);
            }
        }
    }
    // 直接运行目标方法
    return methodInvocation.proceed();
}

假如我们调用TradeService类中没有标注@GlobalTransactional注解的test()方法;

在这里插入图片描述

invoke()方法中会再次判断 当前调用的bean的方法 或 方法所处的类上是否标注了@GlobalTransactional注解,如果没有标注,则执行运行目标方法;否则才会以全局事务的方式执行方法。

四、总结

所谓的@GlobalTransactional注解开启全局事务,实际就是针对类 或 类的方法上标注了@GlobalTransactional注解的类创建动态代理对象。

在调用相应Bean的时候,会进入到动态代理对象的拦截器GlobalTransactionalInterceptor

相关文章
|
10天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
10天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
10天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
11天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
2月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
1天前
|
Java 关系型数据库 数据库
微服务SpringCloud分布式事务之Seata
SpringCloud+SpringCloudAlibaba的Seata实现分布式事务,步骤超详细,附带视频教程
14 1
|
26天前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
182 7
|
2月前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
85 6
|
2月前
|
数据库
如何在Seata框架中配置分布式事务的隔离级别?
总的来说,配置分布式事务的隔离级别是实现分布式事务管理的重要环节之一,需要认真对待和仔细调整,以满足业务的需求和性能要求。你还可以进一步深入研究和实践 Seata 框架的配置和使用,以更好地应对各种分布式事务场景的挑战。
43 6
|
2月前
|
消息中间件 运维 数据库
Seata框架和其他分布式事务框架有什么区别
Seata框架和其他分布式事务框架有什么区别
35 1

推荐镜像

更多