手把手教你Spring Cloud集成Seata TCC模式(上)

简介: 手把手教你Spring Cloud集成Seata TCC模式

前言

在博客Spring Cloud集成分布式事务框架Seata 1.5.2中,我们已经集成了Seata AT模式,虽然AT模式可以覆盖大部分分布式事务需求,但是针对于一些追求高性能的业务场景,我们还是需要选择TCC模式;

因为TCC的资源预留概念降低了锁的粒度,在分布式事务未完成前并不会阻塞同业务下的其他分布式事务的执行;但是有一点不好的就是:TCC模式对于业务侵入性比较大,整个分布式事务的资源准备提交回滚逻辑全部需要开发人员自己完成,开发工作量比AT模式大出两三倍;

为了在将来的工作中能够顺利地使用TCC模式来作为高性能业务的分布式事务解决方案,我们下面就开始手把手教大家如何在Spring Cloud中集成Seata TCC模式。

业务场景

ce1f9b3582394c5990e9baff91a00970_tplv-k3u1fbpfcp-zoom-in-crop-mark_4536_0_0_0.png

同样还是购物车下单的业务场景:

1.用户通过业务入口提交下单数据;

2.先计算订单总金额并调用RPC进行预扣款;

  • 2.1:预扣款成功才能创建订单;
  • 2.2:预扣款失败,TM发起回滚,Account服务调用回滚逻辑,分布式事务回滚结束;

3.预扣款成功,那么再发起RPC创建订单;

  • 3.1:Order服务接收到创建订单的请求,先RPC执行预扣库存的操作;
  • 3.2:Storage服务预扣库存成功,Order服务执行预创建订单操作;
  • 3.3:Storage服务预扣库存失败,导致订单创建失败,下单业务抛出异常,TM发起回滚,Account服务与Storage服务调用回滚逻辑,分布式事务回滚结束;
  • 3.4:Order服务执行预创建订单成功,下单业务执行完毕,TM发起提交,所有RM调用提交逻辑,分布式事务成功;
  • 3.5:Order服务执行预创建订单失败,下单业务抛出异常,TM发起回滚,所有RM调用回滚逻辑,分布式事务回滚结束;

数据表创建

注意:TCC模式不需要创建undolog表

账户表:wallet_tcc_tbl

