一、Seata 分布式事务
1.1 概述
2019 年 1 月,阿里巴巴中间件团队发起了开源项目 Fescar(Fast & EaSy Commit And Rollback),其愿景是让分布式事务的使用像本地事务的使用一样,简单和高效,并逐步解决开发者们 遇到的分布式事务方面的所有难题。后来更名为 Seata,意为:Simple Extensible Autonomous Transaction Architecture,是一套分布式事务解决方案。
1.2 Seata的执行流程如下
A服务的TM向TC申请开启一个全局事务,TC就会创建一个全局事务并返回一个唯一的XID
A服务的RM向TC注册分支事务,并及其纳入XID对应全局事务的管辖
A服务执行分支事务,向数据库做操作
A服务开始远程调用B服务,此时XID会在微服务的调用链上传播
B服务的RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖
B服务执行分支事务,向数据库做操作
全局事务调用链处理完毕,TM根据有无异常向TC发起全局事务的提交或者回滚
TC协调其管辖之下的所有分支事务, 决定是否回滚
二、模拟下单扣减库存
Order服务
2.1 OrderController
@RestController @Slf4j public class OrderController5 { @Autowired private OrderServiceImpl5 orderService; //下单 @RequestMapping("/order/prod/{pid}") public Order order(@PathVariable("pid") Integer pid) { log.info("接收到{}号商品的下单请求,接下来调用商品微服务查询此商品信息", pid); return orderService.createOrder(pid); } }
2.2 OrderServiceImpl
@Service @Slf4j public class OrderServiceImpl5{ @Autowired private OrderDao orderDao; @Autowired private ProductService productService; @Autowired private RocketMQTemplate rocketMQTemplate; //@GlobalTransactional // seata全局事务控制 @Transactional // 本地事务控制 public Order createOrder(Integer pid) { //1 调用商品微服务,查询商品信息 Product product = productService.findByPid(pid); log.info("查询到{}号商品的信息,内容是:{}", pid, JSON.toJSONString(product)); //2 下单(创建订单) Order order = new Order(); order.setUid(1); order.setUsername(" 测 试 用 户 "); order.setPid(pid); order.setPname(product.getPname()); order.setPprice(product.getPprice()); order.setNumber(1); orderDao.save(order); log.info("创建订单成功,订单信息为{}", JSON.toJSONString(order)); //3 扣库存 productService.reduceInventory(pid, order.getNumber()); //4 向mq中投递一个下单成功的消息 rocketMQTemplate.convertAndSend("order-topic", order); return order; } }
2.3 ProductService
@FeignClient(value = "service-product") public interface ProductService { //减库存 @RequestMapping("/product/reduceInventory") void reduceInventory(@RequestParam("pid") Integer pid, @RequestParam("num") int num); }
Product微服务
2.4 ProductController
//减少库存 @RequestMapping("/product/reduceInventory") public void reduceInventory(Integer pid, int num) { productService.reduceInventory(pid, num); }
2.5 ProductService
@Override public void reduceInventory(Integer pid, int num) { Product product = productDao.findById(pid).get(); product.setStock(product.getStock() - num);//减库存 productDao.save(product); }
2.6 模拟下单异常
在ProductServiceImpl的代码中模拟一个异常, 然后调用下单接口
@Override public void reduceInventory(Integer pid, Integer number) { Product product = productDao.findById(pid).get(); if (product.getStock() < number) { throw new RuntimeException("库存不足"); } int i = 1 / 0; // 异常所在 //扣减库存操作 product.setStock(product.getStock() - number); productDao.save(product); }
http://localhost:8091/order/prod/1
在2.2中的本地事务@Transactional,那么数据库会得到神魔结果呢???
答案不言而喻:订单创建成功,然后库存并没有得到响应的减少。
三、Seata引入
3.1 下载
下载地址:https://github.com/seata/seata/releases/v0.9.0/
3.2 修改配置文件
由于我这里使用的nacos注册中心作为管理。
将下载得到的压缩包进行解压,进入conf目录,调整下面的配置文件:
1.registry.conf
registry { type = "nacos" nacos { serverAddr = "localhost" namespace = "public" cluster = "default" } } config { type = "nacos" nacos { serverAddr = "localhost" namespace = "public" cluster = "default" } }
2.nacos.conf
service.vgroup_mapping.service-product=default service.vgroup_mapping.service-order=default
3.3 初始化seata在nacos中的配置
# 初始化seata 的nacos配置 # 注意: 这里要保证nacos是已经正常运行的 cd conf nacos-config.sh 127.0.0.1 # cmd 运行如下命令
执行成功后可以打开Nacos的控制台,在配置列表中,可以看到初始化了很多Group为SEATA_GROUP 的配置。
3.4 启动Seata
cd bin seata-server.bat -p 9000 -m file
启动后在 Nacos 的服务列表下面可以看到一个名为 serverAddr 的服务。
四、Seata实现事务控制
4.1 初始化数据表
在我们的数据库中加入一张undo_log表,这是Seata记录事务日志要用到的表
CREATE TABLE `undo_log` ( `id` BIGINT(20) NOT NULL AUTO_INCREMENT, `branch_id` BIGINT(20) NOT NULL, `xid` VARCHAR(100) NOT NULL, `context` VARCHAR(128) NOT NULL, `rollback_info` LONGBLOB NOT NULL, `log_status` INT(11) NOT NULL, `log_created` DATETIME NOT NULL, `log_modified` DATETIME NOT NULL, `ext` VARCHAR(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`) ) ENGINE = INNODB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
4.2 添加配置
在需要进行分布式控制的微服务中进行下面几项配置:
4.2.1 pom依赖
这里两个[订单order服务,库存product服务],都需要加入这个依赖。
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency>
4.2.2 DataSourceProxyConfig
Seata 是通过代理数据源实现事务分支的,所以需要配置 io.seata.rm.datasource.DataSourceProxy 的
Bean,且是 @Primary默认的数据源,否则事务不会回滚,无法实现分布式事务
这里两个[订单order服务,库存product服务]
@Configuration public class DataSourceProxyConfig { @Bean @ConfigurationProperties(prefix = "spring.datasource") public DruidDataSource druidDataSource() { return new DruidDataSource(); } @Primary @Bean public DataSourceProxy dataSource(DruidDataSource druidDataSource) { return new DataSourceProxy(druidDataSource); } }
4.2.3 registry.conf
这里两个[订单order服务,库存product服务]的
在resources下添加Seata的配置文件 registry.conf
registry { type = "nacos" nacos { serverAddr = "localhost" namespace = "public" cluster = "default" } } config { type = "nacos" nacos { serverAddr = "localhost" namespace = "public" } } cluster = "default"
4.2.4 bootstrap.yml
这里两个[订单order服务,库存product服务]的
这个配置文件会优先进行读取,只要引入了nacos的conf这个依赖,一定要添加这个配置文件,否则无法注册服务。
spring: application: name: service-product和service-order cloud: nacos: config: server-addr: localhost:8848 # nacos的服务端地址 namespace: public group: SEATA_GROUP alibaba: seata: tx-service-group: ${spring.application.name}
4.2.5 在2.2 中的order微服务开启全局事务
@GlobalTransactional//全局事务控制 public Order createOrder(Integer pid) {}
http://localhost:8091/order/prod/1
测试下单服务,会发现要 下单和库存一起成功,要么一起失败,只要出现了异常 int i = 1/0; 两者都不会写入成功。