前言
在前面的博客手把手教你Spring Cloud集成Seata TCC模式中,我们讲述了如何把TCC模式的分布式事务解决方案集成到Spring Cloud项目中。在该案例中,我们通过@LocalTCC
和@TwoPhaseBusinessAction
两个注解配合,就实现了一个TCC Action
,能够供我们的Service
调用并完成分布式事务。那么,我们的疑问就是,Seata TCC是如何处理我们自定义的TCC Action
的呢?
初始化创建TccActionInterceptor
和AT模式一样,TCC也是通过代理模式完成的,在GlobalTransactionScanner
中有这么一段代码:
interceptor = null; // 检查是否是TCC Action if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // 当需要Seata处理TCC幂等性、资源悬挂、空回滚等问题时,会初始化一个定时任务,用于清除一些已经完成分布式事务的数据 TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext); //创建TccActionInterceptor拦截器 interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); // 添加监听器,时刻监听用户是否关闭全局事务 ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor); } 复制代码
1.在
GlobalTransactionScanner
的wrapIfNecessary()
方法作为创建代理对象的入口,会针对所有的TCC Action
进行包装;2.
TCCBeanParserUtils.isTccAutoProxy()
会检查TCC Action
是否被@LocalTCC
和@TwoPhaseBusinessAction
注解了,有这两个注解,那么就符合TCC Action
标准,就会被拦截;3.
TCCBeanParserUtils.initTccFenceCleanTask()
是用来处理TCC
模式中存在的三个问题:幂等性
、资源悬挂
、空回滚
;4.
TccActionInterceptor
才是我们当前这篇文章的主要讲解对象,它会针对TCC Action
做一个拦截,并对TCC Action
进行相应的增强处理;5.
ConfigurationCache.addConfigListener()
是为了监听是否触发了分布式事务的关闭。开启了client.tm.degradeCheck=true
才会触发TM的降级处理;
拦截TCC Action
在TccActionInterceptor
中,我们主要看invoke()
方法,因为它实现了MethodInterceptor
,这是实现代理的常用手段:
@Override public Object invoke(final MethodInvocation invocation) throws Throwable { if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) { // 先排除不在TCC分布式事务中的可能性 return invocation.proceed(); } // 找到目前被调用的方法 Method method = getActionInterfaceMethod(invocation); // 从方法上找到TwoPhaseBusinessAction注解 TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class); // 注解不为null,说明我们要对这个方法进行拦截 if (businessAction != null) { // 获取TM保存在上下文中的XID String xid = RootContext.getXID(); // 如果有嵌套的分支事务,先挂起,然后把当前要做的分支事务类型放进去(常规操作) BranchType previousBranchType = RootContext.getBranchType(); // 如果上层嵌套的分支事务不是TCC类型,就把当前要处理的分支类型设置为TCC类型 if (BranchType.TCC != previousBranchType) { RootContext.bindBranchType(BranchType.TCC); } try { // 真正的开始进行拦截工作 return actionInterceptorHandler.proceed(method, invocation.getArguments(), xid, businessAction, invocation::proceed); } finally { // 下面都是恢复现场 if (BranchType.TCC != previousBranchType) { RootContext.unbindBranchType(); } //MDC remove branchId MDC.remove(RootContext.MDC_KEY_BRANCH_ID); } } //如果businessAction为null,说明不需要拦截 return invocation.proceed(); } 复制代码
1.
invoke()
中,第一步先排除不在TCC分布式事务内的可能性,这样下面的代码就必然是在TCC分布式事务中可能需要调用的代码;2.被拦截的方法必须要是
@TwoPhaseBusinessAction
注解的方法;3.代码中还考虑到了事务嵌套的情况,需要把上层事务的
BranchType
取出来,把当前需要处理的BranchType
设置为BranchType.TCC
,调用结束后,又把上层事务的BranchType
恢复回去;
开启分支事务,执行资源预留逻辑
public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction, Callback<Object> targetCallback) throws Throwable { // 获取BusinessActionContext,获取不到就自己创建 BusinessActionContext actionContext = getOrCreateActionContextAndResetToArguments(method.getParameterTypes(), arguments); // 把相关的信息都设置进BusinessActionContext中 actionContext.setXid(xid); //Set the action name String actionName = businessAction.name(); actionContext.setActionName(actionName); //Set the delay report actionContext.setDelayReport(businessAction.isDelayReport()); // 把方法上的所有参数都转换成json,并向TC注册分支事务,返回分支事务ID--->branchId String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext); actionContext.setBranchId(branchId); //MDC put branchId MDC.put(RootContext.MDC_KEY_BRANCH_ID, branchId); // 取出上层事务的BusinessActionContext BusinessActionContext previousActionContext = BusinessActionContextUtil.getContext(); try { // 设置当前事务的BusinessActionContext BusinessActionContextUtil.setContext(actionContext); // 如果需要Seata框架处理幂等性、资源悬挂、空回滚等问题,就调用TCCFenceHandler.prepareFence() if (businessAction.useTCCFence()) { try { // Use TCC Fence, and return the business result return TCCFenceHandler.prepareFence(xid, Long.valueOf(branchId), actionName, targetCallback); } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) { Throwable originException = e.getCause(); if (originException instanceof FrameworkException) { LOGGER.error("[{}] prepare TCC fence error: {}", xid, originException.getMessage()); } throw originException; } } else { // 如果是自己处理幂等性、资源悬挂、空回滚等问题,直接调用targetCallback.execute() return targetCallback.execute(); } } finally { try { //上报分支事务处理结果 BusinessActionContextUtil.reportContext(actionContext); } finally { // 恢复现场 if (previousActionContext != null) { // recovery the previous action context BusinessActionContextUtil.setContext(previousActionContext); } else { // clear the action context BusinessActionContextUtil.clear(); } } } } 复制代码
1.在拦截过程中,框架会根据用户在真实调用时是否传递了
BusinessActionContext
参数来判断是否需要Seata重新new一个BusinessActionContext
对象。我们在awesome-seata案例中,直接传的就是null,那么针对这种情况,Seata会自己创建BusinessActionContext
对象;2.在
doTccActionLogStore()
逻辑中,主要功能就是解析被@BusinessActionContextParameter
注解的参数,放进BusinessActionContext
中,并向TC服务注册分支事务;3.在业务逻辑被真正调用前,会根据用户在
@TwoPhaseBusinessAction
中配置的useTCCFence
属性来判断是否需要Seata处理幂等性、资源悬挂、空回滚等问题;如果不需要处理的话,就直接调用资源预留的逻辑;否则使用TCCFenceHandler
来处理幂等性、资源悬挂、空回滚等逻辑;
小结
综上所述,我们可以归纳出如下几点:
1.TCC模式也要在TM上标注
@GlobalTransactional
,这样才能实现分布式事务的开启、提交或回滚;2.创建
TCC Action
必须要使用@LocalTCC
和@TwoPhaseBusinessAction
两个注解,@LocalTCC
注解放在TCC Action
接口上面,@TwoPhaseBusinessAction
注解放在资源预留的方法上面;3.
TCC Action
被调用时,参数BusinessActionContext
对象可以传null,也可以自己new BusinessActionContext()
,Seata会根据开发人员传递的参数自行处理;4.如果开发人员需要Seata帮助解决幂等性、资源悬挂、空回滚等问题,那么需要在
@TwoPhaseBusinessAction
中设置useTCCFence=true
,并创建相应的local_tcc_log
数据表;