原理篇:Seata TCC模式是如何调用资源预留逻辑的

简介: 原理篇:Seata TCC模式是如何调用资源预留逻辑的

前言

在前面的博客手把手教你Spring Cloud集成Seata TCC模式中,我们讲述了如何把TCC模式的分布式事务解决方案集成到Spring Cloud项目中。在该案例中,我们通过@LocalTCC@TwoPhaseBusinessAction两个注解配合,就实现了一个TCC Action,能够供我们的Service调用并完成分布式事务。那么,我们的疑问就是,Seata TCC是如何处理我们自定义的TCC Action的呢?

初始化创建TccActionInterceptor

和AT模式一样,TCC也是通过代理模式完成的,在GlobalTransactionScanner中有这么一段代码:

interceptor = null;
// 检查是否是TCC Action
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
    // 当需要Seata处理TCC幂等性、资源悬挂、空回滚等问题时,会初始化一个定时任务,用于清除一些已经完成分布式事务的数据
    TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
    //创建TccActionInterceptor拦截器
    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
    // 添加监听器,时刻监听用户是否关闭全局事务
    ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
            (ConfigurationChangeListener)interceptor);
}
复制代码

1.在GlobalTransactionScannerwrapIfNecessary()方法作为创建代理对象的入口,会针对所有的TCC Action进行包装;

2.TCCBeanParserUtils.isTccAutoProxy()会检查TCC Action是否被@LocalTCC@TwoPhaseBusinessAction注解了,有这两个注解,那么就符合TCC Action标准,就会被拦截;

3.TCCBeanParserUtils.initTccFenceCleanTask()是用来处理TCC模式中存在的三个问题:幂等性资源悬挂空回滚

4.TccActionInterceptor才是我们当前这篇文章的主要讲解对象,它会针对TCC Action做一个拦截,并对TCC Action进行相应的增强处理;

5.ConfigurationCache.addConfigListener()是为了监听是否触发了分布式事务的关闭。开启了client.tm.degradeCheck=true才会触发TM的降级处理;

拦截TCC Action

TccActionInterceptor中,我们主要看invoke()方法,因为它实现了MethodInterceptor,这是实现代理的常用手段:

@Override
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) {
            // 先排除不在TCC分布式事务中的可能性
            return invocation.proceed();
        }
        // 找到目前被调用的方法
        Method method = getActionInterfaceMethod(invocation);
        // 从方法上找到TwoPhaseBusinessAction注解
        TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
        // 注解不为null,说明我们要对这个方法进行拦截
        if (businessAction != null) {
            // 获取TM保存在上下文中的XID
            String xid = RootContext.getXID();
            // 如果有嵌套的分支事务,先挂起,然后把当前要做的分支事务类型放进去(常规操作)
            BranchType previousBranchType = RootContext.getBranchType();
            // 如果上层嵌套的分支事务不是TCC类型,就把当前要处理的分支类型设置为TCC类型
            if (BranchType.TCC != previousBranchType) {
                RootContext.bindBranchType(BranchType.TCC);
            }
            try {
                // 真正的开始进行拦截工作
                return actionInterceptorHandler.proceed(method, invocation.getArguments(), xid, businessAction,
                        invocation::proceed);
            } finally {
                // 下面都是恢复现场
                if (BranchType.TCC != previousBranchType) {
                    RootContext.unbindBranchType();
                }
                //MDC remove branchId
                MDC.remove(RootContext.MDC_KEY_BRANCH_ID);
            }
        }
        //如果businessAction为null,说明不需要拦截
        return invocation.proceed();
    }
复制代码

1.invoke()中,第一步先排除不在TCC分布式事务内的可能性,这样下面的代码就必然是在TCC分布式事务中可能需要调用的代码;

2.被拦截的方法必须要是@TwoPhaseBusinessAction注解的方法;

3.代码中还考虑到了事务嵌套的情况,需要把上层事务的BranchType取出来,把当前需要处理的BranchType设置为BranchType.TCC,调用结束后,又把上层事务的BranchType恢复回去;

开启分支事务,执行资源预留逻辑

