阿里 Seata 新版本终于解决了 TCC 模式的幂等、悬挂和空回滚问题

简介: 阿里 Seata 新版本终于解决了 TCC 模式的幂等、悬挂和空回滚问题

大家好,我是君哥。

今天来聊一聊阿里巴巴 Seata 新版本(1.5.1)是怎么解决 TCC 模式下的幂等、悬挂和空回滚问题的。

1 TCC 回顾

TCC 模式是最经典的分布式事务解决方案,它将分布式事务分为两个阶段来执行,try 阶段对每个分支事务进行预留资源,如果所有分支事务都预留资源成功,则进入 commit 阶段提交全局事务,如果有一个节点预留资源失败则进入 cancel 阶段回滚全局事务。

以传统的订单、库存、账户服务为例,在 try 阶段尝试预留资源,插入订单、扣减库存、扣减金额,这三个服务都是要提交本地事务的,这里可以把资源转入中间表。在 commit 阶段,再把 try 阶段预留的资源转入最终表。而在 cancel 阶段,把 try 阶段预留的资源进行释放,比如把账户金额返回给客户的账户。

注意:try 阶段必须是要提交本地事务的,比如扣减订单金额,必须把钱从客户账户扣掉,如果不扣掉,在 commit 阶段客户账户钱不够了,就会出问题。

1.1 try-commit

try 阶段首先进行预留资源,然后在 commit 阶段扣除资源。如下图:

微信图片_20221213114310.png

1.2 try-cancel

try 阶段首先进行预留资源,预留资源时扣减库存失败导致全局事务回滚,在 cancel 阶段释放资源。如下图:

微信图片_20221213114336.png

2 TCC 优势

TCC 模式最大的优势是效率高。TCC 模式在 try 阶段的锁定资源并不是真正意义上的锁定,而是真实提交了本地事务,将资源预留到中间态,并不需要阻塞等待,因此效率比其他模式要高。

同时 TCC 模式还可以进行如下优化:

2.1 异步提交

try 阶段成功后,不立即进入 confirm/cancel 阶段,而是认为全局事务已经结束了,启动定时任务来异步执行 confirm/cancel,扣减或释放资源,这样会有很大的性能提升。

2.2 同库模式

TCC 模式中有三个角色:

  • TM:管理全局事务,包括开启全局事务,提交/回滚全局事务;
  • RM:管理分支事务;
  • TC: 管理全局事务和分支事务的状态。

下图来自 Seata 官网:

微信图片_20221213114436.png

TM 开启全局事务时,RM 需要向 TC 发送注册消息,TC 保存分支事务的状态。TM 请求提交或回滚时,TC 需要向 RM 发送提交或回滚消息。这样包含两个个分支事务的分布式事务中,TC 和 RM 之间有四次 RPC。

优化后的流程如下图:

微信图片_20221213114501.png

TC 保存全局事务的状态。TM 开启全局事务时,RM 不再需要向 TC 发送注册消息,而是把分支事务状态保存在了本地。TM 向 TC 发送提交或回滚消息后,RM 异步线程首先查出本地保存的未提交分支事务,然后向 TC 发送消息获取(本地分支事务所在的)全局事务状态,以决定是提交还是回滚本地事务。

这样优化后,RPC 次数减少了 50%,性能大幅提升。

3 RM 代码示例

以库存服务为例,RM 库存服务接口代码如下:

@LocalTCC
public interface StorageService {
    /**
     * 扣减库存
     * @param xid 全局xid
     * @param productId 产品id
     * @param count 数量
     * @return
     */
    @TwoPhaseBusinessAction(name = "storageApi", commitMethod = "commit", rollbackMethod = "rollback", useTCCFence = true)
    boolean decrease(String xid, Long productId, Integer count);
    /**
     * 提交事务
     * @param actionContext
     * @return
     */
    boolean commit(BusinessActionContext actionContext);
    /**
     * 回滚事务
     * @param actionContext
     * @return
     */
    boolean rollback(BusinessActionContext actionContext);
}

