1.在预创建订单服务中,我们使用了一个status
字段来表示订单的中间状态,还未生效的订单状态为预创建
;
2.当收到提交请求时,我们将订单状态置为
成功
;3.当收到回滚请求时,我们删除之前
预创建
的订单即可;
Storage服务
:
import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import io.seata.rm.tcc.api.LocalTCC; import io.seata.rm.tcc.api.TwoPhaseBusinessAction; /** * @author zouwei * @className IStorageTccAction * @date: 2022/10/11 13:57 * @description: */ @LocalTCC public interface IStorageTccAction { /** * 预扣库存 * * @param businessActionContext * @param commodityCode * @param count * @return */ @TwoPhaseBusinessAction(name = "prepareDeductStock", commitMethod = "commitDeductStock", rollbackMethod = "rollbackDeductStock") boolean prepareDeductStock(BusinessActionContext businessActionContext, @BusinessActionContextParameter("commodityCode") String commodityCode, @BusinessActionContextParameter("count") int count); /** * 提交被扣库存 * * @param businessActionContext * @return */ boolean commitDeductStock(BusinessActionContext businessActionContext); /** * 回滚被扣库存 * * @param businessActionContext * @return */ boolean rollbackDeductStock(BusinessActionContext businessActionContext); } 复制代码
import com.example.awesomestorage.dao.mapper.StockTccEnhanceMapper; import com.example.awesomestorage.tcc.IStorageTccAction; import com.example.awesomestorage.tcc.TccActionResultWrap; import io.seata.rm.tcc.api.BusinessActionContext; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.util.Map; /** * @author zouwei * @className StorageTccActionImpl * @date: 2022/10/11 14:00 * @description: */ @Component public class StorageTccActionImpl implements IStorageTccAction { @Resource private StockTccEnhanceMapper stockTccEnhanceMapper; @Override public boolean prepareDeductStock(BusinessActionContext businessActionContext, String commodityCode, int count) { String xid = businessActionContext.getXid(); // 幂等性判断 if (TccActionResultWrap.hasPrepareResult(xid)) { return true; } // 避免空悬挂,已经执行过回滚了就不能再预留资源 if (TccActionResultWrap.hasRollbackResult(xid) || TccActionResultWrap.hasCommitResult(xid)) { return false; } // 预扣库存 // 相关sql: update stock_tcc_tbl set count = count - #{count,jdbcType=INTEGER}, pre_deduct_count = pre_deduct_count + #{count,jdbcType=INTEGER} where commodity_code = #{commodityCode,jdbcType=VARCHAR} and count <![CDATA[ >= ]]> #{count,jdbcType=INTEGER} boolean result = stockTccEnhanceMapper.prepareDeductStock(commodityCode, count) > 0; // 记录执行结果:xid:result // 以便回滚时判断是否是空回滚 TccActionResultWrap.prepareSuccess(xid); return result; } @Transactional @Override public boolean commitDeductStock(BusinessActionContext businessActionContext) { String xid = businessActionContext.getXid(); // 幂等性判断 if (TccActionResultWrap.hasCommitResult(xid)) { return true; } Map<String, Object> actionContext = businessActionContext.getActionContext(); String commodityCode = (String) actionContext.get("commodityCode"); int count = (int) actionContext.get("count"); // 执行提交操作,扣除库存 // 相关sql:update stock_tcc_tbl set pre_deduct_count = pre_deduct_count - #{count,jdbcType=INTEGER} where commodity_code = #{commodityCode,jdbcType=VARCHAR} and pre_deduct_count <![CDATA[ >= ]]> #{count,jdbcType=INTEGER} boolean result = stockTccEnhanceMapper.commitDeductStock(commodityCode, count) > 0; // 清除预留结果 TccActionResultWrap.removePrepareResult(xid); // 设置提交结果 TccActionResultWrap.commitSuccess(xid); return result; } @Transactional @Override public boolean rollbackDeductStock(BusinessActionContext businessActionContext) { String xid = businessActionContext.getXid(); // 幂等性判断 if (TccActionResultWrap.hasRollbackResult(xid)) { return true; } // 没有预留资源结果,回滚不做任何处理; if (!TccActionResultWrap.hasPrepareResult(xid)) { // 设置回滚结果,防止空悬挂 TccActionResultWrap.rollbackResult(xid); return true; } // 执行回滚 Map<String, Object> actionContext = businessActionContext.getActionContext(); String commodityCode = (String) actionContext.get("commodityCode"); int count = (int) actionContext.get("count"); // 相关sql: update stock_tcc_tbl set count = count + #{count,jdbcType=INTEGER}, pre_deduct_count = pre_deduct_count - #{count,jdbcType=INTEGER} where commodity_code = #{commodityCode,jdbcType=VARCHAR} and pre_deduct_count <![CDATA[ >= ]]> #{count,jdbcType=INTEGER} boolean result = stockTccEnhanceMapper.rollbackDeductStock(commodityCode, count) > 0; // 清除预留结果 TccActionResultWrap.removePrepareResult(xid); // 设置回滚结果 TccActionResultWrap.rollbackResult(xid); return result; } } 复制代码
1.预扣减库存的原理和预扣款原理一致,我们需要额外给出一个字段
pre_deduct_count
来放预扣的库存数量;通过减少总库存数count
,把被扣库存预留到pre_deduct_count
中达到预扣的效果;2.当需要提交时,我们才真正地从
pre_deduct_count
中扣除我们需要的库存数;3.当接收到回滚请求时,我们把已经预扣的数量还给总库存
count
;
再补充一个TccActionResultWrap
,这个类用来记录TCC Action的状态,用来临时解决幂等性
、空回滚
和空悬挂
问题。
import org.apache.commons.lang3.StringUtils; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @author zouwei * @className TccActionResultWrap * @date: 2022/10/11 13:00 * @description: */ public class TccActionResultWrap { private static final Map<String, String> prepareResult = new ConcurrentHashMap<>(); private static final Map<String, String> commitResult = new ConcurrentHashMap<>(); private static final Map<String, String> rollbackResult = new ConcurrentHashMap<>(); public static void prepareSuccess(String xid) { prepareResult.put(xid, "success"); } public static void commitSuccess(String xid) { commitResult.put(xid, "success"); } public static void rollbackSuccess(String xid) { rollbackResult.put(xid, "success"); } public static void removePrepareResult(String xid) { prepareResult.remove(xid); } public static void removeCommitResult(String xid) { commitResult.remove(xid); } public static void removeRollbackResult(String xid) { rollbackResult.remove(xid); } public static String prepareResult(String xid) { return prepareResult.get(xid); } public static String commitResult(String xid) { return commitResult.get(xid); } public static String rollbackResult(String xid) { return rollbackResult.get(xid); } public static boolean hasPrepareResult(String xid) { return StringUtils.isNotBlank(prepareResult(xid)); } public static boolean hasCommitResult(String xid) { return StringUtils.isNotBlank(commitResult.get(xid)); } public static boolean hasRollbackResult(String xid) { return StringUtils.isNotBlank(rollbackResult.get(xid)); } } 复制代码
以上我们就把所有服务的TCC Action全部处理完毕,我们下面来看看如何使用这些TCC Action:
Service如何使用TCC Action
Account服务
:
import com.example.awesomeaccount.service.IWalletService; import com.example.awesomeaccount.tcc.IWalletTccAction; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; /** * @author zouwei * @className WalletServiceImpl * @date: 2022/9/27 21:20 * @description: */ @Service public class WalletServiceImpl implements IWalletService { @Autowired private IWalletTccAction walletTccAction; @Transactional @Override public Boolean deductMoney(String userId, long amount) { return walletTccAction.prepareDeductMoney(null, userId, amount); } } 复制代码
我们只要在service中直接调用IWalletTccAction
接口中的prepareDeductMoney
方法执行预扣款即可,这个prepareDeductMoney
方法其实就代表一个真正扣款的功能,因为在所有的调用链都调用成功后,Seata会自动调用TCC Action中定义的提交逻辑,以达到一个最终的扣款目的。
注意:参数中的BusinessActionContext不需要开发人员自己传递,直接给null即可,Seata会自动处理。
Order服务
:
import com.example.awesomeorder.api.StockApiClient; import com.example.awesomeorder.service.IOrderService; import com.example.awesomeorder.tcc.IOrderTccAction; import com.example.storageapi.model.OrderInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; /** * @author zouwei * @className OrderServiceImpl * @date: 2022/9/27 22:47 * @description: */ @Service public class OrderServiceImpl implements IOrderService { @Autowired private IOrderTccAction orderTccAction; @Resource private StockApiClient stockApiClient; /** * 创建订单 * * @param userId * @param commodityCode * @param count * @param unitPrice * @return */ @Transactional @Override public Boolean createOrder(String userId, String commodityCode, int count, long unitPrice) { // 构建待扣减的库存信息 OrderInfo orderInfo = new OrderInfo(); // 设置商品编码 orderInfo.setCommodityCode(commodityCode); // 设置需要扣减的数量 orderInfo.setCount(count); // 先预扣减库存 if (stockApiClient.deductStock(orderInfo)) { // 预创建订单 return orderTccAction.prepareOrder(null, userId, commodityCode, count, unitPrice); } // 扣减库存失败,订单创建失败 return Boolean.FALSE; } } 复制代码
Order服务的Service实现和Account服务差不多,TCC Action也可以和Spring Bean一样使用;
Storage服务
:
import com.example.awesomestorage.service.IStockService; import com.example.awesomestorage.tcc.IStorageTccAction; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; /** * @author zouwei * @className StockServiceImpl * @date: 2022/9/28 00:06 * @description: */ @Service public class StockServiceImpl implements IStockService { @Autowired private IStorageTccAction storageTccAction; @Transactional @Override public Boolean deductStock(String commodityCode, int count) { return storageTccAction.prepareDeductStock(null, commodityCode, count); } } 复制代码
Business下单逻辑
import com.example.accountapi.model.AmountInfo; import com.example.awesomebusiness.api.OrderApiClient; import com.example.awesomebusiness.api.WalletApiClient; import com.example.awesomebusiness.service.ShoppingCartService; import com.example.orderapi.model.OrderInfo; import io.seata.core.exception.GlobalTransactionException; import io.seata.spring.annotation.GlobalTransactional; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * @author zouwei * @className ShoppingCartServiceImpl * @date: 2022/9/18 14:01 * @description: */ @Service public class ShoppingCartServiceImpl implements ShoppingCartService { // 钱包服务 @Resource private WalletApiClient walletApiClient; // 订单服务 @Resource private OrderApiClient orderApiClient; // 别忘记了这个注解,这是开启分布式事务的标记,前提是当前业务逻辑不处于任何的分布式事务当中才能开启新的分布式事务 @GlobalTransactional public String placeOrder() throws GlobalTransactionException { // 模拟用户ID 123456,对应数据库初始化的用户ID String userId = "123456"; // 构建订单数据 OrderInfo orderInfo = new OrderInfo(); // 数量15个 orderInfo.setCount(15); // 商品编码,对应库存数据表的初始化数据 orderInfo.setCommodityCode("CC-54321"); // 单价299,默认是long类型,单位分;避免double精度丢失 orderInfo.setUnitPrice(299); // 订单归属 orderInfo.setUserId(userId); // 计算扣款金额,数量*单价 long amount = orderInfo.getCount() * orderInfo.getUnitPrice(); // 构建扣款数据 AmountInfo amountInfo = new AmountInfo(); // 设置扣款金额 amountInfo.setAmount(amount); // 设置扣款主体 amountInfo.setUserId(userId); // 先扣款,扣款成功就创建订单,扣减库存在创建订单的逻辑里面 if (walletApiClient.deductMoney(amountInfo) && orderApiClient.createOrder(orderInfo)) { return "下单成功!"; } // 1.扣款失败,抛异常,分布式事务回滚 // 2.创建订单失败,抛异常,分布式事务回滚 throw new GlobalTransactionException("下单失败!"); } } 复制代码
- 在Business下单逻辑中,和之前AT模式完全没有变化,所有的RPC依然还是使用的FeignClient;
- @GlobalTransactional还是放在方法上面,代表这是TM角色,负责分布式事务的发起、提交和回滚;
- XID的传递依然是RPC来传递;
至此,我们已经把所有的思路及相关核心代码编写完毕,有兴趣的小伙伴可以在github上下载该项目:awesome-seata