public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
                                       Callback<Object> targetCallback) throws Throwable {
        // 获取BusinessActionContext,获取不到就自己创建
        BusinessActionContext actionContext = getOrCreateActionContextAndResetToArguments(method.getParameterTypes(), arguments);
        // 把相关的信息都设置进BusinessActionContext中
        actionContext.setXid(xid);
        //Set the action name
        String actionName = businessAction.name();
        actionContext.setActionName(actionName);
        //Set the delay report
        actionContext.setDelayReport(businessAction.isDelayReport());
        // 把方法上的所有参数都转换成json,并向TC注册分支事务,返回分支事务ID--->branchId
        String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
        actionContext.setBranchId(branchId);
        //MDC put branchId
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, branchId);
        // 取出上层事务的BusinessActionContext
        BusinessActionContext previousActionContext = BusinessActionContextUtil.getContext();
        try {
            // 设置当前事务的BusinessActionContext
            BusinessActionContextUtil.setContext(actionContext);
            // 如果需要Seata框架处理幂等性、资源悬挂、空回滚等问题,就调用TCCFenceHandler.prepareFence()
            if (businessAction.useTCCFence()) {
                try {
                    // Use TCC Fence, and return the business result
                    return TCCFenceHandler.prepareFence(xid, Long.valueOf(branchId), actionName, targetCallback);
                } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                    Throwable originException = e.getCause();
                    if (originException instanceof FrameworkException) {
                        LOGGER.error("[{}] prepare TCC fence error: {}", xid, originException.getMessage());
                    }
                    throw originException;
                }
            } else {
                // 如果是自己处理幂等性、资源悬挂、空回滚等问题,直接调用targetCallback.execute()
                return targetCallback.execute();
            }
        } finally {
            try {
                //上报分支事务处理结果
                BusinessActionContextUtil.reportContext(actionContext);
            } finally {
                // 恢复现场
                if (previousActionContext != null) {
                    // recovery the previous action context
                    BusinessActionContextUtil.setContext(previousActionContext);
                } else {
                    // clear the action context
                    BusinessActionContextUtil.clear();
                }
            }
        }
    }
复制代码

1.在拦截过程中,框架会根据用户在真实调用时是否传递了BusinessActionContext参数来判断是否需要Seata重新new一个BusinessActionContext对象。我们在awesome-seata案例中,直接传的就是null,那么针对这种情况,Seata会自己创建BusinessActionContext对象;

2.在doTccActionLogStore()逻辑中,主要功能就是解析被@BusinessActionContextParameter注解的参数,放进BusinessActionContext中,并向TC服务注册分支事务;

3.在业务逻辑被真正调用前,会根据用户在@TwoPhaseBusinessAction中配置的useTCCFence属性来判断是否需要Seata处理幂等性、资源悬挂、空回滚等问题;如果不需要处理的话,就直接调用资源预留的逻辑;否则使用TCCFenceHandler来处理幂等性、资源悬挂、空回滚等逻辑;

小结

综上所述,我们可以归纳出如下几点:

1.TCC模式也要在TM上标注@GlobalTransactional,这样才能实现分布式事务的开启、提交或回滚;

2.创建TCC Action必须要使用@LocalTCC@TwoPhaseBusinessAction两个注解,@LocalTCC注解放在TCC Action接口上面,@TwoPhaseBusinessAction注解放在资源预留的方法上面;

3.TCC Action被调用时,参数BusinessActionContext对象可以传null,也可以自己new BusinessActionContext(),Seata会根据开发人员传递的参数自行处理;

4.如果开发人员需要Seata帮助解决幂等性、资源悬挂、空回滚等问题,那么需要在@TwoPhaseBusinessAction中设置useTCCFence=true,并创建相应的local_tcc_log数据表;


相关文章
|
4天前
|
自然语言处理 监控 Dubbo
Seata常见问题之使用tcc模式配置yml如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
83 4
|
4天前
|
数据库
|
4天前
|
NoSQL Java 数据库
Seata常见问题之xa模式下插入一条数据再更新这条数据会报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
111 2
|
4天前
|
Java 关系型数据库 微服务
Seata常见问题之项目一直启动不成功如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
93 0
|
4天前
|
存储 Java Nacos
Seata常见问题之xa模式出现错误xid is not valid如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
77 4
|
4天前
|
Nacos 数据库
分布式事务解决方案Seata
分布式事务解决方案Seata
29 1
|
4天前
|
SQL 关系型数据库 数据库
学习分布式事务Seata看这一篇就够了,建议收藏
学习分布式事务Seata看这一篇就够了,建议收藏
|
4天前
|
关系型数据库 MySQL 数据库
分布式事务Seata
分布式事务Seata
|
4天前
|
存储 关系型数据库 MySQL
基于Seata实现分布式事务
通过以上步骤,你可以使用 Seata 实现分布式事务,确保在微服务架构中的事务一致性。Seata 支持多种语言和框架,能够满足不同业务场景的需求。欢迎关注威哥爱编程,一起学习成长。
|
4天前
|
Java 数据库连接 API
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
65 0

热门文章

最新文章