原理篇:Seata TCC如何实现分布式事务提交或回滚

简介: 原理篇:Seata TCC如何实现分布式事务提交或回滚

前言

在之前的文章手把手教你Spring Cloud集成Seata TCC模式中教大家学会了如何使用TCC模式,原理篇:Seata TCC模式是如何调用资源预留逻辑的中解答了TCC模式是怎样调用资源预留逻辑的,那么接下来的疑问就是TCC模式是如何实现分布式事务的提交或回滚的呢?

如何触发分布式事务的提交或回滚

我们知道,AT模式是由TM根据各分支事务提交的情况来决议发起分布式事务的提交还是回滚,其实TCC模式也是一样,TCC模式所属的TM也是需要被@GlobalTransactional注解的,所以TCC模式也会构建出GlobalTransactionalInterceptor拦截器。

GlobalTransactionScanner.wrapIfNecessary():

interceptor = null;
                // 这是专门针对TCC Action拦截的
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    // init tcc fence clean task if enable useTccFence
                    TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                    ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener)interceptor);
                } else {
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                  // 这是专门针对TM拦截的,需要被@GlobalTransactional或@GlobalLock注解
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
                    if (globalTransactionalInterceptor == null) {
                      // 被@GlobalTransactional或@GlobalLock注解才会创建GlobalTransactionalInterceptor对象
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(
                                ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }
复制代码

1.GlobalTransactionScanner.wrapIfNecessary()做了两件事情,一个就是处理TCC Action,一个就是创建TM拦截器GlobalTransactionalInterceptor

2.GlobalTransactionalInterceptor不仅仅是给AT模式使用的,TCC模式也是需要使用的,它的职责就是肩负起TM的功能,实现分布式事务的开启、提交或回滚;

GlobalTransactionalInterceptor.handleGlobalTransaction()------>TransactionalTemplate.execute():

try {
                // 分布式事务开启
                beginTransaction(txInfo, tx);
                Object rs;
                try {
                    // 执行业务逻辑
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 决议回滚
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }
                // 4. 决议提交
                commitTransaction(tx);
                // 返回结果
                return rs;
            } finally {
                //5. clear
                resumeGlobalLockConfig(previousConfig);
                triggerAfterCompletion();
                cleanUp();
            }
复制代码

也就是说,当TM执行的业务逻辑中出现异常,都会触发completeTransactionAfterThrowing();如若没有任何异常,说明业务逻辑平稳执行,那么触发commitTransaction()

在TM调用的business.execute()方法中,是会逐个调用各微服务提供的业务服务,在Seata框架中,被视为多个RM服务,只有当所有RM都调用正常通过后,才能够让TM执行到commitTransaction()方法,任意一个RM出异常,都会触发TM执行到completeTransactionAfterThrowing()上;

所以说,RM分支事务是否执行成功是触发分布式事务提交或回滚的成因;TM是根据RM分支事务的执行结果来做出分布式事务提交或回滚的决议;

如何触发各分支事务提交或回滚

在TM决议提交或回滚后,会向TC服务发起分布式事务的提交或回滚请求,TC服务作为中间转发层,会依次逐个向RM发起分支事务提交或回滚请求,可查看RM的请求处理代码:

public abstract class AbstractRMHandler extends AbstractExceptionHandler
    implements RMInboundHandler, TransactionMessageHandler {
  // 处理TC发起的分支事务提交请求
  @Override
    public BranchCommitResponse handle(BranchCommitRequest request) {
        BranchCommitResponse response = new BranchCommitResponse();
        // 模版模式
        exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
            @Override
            public void execute(BranchCommitRequest request, BranchCommitResponse response)
                throws TransactionException {
                doBranchCommit(request, response);
            }
        }, request, response);
        return response;
    }
  // 处理TC发起的分支事务回滚请求
    @Override
    public BranchRollbackResponse handle(BranchRollbackRequest request) {
        BranchRollbackResponse response = new BranchRollbackResponse();
      // 模版模式
        exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
            @Override
            public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
                throws TransactionException {
                doBranchRollback(request, response);
            }
        }, request, response);
        return response;
    }
}
复制代码

触发RM分支事务提交或回滚,是由TM发起的决议,中间由TC服务依次进行转发给对应的RM;

RM如何实现提交或回滚

最终,在接收到TC服务的请求后,会调用到TCCResourceManager.branchCommit()TCCResourceManager.branchRollback():

  • 提交逻辑
@Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
        // 从缓存中获取TCCResource
        TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
        if (tccResource == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
        }
        // 拿出目标对象和指定的commit方法
        Object targetTCCBean = tccResource.getTargetBean();
        Method commitMethod = tccResource.getCommitMethod();
        if (targetTCCBean == null || commitMethod == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
        }
        try {
            //BusinessActionContext
            BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
                applicationData);
            // 需要传递的参数
            Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
            Object ret;
            boolean result;
            // 如果需要Seata完成幂等性、资源悬挂、空回滚等处理,就调用TCCFenceHandler.commitFence()
            if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
                try {
                    result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);
                } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                    throw e.getCause();
                }
            } else {
                // 直接调用目标方法
                ret = commitMethod.invoke(targetTCCBean, args);
                if (ret != null) {
                    if (ret instanceof TwoPhaseResult) {
                        result = ((TwoPhaseResult)ret).isSuccess();
                    } else {
                        result = (boolean)ret;
                    }
                } else {
                    result = true;
                }
            }
            LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
            // 返回提交结果
            return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
        } catch (Throwable t) {
            String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
            LOGGER.error(msg, t);
            return BranchStatus.PhaseTwo_CommitFailed_Retryable;
        }
    }
复制代码

1.RM最终会执行到TCCResourceManager中,并调用branchCommit()完成分支提交的逻辑调用;

2.在提交前会从tccResourceCache中取出一阶段缓存进去的TCCResource,里面包含了目标对象、目标方法以及对应的参数;

3.RM会根据use_tcc_fence配置来决定是直接调用commitMethod.invoke()还是通过TCCFenceHandler.commitFence()来完成目标方法的调用;

  • 回滚逻辑
@Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        // 先从缓存中获取TCCResource
        TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
        if (tccResource == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
        }
        // 拿出目标对象和回滚方法
        Object targetTCCBean = tccResource.getTargetBean();
        Method rollbackMethod = tccResource.getRollbackMethod();
        if (targetTCCBean == null || rollbackMethod == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
        }
        try {
            //BusinessActionContext
            BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
                applicationData);
            // 取出需要传递的参数
            Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);
            Object ret;
            boolean result;
            // 是否设置了use_tcc_fence=true,需要Seata完成幂等性、资源悬挂、空回滚等处理;
            if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
                try {
                    result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,
                            args, tccResource.getActionName());
                } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                    throw e.getCause();
                }
            } else {
                // 开发人员自行处理幂等性、资源悬挂、空回滚,直接调用目标方法
                ret = rollbackMethod.invoke(targetTCCBean, args);
                if (ret != null) {
                    if (ret instanceof TwoPhaseResult) {
                        result = ((TwoPhaseResult)ret).isSuccess();
                    } else {
                        result = (boolean)ret;
                    }
                } else {
                    result = true;
                }
            }
            LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
            // 返回回滚结果
            return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        } catch (Throwable t) {
            String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
            LOGGER.error(msg, t);
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
复制代码

1.回滚逻辑和提交逻辑一样,首先都是需要取出缓存中的TCCResource,这里面包含了目标对象和回滚方法以及需要传递的参数;

2.RM会根据use_tcc_fence来判断是否需要进一步增强处理,帮助开发人员处理幂等性、资源悬挂、空回滚等问题;

小结

通过上述源码分析,我们可以简单总结出以下几点:

1.TCC模式的全局事务提交或回滚的流程是由TM发起的,中间有TC服务进行转发给各RM服务;该流程与AT模式类似;

2.在RM执行提交或回滚时,先从缓存中取出一阶段存储的TCCResource,并取出目标对象、对应的提交或回滚方法以及需要传递的参数;

3.RM在真正执行提交或回滚前,还要根据use_tcc_fence来决定是否进一步增强,以便处理幂等性、资源悬挂、空回滚等问题;


相关文章
|
1月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
18天前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
132 7
|
1月前
|
存储 Dubbo Java
分布式 RPC 底层原理详解,看这篇就够了!
本文详解分布式RPC的底层原理与系统设计,大厂面试高频,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
分布式 RPC 底层原理详解,看这篇就够了!
|
1月前
|
消息中间件 数据库
Seata框架的工作原理
你还可以进一步深入研究 Seata 框架的技术细节和具体实现,以更好地理解其工作原理和优势。同时,结合实际应用场景进行实践和优化,也是提高分布式事务处理能力的重要途径。
43 15
|
22天前
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
55 4
|
1月前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
51 6
|
1月前
|
数据库
如何在Seata框架中配置分布式事务的隔离级别?
总的来说,配置分布式事务的隔离级别是实现分布式事务管理的重要环节之一,需要认真对待和仔细调整,以满足业务的需求和性能要求。你还可以进一步深入研究和实践 Seata 框架的配置和使用,以更好地应对各种分布式事务场景的挑战。
29 6
|
28天前
|
消息中间件 运维 数据库
Seata框架和其他分布式事务框架有什么区别
Seata框架和其他分布式事务框架有什么区别
25 1
|
2月前
|
SQL JavaScript 数据库连接
Seata的工作原理
【10月更文挑战第30天】
40 3
|
2月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?

热门文章

最新文章