原理篇:Seata TCC如何实现分布式事务提交或回滚

简介: 原理篇:Seata TCC如何实现分布式事务提交或回滚

前言

在之前的文章手把手教你Spring Cloud集成Seata TCC模式中教大家学会了如何使用TCC模式,原理篇:Seata TCC模式是如何调用资源预留逻辑的中解答了TCC模式是怎样调用资源预留逻辑的,那么接下来的疑问就是TCC模式是如何实现分布式事务的提交或回滚的呢?

如何触发分布式事务的提交或回滚

我们知道,AT模式是由TM根据各分支事务提交的情况来决议发起分布式事务的提交还是回滚,其实TCC模式也是一样,TCC模式所属的TM也是需要被@GlobalTransactional注解的,所以TCC模式也会构建出GlobalTransactionalInterceptor拦截器。

GlobalTransactionScanner.wrapIfNecessary():

interceptor = null;
                // 这是专门针对TCC Action拦截的
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    // init tcc fence clean task if enable useTccFence
                    TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                    ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener)interceptor);
                } else {
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                  // 这是专门针对TM拦截的,需要被@GlobalTransactional或@GlobalLock注解
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
                    if (globalTransactionalInterceptor == null) {
                      // 被@GlobalTransactional或@GlobalLock注解才会创建GlobalTransactionalInterceptor对象
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(
                                ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }
复制代码

1.GlobalTransactionScanner.wrapIfNecessary()做了两件事情,一个就是处理TCC Action,一个就是创建TM拦截器GlobalTransactionalInterceptor

2.GlobalTransactionalInterceptor不仅仅是给AT模式使用的,TCC模式也是需要使用的,它的职责就是肩负起TM的功能,实现分布式事务的开启、提交或回滚;

GlobalTransactionalInterceptor.handleGlobalTransaction()------>TransactionalTemplate.execute():

try {
                // 分布式事务开启
                beginTransaction(txInfo, tx);
                Object rs;
                try {
                    // 执行业务逻辑
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 决议回滚
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }
                // 4. 决议提交
                commitTransaction(tx);
                // 返回结果
                return rs;
            } finally {
                //5. clear
                resumeGlobalLockConfig(previousConfig);
                triggerAfterCompletion();
                cleanUp();
            }
复制代码

也就是说,当TM执行的业务逻辑中出现异常,都会触发completeTransactionAfterThrowing();如若没有任何异常,说明业务逻辑平稳执行,那么触发commitTransaction()

在TM调用的business.execute()方法中,是会逐个调用各微服务提供的业务服务,在Seata框架中,被视为多个RM服务,只有当所有RM都调用正常通过后,才能够让TM执行到commitTransaction()方法,任意一个RM出异常,都会触发TM执行到completeTransactionAfterThrowing()上;

所以说,RM分支事务是否执行成功是触发分布式事务提交或回滚的成因;TM是根据RM分支事务的执行结果来做出分布式事务提交或回滚的决议;

如何触发各分支事务提交或回滚

在TM决议提交或回滚后,会向TC服务发起分布式事务的提交或回滚请求,TC服务作为中间转发层,会依次逐个向RM发起分支事务提交或回滚请求,可查看RM的请求处理代码:

public abstract class AbstractRMHandler extends AbstractExceptionHandler
    implements RMInboundHandler, TransactionMessageHandler {
  // 处理TC发起的分支事务提交请求
  @Override
    public BranchCommitResponse handle(BranchCommitRequest request) {
        BranchCommitResponse response = new BranchCommitResponse();
        // 模版模式
        exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
            @Override
            public void execute(BranchCommitRequest request, BranchCommitResponse response)
                throws TransactionException {
                doBranchCommit(request, response);
            }
        }, request, response);
        return response;
    }
  // 处理TC发起的分支事务回滚请求
    @Override
    public BranchRollbackResponse handle(BranchRollbackRequest request) {
        BranchRollbackResponse response = new BranchRollbackResponse();
      // 模版模式
        exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
            @Override
            public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
                throws TransactionException {
                doBranchRollback(request, response);
            }
        }, request, response);
        return response;
    }
}
复制代码

触发RM分支事务提交或回滚,是由TM发起的决议,中间由TC服务依次进行转发给对应的RM;

RM如何实现提交或回滚

最终,在接收到TC服务的请求后,会调用到TCCResourceManager.branchCommit()TCCResourceManager.branchRollback():

  • 提交逻辑
@Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
        // 从缓存中获取TCCResource
        TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
        if (tccResource == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
        }
        // 拿出目标对象和指定的commit方法
        Object targetTCCBean = tccResource.getTargetBean();
        Method commitMethod = tccResource.getCommitMethod();
        if (targetTCCBean == null || commitMethod == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
        }
        try {
            //BusinessActionContext
            BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
                applicationData);
            // 需要传递的参数
            Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
            Object ret;
            boolean result;
            // 如果需要Seata完成幂等性、资源悬挂、空回滚等处理,就调用TCCFenceHandler.commitFence()
            if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
                try {
                    result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);
                } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                    throw e.getCause();
                }
            } else {
                // 直接调用目标方法
                ret = commitMethod.invoke(targetTCCBean, args);
                if (ret != null) {
                    if (ret instanceof TwoPhaseResult) {
                        result = ((TwoPhaseResult)ret).isSuccess();
                    } else {
                        result = (boolean)ret;
                    }
                } else {
                    result = true;
                }
            }
            LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
            // 返回提交结果
            return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
        } catch (Throwable t) {
            String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
            LOGGER.error(msg, t);
            return BranchStatus.PhaseTwo_CommitFailed_Retryable;
        }
    }
