Spring Cloud集成分布式事务框架Seata 1.5.2(二)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Spring Cloud集成分布式事务框架Seata 1.5.2
  • account-apiorder-apistorage-api作为桥接层,只需要spring mvc的功能即可,所以它们对应的依赖为:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <!-- 记得修改对应的artifactId -->
    <artifactId>account-api</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>account-api</name>
    <description>account-api</description>
    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- spring mvc -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>
</project>
复制代码

至此,所有服务对应的依赖就全部搞定。

数据表的建立

awesome-business是不需要操作数据库的,所以不存在数据表,只有awesome-accountawesome-orderawesome-storage三个RM服务需要操作数据库,所以需要建立数据表。

  • awesome-account需要创建钱包对应的表,另外就是seata的undo_log
DROP TABLE IF EXISTS `wallet_tbl`;
CREATE TABLE `wallet_tbl` (
  `id` int NOT NULL COMMENT '主键',
  `user_id` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID',
  `money` int NOT NULL COMMENT '账户余额',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- 初始化钱包数据
-- ----------------------------
INSERT INTO `wallet_tbl` (`id`, `user_id`, `money`) VALUES (1, '123456', 100000);
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `branch_id` bigint NOT NULL COMMENT 'branch transaction id',
  `xid` varchar(128) NOT NULL COMMENT 'global transaction id',
  `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
  `rollback_info` longblob NOT NULL COMMENT 'rollback info',
  `log_status` int NOT NULL COMMENT '0:normal status,1:defense status',
  `log_created` datetime(6) NOT NULL COMMENT 'create datetime',
  `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='AT transaction mode undo table';
复制代码
  • awesome-order需要创建订单表与undo_log
DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `user_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID',
  `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码',
  `count` int NOT NULL COMMENT '数量',
  `unit_price` int NOT NULL COMMENT '单价',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `branch_id` bigint NOT NULL COMMENT 'branch transaction id',
  `xid` varchar(128) NOT NULL COMMENT 'global transaction id',
  `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
  `rollback_info` longblob NOT NULL COMMENT 'rollback info',
  `log_status` int NOT NULL COMMENT '0:normal status,1:defense status',
  `log_created` datetime(6) NOT NULL COMMENT 'create datetime',
  `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='AT transaction mode undo table';
复制代码
  • awesome-storage需要创建库存表与undo_log
DROP TABLE IF EXISTS `stock_tbl`;
CREATE TABLE `stock_tbl` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码',
  `count` int NOT NULL COMMENT '库存数',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- 初始化库存数据
-- ----------------------------
INSERT INTO `stock_tbl` (`id`, `commodity_code`, `count`) VALUES (1, 'CC-54321', 100);
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `branch_id` bigint NOT NULL COMMENT 'branch transaction id',
  `xid` varchar(128) NOT NULL COMMENT 'global transaction id',
  `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
  `rollback_info` longblob NOT NULL COMMENT 'rollback info',
  `log_status` int NOT NULL COMMENT '0:normal status,1:defense status',
  `log_created` datetime(6) NOT NULL COMMENT 'create datetime',
  `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='AT transaction mode undo table';
复制代码

至此,所有服务的数据表已经建立完毕。

服务配置

接下来,我们需要根据需要给每个服务加上对应的配置。

  • awesome-business作为业务入口服务,需要配置nacos服务发现、openFeign远程RPC调用工具、seata分布式框架:
spring:
  cloud:
    nacos:
      discovery:
        enabled: true
        server-addr: "host:端口"
        # 需要在nacos中创建awesome-seata命名空间
        namespace: "awesome-seata"
        group: ${spring.profiles.active}
        username: "用户名"
        password: "密码"
    # openFeign优化,是否开启缓存,如果服务发现已经有缓存了,就可以不用开启缓存
    loadbalancer:
      cache:
        enabled: true
        # 过期时间10s
        ttl: 10
        # 容量256M
        capacity: 256
        caffeine:
          #          initialCapacity=[integer]: sets Caffeine.initialCapacity.
          #          maximumSize=[long]: sets Caffeine.maximumSize.
          #          maximumWeight=[long]: sets Caffeine.maximumWeight.
          #          expireAfterAccess=[duration]: sets Caffeine.expireAfterAccess(long, java.util.concurrent.TimeUnit).
          #          expireAfterWrite=[duration]: sets Caffeine.expireAfterWrite(long, java.util.concurrent.TimeUnit).
          #          refreshAfterWrite=[duration]: sets Caffeine.refreshAfterWrite(long, java.util.concurrent.TimeUnit).
          #          weakKeys: sets Caffeine.weakKeys().
          #          weakValues: sets Caffeine.weakValues().
          #          softValues: sets Caffeine.softValues().
          #          recordStats: sets Caffeine.recordStats().
          #         initialCapacity初始化键值对的数量
          spec: initialCapacity=500,expireAfterWrite=5s
# openFeign优化
feign:
  # 开启压缩功能
  compression:
    request:
      enabled: true
      mime-types: text/xml,application/xml,application/json
      min-request-size: 2048
    response:
      enabled: true
  # 不使用httpclient,改用okhttp
  httpclient:
    enabled: false
  okhttp:
    enabled: true
    # 是否禁用重定向
    follow-redirects: true
    connect-timeout: 5000
    # 链接失败是否重试
    retry-on-connection-failure: true
    read-timeout: 5000
    write-timeout: 5000
    # 最大空闲数量
    max-idle-connections: 5
    # 生存时间
    keep-alive-duration: 15000
  client:
    config:
      # 设置超时,囊括了okhttp的超时,okhttp属于真正执行的超时,openFeign属于服务间的超时
      # 设置全局超时时间
      default:
        connectTimeout: 2000
        readTimeout: 5000
      #   针对contextId设置超时时间
      walletApi:
        connectTimeout: 1000
        readTimeout: 2000
#        loggerLevel: FULL
seata:
  #  开启seata
  enabled: true
  #  注册中心找TC服务
  registry:
    type: nacos
    nacos:
      cluster: "default"
      username: "用户名"
      password: "密码"
      server-addr: "host:端口"
      group: SEATA_GROUP
      # 需要在nacos中创建awesome-seata命名空间
      namespace: seata-server
      application: seata-server
  application-id: ${spring.application.name}
  #  事务分组,需要对应vgroup-mapping下面的key
  tx-service-group: shanghai
  service:
    vgroup-mapping:
      # 该分组对应的TC集群名称
      # default对应TC的cluster字段
      shanghai: default
# 日志配置
logging:
  pattern:
    # 日志输出格式
    console: "%d{yyyy-MM-dd HH:mm:ss} %clr(%5p) [%thread] %clr(%logger){cyan} : %msg%n"
  level:
    #    trace < debug < info < warn < error < fatal
    # 全局日志级别
    root: info
    # 指定包日志级别
    com.example.awesomeabusiness: warn
复制代码

该配置中OpenFeign优化思路可以参考博客:openFeign的集成与优化

  • awesome-accountawesome-orderawesome-storage三个服务需要操作数据库、以及相关的分布式事务服务,所以对应的配置如下:
spring:
  # 数据库链接配置
  datasource:
    url: jdbc:mysql://host:端口/awesome-order?useUnicode=true&characterEncoding=UTF8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=UTC
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: "用户名"
    password: "密码"
    # 链接池
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      # 链接池初始化大小
      initial-size: 8
      # 最大活跃数
      max-active: 16
      # 最小空闲数
      min-idle: 1
      # 最大等待时间
      max-wait: 60000
  # 微服务配置
  cloud:
    nacos:
      discovery:
        enabled: true
        server-addr: "host:端口"
        namespace: "awesome-seata"
        group: ${spring.profiles.active}
        username: nacos
        password: nacos
        service: ${spring.application.name}
        register-enabled: true
        # 配置注册的目标ip,根据自己的需求设置
        ip: "127.0.0.1"
        # 注册的目标端口
        port: ${server.port}
        metadata:
          environment: ${spring.profiles.active}
          version: ${project.version}
    # openFeign优化
    loadbalancer:
      cache:
        enabled: true
        # 过期时间10s
        ttl: 10
        # 容量256M
        capacity: 256
        caffeine:
          #          initialCapacity=[integer]: sets Caffeine.initialCapacity.
          #          maximumSize=[long]: sets Caffeine.maximumSize.
          #          maximumWeight=[long]: sets Caffeine.maximumWeight.
          #          expireAfterAccess=[duration]: sets Caffeine.expireAfterAccess(long, java.util.concurrent.TimeUnit).
          #          expireAfterWrite=[duration]: sets Caffeine.expireAfterWrite(long, java.util.concurrent.TimeUnit).
          #          refreshAfterWrite=[duration]: sets Caffeine.refreshAfterWrite(long, java.util.concurrent.TimeUnit).
          #          weakKeys: sets Caffeine.weakKeys().
          #          weakValues: sets Caffeine.weakValues().
          #          softValues: sets Caffeine.softValues().
          #          recordStats: sets Caffeine.recordStats().
          #         initialCapacity初始化键值对的数量
          spec: initialCapacity=500,expireAfterWrite=5s
#openFeign优化
feign:
  # 开启压缩功能
  compression:
    request:
      enabled: true
      mime-types: text/xml,application/xml,application/json
      min-request-size: 2048
    response:
      enabled: true
  # 不使用httpclient,改用okhttp
  httpclient:
    enabled: false
  okhttp:
    enabled: true
    # 是否禁用重定向
    follow-redirects: true
    connect-timeout: 5000
    # 链接失败是否重试
    retry-on-connection-failure: true
    read-timeout: 5000
    write-timeout: 5000
    # 最大空闲数量
    max-idle-connections: 5
    # 生存时间
    keep-alive-duration: 15000
  client:
    config:
      # 设置超时,囊括了okhttp的超时,okhttp属于真正执行的超时,openFeign属于服务间的超时
      # 设置全局超时时间
      default:
        connectTimeout: 2000
        readTimeout: 5000
# mybatis配置
mybatis:
  check-config-location: true
  #  mybatis框架配置文件,对mybatis的生命周期起作用
  config-location: "classpath:mybatis/mybatis-config.xml"
  #  配置xml路径
  mapper-locations: "classpath:mybatis/mapper/*Mapper.xml"
  #  配置model包路径
  type-aliases-package: "com.example.awesomeorder.dao.entity.*"
# 分布式事务配置
seata:
  #  开启seata
  enabled: true
  #  注册中心找TC服务
  registry:
    type: nacos
    nacos:
      cluster: "default"
      username: "nacos"
      password: "nacos"
      server-addr: "host:端口"
      group: SEATA_GROUP
      namespace: seata-server
      application: seata-server
  application-id: ${spring.application.name}
  #  事务分组
  tx-service-group: shanghai
  service:
    vgroup-mapping:
      # 该分组对应的TC集群名称
      shanghai: default
# 日志配置
logging:
  pattern:
    # 日志输出格式
    console: "%d{yyyy-MM-dd HH:mm:ss} %clr(%5p) [%thread] %clr(%logger){cyan} : %msg%n"
  level:
    #    trace < debug < info < warn < error < fatal
    # 全局日志级别
    root: info
    # 指定包日志级别
    com.example.awesomeorder: warn
复制代码

上述配置属于awesome-orderawesome-accountawesome-storage没有rpc调用的可以删掉OpenFeign相关的配置:

spring:
  # 数据库链接配置
  datasource:
    url: jdbc:mysql://host:端口/数据库名称?useUnicode=true&characterEncoding=UTF8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=UTC
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: "用户名"
    password: "密码"
    # 链接池
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      # 链接池初始化大小
      initial-size: 8
      # 最大活跃数
      max-active: 16
      # 最小空闲数
      min-idle: 1
      # 最大等待时间
      max-wait: 60000
  # 微服务配置
  cloud:
    nacos:
      discovery:
        enabled: true
        server-addr: "host:端口"
        namespace: "awesome-seata"
        group: ${spring.profiles.active}
        username: nacos
        password: nacos
        service: ${spring.application.name}
        register-enabled: true
        # 配置注册的目标ip,根据自己的需求设置
        ip: "127.0.0.1"
        # 注册的目标端口
        port: ${server.port}
        metadata:
          environment: ${spring.profiles.active}
          version: ${project.version}
# mybatis配置
mybatis:
  check-config-location: true
  #  mybatis框架配置文件,对mybatis的生命周期起作用
  config-location: "classpath:mybatis/mybatis-config.xml"
  #  配置xml路径
  mapper-locations: "classpath:mybatis/mapper/*Mapper.xml"
  #  配置model包路径,根据包路径修改
  type-aliases-package: "com.example.awesomeaccount.dao.entity.*"
# 分布式事务配置
seata:
  #  开启seata
  enabled: true
  #  注册中心找TC服务
  registry:
    type: nacos
    nacos:
      cluster: "default"
      username: "nacos"
      password: "nacos"
      server-addr: "host:端口"
      group: SEATA_GROUP
      namespace: seata-server
      application: seata-server
  application-id: ${spring.application.name}
  #  事务分组
  tx-service-group: shanghai
  service:
    vgroup-mapping:
      # 该分组对应的TC集群名称
      shanghai: default
# 日志配置
logging:
  pattern:
    # 日志输出格式
    console: "%d{yyyy-MM-dd HH:mm:ss} %clr(%5p) [%thread] %clr(%logger){cyan} : %msg%n"
  level:
    #    trace < debug < info < warn < error < fatal
    # 全局日志级别
    root: info
    # 指定包日志级别,根据包路径修改
    com.example.awesomeaccount: warn
复制代码

代码实现

  • 首先我们主要看一下awesome-business项目中的service实现:
package com.example.awesomebusiness.service.impl;
import com.example.accountapi.model.AmountInfo;
import com.example.awesomebusiness.api.OrderApiClient;
import com.example.awesomebusiness.api.WalletApiClient;
import com.example.awesomebusiness.service.ShoppingCartService;
import com.example.orderapi.model.OrderInfo;
import io.seata.core.exception.GlobalTransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
 * @author zouwei
 * @className ShoppingCartServiceImpl
 * @date: 2022/9/18 14:01
 * @description:
 */
@Service
public class ShoppingCartServiceImpl implements ShoppingCartService {
  // 钱包服务
  @Resource
  private WalletApiClient walletApiClient;
  // 订单服务
  @Resource
  private OrderApiClient orderApiClient;
  // 别忘记了这个注解,这是开启分布式事务的标记,前提是当前业务逻辑不处于任何的分布式事务当中才能开启新的分布式事务
  @GlobalTransactional
  public String placeOrder() throws GlobalTransactionException {
    // 模拟用户ID 123456,对应数据库初始化的用户ID
    String userId = "123456";
    // 构建订单数据
    OrderInfo orderInfo = new OrderInfo();
    // 数量15个
    orderInfo.setCount(15);
    // 商品编码,对应库存数据表的初始化数据
    orderInfo.setCommodityCode("CC-54321");
    // 单价299,默认是long类型,单位分;避免double精度丢失
    orderInfo.setUnitPrice(299);
    // 订单归属
    orderInfo.setUserId(userId);
    // 计算扣款金额,数量*单价
    long amount = orderInfo.getCount() * orderInfo.getUnitPrice();
    // 构建扣款数据
    AmountInfo amountInfo = new AmountInfo();
    // 设置扣款金额
    amountInfo.setAmount(amount);
    // 设置扣款主体
    amountInfo.setUserId(userId);
    // 先扣款,扣款成功就创建订单,扣减库存在创建订单的逻辑里面
    if (walletApiClient.deductMoney(amountInfo) && orderApiClient.createOrder(orderInfo)) {
      return "下单成功!";
    }
    // 1.扣款失败,抛异常,分布式事务回滚
    // 2.创建订单失败,抛异常,分布式事务回滚
    throw new GlobalTransactionException("下单失败!");
  }
}
复制代码


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
存储 数据可视化 Java
基于MicrometerTracing门面和Zipkin实现集成springcloud2023的服务追踪
Sleuth将会停止维护,Sleuth最新版本也只支持springboot2。作为替代可以使用MicrometerTracing在微服务中作为服务追踪的工具。
107 1
|
22天前
|
机器学习/深度学习 自然语言处理 并行计算
DeepSpeed分布式训练框架深度学习指南
【11月更文挑战第6天】随着深度学习模型规模的日益增大,训练这些模型所需的计算资源和时间成本也随之增加。传统的单机训练方式已难以应对大规模模型的训练需求。
68 3
|
26天前
|
机器学习/深度学习 并行计算 Java
谈谈分布式训练框架DeepSpeed与Megatron
【11月更文挑战第3天】随着深度学习技术的不断发展,大规模模型的训练需求日益增长。为了应对这种需求,分布式训练框架应运而生,其中DeepSpeed和Megatron是两个备受瞩目的框架。本文将深入探讨这两个框架的背景、业务场景、优缺点、主要功能及底层实现逻辑,并提供一个基于Java语言的简单demo例子,帮助读者更好地理解这些技术。
52 2
|
1月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
48 1
|
2月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
2月前
|
数据采集 分布式计算 MaxCompute
MaxCompute 分布式计算框架 MaxFrame 服务正式商业化公告
MaxCompute 分布式计算框架 MaxFrame 服务于北京时间2024年09月27日正式商业化!
93 3
|
2月前
|
消息中间件 Java 对象存储
数据一致性挑战:Spring Cloud与Netflix OSS下的分布式事务管理
数据一致性挑战:Spring Cloud与Netflix OSS下的分布式事务管理
53 2
|
2月前
|
负载均衡 监控 Dubbo
分布式框架-dubbo
分布式框架-dubbo
|
1月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
49 0
|
2月前
|
运维 NoSQL Java
SpringBoot接入轻量级分布式日志框架GrayLog技术分享
在当今的软件开发环境中,日志管理扮演着至关重要的角色,尤其是在微服务架构下,分布式日志的统一收集、分析和展示成为了开发者和运维人员必须面对的问题。GrayLog作为一个轻量级的分布式日志框架,以其简洁、高效和易部署的特性,逐渐受到广大开发者的青睐。本文将详细介绍如何在SpringBoot项目中接入GrayLog,以实现日志的集中管理和分析。
247 1
下一篇
无影云桌面