原理篇:Seata TCC模式是如何解决幂等性、资源悬挂、空回滚问题的

简介: 原理篇:Seata TCC模式是如何解决幂等性、资源悬挂、空回滚问题的

前言

在之前的博客中,我们讲到TCC模式中,如果在@TwoPhaseBusinessAction注解中设置useTCCFence=true,那么Seata会帮助开发人员处理幂等性、资源悬挂、空回滚等问题,那么我们这篇文章来看看Seata TCC是如何解决这三个问题的。

什么时候触发TCCFence?

  • 一阶段

ActionInterceptorHandler.proceed():

// 如果@TwoPhaseBusinessAction注解设置了useTCCFence=true
if (businessAction.useTCCFence()) {
                try {
                    // 调用TCCFenceHandler.prepareFence()
                    return TCCFenceHandler.prepareFence(xid, Long.valueOf(branchId), actionName, targetCallback);
                } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                    Throwable originException = e.getCause();
                    if (originException instanceof FrameworkException) {
                        LOGGER.error("[{}] prepare TCC fence error: {}", xid, originException.getMessage());
                    }
                    throw originException;
                }
            }
复制代码

在Seata准备执行一阶段资源预留前,会判断是否设置了useTCCFence=true,如果设置了useTCCFence=true,那么需要交给TCCFenceHandler.prepareFence()来处理;

  • 二阶段

TCCResourceManager.branchCommit():

// 判断是否useTCCFence=true
if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
                try {
                    // 调用TCCFenceHandler.commitFence()执行提交
                    result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);
                } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                    throw e.getCause();
                }
            }
复制代码

TCCResourceManager.branchRollback():

// 判断是否useTCCFence=true
if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
                try {
                    // 调用TCCFenceHandler.rollbackFence()执行回滚
                    result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,
                            args, tccResource.getActionName());
                } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
                    throw e.getCause();
                }
            }
复制代码

在RM调用分支事务提交或回滚前,都会检查是否设置了useTCCFence=true,如果useTCCFence=true,那么就要交给TCCFenceHandler来增强提交或回滚逻辑;

TCCFenceHandler如何实现幂等性

  • 一阶段

TCCFenceHandler.prepareFence():

Connection conn = DataSourceUtils.getConnection(dataSource);
// 数据库添加一条有状态的记录,状态为`TCCFenceConstant.STATUS_TRIED`;
boolean result = insertTCCFenceLog(conn, xid, branchId, actionName, TCCFenceConstant.STATUS_TRIED);
LOGGER.info("TCC fence prepare result: {}. xid: {}, branchId: {}", result, xid, branchId);
if (result) {
    // 调用资源预留逻辑
    return targetCallback.execute();
} else {
    throw new TCCFenceException(String.format("Insert tcc fence record error, prepare fence failed. xid= %s, branchId= %s", xid, branchId),
                            FrameworkErrorCode.InsertRecordError);
}
复制代码
  • 二阶段

TCCFenceHandler.commitFence():

public static boolean commitFence(Method commitMethod, Object targetTCCBean,
                                      String xid, Long branchId, Object[] args) {
        // 模版模式
        return transactionTemplate.execute(status -> {
            try {
                Connection conn = DataSourceUtils.getConnection(dataSource);
                // 查询一阶段提交的记录
                TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
                // 如果没查到一阶段的记录,说明还不能调用提交逻辑
                if (tccFenceDO == null) {
                    throw new TCCFenceException(String.format("TCC fence record not exists, commit fence method failed. xid= %s, branchId= %s", xid, branchId),
                            FrameworkErrorCode.RecordAlreadyExists);
                }
                // 如果记录的状态为`TCCFenceConstant.STATUS_COMMITTED`,说明已经提交过了,直接返回true;这就满足了幂等性;
                if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
                    LOGGER.info("Branch transaction has already committed before. idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
                    return true;
                }
                // 如果状态是`TCCFenceConstant.STATUS_ROLLBACKED`,说明已经调用了回滚逻辑,那就不能执行提交;如果是`TCCFenceConstant.STATUS_SUSPENDED`,那就说明该分布式事务暂时挂起了,还不能执行提交;
                if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
                    if (LOGGER.isWarnEnabled()) {
                        LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
                    }
                    return false;
                }
                // 这里就可以执行提交逻辑,并修改状态为`TCCFenceConstant.STATUS_COMMITTED`;
                return updateStatusAndInvokeTargetMethod(conn, commitMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_COMMITTED, status, args);
            } catch (Throwable t) {
                status.setRollbackOnly();
                throw new SkipCallbackWrapperException(t);
            }
        });
    }