通过 @LocalTCC 这个注解,RM 初始化的时候会向 TC 注册一个分支事务。在 try 阶段的方法(decrease方法)上有一个 @TwoPhaseBusinessAction 注解,这里定义了分支事务的 resourceId,commit 方法和 cancel 方法,useTCCFence 这个属性下一节再讲。

4 TCC 存在问题

TCC 模式中存在的三大问题是幂等、悬挂和空回滚。在 Seata1.5.1 版本中,增加了一张事务控制表,表名是 tcc_fence_log 来解决这个问题。而在上一节 @TwoPhaseBusinessAction 注解中提到的属性 useTCCFence 就是来指定是否开启这个机制,这个属性值默认是 false。

tcc_fence_log 建表语句如下(MySQL 语法):

CREATE TABLE IF NOT EXISTS `tcc_fence_log`
(
    `xid`           VARCHAR(128)  NOT NULL COMMENT 'global id',
    `branch_id`     BIGINT        NOT NULL COMMENT 'branch id',
    `action_name`   VARCHAR(64)   NOT NULL COMMENT 'action name',
    `status`        TINYINT       NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',
    `gmt_create`    DATETIME(3)   NOT NULL COMMENT 'create time',
    `gmt_modified`  DATETIME(3)   NOT NULL COMMENT 'update time',
    PRIMARY KEY (`xid`, `branch_id`),
    KEY `idx_gmt_modified` (`gmt_modified`),
    KEY `idx_status` (`status`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

4.1 幂等

在 commit/cancel 阶段,因为 TC 没有收到分支事务的响应,需要进行重试,这就要分支事务支持幂等。

我们看一下新版本是怎么解决的。下面的代码在 TCCResourceManager 类:

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
         String applicationData) throws TransactionException {
 TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
 //省略判断
 Object targetTCCBean = tccResource.getTargetBean();
 Method commitMethod = tccResource.getCommitMethod();
 //省略判断
 try {
  //BusinessActionContext
  BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
   applicationData);
  Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
  Object ret;
  boolean result;
  //注解 useTCCFence 属性是否设置为 true
  if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
   try {
    result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);
   } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
    throw e.getCause();
   }
  } else {
   //省略逻辑
  }
  LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
  return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
 } catch (Throwable t) {
  //省略
  return BranchStatus.PhaseTwo_CommitFailed_Retryable;
 }
}

上面的代码可以看到,执行分支事务提交方法时,首先判断 useTCCFence 属性是否为 true,如果为 true,则走 TCCFenceHandler 类中的 commitFence 逻辑,否则走普通提交逻辑。

TCCFenceHandler 类中的 commitFence 方法调用了 TCCFenceHandler 类的 commitFence 方法,代码如下:

public static boolean commitFence(Method commitMethod, Object targetTCCBean,
          String xid, Long branchId, Object[] args) {
 return transactionTemplate.execute(status -> {
  try {
   Connection conn = DataSourceUtils.getConnection(dataSource);
   TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
   if (tccFenceDO == null) {
    throw new TCCFenceException(String.format("TCC fence record not exists, commit fence method failed. xid= %s, branchId= %s", xid, branchId),
      FrameworkErrorCode.RecordAlreadyExists);
   }
   if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
    LOGGER.info("Branch transaction has already committed before. idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
    return true;
   }
   if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
    if (LOGGER.isWarnEnabled()) {
     LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
    }
    return false;
   }
   return updateStatusAndInvokeTargetMethod(conn, commitMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_COMMITTED, status, args);
  } catch (Throwable t) {
   status.setRollbackOnly();
   throw new SkipCallbackWrapperException(t);
  }
 });
}

从代码中可以看到,提交事务时首先会判断 tcc_fence_log 表中是否已经有记录,如果有记录,则判断事务执行状态并返回。这样如果判断到事务的状态已经是 STATUS_COMMITTED,就不会再次提交,保证了幂等。如果 tcc_fence_log 表中没有记录,则插入一条记录,供后面重试时判断。

Rollback 的逻辑跟 commit 类似,逻辑在类 TCCFenceHandler 的 rollbackFence 方法。

