手把手教你Spring Cloud集成Seata XA模式

简介: 手把手教你Spring Cloud集成Seata XA模式

前言

我们在前面的文章中已经教大家分别集成了Seata AT模式以及Seata TCC模式,这篇文章就教大家如何在自己的Spring Cloud项目中集成Seata XA模式。

同样我们还是以购物车下单的业务场景作为本次案例,当前Seata版本为1.5.2:


image.png

1.用户请求从Business业务入口进来,在业务入口中,我们会根据业务需求,先执行RPC扣款操作,然后再调用订单创建的功能;

2.在订单创建过程中,会先RPC执行扣减库存,成功后才会在订单表中插入订单数据;

3.在上述所有逻辑处理成功后,购物车下单逻辑就完成了,中途任何RM服务出现异常,将触发分布式事务回滚,导致下单失败;

注意:Seata XA模式不能与AT模式共存,两种模式在同一个服务中只能存在其一

创建数据表

在Seata XA模式中,除业务表外,不需要创建其他额外的数据表,该分布式事务完全依赖与XA协议;

  • Account服务:
-- ----------------------------
-- Table structure for wallet_xa_tbl
-- ----------------------------
DROP TABLE IF EXISTS `wallet_xa_tbl`;
CREATE TABLE `wallet_xa_tbl` (
  `id` int NOT NULL COMMENT '主键',
  `user_id` varchar(255) COLLATE utf8mb4_bin NOT NULL,
  `money` int NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Records of wallet_xa_tbl
-- ----------------------------
BEGIN;
INSERT INTO `wallet_xa_tbl` (`id`, `user_id`, `money`) VALUES (1, '123456', 100000);
COMMIT;
复制代码
  • Order服务
-- ----------------------------
-- Table structure for order_xa_tbl
-- ----------------------------
DROP TABLE IF EXISTS `order_xa_tbl`;
CREATE TABLE `order_xa_tbl` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `user_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID',
  `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码',
  `count` int NOT NULL COMMENT '数量',
  `unit_price` int NOT NULL COMMENT '单价',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
复制代码
  • Storage服务
-- ----------------------------
-- Table structure for stock_xa_tbl
-- ----------------------------
DROP TABLE IF EXISTS `stock_xa_tbl`;
CREATE TABLE `stock_xa_tbl` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码',
  `count` int NOT NULL COMMENT '库存数',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Records of stock_xa_tbl
-- ----------------------------
BEGIN;
INSERT INTO `stock_xa_tbl` (`id`, `commodity_code`, `count`) VALUES (1, 'CC-54321', 500);
COMMIT;
复制代码

创建RM服务

注意:XA模式在RM服务对应的配置文件中一定要单独指定,否则Seata模式是AT模式

# 分布式事务配置
seata:
  #  开启seata
  enabled: true
  # 一定要单独指定,默认是AT模式
  data-source-proxy-mode: XA
  #  注册中心找TC服务
  registry:
    type: nacos
    nacos:
      cluster: "default"
      username: 用户名
      password: 密码
      server-addr: "ip:端口"
      group: SEATA_GROUP
      namespace: seata-server
      application: seata-server
  application-id: ${spring.application.name}
  #  事务分组
  tx-service-group: shanghai
  service:
    vgroup-mapping:
      # 该分组对应的TC集群名称
      shanghai: default
复制代码
  • Account服务:
public interface IWalletService {
  /**
   * 扣钱
   *
   * @param userId
   * @param amount
   * @return
   */
  Boolean deductMoney4XA(String userId, long amount);
}
import com.example.awesomeaccount.service.IWalletService;
import com.example.awesomeaccount.dao.mapper.WalletXAEnhanceMapper;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
 * @author zouwei
 * @className WalletServiceImpl
 * @date: 2022/9/27 21:20
 * @description:
 */
@Service
public class WalletServiceImpl implements IWalletService {
  @Resource
  private WalletXAEnhanceMapper walletXAEnhanceMapper;
  /**
   * 扣款(XA模式)
   *
   * @param userId
   * @param amount
   * @return
   */
  @Transactional
  @Override
  public Boolean deductMoney4XA(String userId, long amount) {
    // 相关sql:update wallet_xa_tbl set money = money - #{amount,jdbcType=INTEGER} where user_id = #{userId,jdbcType=VARCHAR} and money <![CDATA[ >= ]]>#{amount,jdbcType=INTEGER}
    return walletXAEnhanceMapper.deductMoney(userId, amount) > 0;
  }
}
// 对TM暴露接口
import com.example.accountapi.model.AmountInfo;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
 * 供生产者和消费者共同使用
 *
 * @author zouwei
 * @className AccountApi
 * @date: 2022/9/18 14:31
 * @description:
 */
public interface WalletApi {
  /**
   * 扣款
   *
   * @param amountInfo
   * @return
   */
  @PostMapping("/deductMoney4XA")
  Boolean deductMoney4XA(@RequestBody AmountInfo amountInfo);
}
import com.example.accountapi.api.WalletApi;
import com.example.accountapi.model.AmountInfo;
import com.example.awesomeaccount.service.IWalletService;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * WalletApi接口在account-api模块中
 *
 * @author zouwei
 * @className WalletController
 * @date: 2022/9/18 14:58
 * @description:
 */
@Slf4j
@RestController
@RequestMapping("/wallet")
public class WalletController implements WalletApi {
  @Autowired
  private IWalletService walletService;
  @Override
  public Boolean deductMoney4XA(AmountInfo amountInfo) {
    String userId = amountInfo.getUserId();
    long amount = amountInfo.getAmount();
    // 打印分布式事务XID
    log.warn("XID:{}", RootContext.getXID());
    // 扣款
    return walletService.deductMoney4XA(userId, amount);
  }
}
复制代码

1.Account服务的扣款逻辑和平常的crud并没有任何区别,依然是调用mybatis的Mapper进行sql操作;

2.Account服务对外暴露给TM一个restful的扣款接口;

  • Storage服务:

Storage服务同样需要保证配置文件中的Seata.data-source-proxy-mode=XA,否则Seata将以AT模式启动服务;

public interface IStockService {
  // 扣减库存
  Boolean deductStock4XA(String commodityCode, int count);
}
import com.example.awesomestorage.dao.mapper.StockXAEnhanceMapper;
import com.example.awesomestorage.service.IStockService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
 * @author zouwei
 * @className StockServiceImpl
 * @date: 2022/9/28 00:06
 * @description:
 */
@Service
public class StockServiceImpl implements IStockService {
  @Resource
  private StockXAEnhanceMapper stockXAEnhanceMapper;
  /**
   * 扣减库存(XA模式)
   *
   * @param commodityCode
   * @param count
   * @return
   */
  @Transactional
  @Override
  public Boolean deductStock4XA(String commodityCode, int count) {
    // 扣减库存,判断影响行数是否大于0
    // sql如下: update stock_tbl set count = count - #{count,jdbcType=INTEGER} where commodity_code = #{commodityCode,jdbcType=VARCHAR} and count <![CDATA[ >= ]]> #{count,jdbcType=INTEGER}
    return stockXAEnhanceMapper.deductStock(commodityCode, count) > 0;
  }
}
// 对外暴露接口
import com.example.storageapi.model.OrderInfo;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
 * @author zouwei
 * @className StockApi
 * @date: 2022/9/28 00:26
 * @description:
 */
public interface StockApi {
  @PostMapping("/deduct4XA")
  Boolean deductStock4XA(@RequestBody OrderInfo orderInfo);
}
import com.example.awesomestorage.service.IStockService;
import com.example.storageapi.api.StockApi;
import com.example.storageapi.model.OrderInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author zouwei
 * @className StockController
 * @date: 2022/9/28 00:23
 * @description:
 */
@RestController
@RequestMapping("/stock")
public class StockController implements StockApi {
  @Autowired
  private IStockService stockService;
  @Override
  public Boolean deductStock4XA(OrderInfo orderInfo) {
    String commodityCode = orderInfo.getCommodityCode();
    int count = orderInfo.getCount();
    return stockService.deductStock4XA(commodityCode, count);
  }
}
复制代码

同样的,我们的Storage服务也和Account服务一样,暴露给TM一个扣减库存的接口;

  • Order服务

和上面两个服务一样,我们先配置Seata.data-source-proxy-mode=XA,保证我们能够正确地开启XA模式

public interface IOrderService {
  Boolean createOrder4XA(String userId, String commodityCode, int count, long unitPrice);
}
import com.example.awesomeorder.api.StockApiClient;
import com.example.awesomeorder.dao.entity.Order;
import com.example.awesomeorder.dao.mapper.OrderMapper;
import com.example.awesomeorder.service.IOrderService;
import com.example.awesomeorder.tcc.IOrderTccAction;
import com.example.storageapi.model.OrderInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
/**
 * @author zouwei
 * @className OrderServiceImpl
 * @date: 2022/9/27 22:47
 * @description:
 */
@Service
public class OrderServiceImpl implements IOrderService {
  @Resource
  private OrderXAMapper orderXAMapper;
  @Resource
  private StockApiClient stockApiClient;
  /**
   * 创建订单(XA模式)
   *
   * @param userId
   * @param commodityCode
   * @param count
   * @param unitPrice
   * @return
   */
  @Transactional
  @Override
  public Boolean createOrder4XA(String userId, String commodityCode, int count, long unitPrice) {
    // 构建待扣减的库存信息
    OrderInfo orderInfo = new OrderInfo();
    // 设置商品编码
    orderInfo.setCommodityCode(commodityCode);
    // 设置需要扣减的数量
    orderInfo.setCount(count);
    // 先构建库存
    if (stockApiClient.deductStock4XA(orderInfo)) {
      // 扣减库存成功后,准备创建订单
      OrderXA order = new OrderXA();
      // 创建时间
      order.setCreateTime(LocalDateTime.now());
      // 用户ID
      order.setUserId(userId);
      // 数量
      order.setCount(count);
      // 商品编码
      order.setCommodityCode(commodityCode);
      // 单价
      order.setUnitPrice(unitPrice);
      // 创建订单
      return orderXAMapper.insert(order) > 0;
    }
    // 扣减库存失败,订单创建失败
    return Boolean.FALSE;
  }
}
// 对外暴露接口
import com.example.orderapi.model.OrderInfo;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
 * @author zouwei
 * @className OrderApi
 * @date: 2022/9/27 22:57
 * @description:
 */
public interface OrderApi {
  @PutMapping("/create4At")
  Boolean createOrder4XA(@RequestBody OrderInfo orderInfo);
}
import com.example.awesomeorder.service.IOrderService;
import com.example.orderapi.api.OrderApi;
import com.example.orderapi.model.OrderInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author zouwei
 * @className OrderController
 * @date: 2022/9/27 22:55
 * @description:
 */
@RestController
@RequestMapping("/order")
public class OrderController implements OrderApi {
  @Autowired
  private IOrderService orderService;
  @Override
  public Boolean createOrder4XA(OrderInfo orderInfo) {
    String commodityCode = orderInfo.getCommodityCode();
    int count = orderInfo.getCount();
    long unitPrice = orderInfo.getUnitPrice();
    String userId = orderInfo.getUserId();
    return orderService.createOrder4XA(userId, commodityCode, count, unitPrice);
  }
}
复制代码

至此,我们所有的RM服务全部创建完毕,下面我们开始准备构建TM;

创建TM服务

在Seata TM服务中,我们不需要配置数据源,因为TM不需要跟数据源打交道,我们只需要与TC服务进行通讯,在业务调用上,通过openFeign与RM服务进行通讯;

在TM服务中,就不需要额外指定seata.data-source-proxy-mode=XA,因为TM不和数据源打交道:

seata:
  #  开启seata
  enabled: true
  #  注册中心找TC服务
  registry:
    type: nacos
    nacos:
      cluster: "default"
      username: "用户名"
      password: "密码"
      server-addr: "ip:端口"
      group: SEATA_GROUP
      namespace: seata-server
      application: seata-server
  application-id: ${spring.application.name}
  #  事务分组
  tx-service-group: shanghai
  service:
    vgroup-mapping:
      # 该分组对应的TC集群名称
      shanghai: default
复制代码

另外的话,还需要配置上OpenFeign。有疑问的小伙伴可查看文章OpenFeign的集成与优化

import io.seata.core.exception.GlobalTransactionException;
/**
 * @author zouwei
 * @className ShoppingCartService
 * @date: 2022/9/18 14:01
 * @description:
 */
public interface ShoppingCartService {
  // 下单
  String placeOrder4XA() throws GlobalTransactionException;
}
import com.example.accountapi.model.AmountInfo;
import com.example.awesomebusiness.api.OrderApiClient;
import com.example.awesomebusiness.api.WalletApiClient;
import com.example.awesomebusiness.service.ShoppingCartService;
import com.example.orderapi.model.OrderInfo;
import io.seata.core.exception.GlobalTransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
 * @author zouwei
 * @className ShoppingCartServiceImpl
 * @date: 2022/9/18 14:01
 * @description:
 */
@Service
public class ShoppingCartServiceImpl implements ShoppingCartService {
  // 钱包服务
  @Resource
  private WalletApiClient walletApiClient;
  // 订单服务
  @Resource
  private OrderApiClient orderApiClient;
  // 别忘记了这个注解,这是开启分布式事务的标记
  @GlobalTransactional
  @Override
  public String placeOrder4XA() throws GlobalTransactionException {
    // 模拟用户ID 123456,对应数据库初始化的用户ID
    String userId = "123456";
    // 构建订单数据
    OrderInfo orderInfo = new OrderInfo();
    // 数量15个
    orderInfo.setCount(15);
    // 商品编码,对应库存数据表的初始化数据
    orderInfo.setCommodityCode("CC-54321");
    // 单价299,默认是long类型,单位分;避免double精度丢失
    orderInfo.setUnitPrice(299);
    // 订单归属
    orderInfo.setUserId(userId);
    // 计算扣款金额,数量*单价
    long amount = orderInfo.getCount() * orderInfo.getUnitPrice();
    // 构建扣款数据
    AmountInfo amountInfo = new AmountInfo();
    // 设置扣款金额
    amountInfo.setAmount(amount);
    // 设置扣款主体
    amountInfo.setUserId(userId);
    // 先扣款,扣款成功就创建订单,扣减库存在创建订单的逻辑里面
    if (walletApiClient.deductMoney4XA(amountInfo) && orderApiClient.createOrder4XA(orderInfo)) {
      return "下单成功!";
    }
    // 1.扣款失败,抛异常,分布式事务回滚
    // 2.创建订单失败,抛异常,分布式事务回滚
    throw new GlobalTransactionException("下单失败!");
  }
}
import com.example.awesomebusiness.service.ShoppingCartService;
import io.seata.core.exception.GlobalTransactionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author zouwei
 * @className ShoppingCartController
 * @date: 2022/9/18 13:55
 * @description:
 */
@RestController
@RequestMapping(value = "/shoppingCart")
public class ShoppingCartController {
  @Autowired
  private ShoppingCartService shoppingCartService;
  @PostMapping("/placeOrder")
  public String placeOrder() throws GlobalTransactionException {
    return shoppingCartService.placeOrder4XA();
  }
}
复制代码

以上代码便是Spring Cloud集成Seata XA模式的核心代码,感兴趣的小伙伴可以访问awesome-seats下载源码。


相关文章
|
22天前
|
数据库 微服务
SEATA模式
Seata 是一款开源的分布式事务解决方案,支持多种事务模式以适应不同的应用场景。其主要模式包括:AT(TCC)模式,事务分三阶段执行;TCC 模式,提供更灵活的事务控制;SAGA 模式,基于状态机实现跨服务的事务一致性;XA 模式,采用传统两阶段提交协议确保数据一致性。
41 5
|
5月前
|
NoSQL Java Nacos
SpringCloud集成Seata并使用Nacos做注册中心与配置中心
SpringCloud集成Seata并使用Nacos做注册中心与配置中心
183 3
|
26天前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
31 6
|
26天前
|
Java 关系型数据库 MySQL
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
45 5
|
26天前
|
缓存 监控 Java
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
35 5
|
1月前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
54 6
|
29天前
Seata框架在AT模式下是如何保证数据一致性的?
通过以上这些机制的协同作用,Seata 在 AT 模式下能够有效地保证数据的一致性,确保分布式事务的可靠执行。你还可以进一步深入研究 Seata 的具体实现细节,以更好地理解其数据一致性保障的原理。
40 3
|
3月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
5月前
|
负载均衡 Java Spring
Spring cloud gateway 如何在路由时进行负载均衡
Spring cloud gateway 如何在路由时进行负载均衡
600 15
|
4月前
|
关系型数据库 MySQL 数据库
SpringCloud2023中使用Seata解决分布式事务
对于分布式系统而言,需要保证分布式系统中的数据一致性,保证数据在子系统中始终保持一致,避免业务出现问题。分布式系统中对数据的操作要么一起成功,要么一起失败,必须是一个整体性的事务。Seata简化了这个使用过程。
100 2