@[toc]
1 分布式事务理论基础
事务的AICD原则
- 原子性(Atomicity):事务中的所有操作,要么全部成功,要么全部失败
- 隔离性(Isolation):多个事务之间不会产生影响,两个事务操作同一个元素,不会产生影响,对同一资源操作的事务不能同时发生
- 一致性(Consistency):操作之前和操作之后总量是不变的
- 持久性(Durability):事务提交后,表中数据发生相应的变化
分布式事务:在分布式系统下,一个业务跨越多个服务或数据,每个服务都是一个分支事务,要保证所有分支事务最终状态一致,这样的事务就是分布式事务。1.1 CAP定理
Consistency
(一致性):用户访问分布式系统中的任意节点,得到的数据必须一致,因此,当修改数据是要保持备份的同步。Availability
(可用性):用户访问集群中的任意健康节点,必须能得到响应,而不是超时或拒绝。Partition tolerance
(分区容错性):分区:因为网络故障或其他原因导致分布式系统中的部分节点与其他节点失去连接,形成独立分区。容错:在集群出现分区时,整个系统也要持续对外提供服务。
分布式系统无法同时满足者三个指标,这个结论就叫做CAP定理。1.2 BASE理论
BASE理论是对CAP的一种解决思路,包含三个思想:Basically Available
(基本可用):分布式系统在出现故障时,允许损失部分可用性,即保证核心可用。Soft State
(软状态):在一定时间内,允许出现中间状态,比如临时的不一致状态。Eventually Consistent
(最终一致性):虽然无法保证强一致性,但是在软状态结束后,最终达到数据一致。1.3 借鉴CAP定理和BASS理论解决分布式事务
- AP模式:各子事务分别执行和提交,允许出现结果不一致,然后采用弥补措施恢复数据即可,实现
最终一致
。 - CP模式:各个子事务执行后互相等待,同时提交,同时回滚,达成
强一致
。但事务等待过程中,处于弱可用状态
。1.4 事务协调者
解决分布式事务,各个子系统之间必须能感知到彼此的事务状态,才能保证状态一致,因此需要一个事务协调者来协调每一个事务的参与者(子系统事务)。 - 全局事务:整个分布式事务
- 分支事务:分布式事务中包含的每个子系统的事务
- 最终一致思想(AP模式):各分支事务分别执行并提交,如果有不一致的情况,再想办法恢复数据
- 强一致思想(CP模式):各分支事务执行完业务不要提交,等待彼此结果。而后统一提交或回滚。
2 seata简介
Seata事务管理中有三个重要的角色: - TC(Transaction Coordinator)- 事务协调者:
维护
全局和分支事务的状态
,协调
全局事务提交和回滚
。 - TM(Transaction Manager)- 事务管理器:
定义
全局事务的范围、开始
全局事务、提交或回滚全局事务。 - RM(Resource Manager)- 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
Seata提供了四种不同的分布式事务解决方案: - XA模式:强一致性分阶段事务模式,牺牲了一定的可用性,无业务侵入
- TCC模式:最终一致的分阶段事务模式,有业务侵入
- AT模式:最终一致的分阶段事务模式,无业务侵入,也是Seata的默认模式
- SAGA模式:长事务模式,有业务侵入
2.1 seata服务启动
配置如下:
```yaml
server:
port: 7091
spring:
application:
name: seata-server
logging:
config: classpath:logback-spring.xml
file:
path: ${user.home}/logs/seata
extend:
logstash-appender:
destination: 127.0.0.1:4560
kafka-appender:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash
console:
user:
username: seata
password: seata
seata:
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
ignore:
urls: /,//*.css,//.js,/**/.html,//*.map,//.svg,/**/.png,//*.ico,/console-fe/public/,/api/v1/auth/login
config:
# support: nacos 、 consul 、 apollo 、 zk 、 etcd3
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace:
group: SEATA_GROUP
username: nacos
password: nacos
context-path:
data-id: seataServer.properties
registry:
# support: nacos 、 eureka 、 redis 、 zk 、 consul 、 etcd3 、 sofa
type: nacos
preferred-networks: 30.240.*
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group: DEFAULT_GROUP
namespace:
cluster: SH
username: nacos
password: nacos
context-path:
store:
# support: file 、 db 、 redis
mode: db
db:
datasource: druid
db-type: mysql
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/yf-seata?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
user: root
password: yuan159951.
min-conn: 10
max-conn: 100
global-table: global_table
branch-table: branch_table
lock-table: lock_table
distributed-lock-table: distributed_lock
query-limit: 1000
max-wait: 5000
## 2.2 seata客户端启动
### 2.2.1 引入依赖
```xml
<!-- SpringBoot Seata -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
2.2.2 配置如下
# spring配置
spring:
redis:
host: localhost
port: 6379
password:
datasource:
druid:
stat-view-servlet:
enabled: true
loginUsername: admin
loginPassword: 123456
dynamic:
druid:
initial-size: 5
min-idle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,slf4j
connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
datasource:
# 主库数据源
master:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/ry-cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: yuan159951.
# seata_order数据源
order:
username: root
password: yuan159951.
url: jdbc:mysql://localhost:3306/seata_order?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
driver-class-name: com.mysql.cj.jdbc.Driver
# seata_account数据源
account:
username: root
password: yuan159951.
url: jdbc:mysql://localhost:3306/seata_account?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
driver-class-name: com.mysql.cj.jdbc.Driver
# seata_product数据源
product:
username: root
password: yuan159951.
url: jdbc:mysql://localhost:3306/seata_product?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
driver-class-name: com.mysql.cj.jdbc.Driver
#开启seata代理,开启后默认每个数据源都代理,如果某个不需要代理可单独关闭
seata: true
# seata配置
seata:
enabled: true
# Seata 应用编号,默认为 ${spring.application.name}
application-id: ${
spring.application.name}
# Seata 事务组编号,用于 TC 集群名
tx-service-group: ${
spring.application.name}-group
# 关闭自动代理
enable-auto-data-source-proxy: false
# 服务配置项
service:
# 虚拟组和分组的映射
vgroupMapping:
ruoyi-system-group: SH
# 分组和 Seata 服务的映射
grouplist:
default: 127.0.0.1:8091
# config:
# type: file
registry:
type: nacos
preferred-networks: 30.240.*
nacos:
server-addr: 127.0.0.1:8848
namespace:
group: DEFAULT_GROUP
application: seata-server
username: nacos
password: nacos
# config:
# type: nacos
# nacos:
# serverAddr: 127.0.0.1:8848
# data-id: seata-Server
# namespace:
# group: SEATA_GROUP
# cluster: default
# username: nacos
# password: nacos
2.2.3 业务代码参考
@Override
@Transactional
@GlobalTransactional // 重点 第一个开启事务的需要添加seata全局事务注解
public void placeOrder(PlaceOrderRequest request)
{
log.info("=============ORDER START=================");
Long userId = request.getUserId();
Long productId = request.getProductId();
Integer amount = request.getAmount();
log.info("收到下单请求,用户:{}, 商品:{},数量:{}", userId, productId, amount);
log.info("当前 XID: {}", RootContext.getXID());
Order order = new Order(userId, productId, 0, amount);
orderMapper.insert(order);
log.info("订单一阶段生成,等待扣库存付款中");
// 扣减库存并计算总价
Double totalPrice = productService.reduceStock(productId, amount);
// 扣减余额
accountService.reduceBalance(userId, totalPrice);
order.setStatus(1);
order.setTotalPrice(totalPrice);
orderMapper.updateById(order);
log.info("订单已成功下单");
log.info("=============ORDER END=================");
}
/**
* 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*/
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void reduceBalance(Long userId, Double price)
{
log.info("=============ACCOUNT START=================");
log.info("当前 XID: {}", RootContext.getXID());
Account account = accountMapper.selectById(userId);
Double balance = account.getBalance();
log.info("下单用户{}余额为 {},商品总价为{}", userId, balance, price);
if (balance < price)
{
log.warn("用户 {} 余额不足,当前余额:{}", userId, balance);
throw new RuntimeException("余额不足");
}
log.info("开始扣减用户 {} 余额", userId);
double currentBalance = account.getBalance() - price;
account.setBalance(currentBalance);
accountMapper.updateById(account);
log.info("扣减用户 {} 余额成功,扣减后用户账户余额为{}", userId, currentBalance);
log.info("=============ACCOUNT END=================");
}
/**
* 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*/
@DS("product")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public Double reduceStock(Long productId, Integer amount)
{
log.info("=============PRODUCT START=================");
log.info("当前 XID: {}", RootContext.getXID());
// 检查库存
Product product = productMapper.selectById(productId);
Integer stock = product.getStock();
log.info("商品编号为 {} 的库存为{},订单商品数量为{}", productId, stock, amount);
if (stock < amount)
{
log.warn("商品编号为{} 库存不足,当前库存:{}", productId, stock);
throw new RuntimeException("库存不足");
}
log.info("开始扣减商品编号为 {} 库存,单价商品价格为{}", productId, product.getPrice());
// 扣减库存
int currentStock = stock - amount;
product.setStock(currentStock);
productMapper.updateById(product);
double totalPrice = product.getPrice() * amount;
log.info("扣减商品编号为 {} 库存成功,扣减后库存为{}, {} 件商品总价为 {} ", productId, currentStock, amount, totalPrice);
log.info("=============PRODUCT END=================");
return totalPrice;
}
3 seata的四种模式
3.1 XA模式
优点:
- 事务的强一致性,满足ACID原则
- 常用数据库都支持,实现简单,并且没有代码侵入
缺点: - 因为一阶段需要锁定数据库资源,等待二阶段结束才释放,性能较差
- 依赖关系型数据库实现事务
seata: data-source-proxy-mode: XA # 开启数据源代理的XA模式
3.2 AT模式
- 一阶段:TM开启全局事务并调用分支事务,RM注册分支事务,直接执行sql并提交,记录undo-log快照。
- 二阶段:TM提交或回滚全局事务,若成功,则直接删除undo-log,若失败,则根据undo-log快照回滚,然后删除undo-log快照。
AT模式和XA模式最大的区别是: - XA模式一阶段不提交事务,锁定资源;AT模式一阶段直接提交,不锁定资源。
- XA模式依赖数据库机制实现回滚;AT模式利用数据快照实现数据回滚。
- XA模式强一直;AT模式最终一致。
- AT模式增加了全局锁,避免脏读(修改失败等)。可以人工介入。
AT模式的优点: - 一阶段完成直接提交事务,释放数据库资源,性能比较好
- 利用全局锁实现读写隔离
- 没有代码侵入,框架自动完成回滚和提交
AT模式的缺点: - 两阶段之间属于软状态,属于最终一致
- 框架的快照功能会影响性能,但比XA模式要好很多
seata: data-source-proxy-mode: AT # 开启数据源代理的AT模式
3.3 TCC模式
TCC模式与AT模式非常相似,每阶段都是独立事务,不同的是TCC通过人工编码来实现数据恢复。需要实现三个方法: - Try:资源的检测和预留。
- Confirm:完成资源操作业务;要求Try成功Confirm一定要能成功。
- Cancel:预留资源释放,可以理解为try的反向操作。
- 空回滚:根据xid查询补偿表,如果为null,则try还没执行,需要空回滚,根据Cancel和xid做幂等操作。
- 业务悬挂:根据xid查询补偿表,如果已经存在,则证明Cancel已经执行,拒绝执行try业务。
TCC的优点: - 一阶段完成直接提交事务,释放数据库资源,性能好
- 相比AT模式,无需生成快照,无需使用全局锁,性能最强
- 不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库
TCC的缺点: - 有代码侵入,需要人为编写try、Confirm、Cancel接口,太麻烦
- 软状态,事务是最终一致
- 需要考虑Confirm和Cancel的失败情况,做好幂等处理
3.4 Saga模式
应用场景:时间跨度大的业务,例如银行业务。
Saga模式是seata提供的长事务解决方案。也分为两个阶段: - 一阶段:直接提交本地事务
- 二阶段:成功则什么都不做,失败则通过编写补偿业务来回滚
Saga模式优点: - 事务参与者可以基于时间驱动实现异步调用,吞吐高
- 一阶段直接提交事务,无锁,性能好
- 不用编写TCC中的三个阶段,实现简单
Saga模式缺点 - 软状态持续时间不确定,时效性差
- 没有锁,没有事务隔离,会有脏写
4 四种模式对比