4.2 空回滚

如下图,账户服务是两个节点的集群,在 try 阶段账户服务 1 这个节点发生了故障,try 阶段在不考虑重试的情况下,全局事务必须要走向结束状态,这样就需要在账户服务上执行一次 cancel 操作,这样就空跑了一次回滚操作。

微信图片_20221213114534.png

Seata 的解决方案是在 try 阶段 往 tcc_fence_log  表插入一条记录,status 字段值是 STATUS_TRIED,在 Rollback 阶段判断记录是否存在,如果不存在,则不执行回滚操作。代码如下:

//TCCFenceHandler 类
public static Object prepareFence(String xid, Long branchId, String actionName, Callback<Object> targetCallback) {
 return transactionTemplate.execute(status -> {
  try {
   Connection conn = DataSourceUtils.getConnection(dataSource);
   boolean result = insertTCCFenceLog(conn, xid, branchId, actionName, TCCFenceConstant.STATUS_TRIED);
   LOGGER.info("TCC fence prepare result: {}. xid: {}, branchId: {}", result, xid, branchId);
   if (result) {
    return targetCallback.execute();
   } else {
    throw new TCCFenceException(String.format("Insert tcc fence record error, prepare fence failed. xid= %s, branchId= %s", xid, branchId),
      FrameworkErrorCode.InsertRecordError);
   }
  } catch (TCCFenceException e) {
   //省略
  } catch (Throwable t) {
   //省略
  }
 });
}

在 Rollback 阶段的处理逻辑如下:

//TCCFenceHandler 类
public static boolean rollbackFence(Method rollbackMethod, Object targetTCCBean,
         String xid, Long branchId, Object[] args, String actionName) {
 return transactionTemplate.execute(status -> {
  try {
   Connection conn = DataSourceUtils.getConnection(dataSource);
   TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
   // non_rollback
   if (tccFenceDO == null) {
    //不执行回滚逻辑
    return true;
   } else {
    if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
     LOGGER.info("Branch transaction had already rollbacked before, idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
     return true;
    }
    if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
     if (LOGGER.isWarnEnabled()) {
      LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
     }
     return false;
    }
   }
   return updateStatusAndInvokeTargetMethod(conn, rollbackMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_ROLLBACKED, status, args);
  } catch (Throwable t) {
   status.setRollbackOnly();
   throw new SkipCallbackWrapperException(t);
  }
 });
}

updateStatusAndInvokeTargetMethod 方法执行的 sql 如下:

update tcc_fence_log set status = ?, gmt_modified = ?
    where xid = ? and  branch_id = ? and status = ? ;

可见就是把 tcc_fence_log 表记录的  status  字段值从 STATUS_TRIED 改为 STATUS_ROLLBACKED,如果更新成功,就执行回滚逻辑。

4.3 悬挂

悬挂是指因为网络问题,RM 开始没有收到 try 指令,但是执行了 Rollback 后 RM 又收到了 try 指令并且预留资源成功,这时全局事务已经结束,最终导致预留的资源不能释放。如下图:

微信图片_20221213114602.png

Seata 解决这个问题的方法是执行 Rollback 方法时先判断 tcc_fence_log 是否存在当前 xid 的记录,如果没有则向 tcc_fence_log 表插入一条记录,状态是 STATUS_SUSPENDED,并且不再执行回滚操作。代码如下:

public static boolean rollbackFence(Method rollbackMethod, Object targetTCCBean,
         String xid, Long branchId, Object[] args, String actionName) {
 return transactionTemplate.execute(status -> {
  try {
   Connection conn = DataSourceUtils.getConnection(dataSource);
   TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
   // non_rollback
   if (tccFenceDO == null) {
       //插入防悬挂记录
    boolean result = insertTCCFenceLog(conn, xid, branchId, actionName, TCCFenceConstant.STATUS_SUSPENDED);
    //省略逻辑
    return true;
   } else {
    //省略逻辑
   }
   return updateStatusAndInvokeTargetMethod(conn, rollbackMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_ROLLBACKED, status, args);
  } catch (Throwable t) {
   //省略逻辑
  }
 });
}

