前言
在之前的文章手把手教你Spring Cloud集成Seata TCC模式中教大家学会了如何使用TCC模式,原理篇:Seata TCC模式是如何调用资源预留逻辑的中解答了TCC模式是怎样调用资源预留逻辑的,那么接下来的疑问就是TCC模式是如何实现分布式事务的提交或回滚的呢?
如何触发分布式事务的提交或回滚
我们知道,AT模式是由TM根据各分支事务提交的情况来决议发起分布式事务的提交还是回滚,其实TCC模式也是一样,TCC模式所属的TM
也是需要被@GlobalTransactional
注解的,所以TCC模式也会构建出GlobalTransactionalInterceptor
拦截器。
GlobalTransactionScanner.wrapIfNecessary()
:
interceptor = null; // 这是专门针对TCC Action拦截的 if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // init tcc fence clean task if enable useTccFence TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext); //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor); } else { Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); // 这是专门针对TM拦截的,需要被@GlobalTransactional或@GlobalLock注解 if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) { return bean; } if (globalTransactionalInterceptor == null) { // 被@GlobalTransactional或@GlobalLock注解才会创建GlobalTransactionalInterceptor对象 globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook); ConfigurationCache.addConfigListener( ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor); } interceptor = globalTransactionalInterceptor; } 复制代码
1.
GlobalTransactionScanner.wrapIfNecessary()
做了两件事情,一个就是处理TCC Action
,一个就是创建TM
拦截器GlobalTransactionalInterceptor
;2.
GlobalTransactionalInterceptor
不仅仅是给AT模式使用的,TCC模式也是需要使用的,它的职责就是肩负起TM
的功能,实现分布式事务的开启、提交或回滚;
GlobalTransactionalInterceptor.handleGlobalTransaction()
------>TransactionalTemplate.execute()
:
try { // 分布式事务开启 beginTransaction(txInfo, tx); Object rs; try { // 执行业务逻辑 rs = business.execute(); } catch (Throwable ex) { // 决议回滚 completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } // 4. 决议提交 commitTransaction(tx); // 返回结果 return rs; } finally { //5. clear resumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp(); } 复制代码
也就是说,当TM执行的业务逻辑中出现异常,都会触发completeTransactionAfterThrowing()
;如若没有任何异常,说明业务逻辑平稳执行,那么触发commitTransaction()
;
在TM调用的business.execute()
方法中,是会逐个调用各微服务提供的业务服务,在Seata框架中,被视为多个RM
服务,只有当所有RM
都调用正常通过后,才能够让TM执行到commitTransaction()
方法,任意一个RM
出异常,都会触发TM执行到completeTransactionAfterThrowing()
上;
所以说,RM分支事务是否执行成功是触发分布式事务提交或回滚的成因;TM是根据RM分支事务的执行结果来做出分布式事务提交或回滚的决议;
如何触发各分支事务提交或回滚
在TM决议提交或回滚后,会向TC服务发起分布式事务的提交或回滚请求,TC服务作为中间转发层,会依次逐个向RM发起分支事务提交或回滚请求,可查看RM的请求处理代码:
public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler { // 处理TC发起的分支事务提交请求 @Override public BranchCommitResponse handle(BranchCommitRequest request) { BranchCommitResponse response = new BranchCommitResponse(); // 模版模式 exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() { @Override public void execute(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { doBranchCommit(request, response); } }, request, response); return response; } // 处理TC发起的分支事务回滚请求 @Override public BranchRollbackResponse handle(BranchRollbackRequest request) { BranchRollbackResponse response = new BranchRollbackResponse(); // 模版模式 exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() { @Override public void execute(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException { doBranchRollback(request, response); } }, request, response); return response; } } 复制代码
触发RM分支事务提交或回滚,是由TM发起的决议,中间由TC服务依次进行转发给对应的RM;
RM如何实现提交或回滚
最终,在接收到TC服务的请求后,会调用到TCCResourceManager.branchCommit()
和TCCResourceManager.branchRollback()
:
- 提交逻辑
@Override public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { // 从缓存中获取TCCResource TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId); if (tccResource == null) { throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId)); } // 拿出目标对象和指定的commit方法 Object targetTCCBean = tccResource.getTargetBean(); Method commitMethod = tccResource.getCommitMethod(); if (targetTCCBean == null || commitMethod == null) { throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId)); } try { //BusinessActionContext BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, applicationData); // 需要传递的参数 Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext); Object ret; boolean result; // 如果需要Seata完成幂等性、资源悬挂、空回滚等处理,就调用TCCFenceHandler.commitFence() if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) { try { result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args); } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) { throw e.getCause(); } } else { // 直接调用目标方法 ret = commitMethod.invoke(targetTCCBean, args); if (ret != null) { if (ret instanceof TwoPhaseResult) { result = ((TwoPhaseResult)ret).isSuccess(); } else { result = (boolean)ret; } } else { result = true; } } LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId); // 返回提交结果 return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable; } catch (Throwable t) { String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid); LOGGER.error(msg, t); return BranchStatus.PhaseTwo_CommitFailed_Retryable; } } 复制代码
1.RM最终会执行到
TCCResourceManager
中,并调用branchCommit()
完成分支提交的逻辑调用;2.在提交前会从
tccResourceCache
中取出一阶段缓存进去的TCCResource
,里面包含了目标对象、目标方法以及对应的参数;3.RM会根据
use_tcc_fence
配置来决定是直接调用commitMethod.invoke()
还是通过TCCFenceHandler.commitFence()
来完成目标方法的调用;
- 回滚逻辑
@Override public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { // 先从缓存中获取TCCResource TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId); if (tccResource == null) { throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId)); } // 拿出目标对象和回滚方法 Object targetTCCBean = tccResource.getTargetBean(); Method rollbackMethod = tccResource.getRollbackMethod(); if (targetTCCBean == null || rollbackMethod == null) { throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId)); } try { //BusinessActionContext BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, applicationData); // 取出需要传递的参数 Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext); Object ret; boolean result; // 是否设置了use_tcc_fence=true,需要Seata完成幂等性、资源悬挂、空回滚等处理; if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) { try { result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId, args, tccResource.getActionName()); } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) { throw e.getCause(); } } else { // 开发人员自行处理幂等性、资源悬挂、空回滚,直接调用目标方法 ret = rollbackMethod.invoke(targetTCCBean, args); if (ret != null) { if (ret instanceof TwoPhaseResult) { result = ((TwoPhaseResult)ret).isSuccess(); } else { result = (boolean)ret; } } else { result = true; } } LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId); // 返回回滚结果 return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable; } catch (Throwable t) { String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid); LOGGER.error(msg, t); return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } } 复制代码
1.回滚逻辑和提交逻辑一样,首先都是需要取出缓存中的
TCCResource
,这里面包含了目标对象和回滚方法以及需要传递的参数;2.RM会根据
use_tcc_fence
来判断是否需要进一步增强处理,帮助开发人员处理幂等性、资源悬挂、空回滚等问题;
小结
通过上述源码分析,我们可以简单总结出以下几点:
1.TCC模式的全局事务提交或回滚的流程是由TM发起的,中间有TC服务进行转发给各RM服务;该流程与AT模式类似;
2.在RM执行提交或回滚时,先从缓存中取出一阶段存储的
TCCResource
,并取出目标对象、对应的提交或回滚方法以及需要传递的参数;3.RM在真正执行提交或回滚前,还要根据
use_tcc_fence
来决定是否进一步增强,以便处理幂等性、资源悬挂、空回滚等问题;