原理篇:Seata TCC模式是如何调用资源预留逻辑的

简介: 原理篇:Seata TCC模式是如何调用资源预留逻辑的

前言

在前面的博客手把手教你Spring Cloud集成Seata TCC模式中,我们讲述了如何把TCC模式的分布式事务解决方案集成到Spring Cloud项目中。在该案例中,我们通过@LocalTCC@TwoPhaseBusinessAction两个注解配合,就实现了一个TCC Action,能够供我们的Service调用并完成分布式事务。那么,我们的疑问就是,Seata TCC是如何处理我们自定义的TCC Action的呢?

初始化创建TccActionInterceptor

和AT模式一样,TCC也是通过代理模式完成的,在GlobalTransactionScanner中有这么一段代码:

interceptor = null;
// 检查是否是TCC Action
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
    // 当需要Seata处理TCC幂等性、资源悬挂、空回滚等问题时,会初始化一个定时任务,用于清除一些已经完成分布式事务的数据
    TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
    //创建TccActionInterceptor拦截器
    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
    // 添加监听器,时刻监听用户是否关闭全局事务
    ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
            (ConfigurationChangeListener)interceptor);
}
复制代码

1.在GlobalTransactionScannerwrapIfNecessary()方法作为创建代理对象的入口,会针对所有的TCC Action进行包装;

2.TCCBeanParserUtils.isTccAutoProxy()会检查TCC Action是否被@LocalTCC@TwoPhaseBusinessAction注解了,有这两个注解,那么就符合TCC Action标准,就会被拦截;

3.TCCBeanParserUtils.initTccFenceCleanTask()是用来处理TCC模式中存在的三个问题:幂等性资源悬挂空回滚

4.TccActionInterceptor才是我们当前这篇文章的主要讲解对象,它会针对TCC Action做一个拦截,并对TCC Action进行相应的增强处理;

5.ConfigurationCache.addConfigListener()是为了监听是否触发了分布式事务的关闭。开启了client.tm.degradeCheck=true才会触发TM的降级处理;

拦截TCC Action

TccActionInterceptor中,我们主要看invoke()方法,因为它实现了MethodInterceptor,这是实现代理的常用手段:

@Override
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) {
            // 先排除不在TCC分布式事务中的可能性
            return invocation.proceed();
        }
        // 找到目前被调用的方法
        Method method = getActionInterfaceMethod(invocation);
        // 从方法上找到TwoPhaseBusinessAction注解
        TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
        // 注解不为null,说明我们要对这个方法进行拦截
        if (businessAction != null) {
            // 获取TM保存在上下文中的XID
            String xid = RootContext.getXID();
            // 如果有嵌套的分支事务,先挂起,然后把当前要做的分支事务类型放进去(常规操作)
            BranchType previousBranchType = RootContext.getBranchType();
            // 如果上层嵌套的分支事务不是TCC类型,就把当前要处理的分支类型设置为TCC类型
            if (BranchType.TCC != previousBranchType) {
                RootContext.bindBranchType(BranchType.TCC);
            }
            try {
                // 真正的开始进行拦截工作
                return actionInterceptorHandler.proceed(method, invocation.getArguments(), xid, businessAction,
                        invocation::proceed);
            } finally {
                // 下面都是恢复现场
                if (BranchType.TCC != previousBranchType) {
                    RootContext.unbindBranchType();
                }
                //MDC remove branchId
                MDC.remove(RootContext.MDC_KEY_BRANCH_ID);
            }
        }
        //如果businessAction为null,说明不需要拦截
        return invocation.proceed();
    }
复制代码

1.invoke()中,第一步先排除不在TCC分布式事务内的可能性,这样下面的代码就必然是在TCC分布式事务中可能需要调用的代码;

2.被拦截的方法必须要是@TwoPhaseBusinessAction注解的方法;

3.代码中还考虑到了事务嵌套的情况,需要把上层事务的BranchType取出来,把当前需要处理的BranchType设置为BranchType.TCC,调用结束后,又把上层事务的BranchType恢复回去;

开启分支事务,执行资源预留逻辑

