account-api
、order-api
、storage-api
作为桥接层,只需要spring mvc
的功能即可,所以它们对应的依赖为:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.3</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <!-- 记得修改对应的artifactId --> <artifactId>account-api</artifactId> <version>0.0.1-SNAPSHOT</version> <name>account-api</name> <description>account-api</description> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <dependencies> <!-- spring mvc --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> </project> 复制代码
至此,所有服务对应的依赖就全部搞定。
数据表的建立
awesome-business
是不需要操作数据库的,所以不存在数据表,只有awesome-account
、awesome-order
、awesome-storage
三个RM服务需要操作数据库,所以需要建立数据表。
awesome-account
需要创建钱包
对应的表,另外就是seata的undo_log
:
DROP TABLE IF EXISTS `wallet_tbl`; CREATE TABLE `wallet_tbl` ( `id` int NOT NULL COMMENT '主键', `user_id` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID', `money` int NOT NULL COMMENT '账户余额', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -- ---------------------------- -- 初始化钱包数据 -- ---------------------------- INSERT INTO `wallet_tbl` (`id`, `user_id`, `money`) VALUES (1, '123456', 100000); -- ---------------------------- -- Table structure for undo_log -- ---------------------------- DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `branch_id` bigint NOT NULL COMMENT 'branch transaction id', `xid` varchar(128) NOT NULL COMMENT 'global transaction id', `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` longblob NOT NULL COMMENT 'rollback info', `log_status` int NOT NULL COMMENT '0:normal status,1:defense status', `log_created` datetime(6) NOT NULL COMMENT 'create datetime', `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime', UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='AT transaction mode undo table'; 复制代码
awesome-order
需要创建订单表与undo_log
:
DROP TABLE IF EXISTS `order_tbl`; CREATE TABLE `order_tbl` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '主键', `user_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID', `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码', `count` int NOT NULL COMMENT '数量', `unit_price` int NOT NULL COMMENT '单价', `create_time` datetime NOT NULL COMMENT '创建时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -- ---------------------------- -- Table structure for undo_log -- ---------------------------- DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `branch_id` bigint NOT NULL COMMENT 'branch transaction id', `xid` varchar(128) NOT NULL COMMENT 'global transaction id', `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` longblob NOT NULL COMMENT 'rollback info', `log_status` int NOT NULL COMMENT '0:normal status,1:defense status', `log_created` datetime(6) NOT NULL COMMENT 'create datetime', `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime', UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='AT transaction mode undo table'; 复制代码
awesome-storage
需要创建库存表与undo_log
:
DROP TABLE IF EXISTS `stock_tbl`; CREATE TABLE `stock_tbl` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '主键', `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码', `count` int NOT NULL COMMENT '库存数', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -- ---------------------------- -- 初始化库存数据 -- ---------------------------- INSERT INTO `stock_tbl` (`id`, `commodity_code`, `count`) VALUES (1, 'CC-54321', 100); -- ---------------------------- -- Table structure for undo_log -- ---------------------------- DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `branch_id` bigint NOT NULL COMMENT 'branch transaction id', `xid` varchar(128) NOT NULL COMMENT 'global transaction id', `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` longblob NOT NULL COMMENT 'rollback info', `log_status` int NOT NULL COMMENT '0:normal status,1:defense status', `log_created` datetime(6) NOT NULL COMMENT 'create datetime', `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime', UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='AT transaction mode undo table'; 复制代码
至此,所有服务的数据表已经建立完毕。
服务配置
接下来,我们需要根据需要给每个服务加上对应的配置。
awesome-business
作为业务入口服务,需要配置nacos服务发现、openFeign远程RPC调用工具、seata分布式框架:
spring: cloud: nacos: discovery: enabled: true server-addr: "host:端口" # 需要在nacos中创建awesome-seata命名空间 namespace: "awesome-seata" group: ${spring.profiles.active} username: "用户名" password: "密码" # openFeign优化,是否开启缓存,如果服务发现已经有缓存了,就可以不用开启缓存 loadbalancer: cache: enabled: true # 过期时间10s ttl: 10 # 容量256M capacity: 256 caffeine: # initialCapacity=[integer]: sets Caffeine.initialCapacity. # maximumSize=[long]: sets Caffeine.maximumSize. # maximumWeight=[long]: sets Caffeine.maximumWeight. # expireAfterAccess=[duration]: sets Caffeine.expireAfterAccess(long, java.util.concurrent.TimeUnit). # expireAfterWrite=[duration]: sets Caffeine.expireAfterWrite(long, java.util.concurrent.TimeUnit). # refreshAfterWrite=[duration]: sets Caffeine.refreshAfterWrite(long, java.util.concurrent.TimeUnit). # weakKeys: sets Caffeine.weakKeys(). # weakValues: sets Caffeine.weakValues(). # softValues: sets Caffeine.softValues(). # recordStats: sets Caffeine.recordStats(). # initialCapacity初始化键值对的数量 spec: initialCapacity=500,expireAfterWrite=5s # openFeign优化 feign: # 开启压缩功能 compression: request: enabled: true mime-types: text/xml,application/xml,application/json min-request-size: 2048 response: enabled: true # 不使用httpclient,改用okhttp httpclient: enabled: false okhttp: enabled: true # 是否禁用重定向 follow-redirects: true connect-timeout: 5000 # 链接失败是否重试 retry-on-connection-failure: true read-timeout: 5000 write-timeout: 5000 # 最大空闲数量 max-idle-connections: 5 # 生存时间 keep-alive-duration: 15000 client: config: # 设置超时,囊括了okhttp的超时,okhttp属于真正执行的超时,openFeign属于服务间的超时 # 设置全局超时时间 default: connectTimeout: 2000 readTimeout: 5000 # 针对contextId设置超时时间 walletApi: connectTimeout: 1000 readTimeout: 2000 # loggerLevel: FULL seata: # 开启seata enabled: true # 注册中心找TC服务 registry: type: nacos nacos: cluster: "default" username: "用户名" password: "密码" server-addr: "host:端口" group: SEATA_GROUP # 需要在nacos中创建awesome-seata命名空间 namespace: seata-server application: seata-server application-id: ${spring.application.name} # 事务分组,需要对应vgroup-mapping下面的key tx-service-group: shanghai service: vgroup-mapping: # 该分组对应的TC集群名称 # default对应TC的cluster字段 shanghai: default # 日志配置 logging: pattern: # 日志输出格式 console: "%d{yyyy-MM-dd HH:mm:ss} %clr(%5p) [%thread] %clr(%logger){cyan} : %msg%n" level: # trace < debug < info < warn < error < fatal # 全局日志级别 root: info # 指定包日志级别 com.example.awesomeabusiness: warn 复制代码
该配置中OpenFeign优化思路可以参考博客:openFeign的集成与优化
awesome-account
、awesome-order
、awesome-storage
三个服务需要操作数据库、以及相关的分布式事务服务,所以对应的配置如下:
spring: # 数据库链接配置 datasource: url: jdbc:mysql://host:端口/awesome-order?useUnicode=true&characterEncoding=UTF8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=UTC driver-class-name: com.mysql.cj.jdbc.Driver username: "用户名" password: "密码" # 链接池 type: com.alibaba.druid.pool.DruidDataSource druid: # 链接池初始化大小 initial-size: 8 # 最大活跃数 max-active: 16 # 最小空闲数 min-idle: 1 # 最大等待时间 max-wait: 60000 # 微服务配置 cloud: nacos: discovery: enabled: true server-addr: "host:端口" namespace: "awesome-seata" group: ${spring.profiles.active} username: nacos password: nacos service: ${spring.application.name} register-enabled: true # 配置注册的目标ip,根据自己的需求设置 ip: "127.0.0.1" # 注册的目标端口 port: ${server.port} metadata: environment: ${spring.profiles.active} version: ${project.version} # openFeign优化 loadbalancer: cache: enabled: true # 过期时间10s ttl: 10 # 容量256M capacity: 256 caffeine: # initialCapacity=[integer]: sets Caffeine.initialCapacity. # maximumSize=[long]: sets Caffeine.maximumSize. # maximumWeight=[long]: sets Caffeine.maximumWeight. # expireAfterAccess=[duration]: sets Caffeine.expireAfterAccess(long, java.util.concurrent.TimeUnit). # expireAfterWrite=[duration]: sets Caffeine.expireAfterWrite(long, java.util.concurrent.TimeUnit). # refreshAfterWrite=[duration]: sets Caffeine.refreshAfterWrite(long, java.util.concurrent.TimeUnit). # weakKeys: sets Caffeine.weakKeys(). # weakValues: sets Caffeine.weakValues(). # softValues: sets Caffeine.softValues(). # recordStats: sets Caffeine.recordStats(). # initialCapacity初始化键值对的数量 spec: initialCapacity=500,expireAfterWrite=5s #openFeign优化 feign: # 开启压缩功能 compression: request: enabled: true mime-types: text/xml,application/xml,application/json min-request-size: 2048 response: enabled: true # 不使用httpclient,改用okhttp httpclient: enabled: false okhttp: enabled: true # 是否禁用重定向 follow-redirects: true connect-timeout: 5000 # 链接失败是否重试 retry-on-connection-failure: true read-timeout: 5000 write-timeout: 5000 # 最大空闲数量 max-idle-connections: 5 # 生存时间 keep-alive-duration: 15000 client: config: # 设置超时,囊括了okhttp的超时,okhttp属于真正执行的超时,openFeign属于服务间的超时 # 设置全局超时时间 default: connectTimeout: 2000 readTimeout: 5000 # mybatis配置 mybatis: check-config-location: true # mybatis框架配置文件,对mybatis的生命周期起作用 config-location: "classpath:mybatis/mybatis-config.xml" # 配置xml路径 mapper-locations: "classpath:mybatis/mapper/*Mapper.xml" # 配置model包路径 type-aliases-package: "com.example.awesomeorder.dao.entity.*" # 分布式事务配置 seata: # 开启seata enabled: true # 注册中心找TC服务 registry: type: nacos nacos: cluster: "default" username: "nacos" password: "nacos" server-addr: "host:端口" group: SEATA_GROUP namespace: seata-server application: seata-server application-id: ${spring.application.name} # 事务分组 tx-service-group: shanghai service: vgroup-mapping: # 该分组对应的TC集群名称 shanghai: default # 日志配置 logging: pattern: # 日志输出格式 console: "%d{yyyy-MM-dd HH:mm:ss} %clr(%5p) [%thread] %clr(%logger){cyan} : %msg%n" level: # trace < debug < info < warn < error < fatal # 全局日志级别 root: info # 指定包日志级别 com.example.awesomeorder: warn 复制代码
上述配置属于awesome-order
,awesome-account
和awesome-storage
没有rpc调用的可以删掉OpenFeign相关的配置:
spring: # 数据库链接配置 datasource: url: jdbc:mysql://host:端口/数据库名称?useUnicode=true&characterEncoding=UTF8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=UTC driver-class-name: com.mysql.cj.jdbc.Driver username: "用户名" password: "密码" # 链接池 type: com.alibaba.druid.pool.DruidDataSource druid: # 链接池初始化大小 initial-size: 8 # 最大活跃数 max-active: 16 # 最小空闲数 min-idle: 1 # 最大等待时间 max-wait: 60000 # 微服务配置 cloud: nacos: discovery: enabled: true server-addr: "host:端口" namespace: "awesome-seata" group: ${spring.profiles.active} username: nacos password: nacos service: ${spring.application.name} register-enabled: true # 配置注册的目标ip,根据自己的需求设置 ip: "127.0.0.1" # 注册的目标端口 port: ${server.port} metadata: environment: ${spring.profiles.active} version: ${project.version} # mybatis配置 mybatis: check-config-location: true # mybatis框架配置文件,对mybatis的生命周期起作用 config-location: "classpath:mybatis/mybatis-config.xml" # 配置xml路径 mapper-locations: "classpath:mybatis/mapper/*Mapper.xml" # 配置model包路径,根据包路径修改 type-aliases-package: "com.example.awesomeaccount.dao.entity.*" # 分布式事务配置 seata: # 开启seata enabled: true # 注册中心找TC服务 registry: type: nacos nacos: cluster: "default" username: "nacos" password: "nacos" server-addr: "host:端口" group: SEATA_GROUP namespace: seata-server application: seata-server application-id: ${spring.application.name} # 事务分组 tx-service-group: shanghai service: vgroup-mapping: # 该分组对应的TC集群名称 shanghai: default # 日志配置 logging: pattern: # 日志输出格式 console: "%d{yyyy-MM-dd HH:mm:ss} %clr(%5p) [%thread] %clr(%logger){cyan} : %msg%n" level: # trace < debug < info < warn < error < fatal # 全局日志级别 root: info # 指定包日志级别,根据包路径修改 com.example.awesomeaccount: warn 复制代码
代码实现
- 首先我们主要看一下
awesome-business
项目中的service实现:
package com.example.awesomebusiness.service.impl; import com.example.accountapi.model.AmountInfo; import com.example.awesomebusiness.api.OrderApiClient; import com.example.awesomebusiness.api.WalletApiClient; import com.example.awesomebusiness.service.ShoppingCartService; import com.example.orderapi.model.OrderInfo; import io.seata.core.exception.GlobalTransactionException; import io.seata.spring.annotation.GlobalTransactional; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * @author zouwei * @className ShoppingCartServiceImpl * @date: 2022/9/18 14:01 * @description: */ @Service public class ShoppingCartServiceImpl implements ShoppingCartService { // 钱包服务 @Resource private WalletApiClient walletApiClient; // 订单服务 @Resource private OrderApiClient orderApiClient; // 别忘记了这个注解,这是开启分布式事务的标记,前提是当前业务逻辑不处于任何的分布式事务当中才能开启新的分布式事务 @GlobalTransactional public String placeOrder() throws GlobalTransactionException { // 模拟用户ID 123456,对应数据库初始化的用户ID String userId = "123456"; // 构建订单数据 OrderInfo orderInfo = new OrderInfo(); // 数量15个 orderInfo.setCount(15); // 商品编码,对应库存数据表的初始化数据 orderInfo.setCommodityCode("CC-54321"); // 单价299,默认是long类型,单位分;避免double精度丢失 orderInfo.setUnitPrice(299); // 订单归属 orderInfo.setUserId(userId); // 计算扣款金额,数量*单价 long amount = orderInfo.getCount() * orderInfo.getUnitPrice(); // 构建扣款数据 AmountInfo amountInfo = new AmountInfo(); // 设置扣款金额 amountInfo.setAmount(amount); // 设置扣款主体 amountInfo.setUserId(userId); // 先扣款,扣款成功就创建订单,扣减库存在创建订单的逻辑里面 if (walletApiClient.deductMoney(amountInfo) && orderApiClient.createOrder(orderInfo)) { return "下单成功!"; } // 1.扣款失败,抛异常,分布式事务回滚 // 2.创建订单失败,抛异常,分布式事务回滚 throw new GlobalTransactionException("下单失败!"); } } 复制代码