32-微服务技术栈(高级):分布式事务Seata的TCC模式

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,182元/月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
简介: 在分布式架构系统中,服务不止一个,一个完整的业务链路肯定也不止调用一个服务,此时每个服务都有自己的数据库增删改查,而每一个写操作对应一个本地事务。如果想要确保全部的业务状态一致,也就意味着需要所有的本地事务状态一致,这在我们之前的学习中肯定是不具备的,如何做到跨服务、跨数据源的事务一致性将是本章节的重点学习内容。

TCC模式与AT模式非常相似,每阶段都是独立事务,不同的是TCC通过人工编码来实现数据恢复。需要实现三个方法:

  • Try:资源的检测和预留;
  • Confirm:完成资源操作业务;要求 Try 成功 Confirm 一定要能成功。
  • Cancel:预留资源释放,可以理解为try的反向操作。

1.流程分析

举例,一个扣减用户余额的业务。假设账户A原来余额是100,需要余额扣减30元。

  • 阶段一( Try ):检查余额是否充足,如果充足则冻结金额增加30元,可用余额扣除30

初始余额:

余额充足,可以冻结:

此时,总金额 = 冻结金额 + 可用金额,数量依然是100不变。事务直接提交无需等待其它事务。

  • 阶段二(Confirm):假如要提交(Confirm),则冻结金额扣减30

确认可以提交,不过之前可用金额已经扣减过了,这里只要清除冻结金额就好了:

此时,总金额 = 冻结金额 + 可用金额 = 0 + 70  = 70元

  • 阶段二(Canncel):如果要回滚(Cancel),则冻结金额扣减30,可用余额增加30

需要回滚,那么就要释放冻结金额,恢复可用金额:

2.Seata的TCC模型

Seata中的TCC模型依然延续之前的事务架构,如图:

3.优缺点

TCC模式的每个阶段是做什么的?

  • Try:资源检查和预留
  • Confirm:业务执行和提交
  • Cancel:预留资源的释放

TCC的优点是什么?

  • 一阶段完成直接提交事务,释放数据库资源,性能好
  • 相比AT模型,无需生成快照,无需使用全局锁,性能最强
  • 不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库

TCC的缺点是什么?

  • 有代码侵入,需要人为编写try、Confirm和Cancel接口,太麻烦
  • 软状态,事务是最终一致
  • 需要考虑Confirm和Cancel的失败情况,做好幂等处理

4.事务悬挂和空回滚

1)空回滚

当某分支事务的try阶段阻塞时,可能导致全局事务超时而触发二阶段的cancel操作。在未执行try操作时先执行了cancel操作,这时cancel不能做回滚,就是空回滚

如图:

执行cancel操作时,应当判断try是否已经执行,如果尚未执行,则应该空回滚。

2)业务悬挂

对于已经空回滚的业务,之前被阻塞的try操作恢复,继续执行try,就永远不可能confirm或cancel ,事务一直处于中间状态,这就是业务悬挂

执行try操作时,应当判断cancel是否已经执行过了,如果已经执行,应当阻止空回滚后的try操作,避免悬挂

5.实现TCC模式

解决空回滚和业务悬挂问题,必须要记录当前事务状态,是在try、还是cancel?

1)思路分析

这里我们定义一张表;在 seata_demo数据库中创建如下表:

CREATE TABLE `account_freeze_tbl`(

   `xid` varchar(128) NOT NULL,

   `user_id` varchar(255) DEFAULT NULL COMMENT'用户id',

   `freeze_money`int(11) unsigned DEFAULT'0'COMMENT'冻结金额',

   `state`int(1) DEFAULT NULL COMMENT'事务状态,0:try,1:confirm,2:cancel',

   PRIMARY KEY(`xid`)USING BTREE

)ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;

其中:

  • xid:是全局事务id
  • freeze_money:用来记录用户冻结金额
  • state:用来记录事务状态

那此时,我们的业务该怎么做呢?

  • Try业务:
  • 记录冻结金额和事务状态到account_freeze表
  • 扣减account表可用金额
  • Confirm业务
  • 根据xid删除account_freeze表的冻结记录
  • Cancel业务
  • 修改account_freeze表,冻结金额为0,state为2
  • 修改account表,恢复可用金额
  • 如何判断是否空回滚?
  • cancel业务中,根据xid查询account_freeze,如果为null则说明try还没做,需要空回滚
  • 如何避免业务悬挂?
  • try业务中,根据xid查询account_freeze ,如果已经存在则证明Cancel已经执行,拒绝执行try业务

接下来,我们改造account-service,利用TCC实现余额扣减功能。

2)声明TCC接口

TCC的Try、Confirm、Cancel方法都需要在接口中基于注解来声明,

我们在account-service项目中的cn.itcast.account.service包中新建一个接口,声明TCC三个接口:

package cn.itcast.account.service;


import io.seata.rm.tcc.api.BusinessActionContext;

import io.seata.rm.tcc.api.BusinessActionContextParameter;

import io.seata.rm.tcc.api.LocalTCC;

import io.seata.rm.tcc.api.TwoPhaseBusinessAction;


@LocalTCC

public interface AccountTCCService {


   /**

    * 根据用户id扣减余额,记录冻结金额

    * commitMethod 提交时对于的方法;rollbackMethod回滚时对于的方法

    * @param userId 用户id

    * @param money 扣减金额

    */

   @TwoPhaseBusinessAction(name = "deduct", commitMethod = "confirm", rollbackMethod = "cancel")

   void deduct(@BusinessActionContextParameter("userId")String userId,

               @BusinessActionContextParameter("money")int money);


   /**

    * 执行TCC事务时,提交事务时执行的方法

    * @param ctx 上下文对象

    * @return 执行结果

    */

   Boolean confirm(BusinessActionContext ctx);

   /**

    * 执行TCC事务时,回滚事务时执行的方法

    * @param ctx 上下文对象

    * @return 执行结果

    */

   Boolean cancel(BusinessActionContext ctx);

}

3)编写实现类

account-service服务中的cn.itcast.account.service.impl包下新建一个类,实现TCC业务:

package cn.itcast.account.service.impl;


import cn.itcast.account.entity.AccountFreeze;

import cn.itcast.account.mapper.AccountFreezeMapper;

import cn.itcast.account.mapper.AccountMapper;

import cn.itcast.account.service.AccountTCCService;

import io.seata.core.context.RootContext;

import io.seata.rm.tcc.api.BusinessActionContext;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import org.springframework.transaction.annotation.Transactional;


@Service

public class AccountTCCServiceImpl implements AccountTCCService {


   @Autowired

   private AccountMapper accountMapper;


   @Autowired

   private AccountFreezeMapper accountFreezeMapper;


   @Override

   @Transactional

   public void deduct(String userId, int money) {

       //获取事务id

       String xid = RootContext.getXID();

       //处理业务悬挂

       AccountFreeze freeze = accountFreezeMapper.selectById(xid);

       if (freeze != null) {

           //已经cancel过,拒绝业务

           return;

       }


       //1 扣减余额

       accountMapper.deduct(userId, money);


       //2 记录冻结金额

       AccountFreeze accountFreeze = new AccountFreeze();

       accountFreeze.setXid(xid);

       accountFreeze.setUserId(userId);

       accountFreeze.setFreezeMoney(money);

       accountFreeze.setState(AccountFreeze.State.TRY);

       accountFreezeMapper.insert(accountFreeze);

   }


   @Override

   public Boolean confirm(BusinessActionContext ctx) {

       //删除冻结金额

       String xid = ctx.getXid();

       int count = accountFreezeMapper.deleteById(xid);

       return count==1;

   }


   @Override