public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
                                       Callback<Object> targetCallback) throws Throwable {
        // 获取BusinessActionContext,获取不到就自己创建
        BusinessActionContext actionContext = getOrCreateActionContextAndResetToArguments(method.getParameterTypes(), arguments);
        // 把相关的信息都设置进BusinessActionContext中
        actionContext.setXid(xid);
        //Set the action name
        String actionName = businessAction.name();
        actionContext.setActionName(actionName);
        //Set the delay report
        actionContext.setDelayReport(businessAction.isDelayReport());
        // 把方法上的所有参数都转换成json,并向TC注册分支事务,返回分支事务ID--->branchId
        String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
        actionContext.setBranchId(branchId);
        //MDC put branchId
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, branchId);
        // 取出上层事务的BusinessActionContext
        BusinessActionContext previousActionContext = BusinessActionContextUtil.getContext();
        try {
            // 设置当前事务的BusinessActionContext
            BusinessActionContextUtil.setContext(actionContext);
            // 如果需要Seata框架处理幂等性、资源悬挂、空回滚等问题,就调用TCCFenceHandler.prepareFence()
            if (businessAction.useTCCFence()) {
                try {
                    // Use TCC Fence, and return the business result
                    return TCCFenceHandler.prepareFence(xid, Long.valueOf(branchId), actionName, targetCallback);
                } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                    Throwable originException = e.getCause();
                    if (originException instanceof FrameworkException) {
                        LOGGER.error("[{}] prepare TCC fence error: {}", xid, originException.getMessage());
                    }
                    throw originException;
                }
            } else {
                // 如果是自己处理幂等性、资源悬挂、空回滚等问题,直接调用targetCallback.execute()
                return targetCallback.execute();
            }
        } finally {
            try {
                //上报分支事务处理结果
                BusinessActionContextUtil.reportContext(actionContext);
            } finally {
                // 恢复现场
                if (previousActionContext != null) {
                    // recovery the previous action context
                    BusinessActionContextUtil.setContext(previousActionContext);
                } else {
                    // clear the action context
                    BusinessActionContextUtil.clear();
                }
            }
        }
    }
复制代码

1.在拦截过程中,框架会根据用户在真实调用时是否传递了BusinessActionContext参数来判断是否需要Seata重新new一个BusinessActionContext对象。我们在awesome-seata案例中,直接传的就是null,那么针对这种情况,Seata会自己创建BusinessActionContext对象;

2.在doTccActionLogStore()逻辑中,主要功能就是解析被@BusinessActionContextParameter注解的参数,放进BusinessActionContext中,并向TC服务注册分支事务;

3.在业务逻辑被真正调用前,会根据用户在@TwoPhaseBusinessAction中配置的useTCCFence属性来判断是否需要Seata处理幂等性、资源悬挂、空回滚等问题;如果不需要处理的话,就直接调用资源预留的逻辑;否则使用TCCFenceHandler来处理幂等性、资源悬挂、空回滚等逻辑;

小结

综上所述,我们可以归纳出如下几点:

1.TCC模式也要在TM上标注@GlobalTransactional,这样才能实现分布式事务的开启、提交或回滚;

2.创建TCC Action必须要使用@LocalTCC@TwoPhaseBusinessAction两个注解,@LocalTCC注解放在TCC Action接口上面,@TwoPhaseBusinessAction注解放在资源预留的方法上面;

3.TCC Action被调用时,参数BusinessActionContext对象可以传null,也可以自己new BusinessActionContext(),Seata会根据开发人员传递的参数自行处理;

4.如果开发人员需要Seata帮助解决幂等性、资源悬挂、空回滚等问题,那么需要在@TwoPhaseBusinessAction中设置useTCCFence=true,并创建相应的local_tcc_log数据表;


相关文章
|
1月前
|
消息中间件 数据库
Seata框架的工作原理
你还可以进一步深入研究 Seata 框架的技术细节和具体实现,以更好地理解其工作原理和优势。同时,结合实际应用场景进行实践和优化,也是提高分布式事务处理能力的重要途径。
43 15
|
22天前
|
数据库 微服务
SEATA模式
Seata 是一款开源的分布式事务解决方案,支持多种事务模式以适应不同的应用场景。其主要模式包括:AT(TCC)模式,事务分三阶段执行;TCC 模式,提供更灵活的事务控制;SAGA 模式,基于状态机实现跨服务的事务一致性;XA 模式,采用传统两阶段提交协议确保数据一致性。
39 5
|
28天前
Seata框架在AT模式下是如何保证数据一致性的?
通过以上这些机制的协同作用,Seata 在 AT 模式下能够有效地保证数据的一致性,确保分布式事务的可靠执行。你还可以进一步深入研究 Seata 的具体实现细节,以更好地理解其数据一致性保障的原理。
39 3
|
2月前
|
SQL JavaScript 数据库连接
Seata的工作原理
【10月更文挑战第30天】
40 3
|
6月前
|
Apache 开发者
Apache Seata 如何解决 TCC 模式的幂等、悬挂和空回滚问题
【6月更文挑战第8天】Apache Seata 是一款分布式事务框架,解决TCC模式下的幂等、悬挂和空回滚问题。通过记录事务状态处理幂等,设置超时机制避免悬挂,明确标记Try操作成功来处理空回滚。Seata 提供丰富配置和管理功能,确保分布式事务的可靠性和效率,支持复杂事务处理场景,为企业业务发展提供支持。
240 7
|
7月前
|
NoSQL Java 数据库
Seata常见问题之xa模式下插入一条数据再更新这条数据会报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
207 2
|
7月前
|
SpringCloudAlibaba Java 数据库
SpringCloud Alibaba微服务 -- Seata的原理和使用
SpringCloud Alibaba微服务 -- Seata的原理和使用
|
1月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
18天前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
132 7
|
1月前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
51 6