手把手教你Spring Cloud集成Seata XA模式

简介: 手把手教你Spring Cloud集成Seata XA模式

前言

我们在前面的文章中已经教大家分别集成了Seata AT模式以及Seata TCC模式,这篇文章就教大家如何在自己的Spring Cloud项目中集成Seata XA模式。

同样我们还是以购物车下单的业务场景作为本次案例,当前Seata版本为1.5.2:


image.png

1.用户请求从Business业务入口进来,在业务入口中,我们会根据业务需求,先执行RPC扣款操作,然后再调用订单创建的功能;

2.在订单创建过程中,会先RPC执行扣减库存,成功后才会在订单表中插入订单数据;

3.在上述所有逻辑处理成功后,购物车下单逻辑就完成了,中途任何RM服务出现异常,将触发分布式事务回滚,导致下单失败;

注意:Seata XA模式不能与AT模式共存,两种模式在同一个服务中只能存在其一

创建数据表

在Seata XA模式中,除业务表外,不需要创建其他额外的数据表,该分布式事务完全依赖与XA协议;

  • Account服务:
-- ----------------------------
-- Table structure for wallet_xa_tbl
-- ----------------------------
DROP TABLE IF EXISTS `wallet_xa_tbl`;
CREATE TABLE `wallet_xa_tbl` (
  `id` int NOT NULL COMMENT '主键',
  `user_id` varchar(255) COLLATE utf8mb4_bin NOT NULL,
  `money` int NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Records of wallet_xa_tbl
-- ----------------------------
BEGIN;
INSERT INTO `wallet_xa_tbl` (`id`, `user_id`, `money`) VALUES (1, '123456', 100000);
COMMIT;
复制代码
  • Order服务
-- ----------------------------
-- Table structure for order_xa_tbl
-- ----------------------------
DROP TABLE IF EXISTS `order_xa_tbl`;
CREATE TABLE `order_xa_tbl` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `user_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID',
  `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码',
  `count` int NOT NULL COMMENT '数量',
  `unit_price` int NOT NULL COMMENT '单价',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
复制代码
  • Storage服务
-- ----------------------------
-- Table structure for stock_xa_tbl
-- ----------------------------
DROP TABLE IF EXISTS `stock_xa_tbl`;
CREATE TABLE `stock_xa_tbl` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码',
  `count` int NOT NULL COMMENT '库存数',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Records of stock_xa_tbl
-- ----------------------------
BEGIN;
INSERT INTO `stock_xa_tbl` (`id`, `commodity_code`, `count`) VALUES (1, 'CC-54321', 500);
COMMIT;
复制代码

创建RM服务

注意:XA模式在RM服务对应的配置文件中一定要单独指定,否则Seata模式是AT模式

# 分布式事务配置
seata:
  #  开启seata
  enabled: true
  # 一定要单独指定,默认是AT模式
  data-source-proxy-mode: XA
  #  注册中心找TC服务
  registry:
    type: nacos
    nacos:
      cluster: "default"
      username: 用户名
      password: 密码
      server-addr: "ip:端口"
      group: SEATA_GROUP
      namespace: seata-server
      application: seata-server
  application-id: ${spring.application.name}
  #  事务分组
  tx-service-group: shanghai
  service:
    vgroup-mapping:
      # 该分组对应的TC集群名称
      shanghai: default
复制代码
  • Account服务:
public interface IWalletService {
  /**
   * 扣钱
   *
   * @param userId
   * @param amount
   * @return
   */
  Boolean deductMoney4XA(String userId, long amount);
}
import com.example.awesomeaccount.service.IWalletService;
import com.example.awesomeaccount.dao.mapper.WalletXAEnhanceMapper;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
 * @author zouwei
 * @className WalletServiceImpl
 * @date: 2022/9/27 21:20
 * @description:
 */
@Service
public class WalletServiceImpl implements IWalletService {
  @Resource
  private WalletXAEnhanceMapper walletXAEnhanceMapper;
  /**
   * 扣款(XA模式)
   *
   * @param userId
   * @param amount
   * @return
   */
  @Transactional
  @Override
  public Boolean deductMoney4XA(String userId, long amount) {
    // 相关sql:update wallet_xa_tbl set money = money - #{amount,jdbcType=INTEGER} where user_id = #{userId,jdbcType=VARCHAR} and money <![CDATA[ >= ]]>#{amount,jdbcType=INTEGER}
    return walletXAEnhanceMapper.deductMoney(userId, amount) > 0;
  }
}
// 对TM暴露接口
import com.example.accountapi.model.AmountInfo;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
 * 供生产者和消费者共同使用
 *
 * @author zouwei
 * @className AccountApi
 * @date: 2022/9/18 14:31
 * @description:
 */
public interface WalletApi {
  /**
   * 扣款
   *
   * @param amountInfo
   * @return
   */
  @PostMapping("/deductMoney4XA")
  Boolean deductMoney4XA(@RequestBody AmountInfo amountInfo);
}
import com.example.accountapi.api.WalletApi;
import com.example.accountapi.model.AmountInfo;
import com.example.awesomeaccount.service.IWalletService;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * WalletApi接口在account-api模块中
 *
 * @author zouwei
 * @className WalletController
 * @date: 2022/9/18 14:58
 * @description:
 */
@Slf4j
@RestController
@RequestMapping("/wallet")
public class WalletController implements WalletApi {
  @Autowired
  private IWalletService walletService;
  @Override
  public Boolean deductMoney4XA(AmountInfo amountInfo) {
    String userId = amountInfo.getUserId();
    long amount = amountInfo.getAmount();
    // 打印分布式事务XID
    log.warn("XID:{}", RootContext.getXID());
    // 扣款
    return walletService.deductMoney4XA(userId, amount);
  }
}
复制代码

1.Account服务的扣款逻辑和平常的crud并没有任何区别,依然是调用mybatis的Mapper进行sql操作;

2.Account服务对外暴露给TM一个restful的扣款接口;

  • Storage服务:

Storage服务同样需要保证配置文件中的Seata.data-source-proxy-mode=XA,否则Seata将以AT模式启动服务;

public interface IStockService {
  // 扣减库存
  Boolean deductStock4XA(String commodityCode, int count);
}
import com.example.awesomestorage.dao.mapper.StockXAEnhanceMapper;
import com.example.awesomestorage.service.IStockService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
 * @author zouwei
 * @className StockServiceImpl
 * @date: 2022/9/28 00:06
 * @description:
 */
@Service
public class StockServiceImpl implements IStockService {
  @Resource
  private StockXAEnhanceMapper stockXAEnhanceMapper;
  /**
   * 扣减库存(XA模式)
   *
   * @param commodityCode
   * @param count
   * @return
   */
  @Transactional
  @Override
  public Boolean deductStock4XA(String commodityCode, int count) {
    // 扣减库存,判断影响行数是否大于0
    // sql如下: update stock_tbl set count = count - #{count,jdbcType=INTEGER} where commodity_code = #{commodityCode,jdbcType=VARCHAR} and count <![CDATA[ >= ]]> #{count,jdbcType=INTEGER}
    return stockXAEnhanceMapper.deductStock(commodityCode, count) > 0;
  }
}
// 对外暴露接口
import com.example.storageapi.model.OrderInfo;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
 * @author zouwei
 * @className StockApi
 * @date: 2022/9/28 00:26
 * @description:
 */
public interface StockApi {
  @PostMapping("/deduct4XA")
  Boolean deductStock4XA(@RequestBody OrderInfo orderInfo);
}
import com.example.awesomestorage.service.IStockService;
import com.example.storageapi.api.StockApi;
import com.example.storageapi.model.OrderInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author zouwei
 * @className StockController
 * @date: 2022/9/28 00:23
 * @description:
 */
@RestController
@RequestMapping("/stock")
public class StockController implements StockApi {
  @Autowired
  private IStockService stockService;
  @Override
  public Boolean deductStock4XA(OrderInfo orderInfo) {
    String commodityCode = orderInfo.getCommodityCode();
    int count = orderInfo.getCount();
    return stockService.deductStock4XA(commodityCode, count);
  }
}
复制代码

同样的,我们的Storage服务也和Account服务一样,暴露给TM一个扣减库存的接口;

  • Order服务

和上面两个服务一样,我们先配置Seata.data-source-proxy-mode=XA,保证我们能够正确地开启XA模式

public interface IOrderService {
  Boolean createOrder4XA(String userId, String commodityCode, int count, long unitPrice);
}
import com.example.awesomeorder.api.StockApiClient;
import com.example.awesomeorder.dao.entity.Order;
import com.example.awesomeorder.dao.mapper.OrderMapper;
import com.example.awesomeorder.service.IOrderService;
import com.example.awesomeorder.tcc.IOrderTccAction;
import com.example.storageapi.model.OrderInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
/**
 * @author zouwei
 * @className OrderServiceImpl
 * @date: 2022/9/27 22:47
 * @description:
 */
@Service
public class OrderServiceImpl implements IOrderService {
  @Resource
  private OrderXAMapper orderXAMapper;
  @Resource
  private StockApiClient stockApiClient;
  /**
   * 创建订单(XA模式)
   *
   * @param userId
   * @param commodityCode
   * @param count
   * @param unitPrice
   * @return
   */
  @Transactional
  @Override
  public Boolean createOrder4XA(String userId, String commodityCode, int count, long unitPrice) {
    // 构建待扣减的库存信息
    OrderInfo orderInfo = new OrderInfo();
    // 设置商品编码
    orderInfo.setCommodityCode(commodityCode);
    // 设置需要扣减的数量
    orderInfo.setCount(count);
    // 先构建库存
    if (stockApiClient.deductStock4XA(orderInfo)) {
      // 扣减库存成功后,准备创建订单
      OrderXA order = new OrderXA();
      // 创建时间
      order.setCreateTime(LocalDateTime.now());
      // 用户ID
      order.setUserId(userId);
      // 数量
      order.setCount(count);
      // 商品编码
      order.setCommodityCode(commodityCode);
      // 单价
      order.setUnitPrice(unitPrice);
      // 创建订单
      return orderXAMapper.insert(order) > 0;
    }
    // 扣减库存失败,订单创建失败
    return Boolean.FALSE;
  }
}
// 对外暴露接口
import com.example.orderapi.model.OrderInfo;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
 * @author zouwei
 * @className OrderApi
 * @date: 2022/9/27 22:57
 * @description:
 */
public interface OrderApi {
  @PutMapping("/create4At")
  Boolean createOrder4XA(@RequestBody OrderInfo orderInfo);
}
import com.example.awesomeorder.service.IOrderService;
import com.example.orderapi.api.OrderApi;
import com.example.orderapi.model.OrderInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author zouwei
 * @className OrderController
 * @date: 2022/9/27 22:55
 * @description:
 */
@RestController
@RequestMapping("/order")
public class OrderController implements OrderApi {
  @Autowired
  private IOrderService orderService;
  @Override
  public Boolean createOrder4XA(OrderInfo orderInfo) {
    String commodityCode = orderInfo.getCommodityCode();
    int count = orderInfo.getCount();
    long unitPrice = orderInfo.getUnitPrice();
    String userId = orderInfo.getUserId();
    return orderService.createOrder4XA(userId, commodityCode, count, unitPrice);
  }
}
复制代码

至此,我们所有的RM服务全部创建完毕,下面我们开始准备构建TM;

创建TM服务

在Seata TM服务中,我们不需要配置数据源,因为TM不需要跟数据源打交道,我们只需要与TC服务进行通讯,在业务调用上,通过openFeign与RM服务进行通讯;

在TM服务中,就不需要额外指定seata.data-source-proxy-mode=XA,因为TM不和数据源打交道:

seata:
  #  开启seata
  enabled: true
  #  注册中心找TC服务
  registry:
    type: nacos
    nacos:
      cluster: "default"
      username: "用户名"
      password: "密码"
      server-addr: "ip:端口"
      group: SEATA_GROUP
      namespace: seata-server
      application: seata-server
  application-id: ${spring.application.name}
  #  事务分组
  tx-service-group: shanghai
  service:
    vgroup-mapping:
      # 该分组对应的TC集群名称
      shanghai: default
复制代码

另外的话,还需要配置上OpenFeign。有疑问的小伙伴可查看文章OpenFeign的集成与优化

import io.seata.core.exception.GlobalTransactionException;
/**
 * @author zouwei
 * @className ShoppingCartService
 * @date: 2022/9/18 14:01
 * @description:
 */
public interface ShoppingCartService {
  // 下单
  String placeOrder4XA() throws GlobalTransactionException;
}
import com.example.accountapi.model.AmountInfo;
import com.example.awesomebusiness.api.OrderApiClient;
import com.example.awesomebusiness.api.WalletApiClient;
import com.example.awesomebusiness.service.ShoppingCartService;
import com.example.orderapi.model.OrderInfo;
import io.seata.core.exception.GlobalTransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
 * @author zouwei
 * @className ShoppingCartServiceImpl
 * @date: 2022/9/18 14:01
 * @description:
 */
@Service
public class ShoppingCartServiceImpl implements ShoppingCartService {
  // 钱包服务
  @Resource
  private WalletApiClient walletApiClient;
  // 订单服务
  @Resource
  private OrderApiClient orderApiClient;
  // 别忘记了这个注解,这是开启分布式事务的标记
  @GlobalTransactional
  @Override
  public String placeOrder4XA() throws GlobalTransactionException {
    // 模拟用户ID 123456,对应数据库初始化的用户ID
    String userId = "123456";
    // 构建订单数据
    OrderInfo orderInfo = new OrderInfo();
    // 数量15个
    orderInfo.setCount(15);
    // 商品编码,对应库存数据表的初始化数据
    orderInfo.setCommodityCode("CC-54321");
    // 单价299,默认是long类型,单位分;避免double精度丢失
    orderInfo.setUnitPrice(299);
    // 订单归属
    orderInfo.setUserId(userId);
    // 计算扣款金额,数量*单价
    long amount = orderInfo.getCount() * orderInfo.getUnitPrice();
    // 构建扣款数据
    AmountInfo amountInfo = new AmountInfo();
    // 设置扣款金额
    amountInfo.setAmount(amount);
    // 设置扣款主体
    amountInfo.setUserId(userId);
    // 先扣款,扣款成功就创建订单,扣减库存在创建订单的逻辑里面
    if (walletApiClient.deductMoney4XA(amountInfo) && orderApiClient.createOrder4XA(orderInfo)) {
      return "下单成功!";
    }
    // 1.扣款失败,抛异常,分布式事务回滚
    // 2.创建订单失败,抛异常,分布式事务回滚
    throw new GlobalTransactionException("下单失败!");
  }
}
import com.example.awesomebusiness.service.ShoppingCartService;
import io.seata.core.exception.GlobalTransactionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author zouwei
 * @className ShoppingCartController
 * @date: 2022/9/18 13:55
 * @description:
 */
@RestController
@RequestMapping(value = "/shoppingCart")
public class ShoppingCartController {
  @Autowired
  private ShoppingCartService shoppingCartService;
  @PostMapping("/placeOrder")
  public String placeOrder() throws GlobalTransactionException {
    return shoppingCartService.placeOrder4XA();
  }
}
复制代码

以上代码便是Spring Cloud集成Seata XA模式的核心代码,感兴趣的小伙伴可以访问awesome-seats下载源码。


相关文章
|
8天前
|
缓存 供应链 物联网
如何将 Salesforce IoT Cloud 与其他系统集成
Salesforce IoT Cloud 可通过其开放的 API 和集成云平台轻松与外部系统集成,实现数据交换和流程自动化,支持多种协议和标准,帮助企业构建智能物联网应用。
|
1月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
1月前
|
IDE API 开发工具
沉浸式集成阿里云 OpenAPI|Alibaba Cloud API Toolkit for VS Code
Alibaba Cloud API Toolkit for VSCode 是集成了 OpenAPI 开发者门户多项功能的 VSCode 插件,开发者可以通过这个插件方便地查找API文档、进行API调试、插入SDK代码,并配置基础环境设置。我们的目标是缩短开发者在门户和IDE之间的频繁切换,实现API信息和开发流程的无缝结合,让开发者的工作变得更加高效和紧密。
沉浸式集成阿里云 OpenAPI|Alibaba Cloud API Toolkit for VS Code
|
1月前
|
Java 调度 开发者
spring的@Scheduled()有几种定时模式?
【10月更文挑战第12天】spring的@Scheduled()有几种定时模式?
74 1
|
2月前
|
设计模式 Java Spring
spring源码设计模式分析(五)-策略模式
spring源码设计模式分析(五)-策略模式
|
2月前
|
消息中间件 设计模式 缓存
spring源码设计模式分析(四)-观察者模式
spring源码设计模式分析(四)-观察者模式
|
2月前
|
设计模式 Java Spring
spring源码设计模式分析(六)-模板方法模式
spring源码设计模式分析(六)-模板方法模式
|
2月前
|
设计模式 Java Spring
spring源码设计模式分析(七)-委派模式
spring源码设计模式分析(七)-委派模式
|
2月前
|
设计模式 Java 数据库
spring源码设计模式分析(八)-访问者模式
spring源码设计模式分析(八)-访问者模式
|
2月前
|
设计模式 搜索推荐 Java
spring源码设计模式分析(三)
spring源码设计模式分析(三)

热门文章

最新文章

下一篇
无影云桌面