一、认识Seata
1.1 Seata 是什么?
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
1.2 了解AT、TCC、SAGA事务模式?
AT 模式
前提
- 基于支持本地 ACID 事务的关系型数据库。
- Java 应用,通过 JDBC 访问数据库。
整体机制
两阶段提交协议的演变:
- 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
- 二阶段:
提交异步化,非常快速地完成。
回滚通过一阶段的回滚日志进行反向补偿。
如何实现写隔离
过程:
- 一阶段本地事务提交前,需要确保先拿到 全局锁 。
- 拿不到 全局锁 ,不能提交本地事务。
- 拿 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。
举个栗子:
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
下面来看下官方的两张图来加深下理解:
tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
如何实现读隔离
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。
如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。
见官方图:
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
TCC 模式
一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:
- 一阶段 prepare 行为
- 二阶段 commit 或 rollback 行为
TCC 模式,不依赖于底层数据资源的事务支持:
- 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
- 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
- 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。
Saga 模式
Saga模式是SEATA提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。
官方图:
Saga 模式适用场景
- 业务流程长、业务流程多
- 参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口
Saga 模式优势
- 一阶段提交本地事务,无锁,高性能
- 事件驱动架构,参与者可异步执行,高吞吐
- 补偿服务易于实现
Saga 模式缺点
- 不保证隔离性
二、Seata安装
2.1 下载
下载地址:
- https://github.com/seata/seata/releases
- https://github.com/seata/seata/releases/download/v1.4.2/seata-server-1.4.2.zip
我这边下载的是v1.4.2版本,大家下载时需要注意下seata版本和springcloud alibaba的版本,根据自己的alibaba的版本选择对应的seata
给大家贴出组件版本关系对应:
2.2 创建所需数据表
2.2.1 创建 分支表、全局表、锁表
首先需要创建 分支表、全局表、锁表
创建sql如下
-- -------------------------------- The script used when storeMode is 'db' -------------------------------- -- the table to store GlobalSession data CREATE TABLE IF NOT EXISTS `global_table` ( `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `status` TINYINT NOT NULL, `application_id` VARCHAR(32), `transaction_service_group` VARCHAR(32), `transaction_name` VARCHAR(128), `timeout` INT, `begin_time` BIGINT, `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`xid`), KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), KEY `idx_transaction_id` (`transaction_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; -- the table to store BranchSession data CREATE TABLE IF NOT EXISTS `branch_table` ( `branch_id` BIGINT NOT NULL, `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `resource_group_id` VARCHAR(32), `resource_id` VARCHAR(256), `branch_type` VARCHAR(8), `status` TINYINT, `client_id` VARCHAR(64), `application_data` VARCHAR(2000), `gmt_create` DATETIME(6), `gmt_modified` DATETIME(6), PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; -- the table to store lock data CREATE TABLE IF NOT EXISTS `lock_table` ( `row_key` VARCHAR(128) NOT NULL, `xid` VARCHAR(128), `transaction_id` BIGINT, `branch_id` BIGINT NOT NULL, `resource_id` VARCHAR(256), `table_name` VARCHAR(32), `pk` VARCHAR(36), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`row_key`), KEY `idx_branch_id` (`branch_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8;
2.2.2 创建 UNDO_LOG 表
SEATA AT 模式需要 UNDO_LOG 表
创建sql如下:
CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
2.3 修改配置文件
2.3.1 修改 registry.conf 文件
conf目录下找到registry.conf文件
首先将type类型改为 nacos,type默认file (这种方式需要把源码的file.conf文件复制到项目中,比较麻烦不推荐) ,然后修改seata的注册中心的相关配置
其次修改seata的配置中心的相关配置,同样type类型改为nacos
2.3.2 修改 file.conf 文件
下面还需要修改seata的DB类型
我们在conf目录下找到file.conf文件
mode类型改为db
然后修改自己的数据库配置
2.4 启动seata
在bin目录下找到 seata-server.bat 双击启动
看到日志输出 Server started 应该就启动成功了
查看nacos注册中心
观察服务列表,发现seata服务已经成功注册
三、Seata的应用
3.1 springcloud项目整合seata
我们简单模拟下用户从下单到扣减库存的流程,来看看seata在项目中是如何应用的
3.1.1 服务架构
先看下我的项目的整体模块架构
springcloud版本
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <spring-cloud.version>2021.0.1</spring-cloud.version> <spring-cloud-alibaba.version>2021.0.1.0</spring-cloud-alibaba.version> </properties>
因为我的项目中已经有order的相关服务了, 为了故事的延续性我在建一个仓储的服务用来扣减库存
想参考我的项目架构的同学可以点击下面的地址
3.1.2 创建仓储服务
创建一个maven模块
为服务添加启动类配置文件和seata依赖等
seata依赖
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> <version>${spring-cloud-alibaba.version}</version> </dependency>
仓储服务application.yml文件
server: port: 9092 spring: application: name: mdx-shop-storage cloud: nacos: discovery: server-addr: localhost:8848 namespace: mdx group: mdx datasource: type: com.alibaba.druid.pool.DruidDataSource url: jdbc:mysql://localhost:3306/mdx_storage?autoRec&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 driverClassName: com.mysql.cj.jdbc.Driver username: root password: Bendi+Ceshi+ jpa: show-sql: true #打印执行的sql语句,false则不打印sql properties: hibernate: ddl-auto: none dialect: org.hibernate.dialect.MySQL5InnoDBDialect open-in-view: true seata: tx-service-group: my_test_tx_group enabled: true registry: type: nacos nacos: application: mdx-seata-server #注册在nacos服务名 server-addr: localhost:8848 group : mdx namespace: mdx #注册在nacos命名空间
3.1.3 创建仓储和订单数据库及数据表
我们为仓储服务和订单服务分别创建数据表
数据库自己提前建好
// 仓储 DROP TABLE IF EXISTS `storage_tbl`; CREATE TABLE `storage_tbl` ( `id` int(11) NOT NULL AUTO_INCREMENT, `commodity_code` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `count` int(11) NULL DEFAULT 0, PRIMARY KEY (`id`) USING BTREE, UNIQUE INDEX `commodity_code`(`commodity_code`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of storage_tbl -- ---------------------------- INSERT INTO `storage_tbl` VALUES (1, 'S123434455666777', 10); // 订单 DROP TABLE IF EXISTS `order_tbl`; CREATE TABLE `order_tbl` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` varchar(255) DEFAULT NULL, `commodity_code` varchar(255) DEFAULT NULL, `count` int(11) DEFAULT 0, `money` int(11) DEFAULT 0, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3.1.4 仓储服务相关实现
这里只贴出仓储服务的主要几个方法,具体的项目结构可以参考 https://gitee.com/Ji_Agang/mdx-shop
对于数据库的操作我们使用Spring Data Jpa来实现
依赖参考
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency>
启动类
注意启动类一定要加 @EnableAutoDataSourceProxy 注解,来开启数据源代理
/** * @author : jiagang * @date : Created in 2022/7/1 11:25 */ @SpringBootApplication @EnableFeignClients @EnableAutoDataSourceProxy public class MdxShopStorageApplication { public static void main(String[] args) { SpringApplication.run(MdxShopStorageApplication.class, args); } }
controller
/** * @author : jiagang * @date : Created in 2022/7/1 18:42 */ @RestController @RequestMapping("/storage") public class StorageController { @Autowired private StorageService service; @GetMapping("/deduct") public CommonResponse deduct(String commodityCode, int count){ service.deduct(commodityCode, count); return CommonResponse.success(); } }
接口
/** * @author : jiagang * @date : Created in 2022/7/1 18:40 */ public interface StorageService { /** * 扣除存储数量 */ void deduct(String commodityCode, int count); }
实现类
/** * @author : jiagang * @date : Created in 2022/7/1 18:42 */ @Service public class StorageServiceImpl implements StorageService { @Autowired private StorageRepository storageRepository; /** * 扣减库存 * @param commodityCode * @param count */ @Override public void deduct(String commodityCode, int count) { StorageTbl storageTbl = storageRepository.findByCommodityCode(commodityCode); if (storageTbl == null){ throw new BizException("storageTbl is null"); } // 这里先不考虑超卖的情况 storageTbl.setCount(storageTbl.getCount() - count); // 使用jpa 存在就更新 storageRepository.save(storageTbl); } }
数据层
/** * @author : jiagang * @date : Created in 2023/1/16 15:44 */ @Repository public interface StorageRepository extends JpaRepository<StorageTbl,Integer> { /** * 通过商品code查询库存 * @param commodityCode * @return */ @Query StorageTbl findByCommodityCode(String commodityCode); }
3.1.5 订单服务相关实现
这里只贴出订单服务的主要几个方法,具体的项目结构可以参考 https://gitee.com/Ji_Agang/mdx-shop
对于数据库的操作我们同样使用Jpa来实现
application.yml 配置文件
server: port: 9091 spring: application: name: mdx-shop-order cloud: nacos: discovery: server-addr: localhost:8848 namespace: mdx group: mdx datasource: type: com.alibaba.druid.pool.DruidDataSource url: jdbc:mysql://localhost:3306/mdx_order?autoRec&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 driverClassName: com.mysql.cj.jdbc.Driver username: root password: Bendi+Ceshi+ jpa: show-sql: true #打印执行的sql语句,false则不打印sql properties: hibernate: ddl-auto: none dialect: org.hibernate.dialect.MySQL5InnoDBDialect open-in-view: true seata: tx-service-group: my_test_tx_group enabled: true registry: type: nacos nacos: application: mdx-seata-server #注册在nacos服务名 server-addr: localhost:8848 group : mdx namespace: mdx #注册在nacos命名空间 feign: sentinel: enabled: true
启动类
注意启动类一定要加 @EnableAutoDataSourceProxy 注解,来开启数据源代理
/** * @author : jiagang * @date : Created in 2022/7/1 11:25 */ @SpringBootApplication @EnableFeignClients @EnableAutoDataSourceProxy public class MdxShopOrderApplication { public static void main(String[] args) { SpringApplication.run(MdxShopOrderApplication.class, args); } }
controller
/** * @author : jiagang * @date : Created in 2022/7/1 18:42 */ @RestController @RequestMapping("/order") public class OrderController { @Autowired private OrderService orderService; /** * 用户下单接口 * @param userId * @param commodityCode * @return */ @PostMapping("createOrder") public CommonResponse<String> createOrder(String userId, String commodityCode){ return CommonResponse.success(orderService.createOrder(userId,commodityCode)); } }
接口
/** * @author : jiagang * @date : Created in 2022/7/1 18:40 */ public interface OrderService { /** * 下单接口 * @param userId 用户id * @param commodityCode 商品代码 * @return */ String createOrder(String userId, String commodityCode); }
实现类
/** * @author : jiagang * @date : Created in 2022/7/1 18:42 */ @Service public class OrderServiceImpl implements OrderService { @Resource private OrderRepository orderRepository; @Resource private StorageFeign storageFeign; /** * 下单接口 * @param userId 用户id * @param commodityCode 商品代码 * @return */ @Override public String createOrder(String userId, String commodityCode) { try { System.out.println("事务id---------------------->" + RootContext.getXID()); // 创建订单 OrderTbl orderTbl = new OrderTbl(); orderTbl.setUserId(userId); orderTbl.setCommodityCode(commodityCode); orderTbl.setCount(1); // 假设为1件 orderTbl.setMoney(10); // 假设为十元 // 保存订单 orderRepository.save(orderTbl); // 保存订单成功后扣减库存 storageFeign.deduct(commodityCode,orderTbl.getCount()); return "success"; }catch (Exception e){ throw new BizException("创建订单失败"); } } }
数据层
/** * @author : jiagang * @date : Created in 2023/1/16 15:44 */ @Repository public interface OrderRepository extends JpaRepository<OrderTbl,Integer> { }
feign接口
对微服务之间使用feign来调用还不熟悉的同学可以点下面的链接
springcloud alibaba微服务 – openfeign的使用(保姆级)
/** * @author : jiagang * @date : Created in 2022/7/4 10:26 */ @FeignClient(value = "mdx-shop-storage") @Component public interface StorageFeign { /** * 扣减库存 * @param commodityCode * @param count * @return */ @GetMapping("storage/deduct") String deduct(@RequestParam String commodityCode,@RequestParam Integer count); }
3.2 模拟异常测试分布式事务
在进行测试之前,我们先来看下业务逻辑,我们使用postman来调用下单接口进行下单,接口地址为 http://localhost:9091/order/createOrder?userId=admin&commodityCode=S123434455666777 (POST请求) ,然后下单接口保存订单,并通过feign接口调用仓储服务扣减库存。
3.2.1 测试正常流程
正常流程下用户下单,订单数据库增加订单,仓储数据库为下单的商品扣减库存。
首先看一下订单数据库order_tbl表和仓储数据库storage_tbl表
订单表没有数据
仓储表有一条商品,库存为10
正常情况下,下单之后(我们只买一件商品)订单增加一条数据,仓储的S123434455666777商品库存减1 为 9
POST 请求调用 下单接口 http://localhost:9091/order/createOrder?userId=admin&commodityCode=S123434455666777
postman提示成功
看一下数据库
订单新增成功
库存减为9
3.2.2 模拟失败流程
先将数据库订单表清空,仓储表库存继续设置为10(这里不操作也行,大家记住之前的状态就可以了)
我们在仓储服务的扣减库存的方法中手动写一个异常,异常如下
System.out.println(1 / 0);
/** * 扣减库存 * @param commodityCode * @param count */ @Override public void deduct(String commodityCode, int count) { System.out.println("事务id---------------------->" + RootContext.getXID()); StorageTbl storageTbl = storageRepository.findByCommodityCode(commodityCode); if (storageTbl == null){ throw new BizException("storageTbl is null"); } // 模拟异常 System.out.println(1 / 0); // 这里先不考虑超卖的情况 storageTbl.setCount(storageTbl.getCount() - count); // 使用jpa 存在就更新 storageRepository.save(storageTbl); }
此时,再来调用下单接口
http://localhost:9091/order/createOrder?userId=admin&commodityCode=S123434455666777
可以看到服务报错,提示创建订单失败
再来观察一下数据库
发现订单依然创建成功,但是库存缺没有减少,还是10,这就导致了用户下单成功了,但是没给人减库存,造成数据不一致,可能会发生超卖。
3.2.2 添加分布式事务注解
先将数据库订单表清空(这里不操作也行,大家记住之前的状态就可以了)
为了解决上面的问题,我们为创建订单方法增加seata的分布式注解
@GlobalTransactional
/** * 下单接口 * @param userId 用户id * @param commodityCode 商品代码 * @return */ @Override @GlobalTransactional public String createOrder(String userId, String commodityCode) { try { System.out.println("事务id---------------------->" + RootContext.getXID()); // 创建订单 OrderTbl orderTbl = new OrderTbl(); orderTbl.setUserId(userId); orderTbl.setCommodityCode(commodityCode); orderTbl.setCount(1); // 假设为1件 orderTbl.setMoney(10); // 假设为十元 // 保存订单 orderRepository.save(orderTbl); // 保存订单成功后扣减库存 storageFeign.deduct(commodityCode,orderTbl.getCount()); return "success"; }catch (Exception e){ throw new BizException("创建订单失败"); } }
加上注解之后重启服务继续调用下单接口
可以看到创建订单失败
然后再来观察下数据库
发现订单表没有此商品的订单,库存也没变,那就表示事务已经成功回滚了,不会再出现订单创建成功了但库存没减的情况。
这篇文章查阅资料、测试、发现问题、解决问题花了大约两天时间,创作不易,点个赞吧👍
最后的最后送大家一句话
白驹过隙,沧海桑田