前言
在博客Spring Cloud集成分布式事务框架Seata 1.5.2中,我们已经集成了Seata AT模式,虽然AT模式可以覆盖大部分分布式事务需求,但是针对于一些追求高性能的业务场景,我们还是需要选择TCC模式;
因为TCC的资源预留
概念降低了锁的粒度,在分布式事务未完成前并不会阻塞同业务下的其他分布式事务的执行;但是有一点不好的就是:TCC模式对于业务侵入性比较大,整个分布式事务的资源准备
、提交
、回滚
逻辑全部需要开发人员自己完成,开发工作量比AT模式大出两三倍;
为了在将来的工作中能够顺利地使用TCC模式来作为高性能业务的分布式事务解决方案,我们下面就开始手把手教大家如何在Spring Cloud中集成Seata TCC模式。
业务场景
同样还是购物车下单的业务场景:
1.用户通过业务入口提交下单数据;
2.先计算订单总金额并调用RPC进行预扣款;
- 2.1:预扣款成功才能创建订单;
- 2.2:预扣款失败,TM发起回滚,Account服务调用回滚逻辑,分布式事务回滚结束;
3.预扣款成功,那么再发起RPC创建订单;
- 3.1:Order服务接收到创建订单的请求,先RPC执行预扣库存的操作;
- 3.2:Storage服务预扣库存成功,Order服务执行预创建订单操作;
- 3.3:Storage服务预扣库存失败,导致订单创建失败,下单业务抛出异常,TM发起回滚,Account服务与Storage服务调用回滚逻辑,分布式事务回滚结束;
- 3.4:Order服务执行预创建订单成功,下单业务执行完毕,TM发起提交,所有RM调用提交逻辑,分布式事务成功;
- 3.5:Order服务执行预创建订单失败,下单业务抛出异常,TM发起回滚,所有RM调用回滚逻辑,分布式事务回滚结束;
数据表创建
注意:TCC模式不需要创建undolog表
账户表:wallet_tcc_tbl
-- ---------------------------- -- Table structure for wallet_tcc_tbl -- ---------------------------- DROP TABLE IF EXISTS `wallet_tcc_tbl`; CREATE TABLE `wallet_tcc_tbl` ( `id` int NOT NULL COMMENT '主键ID', `user_id` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID', `money` int NOT NULL COMMENT '账户金额', `pre_money` int NOT NULL COMMENT '预扣金额', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; INSERT INTO `wallet_tcc_tbl` (`id`, `user_id`, `money`, `pre_money`) VALUES (1, '123456', 1000000, 0); 复制代码
订单表:order_tcc_tbl
-- ---------------------------- -- Table structure for order_tcc_tbl -- ---------------------------- DROP TABLE IF EXISTS `order_tcc_tbl`; CREATE TABLE `order_tcc_tbl` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '主键ID', `user_id` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID', `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码', `count` int NOT NULL COMMENT '商品数量', `unit_price` int NOT NULL COMMENT '商品单价', `status` varchar(8) COLLATE utf8mb4_bin NOT NULL COMMENT '订单状态', `create_time` datetime NOT NULL COMMENT '创建时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; 复制代码
库存表:stock_tcc_tbl
-- ---------------------------- -- Table structure for stock_tcc_tbl -- ---------------------------- DROP TABLE IF EXISTS `stock_tcc_tbl`; CREATE TABLE `stock_tcc_tbl` ( `id` int NOT NULL COMMENT '主键ID', `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码', `count` int NOT NULL COMMENT '商品总数', `pre_deduct_count` int NOT NULL COMMENT '预扣数量', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; INSERT INTO `stock_tcc_tbl` (`id`, `commodity_code`, `count`, `pre_deduct_count`) VALUES (1, 'CC-54321', 10000, 0); 复制代码
创建TCC Action
在各服务表创建完毕后,我们开始编写核心的TCC Action,用以完成我们的Try
、Commit
、Cancal
逻辑。
Account服务
:
- 先构建接口
import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import io.seata.rm.tcc.api.LocalTCC; import io.seata.rm.tcc.api.TwoPhaseBusinessAction; /** * @author zouwei * @className IWalletTccAction * @date: 2022/10/10 20:50 * @description: */ @LocalTCC public interface IWalletTccAction { /** * 预扣款 * * @param businessActionContext * @param userId * @param amount * @return */ @TwoPhaseBusinessAction(name = "prepareDeductMoney", commitMethod = "commitDeductMoney", rollbackMethod = "rollbackDeductMoney") boolean prepareDeductMoney(BusinessActionContext businessActionContext, @BusinessActionContextParameter(paramName = "userId") String userId, @BusinessActionContextParameter(paramName = "amount") Long amount); /** * 提交扣款 * * @param businessActionContext * @return */ boolean commitDeductMoney(BusinessActionContext businessActionContext); /** * 回滚扣款 * * @param businessActionContext * @return */ boolean rollbackDeductMoney(BusinessActionContext businessActionContext); } 复制代码
1.
@LocalTCC
注解表示该接口是一个TCC Action接口,需要Seata处理;2.@TwoPhaseBusinessAction注解标注了提交和回滚方法,以便Seata根据预处理结果来决定时调用提交方法还是回滚方法
- 实现扣款服务逻辑
import com.example.awesomeaccount.dao.mapper.WalletTccEnhanceMapper; import com.example.awesomeaccount.tcc.IWalletTccAction; import com.example.awesomeaccount.tcc.TccActionResultWrap; import io.seata.rm.tcc.api.BusinessActionContext; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.util.Map; /** * @author zouwei * @className WalletTccActionImpl * @date: 2022/10/10 21:05 * @description: */ @Component public class WalletTccActionImpl implements IWalletTccAction { // 用来调用数据库 @Resource private WalletTccEnhanceMapper walletTccEnhanceMapper; @Override public boolean prepareDeductMoney(BusinessActionContext businessActionContext, String userId, Long amount) { String xid = businessActionContext.getXid(); // 幂等性判断 if (TccActionResultWrap.hasPrepareResult(xid)) { return true; } // 避免空悬挂,已经执行过回滚了就不能再预留资源 if (TccActionResultWrap.hasRollbackResult(xid) || TccActionResultWrap.hasCommitResult(xid)) { return false; } // 预留资源 // 相关sql: update wallet_tcc_tbl set money = money - #{amount,jdbcType=INTEGER}, pre_money = pre_money + #{amount,jdbcType=INTEGER} where user_id = #{userId,jdbcType=VARCHAR} and money <![CDATA[ >= ]]>#{amount,jdbcType=INTEGER} boolean result = walletTccEnhanceMapper.prepareDeductMoney(userId, amount) > 0; // 记录执行结果:xid:result // 以便回滚时判断是否是空回滚 TccActionResultWrap.prepareSuccess(xid); return result; } // 保证提交逻辑的原子性 @Transactional @Override public boolean commitDeductMoney(BusinessActionContext businessActionContext) { String xid = businessActionContext.getXid(); // 幂等性判断 if (TccActionResultWrap.hasCommitResult(xid)) { return true; } Map<String, Object> actionContext = businessActionContext.getActionContext(); String userId = (String) actionContext.get("userId"); long amount = (Integer) actionContext.get("amount"); // 执行提交操作,扣除预留款 // 相关sql: update wallet_tcc_tbl set pre_money = pre_money - #{amount,jdbcType=INTEGER} where user_id = #{userId,jdbcType=VARCHAR} and pre_money <![CDATA[ >= ]]>#{amount,jdbcType=INTEGER} boolean result = walletTccEnhanceMapper.commitDeductMoney(userId, amount) > 0; // 清除预留结果 TccActionResultWrap.removePrepareResult(xid); // 设置提交结果 TccActionResultWrap.commitSuccess(xid); return result; } @Transactional @Override public boolean rollbackDeductMoney(BusinessActionContext businessActionContext) { String xid = businessActionContext.getXid(); // 幂等性判断 if (TccActionResultWrap.hasRollbackResult(xid)) { return true; } // 没有预留资源结果,回滚不做任何处理; if (!TccActionResultWrap.hasPrepareResult(xid)) { // 设置回滚结果,防止空悬挂 TccActionResultWrap.rollbackResult(xid); return true; } // 执行回滚 Map<String, Object> actionContext = businessActionContext.getActionContext(); String userId = (String) actionContext.get("userId"); long amount = (Integer) actionContext.get("amount"); // 相关sql: update wallet_tcc_tbl set money = money + #{amount,jdbcType=INTEGER}, pre_money = pre_money - #{amount,jdbcType=INTEGER} where user_id = #{userId,jdbcType=VARCHAR} and pre_money <![CDATA[ >= ]]>#{amount,jdbcType=INTEGER} boolean result = walletTccEnhanceMapper.rollbackDeductMoney(userId, amount) > 0; // 清除预留结果 TccActionResultWrap.removePrepareResult(xid); // 设置回滚结果 TccActionResultWrap.rollbackResult(xid); return result; } } 复制代码
1.预扣款逻辑就是先把订单总金额从
money
字段下挪到pre_money
下,前提是当前账户中有足够的金额可以扣除;2.当账户有足够的金额时,那么意味着预扣款成功,这笔钱是其他事务不能操作的,只能静静地等待TM发起分布式事务提交或回滚指令;
3.一旦接收到提交指令,那么Seata就会调用
commitDeductMoney
方法把已经扣除的金额从pre_money
扣掉,这就意味着这笔钱真正地被花掉了;4.如果接收到回滚指令,说明下单失败,我们就需要把预扣款的钱还给客户,此时Seata就会调用
rollbackDeductMoney
方法,把之前挪到pre_money
下的金额加回到总金额money
中,用户账户里的钱就回来了。
Order服务
:
import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import io.seata.rm.tcc.api.LocalTCC; import io.seata.rm.tcc.api.TwoPhaseBusinessAction; /** * @author zouwei * @className IOrderTccAction * @date: 2022/10/11 12:58 * @description: */ @LocalTCC public interface IOrderTccAction { /** * 预创建订单 * * @param businessActionContext * @param userId * @param commodityCode * @param count * @param unitPrice * @return */ @TwoPhaseBusinessAction(name = "prepareOrder", commitMethod = "commitOrder", rollbackMethod = "rollbackOrder") boolean prepareOrder(BusinessActionContext businessActionContext, @BusinessActionContextParameter(paramName = "userId") String userId, @BusinessActionContextParameter(paramName = "userId") String commodityCode, @BusinessActionContextParameter(paramName = "userId") int count, @BusinessActionContextParameter(paramName = "userId") long unitPrice); /** * 订单生效 * * @param businessActionContext * @return */ boolean commitOrder(BusinessActionContext businessActionContext); /** * 回滚预创建订单 * * @param businessActionContext * @return */ boolean rollbackOrder(BusinessActionContext businessActionContext); } 复制代码
import com.example.awesomeorder.dao.entity.OrderTcc; import com.example.awesomeorder.dao.mapper.OrderTccMapper; import com.example.awesomeorder.tcc.IOrderTccAction; import com.example.awesomeorder.tcc.TccActionResultWrap; import io.seata.rm.tcc.api.BusinessActionContext; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.time.LocalDateTime; /** * @author zouwei * @className OrderTccActionImpl * @date: 2022/10/11 13:05 * @description: */ @Component public class OrderTccActionImpl implements IOrderTccAction { @Resource private OrderTccMapper orderTccMapper; @Override public boolean prepareOrder(BusinessActionContext businessActionContext, String userId, String commodityCode, int count, long unitPrice) { String xid = businessActionContext.getXid(); // 幂等性判断 if (TccActionResultWrap.hasPrepareResult(xid)) { return true; } // 避免空悬挂,已经执行过回滚了就不能再预留资源 if (TccActionResultWrap.hasRollbackResult(xid) || TccActionResultWrap.hasCommitResult(xid)) { return false; } // 准备预创建订单 OrderTcc order = new OrderTcc(); // 创建时间 order.setCreateTime(LocalDateTime.now()); // 用户ID order.setUserId(userId); // 数量 order.setCount(count); // 商品编码 order.setCommodityCode(commodityCode); // 单价 order.setUnitPrice(unitPrice); // 设置状态 order.setStatus("预创建"); // 创建订单 boolean result = orderTccMapper.insert(order) > 0; // 记录主键ID,为了传递给提交或回滚 businessActionContext.addActionContext("id", order.getId()); // 记录执行结果:xid:result // 以便回滚时判断是否是空回滚 TccActionResultWrap.prepareSuccess(xid); return result; } @Transactional @Override public boolean commitOrder(BusinessActionContext businessActionContext) { String xid = businessActionContext.getXid(); // 幂等性判断 if (TccActionResultWrap.hasCommitResult(xid)) { return true; } // 修改预留资源状态 Integer id = (Integer) businessActionContext.getActionContext("id"); OrderTcc row = new OrderTcc(); row.setId(id); row.setStatus("成功"); boolean result = orderTccMapper.updateByPrimaryKeySelective(row) > 0; // 清除预留结果 TccActionResultWrap.removePrepareResult(xid); // 设置提交结果 TccActionResultWrap.commitSuccess(xid); return result; } @Transactional @Override public boolean rollbackOrder(BusinessActionContext businessActionContext) { String xid = businessActionContext.getXid(); // 幂等性判断 if (TccActionResultWrap.hasRollbackResult(xid)) { return true; } // 没有预留资源结果,回滚不做任何处理; if (!TccActionResultWrap.hasPrepareResult(xid)) { // 设置回滚结果,防止空悬挂 TccActionResultWrap.rollbackResult(xid); return true; } // 执行回滚,删除预创建订单 Integer id = (Integer) businessActionContext.getActionContext("id"); boolean result = orderTccMapper.deleteByPrimaryKey(id) > 0; // 清除预留结果 TccActionResultWrap.removePrepareResult(xid); // 设置回滚结果 TccActionResultWrap.rollbackResult(xid); return result; } }