Spring Cloud集成分布式事务框架Seata 1.5.2(二)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Spring Cloud集成分布式事务框架Seata 1.5.2
  • account-apiorder-apistorage-api作为桥接层,只需要spring mvc的功能即可,所以它们对应的依赖为:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <!-- 记得修改对应的artifactId -->
    <artifactId>account-api</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>account-api</name>
    <description>account-api</description>
    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- spring mvc -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>
</project>
复制代码

至此,所有服务对应的依赖就全部搞定。

数据表的建立

awesome-business是不需要操作数据库的,所以不存在数据表,只有awesome-accountawesome-orderawesome-storage三个RM服务需要操作数据库,所以需要建立数据表。

  • awesome-account需要创建钱包对应的表,另外就是seata的undo_log
DROP TABLE IF EXISTS `wallet_tbl`;
CREATE TABLE `wallet_tbl` (
  `id` int NOT NULL COMMENT '主键',
  `user_id` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID',
  `money` int NOT NULL COMMENT '账户余额',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- 初始化钱包数据
-- ----------------------------
INSERT INTO `wallet_tbl` (`id`, `user_id`, `money`) VALUES (1, '123456', 100000);
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `branch_id` bigint NOT NULL COMMENT 'branch transaction id',
  `xid` varchar(128) NOT NULL COMMENT 'global transaction id',
  `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
  `rollback_info` longblob NOT NULL COMMENT 'rollback info',
  `log_status` int NOT NULL COMMENT '0:normal status,1:defense status',
  `log_created` datetime(6) NOT NULL COMMENT 'create datetime',
  `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='AT transaction mode undo table';
复制代码
  • awesome-order需要创建订单表与undo_log
DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `user_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID',
  `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码',
  `count` int NOT NULL COMMENT '数量',
  `unit_price` int NOT NULL COMMENT '单价',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `branch_id` bigint NOT NULL COMMENT 'branch transaction id',
  `xid` varchar(128) NOT NULL COMMENT 'global transaction id',
  `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
  `rollback_info` longblob NOT NULL COMMENT 'rollback info',
  `log_status` int NOT NULL COMMENT '0:normal status,1:defense status',
  `log_created` datetime(6) NOT NULL COMMENT 'create datetime',
  `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='AT transaction mode undo table';
复制代码
  • awesome-storage需要创建库存表与undo_log
DROP TABLE IF EXISTS `stock_tbl`;
CREATE TABLE `stock_tbl` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码',
  `count` int NOT NULL COMMENT '库存数',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- 初始化库存数据
-- ----------------------------
INSERT INTO `stock_tbl` (`id`, `commodity_code`, `count`) VALUES (1, 'CC-54321', 100);
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `branch_id` bigint NOT NULL COMMENT 'branch transaction id',
  `xid` varchar(128) NOT NULL COMMENT 'global transaction id',
  `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
  `rollback_info` longblob NOT NULL COMMENT 'rollback info',
  `log_status` int NOT NULL COMMENT '0:normal status,1:defense status',
  `log_created` datetime(6) NOT NULL COMMENT 'create datetime',
  `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='AT transaction mode undo table';
复制代码

至此,所有服务的数据表已经建立完毕。

服务配置

接下来,我们需要根据需要给每个服务加上对应的配置。

  • awesome-business作为业务入口服务,需要配置nacos服务发现、openFeign远程RPC调用工具、seata分布式框架:
spring:
  cloud:
    nacos:
      discovery:
        enabled: true
        server-addr: "host:端口"
        # 需要在nacos中创建awesome-seata命名空间
        namespace: "awesome-seata"
        group: ${spring.profiles.active}
        username: "用户名"
        password: "密码"
    # openFeign优化,是否开启缓存,如果服务发现已经有缓存了,就可以不用开启缓存
    loadbalancer:
      cache:
        enabled: true
        # 过期时间10s
        ttl: 10
        # 容量256M
        capacity: 256
        caffeine:
          #          initialCapacity=[integer]: sets Caffeine.initialCapacity.
          #          maximumSize=[long]: sets Caffeine.maximumSize.
          #          maximumWeight=[long]: sets Caffeine.maximumWeight.
          #          expireAfterAccess=[duration]: sets Caffeine.expireAfterAccess(long, java.util.concurrent.TimeUnit).
          #          expireAfterWrite=[duration]: sets Caffeine.expireAfterWrite(long, java.util.concurrent.TimeUnit).
          #          refreshAfterWrite=[duration]: sets Caffeine.refreshAfterWrite(long, java.util.concurrent.TimeUnit).
          #          weakKeys: sets Caffeine.weakKeys().
          #          weakValues: sets Caffeine.weakValues().
          #          softValues: sets Caffeine.softValues().
          #          recordStats: sets Caffeine.recordStats().
          #         initialCapacity初始化键值对的数量
          spec: initialCapacity=500,expireAfterWrite=5s
# openFeign优化
feign:
  # 开启压缩功能
  compression:
    request:
      enabled: true
      mime-types: text/xml,application/xml,application/json
      min-request-size: 2048
    response:
      enabled: true
  # 不使用httpclient,改用okhttp
  httpclient:
    enabled: false
  okhttp:
    enabled: true
    # 是否禁用重定向
    follow-redirects: true
    connect-timeout: 5000
    # 链接失败是否重试
    retry-on-connection-failure: true
    read-timeout: 5000
    write-timeout: 5000
    # 最大空闲数量
    max-idle-connections: 5
    # 生存时间
    keep-alive-duration: 15000
  client:
    config:
      # 设置超时,囊括了okhttp的超时,okhttp属于真正执行的超时,openFeign属于服务间的超时
      # 设置全局超时时间
      default:
        connectTimeout: 2000
        readTimeout: 5000
      #   针对contextId设置超时时间
      walletApi:
        connectTimeout: 1000
        readTimeout: 2000
#        loggerLevel: FULL
seata:
  #  开启seata
  enabled: true
  #  注册中心找TC服务
  registry:
    type: nacos
    nacos:
      cluster: "default"
      username: "用户名"
      password: "密码"
      server-addr: "host:端口"
      group: SEATA_GROUP
      # 需要在nacos中创建awesome-seata命名空间
      namespace: seata-server
      application: seata-server
  application-id: ${spring.application.name}
  #  事务分组,需要对应vgroup-mapping下面的key
  tx-service-group: shanghai
  service:
    vgroup-mapping:
      # 该分组对应的TC集群名称
      # default对应TC的cluster字段
      shanghai: default
# 日志配置
logging:
  pattern:
    # 日志输出格式
    console: "%d{yyyy-MM-dd HH:mm:ss} %clr(%5p) [%thread] %clr(%logger){cyan} : %msg%n"
  level:
    #    trace < debug < info < warn < error < fatal
    # 全局日志级别
    root: info
    # 指定包日志级别
    com.example.awesomeabusiness: warn
复制代码

该配置中OpenFeign优化思路可以参考博客:openFeign的集成与优化

  • awesome-accountawesome-orderawesome-storage三个服务需要操作数据库、以及相关的分布式事务服务,所以对应的配置如下:
spring:
  # 数据库链接配置
  datasource:
    url: jdbc:mysql://host:端口/awesome-order?useUnicode=true&characterEncoding=UTF8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=UTC
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: "用户名"
    password: "密码"
    # 链接池
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      # 链接池初始化大小
      initial-size: 8
      # 最大活跃数
      max-active: 16
      # 最小空闲数
      min-idle: 1
      # 最大等待时间
      max-wait: 60000
  # 微服务配置
  cloud:
    nacos:
      discovery:
        enabled: true
        server-addr: "host:端口"
        namespace: "awesome-seata"
        group: ${spring.profiles.active}
        username: nacos
        password: nacos
        service: ${spring.application.name}
        register-enabled: true
        # 配置注册的目标ip,根据自己的需求设置
        ip: "127.0.0.1"
        # 注册的目标端口
        port: ${server.port}
        metadata:
          environment: ${spring.profiles.active}
          version: ${project.version}
    # openFeign优化
    loadbalancer:
      cache:
        enabled: true
        # 过期时间10s
        ttl: 10
        # 容量256M
        capacity: 256
        caffeine:
          #          initialCapacity=[integer]: sets Caffeine.initialCapacity.
          #          maximumSize=[long]: sets Caffeine.maximumSize.
          #          maximumWeight=[long]: sets Caffeine.maximumWeight.
          #          expireAfterAccess=[duration]: sets Caffeine.expireAfterAccess(long, java.util.concurrent.TimeUnit).
          #          expireAfterWrite=[duration]: sets Caffeine.expireAfterWrite(long, java.util.concurrent.TimeUnit).
          #          refreshAfterWrite=[duration]: sets Caffeine.refreshAfterWrite(long, java.util.concurrent.TimeUnit).
          #          weakKeys: sets Caffeine.weakKeys().
          #          weakValues: sets Caffeine.weakValues().
          #          softValues: sets Caffeine.softValues().
          #          recordStats: sets Caffeine.recordStats().
          #         initialCapacity初始化键值对的数量
          spec: initialCapacity=500,expireAfterWrite=5s
#openFeign优化
feign:
  # 开启压缩功能
  compression:
    request:
      enabled: true
      mime-types: text/xml,application/xml,application/json
      min-request-size: 2048
    response:
      enabled: true
  # 不使用httpclient,改用okhttp
  httpclient:
    enabled: false
  okhttp:
    enabled: true
    # 是否禁用重定向
    follow-redirects: true
    connect-timeout: 5000
    # 链接失败是否重试
    retry-on-connection-failure: true
    read-timeout: 5000
    write-timeout: 5000
    # 最大空闲数量
    max-idle-connections: 5
    # 生存时间
    keep-alive-duration: 15000
  client:
    config:
      # 设置超时,囊括了okhttp的超时,okhttp属于真正执行的超时,openFeign属于服务间的超时
      # 设置全局超时时间
      default:
        connectTimeout: 2000
        readTimeout: 5000
# mybatis配置
mybatis:
  check-config-location: true
  #  mybatis框架配置文件,对mybatis的生命周期起作用
  config-location: "classpath:mybatis/mybatis-config.xml"
  #  配置xml路径
  mapper-locations: "classpath:mybatis/mapper/*Mapper.xml"
  #  配置model包路径
  type-aliases-package: "com.example.awesomeorder.dao.entity.*"
# 分布式事务配置
seata:
  #  开启seata
  enabled: true
  #  注册中心找TC服务
  registry:
    type: nacos
    nacos:
      cluster: "default"
      username: "nacos"
      password: "nacos"
      server-addr: "host:端口"
      group: SEATA_GROUP
      namespace: seata-server
      application: seata-server
  application-id: ${spring.application.name}
  #  事务分组
  tx-service-group: shanghai
  service:
    vgroup-mapping:
      # 该分组对应的TC集群名称
      shanghai: default
# 日志配置
logging:
  pattern:
    # 日志输出格式
    console: "%d{yyyy-MM-dd HH:mm:ss} %clr(%5p) [%thread] %clr(%logger){cyan} : %msg%n"
  level:
    #    trace < debug < info < warn < error < fatal
    # 全局日志级别
    root: info
    # 指定包日志级别
    com.example.awesomeorder: warn
复制代码

上述配置属于awesome-orderawesome-accountawesome-storage没有rpc调用的可以删掉OpenFeign相关的配置:

spring:
  # 数据库链接配置
  datasource:
    url: jdbc:mysql://host:端口/数据库名称?useUnicode=true&characterEncoding=UTF8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=UTC
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: "用户名"
    password: "密码"
    # 链接池
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      # 链接池初始化大小
      initial-size: 8
      # 最大活跃数
      max-active: 16
      # 最小空闲数
      min-idle: 1
      # 最大等待时间
      max-wait: 60000
  # 微服务配置
  cloud:
    nacos:
      discovery:
        enabled: true
        server-addr: "host:端口"
        namespace: "awesome-seata"
        group: ${spring.profiles.active}
        username: nacos
        password: nacos
        service: ${spring.application.name}
        register-enabled: true
        # 配置注册的目标ip,根据自己的需求设置
        ip: "127.0.0.1"
        # 注册的目标端口
        port: ${server.port}
        metadata:
          environment: ${spring.profiles.active}
          version: ${project.version}
# mybatis配置
mybatis:
  check-config-location: true
  #  mybatis框架配置文件,对mybatis的生命周期起作用
  config-location: "classpath:mybatis/mybatis-config.xml"
  #  配置xml路径
  mapper-locations: "classpath:mybatis/mapper/*Mapper.xml"
  #  配置model包路径,根据包路径修改
  type-aliases-package: "com.example.awesomeaccount.dao.entity.*"
# 分布式事务配置
seata:
  #  开启seata
  enabled: true
  #  注册中心找TC服务
  registry:
    type: nacos
    nacos:
      cluster: "default"
      username: "nacos"
      password: "nacos"
      server-addr: "host:端口"
      group: SEATA_GROUP
      namespace: seata-server
      application: seata-server
  application-id: ${spring.application.name}
  #  事务分组
  tx-service-group: shanghai
  service:
    vgroup-mapping:
      # 该分组对应的TC集群名称
      shanghai: default
# 日志配置
logging:
  pattern:
    # 日志输出格式
    console: "%d{yyyy-MM-dd HH:mm:ss} %clr(%5p) [%thread] %clr(%logger){cyan} : %msg%n"
  level:
    #    trace < debug < info < warn < error < fatal
    # 全局日志级别
    root: info
    # 指定包日志级别,根据包路径修改
    com.example.awesomeaccount: warn
复制代码

代码实现

  • 首先我们主要看一下awesome-business项目中的service实现:
package com.example.awesomebusiness.service.impl;
import com.example.accountapi.model.AmountInfo;
import com.example.awesomebusiness.api.OrderApiClient;
import com.example.awesomebusiness.api.WalletApiClient;
import com.example.awesomebusiness.service.ShoppingCartService;
import com.example.orderapi.model.OrderInfo;
import io.seata.core.exception.GlobalTransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
 * @author zouwei
 * @className ShoppingCartServiceImpl
 * @date: 2022/9/18 14:01
 * @description:
 */
@Service
public class ShoppingCartServiceImpl implements ShoppingCartService {
  // 钱包服务
  @Resource
  private WalletApiClient walletApiClient;
  // 订单服务
  @Resource
  private OrderApiClient orderApiClient;
  // 别忘记了这个注解,这是开启分布式事务的标记,前提是当前业务逻辑不处于任何的分布式事务当中才能开启新的分布式事务
  @GlobalTransactional
  public String placeOrder() throws GlobalTransactionException {
    // 模拟用户ID 123456,对应数据库初始化的用户ID
    String userId = "123456";
    // 构建订单数据
    OrderInfo orderInfo = new OrderInfo();
    // 数量15个
    orderInfo.setCount(15);
    // 商品编码,对应库存数据表的初始化数据
    orderInfo.setCommodityCode("CC-54321");
    // 单价299,默认是long类型,单位分;避免double精度丢失
    orderInfo.setUnitPrice(299);
    // 订单归属
    orderInfo.setUserId(userId);
    // 计算扣款金额,数量*单价
    long amount = orderInfo.getCount() * orderInfo.getUnitPrice();
    // 构建扣款数据
    AmountInfo amountInfo = new AmountInfo();
    // 设置扣款金额
    amountInfo.setAmount(amount);
    // 设置扣款主体
    amountInfo.setUserId(userId);
    // 先扣款,扣款成功就创建订单,扣减库存在创建订单的逻辑里面
    if (walletApiClient.deductMoney(amountInfo) && orderApiClient.createOrder(orderInfo)) {
      return "下单成功!";
    }
    // 1.扣款失败,抛异常,分布式事务回滚
    // 2.创建订单失败,抛异常,分布式事务回滚
    throw new GlobalTransactionException("下单失败!");
  }
}
复制代码


相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
4天前
|
存储 关系型数据库 MySQL
基于Seata实现分布式事务
通过以上步骤,你可以使用 Seata 实现分布式事务,确保在微服务架构中的事务一致性。Seata 支持多种语言和框架,能够满足不同业务场景的需求。欢迎关注威哥爱编程,一起学习成长。
|
4天前
|
安全 Java 测试技术
Spring Boot集成支付宝支付:概念与实战
【4月更文挑战第29天】在电子商务和在线业务应用中,集成有效且安全的支付解决方案是至关重要的。支付宝作为中国领先的支付服务提供商,其支付功能的集成可以显著提升用户体验。本篇博客将详细介绍如何在Spring Boot应用中集成支付宝支付功能,并提供一个实战示例。
41 2
|
2天前
|
消息中间件 Java 数据安全/隐私保护
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
|
2天前
|
负载均衡 监控 Java
我把Spring Cloud的超详细资料介绍给你,面试官不会生气吧?geigei
我把Spring Cloud的超详细资料介绍给你,面试官不会生气吧?geigei
|
3天前
|
负载均衡 Java 应用服务中间件
Spring Cloud 负载平衡的意义什么?
负载平衡是指将网络流量在多个服务器之间分布,以达到提高系统性能、增强可靠性和提供更好用户体验的目的。在负载平衡的架构中,多个服务器被组织成一个集群,共同处理用户的请求。
26 4
|
4天前
|
NoSQL Java MongoDB
【MongoDB 专栏】MongoDB 与 Spring Boot 的集成实践
【5月更文挑战第11天】本文介绍了如何将非关系型数据库MongoDB与Spring Boot框架集成,以实现高效灵活的数据管理。Spring Boot简化了Spring应用的构建和部署,MongoDB则以其对灵活数据结构的处理能力受到青睐。集成步骤包括:添加MongoDB依赖、配置连接信息、创建数据访问对象(DAO)以及进行数据操作。通过这种方式,开发者可以充分利用两者优势,应对各种数据需求。在实际应用中,结合微服务架构等技术,可以构建高性能、可扩展的系统。掌握MongoDB与Spring Boot集成对于提升开发效率和项目质量至关重要,未来有望在更多领域得到广泛应用。
【MongoDB 专栏】MongoDB 与 Spring Boot 的集成实践
|
4天前
|
监控 安全 Java
Spring cloud原理详解
Spring cloud原理详解
18 0
|
4天前
|
安全 Java 数据库连接
在IntelliJ IDEA中通过Spring Boot集成达梦数据库:从入门到精通
在IntelliJ IDEA中通过Spring Boot集成达梦数据库:从入门到精通
|
4天前
|
消息中间件 负载均衡 Java
【Spring Cloud 初探幽】
【Spring Cloud 初探幽】
16 1
|
4天前
|
安全 Java Docker

热门文章

最新文章