复制代码

1.在提交前会先查询出一阶段插入的记录,如果没有查询到记录,说明一阶段还没有完成,那么就不能执行提交逻辑;

2.如果查询到记录状态是TCCFenceConstant.STATUS_COMMITTED,说明已经执行过提交逻辑了,那么就不会再次执行提交逻辑,也就保证了提交的幂等性;

3.如果是其他状态,比如已经回滚了,或者被挂起了,那么就不能执行提交逻辑;

4.所有不可提交的状态都排除后,开始正式执行提交逻辑,并更新状态为TCCFenceConstant.STATUS_COMMITTED,表示已经提交;

TCCFenceHandler.rollbackFence():

public static boolean rollbackFence(Method rollbackMethod, Object targetTCCBean,
                                        String xid, Long branchId, Object[] args, String actionName) {
        return transactionTemplate.execute(status -> {
            try {
                Connection conn = DataSourceUtils.getConnection(dataSource);
                // 查询一阶段插入的记录
                TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
                // 如果没查询到记录,说明一阶段还没执行
                if (tccFenceDO == null) {
                    // 如果一阶段没执行,就来执行回滚逻辑,说明出现空回滚情况,那么就插入一条记录,防止一阶段在回滚后再执行;
                    boolean result = insertTCCFenceLog(conn, xid, branchId, actionName, TCCFenceConstant.STATUS_SUSPENDED);
                    LOGGER.info("Insert tcc fence record result: {}. xid: {}, branchId: {}", result, xid, branchId);
                    // 如果插入失败,说明一阶段刚刚执行成功了,那么抛出异常,等待下次重试;
                    if (!result) {
                        throw new TCCFenceException(String.format("Insert tcc fence record error, rollback fence method failed. xid= %s, branchId= %s", xid, branchId),
                                FrameworkErrorCode.InsertRecordError);
                    }
                    // 如果插入成功,直接返回,一阶段也不会执行成功;
                    return true;
                } else {
                    // 如果是`TCCFenceConstant.STATUS_ROLLBACKED`状态,说明已经回滚过了;如果是`TCCFenceConstant.STATUS_SUSPENDED`,也不允许执行回滚逻辑;
                    if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
                        LOGGER.info("Branch transaction had already rollbacked before, idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
                        return true;
                    }
                    // 如果是`TCCFenceConstant.STATUS_COMMITTED`状态,说明已经提交了,那么也不能回滚;
                    if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
                        if (LOGGER.isWarnEnabled()) {
                            LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
                        }
                        return false;
                    }
                }
                // 执行回滚逻辑,并修改状态为`TCCFenceConstant.STATUS_ROLLBACKED`;
                return updateStatusAndInvokeTargetMethod(conn, rollbackMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_ROLLBACKED, status, args);
            } catch (Throwable t) {
                status.setRollbackOnly();
                throw new SkipCallbackWrapperException(t);
            }
        });
    }
复制代码

1.回滚前会查询一阶段保存的记录,如果没有查询到一阶段保存的记录,那么说明出现了空回滚的情况;

2.出现空回滚情况就立马插入一条记录,防止一阶段插入记录成功,也就预防了资源悬挂;

3.如果空回滚能够插入记录,那么一阶段必然执行失败,不会出现资源悬挂问题,也就说明什么都没做,回滚成功;如果空回滚插入记录失败,说明一阶段刚刚执行成功,那么我们直接抛异常,回滚失败,等待下次重试;