-- ----------------------------
-- Table structure for wallet_tcc_tbl
-- ----------------------------
DROP TABLE IF EXISTS `wallet_tcc_tbl`;
CREATE TABLE `wallet_tcc_tbl` (
  `id` int NOT NULL COMMENT '主键ID',
  `user_id` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID',
  `money` int NOT NULL COMMENT '账户金额',
  `pre_money` int NOT NULL COMMENT '预扣金额',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
INSERT INTO `wallet_tcc_tbl` (`id`, `user_id`, `money`, `pre_money`) VALUES (1, '123456', 1000000, 0);
复制代码

订单表:order_tcc_tbl

-- ----------------------------
-- Table structure for order_tcc_tbl
-- ----------------------------
DROP TABLE IF EXISTS `order_tcc_tbl`;
CREATE TABLE `order_tcc_tbl` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `user_id` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID',
  `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码',
  `count` int NOT NULL COMMENT '商品数量',
  `unit_price` int NOT NULL COMMENT '商品单价',
  `status` varchar(8) COLLATE utf8mb4_bin NOT NULL COMMENT '订单状态',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
复制代码

库存表:stock_tcc_tbl

-- ----------------------------
-- Table structure for stock_tcc_tbl
-- ----------------------------
DROP TABLE IF EXISTS `stock_tcc_tbl`;
CREATE TABLE `stock_tcc_tbl` (
  `id` int NOT NULL COMMENT '主键ID',
  `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码',
  `count` int NOT NULL COMMENT '商品总数',
  `pre_deduct_count` int NOT NULL COMMENT '预扣数量',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
INSERT INTO `stock_tcc_tbl` (`id`, `commodity_code`, `count`, `pre_deduct_count`) VALUES (1, 'CC-54321', 10000, 0);
复制代码

创建TCC Action

在各服务表创建完毕后,我们开始编写核心的TCC Action,用以完成我们的TryCommitCancal逻辑。

Account服务

  • 先构建接口
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
/**
 * @author zouwei
 * @className IWalletTccAction
 * @date: 2022/10/10 20:50
 * @description:
 */
@LocalTCC
public interface IWalletTccAction {
  /**
   * 预扣款
   *
   * @param businessActionContext
   * @param userId
   * @param amount
   * @return
   */
  @TwoPhaseBusinessAction(name = "prepareDeductMoney", commitMethod = "commitDeductMoney", rollbackMethod = "rollbackDeductMoney")
  boolean prepareDeductMoney(BusinessActionContext businessActionContext,
                             @BusinessActionContextParameter(paramName = "userId") String userId,
                             @BusinessActionContextParameter(paramName = "amount") Long amount);
  /**
   * 提交扣款
   *
   * @param businessActionContext
   * @return
   */
  boolean commitDeductMoney(BusinessActionContext businessActionContext);
  /**
   * 回滚扣款
   *
   * @param businessActionContext
   * @return
   */
  boolean rollbackDeductMoney(BusinessActionContext businessActionContext);
}
复制代码

1.@LocalTCC注解表示该接口是一个TCC Action接口,需要Seata处理;

2.@TwoPhaseBusinessAction注解标注了提交和回滚方法,以便Seata根据预处理结果来决定时调用提交方法还是回滚方法

  • 实现扣款服务逻辑
import com.example.awesomeaccount.dao.mapper.WalletTccEnhanceMapper;
import com.example.awesomeaccount.tcc.IWalletTccAction;
import com.example.awesomeaccount.tcc.TccActionResultWrap;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Map;
/**
 * @author zouwei
 * @className WalletTccActionImpl
 * @date: 2022/10/10 21:05
 * @description:
 */
@Component
public class WalletTccActionImpl implements IWalletTccAction {
  // 用来调用数据库
  @Resource
  private WalletTccEnhanceMapper walletTccEnhanceMapper;
  @Override
  public boolean prepareDeductMoney(BusinessActionContext businessActionContext, String userId, Long amount) {
    String xid = businessActionContext.getXid();
    // 幂等性判断
    if (TccActionResultWrap.hasPrepareResult(xid)) {
      return true;
    }
    // 避免空悬挂,已经执行过回滚了就不能再预留资源
    if (TccActionResultWrap.hasRollbackResult(xid) || TccActionResultWrap.hasCommitResult(xid)) {
      return false;
    }
    // 预留资源
    // 相关sql: update wallet_tcc_tbl set money = money - #{amount,jdbcType=INTEGER}, pre_money = pre_money + #{amount,jdbcType=INTEGER} where user_id = #{userId,jdbcType=VARCHAR} and money <![CDATA[ >= ]]>#{amount,jdbcType=INTEGER}
    boolean result = walletTccEnhanceMapper.prepareDeductMoney(userId, amount) > 0;
    // 记录执行结果:xid:result
    // 以便回滚时判断是否是空回滚
    TccActionResultWrap.prepareSuccess(xid);
    return result;
  }
  // 保证提交逻辑的原子性
  @Transactional
  @Override
  public boolean commitDeductMoney(BusinessActionContext businessActionContext) {
    String xid = businessActionContext.getXid();
    // 幂等性判断
    if (TccActionResultWrap.hasCommitResult(xid)) {
      return true;
    }
    Map<String, Object> actionContext = businessActionContext.getActionContext();
    String userId = (String) actionContext.get("userId");
    long amount = (Integer) actionContext.get("amount");
    // 执行提交操作,扣除预留款
    // 相关sql: update wallet_tcc_tbl set pre_money = pre_money - #{amount,jdbcType=INTEGER} where user_id = #{userId,jdbcType=VARCHAR} and pre_money <![CDATA[ >= ]]>#{amount,jdbcType=INTEGER}
    boolean result = walletTccEnhanceMapper.commitDeductMoney(userId, amount) > 0;
    // 清除预留结果
    TccActionResultWrap.removePrepareResult(xid);
    // 设置提交结果
    TccActionResultWrap.commitSuccess(xid);
    return result;
  }
  @Transactional
  @Override
  public boolean rollbackDeductMoney(BusinessActionContext businessActionContext) {
    String xid = businessActionContext.getXid();
    // 幂等性判断
    if (TccActionResultWrap.hasRollbackResult(xid)) {
      return true;
    }
    // 没有预留资源结果,回滚不做任何处理;
    if (!TccActionResultWrap.hasPrepareResult(xid)) {
      // 设置回滚结果,防止空悬挂
      TccActionResultWrap.rollbackResult(xid);
      return true;
    }
    // 执行回滚
    Map<String, Object> actionContext = businessActionContext.getActionContext();
    String userId = (String) actionContext.get("userId");
    long amount = (Integer) actionContext.get("amount");
    // 相关sql: update wallet_tcc_tbl set money = money + #{amount,jdbcType=INTEGER}, pre_money = pre_money - #{amount,jdbcType=INTEGER} where user_id = #{userId,jdbcType=VARCHAR} and pre_money <![CDATA[ >= ]]>#{amount,jdbcType=INTEGER}
    boolean result = walletTccEnhanceMapper.rollbackDeductMoney(userId, amount) > 0;
    // 清除预留结果
    TccActionResultWrap.removePrepareResult(xid);
    // 设置回滚结果
    TccActionResultWrap.rollbackResult(xid);
    return result;
  }
}
复制代码

1.预扣款逻辑就是先把订单总金额从money字段下挪到pre_money下,前提是当前账户中有足够的金额可以扣除;

2.当账户有足够的金额时,那么意味着预扣款成功,这笔钱是其他事务不能操作的,只能静静地等待TM发起分布式事务提交或回滚指令;

3.一旦接收到提交指令,那么Seata就会调用commitDeductMoney方法把已经扣除的金额从pre_money扣掉,这就意味着这笔钱真正地被花掉了;

4.如果接收到回滚指令,说明下单失败,我们就需要把预扣款的钱还给客户,此时Seata就会调用rollbackDeductMoney方法,把之前挪到pre_money下的金额加回到总金额money中,用户账户里的钱就回来了。

Order服务

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
/**
 * @author zouwei
 * @className IOrderTccAction
 * @date: 2022/10/11 12:58
 * @description:
 */
@LocalTCC
public interface IOrderTccAction {
  /**
   * 预创建订单
   *
   * @param businessActionContext
   * @param userId
   * @param commodityCode
   * @param count
   * @param unitPrice
   * @return
   */
  @TwoPhaseBusinessAction(name = "prepareOrder", commitMethod = "commitOrder", rollbackMethod = "rollbackOrder")
  boolean prepareOrder(BusinessActionContext businessActionContext,
                       @BusinessActionContextParameter(paramName = "userId") String userId,
                       @BusinessActionContextParameter(paramName = "userId") String commodityCode,
                       @BusinessActionContextParameter(paramName = "userId") int count,
                       @BusinessActionContextParameter(paramName = "userId") long unitPrice);
  /**
   * 订单生效
   *
   * @param businessActionContext
   * @return
   */
  boolean commitOrder(BusinessActionContext businessActionContext);
  /**
   * 回滚预创建订单
   *
   * @param businessActionContext
   * @return
   */
  boolean rollbackOrder(BusinessActionContext businessActionContext);
}
复制代码
import com.example.awesomeorder.dao.entity.OrderTcc;
import com.example.awesomeorder.dao.mapper.OrderTccMapper;
import com.example.awesomeorder.tcc.IOrderTccAction;
import com.example.awesomeorder.tcc.TccActionResultWrap;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
/**
 * @author zouwei
 * @className OrderTccActionImpl
 * @date: 2022/10/11 13:05
 * @description:
 */
@Component
public class OrderTccActionImpl implements IOrderTccAction {
  @Resource
  private OrderTccMapper orderTccMapper;
  @Override
  public boolean prepareOrder(BusinessActionContext businessActionContext, String userId, String commodityCode, int count, long unitPrice) {
    String xid = businessActionContext.getXid();
    // 幂等性判断
    if (TccActionResultWrap.hasPrepareResult(xid)) {
      return true;
    }
    // 避免空悬挂,已经执行过回滚了就不能再预留资源
    if (TccActionResultWrap.hasRollbackResult(xid) || TccActionResultWrap.hasCommitResult(xid)) {
      return false;
    }
    // 准备预创建订单
    OrderTcc order = new OrderTcc();
    // 创建时间
    order.setCreateTime(LocalDateTime.now());
    // 用户ID
    order.setUserId(userId);
    // 数量
    order.setCount(count);
    // 商品编码
    order.setCommodityCode(commodityCode);
    // 单价
    order.setUnitPrice(unitPrice);
    // 设置状态
    order.setStatus("预创建");
    // 创建订单
    boolean result = orderTccMapper.insert(order) > 0;
    // 记录主键ID,为了传递给提交或回滚
    businessActionContext.addActionContext("id", order.getId());
    // 记录执行结果:xid:result
    // 以便回滚时判断是否是空回滚
    TccActionResultWrap.prepareSuccess(xid);
    return result;
  }
  @Transactional
  @Override
  public boolean commitOrder(BusinessActionContext businessActionContext) {
    String xid = businessActionContext.getXid();
    // 幂等性判断
    if (TccActionResultWrap.hasCommitResult(xid)) {
      return true;
    }
    // 修改预留资源状态
    Integer id = (Integer) businessActionContext.getActionContext("id");
    OrderTcc row = new OrderTcc();
    row.setId(id);
    row.setStatus("成功");
    boolean result = orderTccMapper.updateByPrimaryKeySelective(row) > 0;
    // 清除预留结果
    TccActionResultWrap.removePrepareResult(xid);
    // 设置提交结果
    TccActionResultWrap.commitSuccess(xid);
    return result;
  }
  @Transactional
  @Override
  public boolean rollbackOrder(BusinessActionContext businessActionContext) {
    String xid = businessActionContext.getXid();
    // 幂等性判断
    if (TccActionResultWrap.hasRollbackResult(xid)) {
      return true;
    }
    // 没有预留资源结果,回滚不做任何处理;
    if (!TccActionResultWrap.hasPrepareResult(xid)) {
      // 设置回滚结果,防止空悬挂
      TccActionResultWrap.rollbackResult(xid);
      return true;
    }
    // 执行回滚,删除预创建订单
    Integer id = (Integer) businessActionContext.getActionContext("id");
    boolean result = orderTccMapper.deleteByPrimaryKey(id) > 0;
    // 清除预留结果
    TccActionResultWrap.removePrepareResult(xid);
    // 设置回滚结果
    TccActionResultWrap.rollbackResult(xid);
    return result;
  }
}



相关文章
|
2天前
|
人工智能 自然语言处理 Java
Spring 集成 DeepSeek 的 3大方法(史上最全)
DeepSeek 的 API 接口和 OpenAI 是兼容的。我们可以自定义 http client,按照 OpenAI 的rest 接口格式,去访问 DeepSeek。自定义 Client 集成DeepSeek ,可以通过以下步骤实现。步骤 1:准备工作访问 DeepSeek 的开发者平台,注册并获取 API 密钥。DeepSeek 提供了与 OpenAI 兼容的 API 端点(例如),确保你已获取正确的 API 地址。
Spring 集成 DeepSeek 的 3大方法(史上最全)
|
1月前
|
监控 Java Nacos
使用Spring Boot集成Nacos
通过上述步骤,Spring Boot应用可以成功集成Nacos,利用Nacos的服务发现和配置管理功能来提升微服务架构的灵活性和可维护性。通过这种集成,开发者可以更高效地管理和部署微服务。
208 17
|
1月前
|
人工智能 安全 Dubbo
Spring AI 智能体通过 MCP 集成本地文件数据
MCP 作为一款开放协议,直接规范了应用程序如何向 LLM 提供上下文。MCP 就像是面向 AI 应用程序的 USB-C 端口,正如 USB-C 提供了一种将设备连接到各种外围设备和配件的标准化方式一样,MCP 提供了一个将 AI 模型连接到不同数据源和工具的标准化方法。
|
1月前
|
缓存 安全 Java
Spring Boot 3 集成 Spring Security + JWT
本文详细介绍了如何使用Spring Boot 3和Spring Security集成JWT,实现前后端分离的安全认证概述了从入门到引入数据库,再到使用JWT的完整流程。列举了项目中用到的关键依赖,如MyBatis-Plus、Hutool等。简要提及了系统配置表、部门表、字典表等表结构。使用Hutool-jwt工具类进行JWT校验。配置忽略路径、禁用CSRF、添加JWT校验过滤器等。实现登录接口,返回token等信息。
369 12
|
1月前
|
存储 安全 Java
Spring Boot 3 集成Spring AOP实现系统日志记录
本文介绍了如何在Spring Boot 3中集成Spring AOP实现系统日志记录功能。通过定义`SysLog`注解和配置相应的AOP切面,可以在方法执行前后自动记录日志信息,包括操作的开始时间、结束时间、请求参数、返回结果、异常信息等,并将这些信息保存到数据库中。此外,还使用了`ThreadLocal`变量来存储每个线程独立的日志数据,确保线程安全。文中还展示了项目实战中的部分代码片段,以及基于Spring Boot 3 + Vue 3构建的快速开发框架的简介与内置功能列表。此框架结合了当前主流技术栈,提供了用户管理、权限控制、接口文档自动生成等多项实用特性。
84 8
|
1月前
|
Java 关系型数据库 数据库
微服务SpringCloud分布式事务之Seata
SpringCloud+SpringCloudAlibaba的Seata实现分布式事务,步骤超详细,附带视频教程
77 1
|
2月前
|
数据库 微服务
SEATA模式
Seata 是一款开源的分布式事务解决方案,支持多种事务模式以适应不同的应用场景。其主要模式包括:AT(TCC)模式,事务分三阶段执行;TCC 模式,提供更灵活的事务控制;SAGA 模式,基于状态机实现跨服务的事务一致性;XA 模式,采用传统两阶段提交协议确保数据一致性。
57 5
|
2月前
|
XML Java API
Spring Boot集成MinIO
本文介绍了如何在Spring Boot项目中集成MinIO,一个高性能的分布式对象存储服务。主要步骤包括:引入MinIO依赖、配置MinIO属性、创建MinIO配置类和服务类、使用服务类实现文件上传和下载功能,以及运行应用进行测试。通过这些步骤,可以轻松地在项目中使用MinIO的对象存储功能。
149 5
|
2月前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
385 7
|
3月前
|
消息中间件 运维 数据库
Seata框架和其他分布式事务框架有什么区别
Seata框架和其他分布式事务框架有什么区别
54 1

热门文章

最新文章