   public Boolean cancel(BusinessActionContext ctx) {

       //获取全局事务id

       String xid = ctx.getXid();

       //用户id

       String userId = ctx.getActionContext("userId").toString();

       AccountFreeze accountFreeze = accountFreezeMapper.selectById(xid);


       //处理空回滚

       if (accountFreeze == null) {

           accountFreeze = new AccountFreeze();

           accountFreeze.setXid(xid);

           accountFreeze.setUserId(userId);

           accountFreeze.setFreezeMoney(0);

           accountFreeze.setState(AccountFreeze.State.CANCEL);

           accountFreezeMapper.insert(accountFreeze);

           return true;

       }


       //幂等处理

       if (accountFreeze.getState() == AccountFreeze.State.CANCEL) {

           //说明已经回滚过了;不需要再处理

           return true;

       }


       //1 回退金额

       accountMapper.refund(userId, accountFreeze.getFreezeMoney());


       //2 更新冻结金额为0和状态

       accountFreeze.setFreezeMoney(0);

       accountFreeze.setState(AccountFreeze.State.CANCEL);

       int count = accountFreezeMapper.updateById(accountFreeze);


       return count==1;

   }

}

改造 src/main/java/cn/itcast/account/web/AccountController.java 如下:

相关文章
|
7月前
|
SQL 数据建模 BI
【YashanDB 知识库】用 yasldr 配置 Bulkload 模式作单线程迁移 300G 的业务数据到分布式数据库,迁移任务频繁出错
问题描述 详细版本:YashanDB Server Enterprise Edition Release 23.2.4.100 x86_64 6db1237 影响范围: 离线数据迁移场景,影响业务数据入库。 外场将部分 NewCIS 的报表业务放到分布式数据库,验证 SQL 性能水平。 操作系统环境配置: 125G 内存 32C CPU 2T 的 HDD 磁盘 问题出现的步骤/操作: 1、部署崖山分布式数据库 1mm 1cn 3dn 单线启动 yasldr 数据迁移任务,设置 32 线程的 bulk load 模式 2、观察 yasldr.log 是否出现如下错
|
6月前
|
SQL
seata是怎么进行分布式事务控制的
seata是怎么进行分布式事务控制的
|
8月前
|
Java 关系型数据库 数据库
微服务SpringCloud分布式事务之Seata
SpringCloud+SpringCloudAlibaba的Seata实现分布式事务,步骤超详细,附带视频教程
597 1
|
9月前
|
数据库 微服务
SEATA模式
Seata 是一款开源的分布式事务解决方案,支持多种事务模式以适应不同的应用场景。其主要模式包括:AT(TCC)模式,事务分三阶段执行;TCC 模式,提供更灵活的事务控制;SAGA 模式,基于状态机实现跨服务的事务一致性;XA 模式,采用传统两阶段提交协议确保数据一致性。
275 5
|
1月前
|
存储 负载均衡 NoSQL
【赵渝强老师】Redis Cluster分布式集群
Redis Cluster是Redis的分布式存储解决方案,通过哈希槽(slot)实现数据分片,支持水平扩展,具备高可用性和负载均衡能力,适用于大规模数据场景。
188 2
|
1月前
|
存储 缓存 NoSQL
【📕分布式锁通关指南 12】源码剖析redisson如何利用Redis数据结构实现Semaphore和CountDownLatch
本文解析 Redisson 如何通过 Redis 实现分布式信号量(RSemaphore)与倒数闩(RCountDownLatch),利用 Lua 脚本与原子操作保障分布式环境下的同步控制,帮助开发者更好地理解其原理与应用。
92 0
|
2月前
|
存储 缓存 NoSQL
Redis核心数据结构与分布式锁实现详解
Redis 是高性能键值数据库,支持多种数据结构,如字符串、列表、集合、哈希、有序集合等,广泛用于缓存、消息队列和实时数据处理。本文详解其核心数据结构及分布式锁实现,帮助开发者提升系统性能与并发控制能力。
|
16天前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)