复制代码

1.RM最终会执行到TCCResourceManager中,并调用branchCommit()完成分支提交的逻辑调用;

2.在提交前会从tccResourceCache中取出一阶段缓存进去的TCCResource,里面包含了目标对象、目标方法以及对应的参数;

3.RM会根据use_tcc_fence配置来决定是直接调用commitMethod.invoke()还是通过TCCFenceHandler.commitFence()来完成目标方法的调用;

  • 回滚逻辑
@Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        // 先从缓存中获取TCCResource
        TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
        if (tccResource == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
        }
        // 拿出目标对象和回滚方法
        Object targetTCCBean = tccResource.getTargetBean();
        Method rollbackMethod = tccResource.getRollbackMethod();
        if (targetTCCBean == null || rollbackMethod == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
        }
        try {
            //BusinessActionContext
            BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
                applicationData);
            // 取出需要传递的参数
            Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);
            Object ret;
            boolean result;
            // 是否设置了use_tcc_fence=true,需要Seata完成幂等性、资源悬挂、空回滚等处理;
            if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
                try {
                    result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,
                            args, tccResource.getActionName());
                } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                    throw e.getCause();
                }
            } else {
                // 开发人员自行处理幂等性、资源悬挂、空回滚,直接调用目标方法
                ret = rollbackMethod.invoke(targetTCCBean, args);
                if (ret != null) {
                    if (ret instanceof TwoPhaseResult) {
                        result = ((TwoPhaseResult)ret).isSuccess();
                    } else {
                        result = (boolean)ret;
                    }
                } else {
                    result = true;
                }
            }
            LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
            // 返回回滚结果
            return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        } catch (Throwable t) {
            String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
            LOGGER.error(msg, t);
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
复制代码

1.回滚逻辑和提交逻辑一样,首先都是需要取出缓存中的TCCResource,这里面包含了目标对象和回滚方法以及需要传递的参数;

2.RM会根据use_tcc_fence来判断是否需要进一步增强处理,帮助开发人员处理幂等性、资源悬挂、空回滚等问题;

小结

通过上述源码分析,我们可以简单总结出以下几点:

1.TCC模式的全局事务提交或回滚的流程是由TM发起的,中间有TC服务进行转发给各RM服务;该流程与AT模式类似;

2.在RM执行提交或回滚时,先从缓存中取出一阶段存储的TCCResource,并取出目标对象、对应的提交或回滚方法以及需要传递的参数;

3.RM在真正执行提交或回滚前,还要根据use_tcc_fence来决定是否进一步增强,以便处理幂等性、资源悬挂、空回滚等问题;


相关文章
|
15天前
|
SQL JavaScript 数据库连接
Seata的工作原理
【10月更文挑战第30天】
20 3
|
1月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
28 1
|
1月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
40 1
|
2月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
25天前
|
存储 缓存 数据处理
深度解析:Hologres分布式存储引擎设计原理及其优化策略
【10月更文挑战第9天】在大数据时代,数据的规模和复杂性不断增加,这对数据库系统提出了更高的要求。传统的单机数据库难以应对海量数据处理的需求,而分布式数据库通过水平扩展提供了更好的解决方案。阿里云推出的Hologres是一个实时交互式分析服务,它结合了OLAP(在线分析处理)与OLTP(在线事务处理)的优势,能够在大规模数据集上提供低延迟的数据查询能力。本文将深入探讨Hologres分布式存储引擎的设计原理,并介绍一些关键的优化策略。
79 0
|
2月前
|
网络协议 安全 Java
分布式(基础)-RMI的原理
分布式(基础)-RMI的原理
|
3月前
|
关系型数据库 MySQL 数据库
SpringCloud2023中使用Seata解决分布式事务
对于分布式系统而言,需要保证分布式系统中的数据一致性,保证数据在子系统中始终保持一致,避免业务出现问题。分布式系统中对数据的操作要么一起成功,要么一起失败,必须是一个整体性的事务。Seata简化了这个使用过程。
83 2
|
3月前
|
Java 关系型数据库 MySQL
(二十七)舞动手指速写一个Seata-XA框架解决棘手的分布式事务问题
相信大家对于事务问题都不陌生,在之前《MySQL事务篇》中曾详解过MySQL的事务机制,在传统的单库环境下开发,咱们可依赖于MySQL所提供的事务机制,来确保单个事务内的一组操作,要么全部执行成功,要么全部执行失败。
|
4月前
|
NoSQL Redis 数据库
|
3月前
|
Java Nacos Docker
"揭秘!Docker部署Seata遇上Nacos,注册成功却报错?这些坑你不得不防!一网打尽解决秘籍,让你的分布式事务稳如老狗!"
【8月更文挑战第15天】在微服务架构中,Nacos搭配Seata确保数据一致性时,Docker部署Seata后可能出现客户端连接错误,如“can not connect to services-server”。此问题多由网络配置不当、配置文件错误或版本不兼容引起。解决策略包括:调整Docker网络设置确保可达性;检查并修正`file.conf`和`registry.conf`中的Nacos地址和端口;验证Seata与Nacos版本兼容性;修改配置后重启服务;参考官方文档和最佳实践进行配置。通过这些步骤,能有效排除故障,保障服务稳定运行。
231 0

热门文章

最新文章