4.如果能够查到一阶段的记录,并且状态是TCCFenceConstant.STATUS_ROLLBACKEDTCCFenceConstant.STATUS_SUSPENDED,说明已经执行过回滚动作了,那么就不能再执行了,直接返回,满足幂等性;

5.如果一阶段记录的状态是TCCFenceConstant.STATUS_COMMITTED,说明已经执行过提交逻辑了,那么就不能再回滚了;

6.如果满足执行回滚的条件,那么调用回滚逻辑,并修改状态为TCCFenceConstant.STATUS_ROLLBACKED,完成回滚动作;

小结

通过以上源码分析,我们可以大概总结为以下几点:

1.开发人员需要在使用TCC模式的时候,在@TwoPhaseBusinessAction注解中设置useTCCFence=true才能使用Seata提供的功能处理幂等性、资源悬挂、空回滚等问题;

2.Seata解决这三个问题的方式是建立一张记录表,通过记录一阶段、二阶段处理的状态让各处理逻辑感知事务所处的状态,从而达到接口幂等性、避免资源悬挂、空回滚等问题;

3.开发人员需要额外再补充一张记录表tcc_fence_log,该数据表建表语句如下:

-- ----------------------------
-- Table structure for tcc_fence_log
-- ----------------------------
DROP TABLE IF EXISTS `tcc_fence_log`;
CREATE TABLE `tcc_fence_log` (
  `branch_id` bigint NOT NULL COMMENT 'branch transaction id',
  `xid` varchar(128) NOT NULL COMMENT 'global transaction id',
  `action_name` varchar(128) NOT NULL COMMENT 'TCC Action name',
  `status` int NOT NULL COMMENT '1:STATUS_TRIED,2:STATUS_COMMITTED,3:STATUS_ROLLBACKED,4:STATUS_SUSPENDED',
  `gmt_create` datetime(6) NOT NULL COMMENT 'create datetime',
  `gmt_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
  UNIQUE KEY `ux_tcc_fence_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='TCC transaction mode tcc fence log table';


相关文章
|
6月前
|
Kubernetes Cloud Native Java
Seata常见问题之回滚一直在重试如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
30天前
|
SQL JavaScript 数据库连接
Seata的工作原理
【10月更文挑战第30天】
28 3
|
5月前
|
Apache 开发者
Apache Seata 如何解决 TCC 模式的幂等、悬挂和空回滚问题
【6月更文挑战第8天】Apache Seata 是一款分布式事务框架,解决TCC模式下的幂等、悬挂和空回滚问题。通过记录事务状态处理幂等,设置超时机制避免悬挂,明确标记Try操作成功来处理空回滚。Seata 提供丰富配置和管理功能,确保分布式事务的可靠性和效率,支持复杂事务处理场景,为企业业务发展提供支持。
219 7
|
6月前
|
存储 Java Nacos
Seata常见问题之xa模式出现错误xid is not valid如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
212 4
|
6月前
|
NoSQL Java 数据库
Seata常见问题之xa模式下插入一条数据再更新这条数据会报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
195 2
|
6月前
|
SpringCloudAlibaba Java 数据库
SpringCloud Alibaba微服务 -- Seata的原理和使用
SpringCloud Alibaba微服务 -- Seata的原理和使用
|
6月前
|
Java 关系型数据库 微服务
Seata常见问题之项目一直启动不成功如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
545 0
|
2月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
6月前
|
存储 关系型数据库 MySQL
基于Seata实现分布式事务
通过以上步骤,你可以使用 Seata 实现分布式事务,确保在微服务架构中的事务一致性。Seata 支持多种语言和框架,能够满足不同业务场景的需求。欢迎关注威哥爱编程,一起学习成长。
165 1
|
3月前
|
关系型数据库 MySQL 数据库
SpringCloud2023中使用Seata解决分布式事务
对于分布式系统而言,需要保证分布式系统中的数据一致性,保证数据在子系统中始终保持一致,避免业务出现问题。分布式系统中对数据的操作要么一起成功,要么一起失败,必须是一个整体性的事务。Seata简化了这个使用过程。
90 2
下一篇
无影云桌面