手把手教你Spring Cloud集成Seata XA模式

简介: 手把手教你Spring Cloud集成Seata XA模式

前言

我们在前面的文章中已经教大家分别集成了Seata AT模式以及Seata TCC模式,这篇文章就教大家如何在自己的Spring Cloud项目中集成Seata XA模式。

同样我们还是以购物车下单的业务场景作为本次案例,当前Seata版本为1.5.2:


image.png

1.用户请求从Business业务入口进来,在业务入口中,我们会根据业务需求,先执行RPC扣款操作,然后再调用订单创建的功能;

2.在订单创建过程中,会先RPC执行扣减库存,成功后才会在订单表中插入订单数据;

3.在上述所有逻辑处理成功后,购物车下单逻辑就完成了,中途任何RM服务出现异常,将触发分布式事务回滚,导致下单失败;

注意:Seata XA模式不能与AT模式共存,两种模式在同一个服务中只能存在其一

创建数据表

在Seata XA模式中,除业务表外,不需要创建其他额外的数据表,该分布式事务完全依赖与XA协议;

  • Account服务:
-- ----------------------------
-- Table structure for wallet_xa_tbl
-- ----------------------------
DROP TABLE IF EXISTS `wallet_xa_tbl`;
CREATE TABLE `wallet_xa_tbl` (
  `id` int NOT NULL COMMENT '主键',
  `user_id` varchar(255) COLLATE utf8mb4_bin NOT NULL,
  `money` int NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Records of wallet_xa_tbl
-- ----------------------------
BEGIN;
INSERT INTO `wallet_xa_tbl` (`id`, `user_id`, `money`) VALUES (1, '123456', 100000);
COMMIT;
复制代码
  • Order服务
-- ----------------------------
-- Table structure for order_xa_tbl
-- ----------------------------
DROP TABLE IF EXISTS `order_xa_tbl`;
CREATE TABLE `order_xa_tbl` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `user_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID',
  `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码',
  `count` int NOT NULL COMMENT '数量',
  `unit_price` int NOT NULL COMMENT '单价',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
复制代码
  • Storage服务
-- ----------------------------
-- Table structure for stock_xa_tbl
-- ----------------------------
DROP TABLE IF EXISTS `stock_xa_tbl`;
CREATE TABLE `stock_xa_tbl` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码',
  `count` int NOT NULL COMMENT '库存数',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Records of stock_xa_tbl
-- ----------------------------
BEGIN;
INSERT INTO `stock_xa_tbl` (`id`, `commodity_code`, `count`) VALUES (1, 'CC-54321', 500);
COMMIT;
复制代码

创建RM服务

注意:XA模式在RM服务对应的配置文件中一定要单独指定,否则Seata模式是AT模式

# 分布式事务配置
seata:
  #  开启seata
  enabled: true
  # 一定要单独指定,默认是AT模式
  data-source-proxy-mode: XA
  #  注册中心找TC服务
  registry:
    type: nacos
    nacos:
      cluster: "default"
      username: 用户名
      password: 密码
      server-addr: "ip:端口"
      group: SEATA_GROUP
      namespace: seata-server
      application: seata-server
  application-id: ${spring.application.name}
  #  事务分组
  tx-service-group: shanghai
  service:
    vgroup-mapping:
      # 该分组对应的TC集群名称
      shanghai: default
复制代码
  • Account服务:
public interface IWalletService {
  /**
   * 扣钱
   *
   * @param userId
   * @param amount
   * @return
   */
  Boolean deductMoney4XA(String userId, long amount);
}
import com.example.awesomeaccount.service.IWalletService;
import com.example.awesomeaccount.dao.mapper.WalletXAEnhanceMapper;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
 * @author zouwei
 * @className WalletServiceImpl
 * @date: 2022/9/27 21:20
 * @description:
 */
@Service
public class WalletServiceImpl implements IWalletService {
  @Resource
  private WalletXAEnhanceMapper walletXAEnhanceMapper;
  /**
   * 扣款(XA模式)
   *
   * @param userId
   * @param amount
   * @return
   */
  @Transactional
  @Override
  public Boolean deductMoney4XA(String userId, long amount) {
    // 相关sql:update wallet_xa_tbl set money = money - #{amount,jdbcType=INTEGER} where user_id = #{userId,jdbcType=VARCHAR} and money <![CDATA[ >= ]]>#{amount,jdbcType=INTEGER}
    return walletXAEnhanceMapper.deductMoney(userId, amount) > 0;
  }
}
// 对TM暴露接口
import com.example.accountapi.model.AmountInfo;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
 * 供生产者和消费者共同使用
 *
 * @author zouwei
 * @className AccountApi
 * @date: 2022/9/18 14:31
 * @description:
 */
public interface WalletApi {
  /**
   * 扣款
   *
   * @param amountInfo
   * @return
   */
  @PostMapping("/deductMoney4XA")
  Boolean deductMoney4XA(@RequestBody AmountInfo amountInfo);
}
import com.example.accountapi.api.WalletApi;
import com.example.accountapi.model.AmountInfo;
import com.example.awesomeaccount.service.IWalletService;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * WalletApi接口在account-api模块中
 *
 * @author zouwei
 * @className WalletController
 * @date: 2022/9/18 14:58
 * @description:
 */
@Slf4j
@RestController
@RequestMapping("/wallet")
public class WalletController implements WalletApi {
  @Autowired
  private IWalletService walletService;
  @Override
  public Boolean deductMoney4XA(AmountInfo amountInfo) {
    String userId = amountInfo.getUserId();
    long amount = amountInfo.getAmount();
    // 打印分布式事务XID
    log.warn("XID:{}", RootContext.getXID());
    // 扣款
    return walletService.deductMoney4XA(userId, amount);
  }
}
复制代码

1.Account服务的扣款逻辑和平常的crud并没有任何区别,依然是调用mybatis的Mapper进行sql操作;

2.Account服务对外暴露给TM一个restful的扣款接口;

  • Storage服务:

Storage服务同样需要保证配置文件中的Seata.data-source-proxy-mode=XA,否则Seata将以AT模式启动服务;

public interface IStockService {
  // 扣减库存
  Boolean deductStock4XA(String commodityCode, int count);
}
import com.example.awesomestorage.dao.mapper.StockXAEnhanceMapper;
import com.example.awesomestorage.service.IStockService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
 * @author zouwei
 * @className StockServiceImpl
 * @date: 2022/9/28 00:06
 * @description:
 */
@Service
public class StockServiceImpl implements IStockService {
  @Resource
  private StockXAEnhanceMapper stockXAEnhanceMapper;
  /**
   * 扣减库存(XA模式)
   *
   * @param commodityCode
   * @param count
   * @return
   */
  @Transactional
  @Override
  public Boolean deductStock4XA(String commodityCode, int count) {
    // 扣减库存,判断影响行数是否大于0
    // sql如下: update stock_tbl set count = count - #{count,jdbcType=INTEGER} where commodity_code = #{commodityCode,jdbcType=VARCHAR} and count <![CDATA[ >= ]]> #{count,jdbcType=INTEGER}
    return stockXAEnhanceMapper.deductStock(commodityCode, count) > 0;
  }
}
// 对外暴露接口
import com.example.storageapi.model.OrderInfo;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
 * @author zouwei
 * @className StockApi
 * @date: 2022/9/28 00:26
 * @description:
 */
public interface StockApi {
  @PostMapping("/deduct4XA")
  Boolean deductStock4XA(@RequestBody OrderInfo orderInfo);
}
import com.example.awesomestorage.service.IStockService;
import com.example.storageapi.api.StockApi;
import com.example.storageapi.model.OrderInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author zouwei
 * @className StockController
 * @date: 2022/9/28 00:23
 * @description:
 */
@RestController
@RequestMapping("/stock")
public class StockController implements StockApi {
  @Autowired
  private IStockService stockService;
  @Override
  public Boolean deductStock4XA(OrderInfo orderInfo) {
    String commodityCode = orderInfo.getCommodityCode();
    int count = orderInfo.getCount();
    return stockService.deductStock4XA(commodityCode, count);
  }
}
复制代码

同样的,我们的Storage服务也和Account服务一样,暴露给TM一个扣减库存的接口;

  • Order服务

和上面两个服务一样,我们先配置Seata.data-source-proxy-mode=XA,保证我们能够正确地开启XA模式

public interface IOrderService {
  Boolean createOrder4XA(String userId, String commodityCode, int count, long unitPrice);
}
import com.example.awesomeorder.api.StockApiClient;
import com.example.awesomeorder.dao.entity.Order;
import com.example.awesomeorder.dao.mapper.OrderMapper;
import com.example.awesomeorder.service.IOrderService;
import com.example.awesomeorder.tcc.IOrderTccAction;
import com.example.storageapi.model.OrderInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
/**
 * @author zouwei
 * @className OrderServiceImpl
 * @date: 2022/9/27 22:47
 * @description:
 */
@Service
public class OrderServiceImpl implements IOrderService {
  @Resource
  private OrderXAMapper orderXAMapper;
  @Resource
  private StockApiClient stockApiClient;
  /**
   * 创建订单(XA模式)
   *
   * @param userId
   * @param commodityCode
   * @param count
   * @param unitPrice
   * @return
   */
  @Transactional
  @Override
  public Boolean createOrder4XA(String userId, String commodityCode, int count, long unitPrice) {
    // 构建待扣减的库存信息
    OrderInfo orderInfo = new OrderInfo();
    // 设置商品编码
    orderInfo.setCommodityCode(commodityCode);
    // 设置需要扣减的数量
    orderInfo.setCount(count);
    // 先构建库存
    if (stockApiClient.deductStock4XA(orderInfo)) {
      // 扣减库存成功后,准备创建订单
      OrderXA order = new OrderXA();
      // 创建时间
      order.setCreateTime(LocalDateTime.now());
      // 用户ID
      order.setUserId(userId);
      // 数量
      order.setCount(count);
      // 商品编码
      order.setCommodityCode(commodityCode);
      // 单价
      order.setUnitPrice(unitPrice);
      // 创建订单
      return orderXAMapper.insert(order) > 0;
    }
    // 扣减库存失败,订单创建失败
    return Boolean.FALSE;
  }
}
// 对外暴露接口
import com.example.orderapi.model.OrderInfo;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
 * @author zouwei
 * @className OrderApi
 * @date: 2022/9/27 22:57
 * @description:
 */
public interface OrderApi {
  @PutMapping("/create4At")
  Boolean createOrder4XA(@RequestBody OrderInfo orderInfo);
}
import com.example.awesomeorder.service.IOrderService;
import com.example.orderapi.api.OrderApi;
import com.example.orderapi.model.OrderInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author zouwei
 * @className OrderController
 * @date: 2022/9/27 22:55
 * @description:
 */
@RestController
@RequestMapping("/order")
public class OrderController implements OrderApi {
  @Autowired
  private IOrderService orderService;
  @Override
  public Boolean createOrder4XA(OrderInfo orderInfo) {
    String commodityCode = orderInfo.getCommodityCode();
    int count = orderInfo.getCount();
    long unitPrice = orderInfo.getUnitPrice();
    String userId = orderInfo.getUserId();
    return orderService.createOrder4XA(userId, commodityCode, count, unitPrice);
  }
}
复制代码

至此,我们所有的RM服务全部创建完毕,下面我们开始准备构建TM;

创建TM服务

在Seata TM服务中,我们不需要配置数据源,因为TM不需要跟数据源打交道,我们只需要与TC服务进行通讯,在业务调用上,通过openFeign与RM服务进行通讯;

在TM服务中,就不需要额外指定seata.data-source-proxy-mode=XA,因为TM不和数据源打交道:

seata:
  #  开启seata
  enabled: true
  #  注册中心找TC服务
  registry:
    type: nacos
    nacos:
      cluster: "default"
      username: "用户名"
      password: "密码"
      server-addr: "ip:端口"
      group: SEATA_GROUP
      namespace: seata-server
      application: seata-server
  application-id: ${spring.application.name}
  #  事务分组
  tx-service-group: shanghai
  service:
    vgroup-mapping:
      # 该分组对应的TC集群名称
      shanghai: default
复制代码

另外的话,还需要配置上OpenFeign。有疑问的小伙伴可查看文章OpenFeign的集成与优化

import io.seata.core.exception.GlobalTransactionException;
/**
 * @author zouwei
 * @className ShoppingCartService
 * @date: 2022/9/18 14:01
 * @description:
 */
public interface ShoppingCartService {
  // 下单
  String placeOrder4XA() throws GlobalTransactionException;
}
import com.example.accountapi.model.AmountInfo;
import com.example.awesomebusiness.api.OrderApiClient;
import com.example.awesomebusiness.api.WalletApiClient;
import com.example.awesomebusiness.service.ShoppingCartService;
import com.example.orderapi.model.OrderInfo;
import io.seata.core.exception.GlobalTransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
 * @author zouwei
 * @className ShoppingCartServiceImpl
 * @date: 2022/9/18 14:01
 * @description:
 */
@Service
public class ShoppingCartServiceImpl implements ShoppingCartService {
  // 钱包服务
  @Resource
  private WalletApiClient walletApiClient;
  // 订单服务
  @Resource
  private OrderApiClient orderApiClient;
  // 别忘记了这个注解,这是开启分布式事务的标记
  @GlobalTransactional
  @Override
  public String placeOrder4XA() throws GlobalTransactionException {
    // 模拟用户ID 123456,对应数据库初始化的用户ID
    String userId = "123456";
    // 构建订单数据
    OrderInfo orderInfo = new OrderInfo();
    // 数量15个
    orderInfo.setCount(15);
    // 商品编码,对应库存数据表的初始化数据
    orderInfo.setCommodityCode("CC-54321");
    // 单价299,默认是long类型,单位分;避免double精度丢失
    orderInfo.setUnitPrice(299);
    // 订单归属
    orderInfo.setUserId(userId);
    // 计算扣款金额,数量*单价
    long amount = orderInfo.getCount() * orderInfo.getUnitPrice();
    // 构建扣款数据
    AmountInfo amountInfo = new AmountInfo();
    // 设置扣款金额
    amountInfo.setAmount(amount);
    // 设置扣款主体
    amountInfo.setUserId(userId);
    // 先扣款,扣款成功就创建订单,扣减库存在创建订单的逻辑里面
    if (walletApiClient.deductMoney4XA(amountInfo) && orderApiClient.createOrder4XA(orderInfo)) {
      return "下单成功!";
    }
    // 1.扣款失败,抛异常,分布式事务回滚
    // 2.创建订单失败,抛异常,分布式事务回滚
    throw new GlobalTransactionException("下单失败!");
  }
}
import com.example.awesomebusiness.service.ShoppingCartService;
import io.seata.core.exception.GlobalTransactionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author zouwei
 * @className ShoppingCartController
 * @date: 2022/9/18 13:55
 * @description:
 */
@RestController
@RequestMapping(value = "/shoppingCart")
public class ShoppingCartController {
  @Autowired
  private ShoppingCartService shoppingCartService;
  @PostMapping("/placeOrder")
  public String placeOrder() throws GlobalTransactionException {
    return shoppingCartService.placeOrder4XA();
  }
}
复制代码

以上代码便是Spring Cloud集成Seata XA模式的核心代码,感兴趣的小伙伴可以访问awesome-seats下载源码。


相关文章
|
4天前
|
Java Spring 容器
Spring系列文章:Spring6集成MyBatis3.5
Spring系列文章:Spring6集成MyBatis3.5
|
4天前
|
负载均衡 Java 网络架构
在SpringCloud2023中快速集成SpringCloudGateway网关
本文主要简单介绍SpringCloud2023实战中SpringCoudGateway的搭建。后续的文章将会介绍在微服务中使用熔断Sentinel、鉴权OAuth2、SSO等技术。
41 2
在SpringCloud2023中快速集成SpringCloudGateway网关
|
4天前
|
安全 Java 测试技术
Spring Boot集成支付宝支付:概念与实战
【4月更文挑战第29天】在电子商务和在线业务应用中,集成有效且安全的支付解决方案是至关重要的。支付宝作为中国领先的支付服务提供商,其支付功能的集成可以显著提升用户体验。本篇博客将详细介绍如何在Spring Boot应用中集成支付宝支付功能,并提供一个实战示例。
41 2
|
4天前
|
NoSQL Java MongoDB
【MongoDB 专栏】MongoDB 与 Spring Boot 的集成实践
【5月更文挑战第11天】本文介绍了如何将非关系型数据库MongoDB与Spring Boot框架集成,以实现高效灵活的数据管理。Spring Boot简化了Spring应用的构建和部署,MongoDB则以其对灵活数据结构的处理能力受到青睐。集成步骤包括:添加MongoDB依赖、配置连接信息、创建数据访问对象(DAO)以及进行数据操作。通过这种方式,开发者可以充分利用两者优势,应对各种数据需求。在实际应用中,结合微服务架构等技术,可以构建高性能、可扩展的系统。掌握MongoDB与Spring Boot集成对于提升开发效率和项目质量至关重要,未来有望在更多领域得到广泛应用。
【MongoDB 专栏】MongoDB 与 Spring Boot 的集成实践
|
4天前
|
安全 Java 数据库连接
在IntelliJ IDEA中通过Spring Boot集成达梦数据库:从入门到精通
在IntelliJ IDEA中通过Spring Boot集成达梦数据库:从入门到精通
|
4天前
|
传感器 人工智能 前端开发
JAVA语言VUE2+Spring boot+MySQL开发的智慧校园系统源码(电子班牌可人脸识别)Saas 模式
智慧校园电子班牌,坐落于班级的门口,适合于各类型学校的场景应用,班级学校日常内容更新可由班级自行管理,也可由学校统一管理。让我们一起看看,电子班牌有哪些功能呢?
106 4
JAVA语言VUE2+Spring boot+MySQL开发的智慧校园系统源码(电子班牌可人脸识别)Saas 模式
|
4天前
|
缓存 Java Spring
单体项目中资源管理模块集成Spring Cache
该内容是关于将Spring Cache集成到资源管理模块以实现缓存同步的说明。主要策略包括:查询时添加到缓存,增删改时删除相关缓存。示例代码展示了@Service类中使用@Transactional和@Cacheable注解进行缓存操作,以及在RedisTemplate中处理缓存的示例。
24 5
|
4天前
|
消息中间件 缓存 运维
java+saas模式医院云HIS系统源码Java+Spring+MySQL + MyCat融合BS版电子病历系统,支持电子病历四级
云HIS系统是一款满足基层医院各类业务需要的健康云产品。该产品能帮助基层医院完成日常各类业务,提供病患预约挂号支持、病患问诊、电子病历、开药发药、会员管理、统计查询、医生工作站和护士工作站等一系列常规功能,还能与公卫、PACS等各类外部系统融合,实现多层机构之间的融合管理。
52 1
|
4天前
|
Java 测试技术 Spring
Spring系列文章:Spring集成Log4j2⽇志框架、整合JUnit
Spring系列文章:Spring集成Log4j2⽇志框架、整合JUnit
|
4天前
|
消息中间件 Java Linux
RabbitMQ教程:Linux下安装、基本命令与Spring Boot集成
RabbitMQ教程:Linux下安装、基本命令与Spring Boot集成

热门文章

最新文章