前言
我们在前面的文章中已经教大家分别集成了Seata AT模式以及Seata TCC模式,这篇文章就教大家如何在自己的Spring Cloud项目中集成Seata XA模式。
同样我们还是以购物车下单的业务场景作为本次案例,当前Seata版本为1.5.2:
1.用户请求从Business业务入口进来,在业务入口中,我们会根据业务需求,先执行RPC扣款操作,然后再调用订单创建的功能;
2.在订单创建过程中,会先RPC执行扣减库存,成功后才会在订单表中插入订单数据;
3.在上述所有逻辑处理成功后,购物车下单逻辑就完成了,中途任何RM服务出现异常,将触发分布式事务回滚,导致下单失败;
注意:Seata XA模式不能与AT模式共存,两种模式在同一个服务中只能存在其一
创建数据表
在Seata XA模式中,除业务表外,不需要创建其他额外的数据表,该分布式事务完全依赖与XA协议;
Account服务
:
-- ---------------------------- -- Table structure for wallet_xa_tbl -- ---------------------------- DROP TABLE IF EXISTS `wallet_xa_tbl`; CREATE TABLE `wallet_xa_tbl` ( `id` int NOT NULL COMMENT '主键', `user_id` varchar(255) COLLATE utf8mb4_bin NOT NULL, `money` int NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -- ---------------------------- -- Records of wallet_xa_tbl -- ---------------------------- BEGIN; INSERT INTO `wallet_xa_tbl` (`id`, `user_id`, `money`) VALUES (1, '123456', 100000); COMMIT; 复制代码
Order服务
:
-- ---------------------------- -- Table structure for order_xa_tbl -- ---------------------------- DROP TABLE IF EXISTS `order_xa_tbl`; CREATE TABLE `order_xa_tbl` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '主键', `user_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID', `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码', `count` int NOT NULL COMMENT '数量', `unit_price` int NOT NULL COMMENT '单价', `create_time` datetime NOT NULL COMMENT '创建时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; 复制代码
Storage服务
:
-- ---------------------------- -- Table structure for stock_xa_tbl -- ---------------------------- DROP TABLE IF EXISTS `stock_xa_tbl`; CREATE TABLE `stock_xa_tbl` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '主键', `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码', `count` int NOT NULL COMMENT '库存数', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -- ---------------------------- -- Records of stock_xa_tbl -- ---------------------------- BEGIN; INSERT INTO `stock_xa_tbl` (`id`, `commodity_code`, `count`) VALUES (1, 'CC-54321', 500); COMMIT; 复制代码
创建RM服务
注意:XA模式在RM服务对应的配置文件中一定要单独指定,否则Seata模式是AT模式
# 分布式事务配置 seata: # 开启seata enabled: true # 一定要单独指定,默认是AT模式 data-source-proxy-mode: XA # 注册中心找TC服务 registry: type: nacos nacos: cluster: "default" username: 用户名 password: 密码 server-addr: "ip:端口" group: SEATA_GROUP namespace: seata-server application: seata-server application-id: ${spring.application.name} # 事务分组 tx-service-group: shanghai service: vgroup-mapping: # 该分组对应的TC集群名称 shanghai: default 复制代码
Account服务
:
public interface IWalletService { /** * 扣钱 * * @param userId * @param amount * @return */ Boolean deductMoney4XA(String userId, long amount); } import com.example.awesomeaccount.service.IWalletService; import com.example.awesomeaccount.dao.mapper.WalletXAEnhanceMapper; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; /** * @author zouwei * @className WalletServiceImpl * @date: 2022/9/27 21:20 * @description: */ @Service public class WalletServiceImpl implements IWalletService { @Resource private WalletXAEnhanceMapper walletXAEnhanceMapper; /** * 扣款(XA模式) * * @param userId * @param amount * @return */ @Transactional @Override public Boolean deductMoney4XA(String userId, long amount) { // 相关sql:update wallet_xa_tbl set money = money - #{amount,jdbcType=INTEGER} where user_id = #{userId,jdbcType=VARCHAR} and money <![CDATA[ >= ]]>#{amount,jdbcType=INTEGER} return walletXAEnhanceMapper.deductMoney(userId, amount) > 0; } } // 对TM暴露接口 import com.example.accountapi.model.AmountInfo; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; /** * 供生产者和消费者共同使用 * * @author zouwei * @className AccountApi * @date: 2022/9/18 14:31 * @description: */ public interface WalletApi { /** * 扣款 * * @param amountInfo * @return */ @PostMapping("/deductMoney4XA") Boolean deductMoney4XA(@RequestBody AmountInfo amountInfo); } import com.example.accountapi.api.WalletApi; import com.example.accountapi.model.AmountInfo; import com.example.awesomeaccount.service.IWalletService; import io.seata.core.context.RootContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * WalletApi接口在account-api模块中 * * @author zouwei * @className WalletController * @date: 2022/9/18 14:58 * @description: */ @Slf4j @RestController @RequestMapping("/wallet") public class WalletController implements WalletApi { @Autowired private IWalletService walletService; @Override public Boolean deductMoney4XA(AmountInfo amountInfo) { String userId = amountInfo.getUserId(); long amount = amountInfo.getAmount(); // 打印分布式事务XID log.warn("XID:{}", RootContext.getXID()); // 扣款 return walletService.deductMoney4XA(userId, amount); } } 复制代码
1.
Account
服务的扣款逻辑和平常的crud并没有任何区别,依然是调用mybatis的Mapper进行sql操作;2.
Account
服务对外暴露给TM一个restful的扣款接口;
Storage服务
:
Storage
服务同样需要保证配置文件中的Seata.data-source-proxy-mode=XA
,否则Seata将以AT模式
启动服务;
public interface IStockService { // 扣减库存 Boolean deductStock4XA(String commodityCode, int count); } import com.example.awesomestorage.dao.mapper.StockXAEnhanceMapper; import com.example.awesomestorage.service.IStockService; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; /** * @author zouwei * @className StockServiceImpl * @date: 2022/9/28 00:06 * @description: */ @Service public class StockServiceImpl implements IStockService { @Resource private StockXAEnhanceMapper stockXAEnhanceMapper; /** * 扣减库存(XA模式) * * @param commodityCode * @param count * @return */ @Transactional @Override public Boolean deductStock4XA(String commodityCode, int count) { // 扣减库存,判断影响行数是否大于0 // sql如下: update stock_tbl set count = count - #{count,jdbcType=INTEGER} where commodity_code = #{commodityCode,jdbcType=VARCHAR} and count <![CDATA[ >= ]]> #{count,jdbcType=INTEGER} return stockXAEnhanceMapper.deductStock(commodityCode, count) > 0; } } // 对外暴露接口 import com.example.storageapi.model.OrderInfo; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; /** * @author zouwei * @className StockApi * @date: 2022/9/28 00:26 * @description: */ public interface StockApi { @PostMapping("/deduct4XA") Boolean deductStock4XA(@RequestBody OrderInfo orderInfo); } import com.example.awesomestorage.service.IStockService; import com.example.storageapi.api.StockApi; import com.example.storageapi.model.OrderInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author zouwei * @className StockController * @date: 2022/9/28 00:23 * @description: */ @RestController @RequestMapping("/stock") public class StockController implements StockApi { @Autowired private IStockService stockService; @Override public Boolean deductStock4XA(OrderInfo orderInfo) { String commodityCode = orderInfo.getCommodityCode(); int count = orderInfo.getCount(); return stockService.deductStock4XA(commodityCode, count); } } 复制代码
同样的,我们的Storage
服务也和Account
服务一样,暴露给TM一个扣减库存的接口;
Order服务
:
和上面两个服务一样,我们先配置Seata.data-source-proxy-mode=XA
,保证我们能够正确地开启XA模式
;
public interface IOrderService { Boolean createOrder4XA(String userId, String commodityCode, int count, long unitPrice); } import com.example.awesomeorder.api.StockApiClient; import com.example.awesomeorder.dao.entity.Order; import com.example.awesomeorder.dao.mapper.OrderMapper; import com.example.awesomeorder.service.IOrderService; import com.example.awesomeorder.tcc.IOrderTccAction; import com.example.storageapi.model.OrderInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.time.LocalDateTime; /** * @author zouwei * @className OrderServiceImpl * @date: 2022/9/27 22:47 * @description: */ @Service public class OrderServiceImpl implements IOrderService { @Resource private OrderXAMapper orderXAMapper; @Resource private StockApiClient stockApiClient; /** * 创建订单(XA模式) * * @param userId * @param commodityCode * @param count * @param unitPrice * @return */ @Transactional @Override public Boolean createOrder4XA(String userId, String commodityCode, int count, long unitPrice) { // 构建待扣减的库存信息 OrderInfo orderInfo = new OrderInfo(); // 设置商品编码 orderInfo.setCommodityCode(commodityCode); // 设置需要扣减的数量 orderInfo.setCount(count); // 先构建库存 if (stockApiClient.deductStock4XA(orderInfo)) { // 扣减库存成功后,准备创建订单 OrderXA order = new OrderXA(); // 创建时间 order.setCreateTime(LocalDateTime.now()); // 用户ID order.setUserId(userId); // 数量 order.setCount(count); // 商品编码 order.setCommodityCode(commodityCode); // 单价 order.setUnitPrice(unitPrice); // 创建订单 return orderXAMapper.insert(order) > 0; } // 扣减库存失败,订单创建失败 return Boolean.FALSE; } } // 对外暴露接口 import com.example.orderapi.model.OrderInfo; import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; /** * @author zouwei * @className OrderApi * @date: 2022/9/27 22:57 * @description: */ public interface OrderApi { @PutMapping("/create4At") Boolean createOrder4XA(@RequestBody OrderInfo orderInfo); } import com.example.awesomeorder.service.IOrderService; import com.example.orderapi.api.OrderApi; import com.example.orderapi.model.OrderInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author zouwei * @className OrderController * @date: 2022/9/27 22:55 * @description: */ @RestController @RequestMapping("/order") public class OrderController implements OrderApi { @Autowired private IOrderService orderService; @Override public Boolean createOrder4XA(OrderInfo orderInfo) { String commodityCode = orderInfo.getCommodityCode(); int count = orderInfo.getCount(); long unitPrice = orderInfo.getUnitPrice(); String userId = orderInfo.getUserId(); return orderService.createOrder4XA(userId, commodityCode, count, unitPrice); } } 复制代码
至此,我们所有的RM服务全部创建完毕,下面我们开始准备构建TM;
创建TM服务
在Seata TM服务中,我们不需要配置数据源,因为TM不需要跟数据源打交道,我们只需要与TC服务进行通讯,在业务调用上,通过openFeign与RM服务进行通讯;
在TM服务中,就不需要额外指定seata.data-source-proxy-mode=XA
,因为TM不和数据源打交道:
seata: # 开启seata enabled: true # 注册中心找TC服务 registry: type: nacos nacos: cluster: "default" username: "用户名" password: "密码" server-addr: "ip:端口" group: SEATA_GROUP namespace: seata-server application: seata-server application-id: ${spring.application.name} # 事务分组 tx-service-group: shanghai service: vgroup-mapping: # 该分组对应的TC集群名称 shanghai: default 复制代码
另外的话,还需要配置上OpenFeign。有疑问的小伙伴可查看文章OpenFeign的集成与优化;
import io.seata.core.exception.GlobalTransactionException; /** * @author zouwei * @className ShoppingCartService * @date: 2022/9/18 14:01 * @description: */ public interface ShoppingCartService { // 下单 String placeOrder4XA() throws GlobalTransactionException; } import com.example.accountapi.model.AmountInfo; import com.example.awesomebusiness.api.OrderApiClient; import com.example.awesomebusiness.api.WalletApiClient; import com.example.awesomebusiness.service.ShoppingCartService; import com.example.orderapi.model.OrderInfo; import io.seata.core.exception.GlobalTransactionException; import io.seata.spring.annotation.GlobalTransactional; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * @author zouwei * @className ShoppingCartServiceImpl * @date: 2022/9/18 14:01 * @description: */ @Service public class ShoppingCartServiceImpl implements ShoppingCartService { // 钱包服务 @Resource private WalletApiClient walletApiClient; // 订单服务 @Resource private OrderApiClient orderApiClient; // 别忘记了这个注解,这是开启分布式事务的标记 @GlobalTransactional @Override public String placeOrder4XA() throws GlobalTransactionException { // 模拟用户ID 123456,对应数据库初始化的用户ID String userId = "123456"; // 构建订单数据 OrderInfo orderInfo = new OrderInfo(); // 数量15个 orderInfo.setCount(15); // 商品编码,对应库存数据表的初始化数据 orderInfo.setCommodityCode("CC-54321"); // 单价299,默认是long类型,单位分;避免double精度丢失 orderInfo.setUnitPrice(299); // 订单归属 orderInfo.setUserId(userId); // 计算扣款金额,数量*单价 long amount = orderInfo.getCount() * orderInfo.getUnitPrice(); // 构建扣款数据 AmountInfo amountInfo = new AmountInfo(); // 设置扣款金额 amountInfo.setAmount(amount); // 设置扣款主体 amountInfo.setUserId(userId); // 先扣款,扣款成功就创建订单,扣减库存在创建订单的逻辑里面 if (walletApiClient.deductMoney4XA(amountInfo) && orderApiClient.createOrder4XA(orderInfo)) { return "下单成功!"; } // 1.扣款失败,抛异常,分布式事务回滚 // 2.创建订单失败,抛异常,分布式事务回滚 throw new GlobalTransactionException("下单失败!"); } } import com.example.awesomebusiness.service.ShoppingCartService; import io.seata.core.exception.GlobalTransactionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author zouwei * @className ShoppingCartController * @date: 2022/9/18 13:55 * @description: */ @RestController @RequestMapping(value = "/shoppingCart") public class ShoppingCartController { @Autowired private ShoppingCartService shoppingCartService; @PostMapping("/placeOrder") public String placeOrder() throws GlobalTransactionException { return shoppingCartService.placeOrder4XA(); } } 复制代码
以上代码便是Spring Cloud集成Seata XA模式的核心代码,感兴趣的小伙伴可以访问awesome-seats下载源码。