而后面执行 try 阶段方法时首先会向 tcc_fence_log 表插入一条当前 xid 的记录,这样就造成了主键冲突。代码如下:

//TCCFenceHandler 类
public static Object prepareFence(String xid, Long branchId, String actionName, Callback<Object> targetCallback) {
 return transactionTemplate.execute(status -> {
  try {
   Connection conn = DataSourceUtils.getConnection(dataSource);
   boolean result = insertTCCFenceLog(conn, xid, branchId, actionName, TCCFenceConstant.STATUS_TRIED);
   //省略逻辑
  } catch (TCCFenceException e) {
   if (e.getErrcode() == FrameworkErrorCode.DuplicateKeyException) {
    LOGGER.error("Branch transaction has already rollbacked before,prepare fence failed. xid= {},branchId = {}", xid, branchId);
    addToLogCleanQueue(xid, branchId);
   }
   status.setRollbackOnly();
   throw new SkipCallbackWrapperException(e);
  } catch (Throwable t) {
   //省略
  }
 });
}

注意:queryTCCFenceDO 方法 sql 中使用了 for update,这样就不用担心 Rollback 方法中获取不到 tcc_fence_log 表记录而无法判断 try 阶段本地事务的执行结果了。

5 总结

TCC 模式是分布式事务使用最多的模式,但是幂等、悬挂和空回滚一直是 TCC 模式需要考虑的问题,Seata 框架在 1.5.1 版本完美解决了这些问题。

对 tcc_fence_log 表的操作也需要考虑事务的控制,Seata 使用了代理数据源,使 tcc_fence_log 表操作和 RM 业务操作在同一个本地事务中执行,这样就能保证本地操作和对 tcc_fence_log 的操作同时成功或失败。

相关文章
|
1月前
|
数据库 微服务
SEATA模式
Seata 是一款开源的分布式事务解决方案,支持多种事务模式以适应不同的应用场景。其主要模式包括:AT(TCC)模式,事务分三阶段执行;TCC 模式,提供更灵活的事务控制;SAGA 模式,基于状态机实现跨服务的事务一致性;XA 模式,采用传统两阶段提交协议确保数据一致性。
45 5
|
2月前
Seata框架在AT模式下是如何保证数据一致性的?
通过以上这些机制的协同作用,Seata 在 AT 模式下能够有效地保证数据的一致性,确保分布式事务的可靠执行。你还可以进一步深入研究 Seata 的具体实现细节,以更好地理解其数据一致性保障的原理。
44 3
|
7月前
|
Apache 开发者
Apache Seata 如何解决 TCC 模式的幂等、悬挂和空回滚问题
【6月更文挑战第8天】Apache Seata 是一款分布式事务框架,解决TCC模式下的幂等、悬挂和空回滚问题。通过记录事务状态处理幂等,设置超时机制避免悬挂,明确标记Try操作成功来处理空回滚。Seata 提供丰富配置和管理功能,确保分布式事务的可靠性和效率,支持复杂事务处理场景,为企业业务发展提供支持。
255 7
|
2月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
6天前
|
Java 关系型数据库 数据库
微服务SpringCloud分布式事务之Seata
SpringCloud+SpringCloudAlibaba的Seata实现分布式事务,步骤超详细,附带视频教程
23 1
|
1月前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
210 7
|
2月前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
101 6
|
2月前
|
数据库
如何在Seata框架中配置分布式事务的隔离级别?
总的来说,配置分布式事务的隔离级别是实现分布式事务管理的重要环节之一,需要认真对待和仔细调整,以满足业务的需求和性能要求。你还可以进一步深入研究和实践 Seata 框架的配置和使用,以更好地应对各种分布式事务场景的挑战。
50 6
|
2月前
|
消息中间件 运维 数据库
Seata框架和其他分布式事务框架有什么区别
Seata框架和其他分布式事务框架有什么区别
36 1
|
4月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata