springcloud+eureka整合seata-tcc模式(上)

简介: springcloud+eureka整合seata-tcc模式

微信图片_20221212142643.jpg

分布式事务中的tcc模式理论介绍的文章非常多,但是网上找到一个代码实现的demo很难,包括阿里的seata官方示例都没有TCC模式的具体实现。今天我们来看一下微服务环境下使用seata TCC模式解决分布式事务的场景,同时提供一个详细的实现。


本文使用的实验环境跟上篇《springcloud+eureka整合分布式事务中间件seata》类似,都是订单、库存和账户3个微服务,全局事务从订单发起:

springboot:2.1.6.RELEASE

orm框架:jdbc

数据库:mysql

数据库连接池:HikariCP

seata server:1.3.0

springcloud:Greenwich.SR2


注:因为微服务采用跟上篇介绍的一样,所以环境搭建就不再重复写了,大家实验过程中有问题的可以参考上篇文章,或者号内留言。


理论回顾


前面我讲了2篇关于seata的文章,都是使用了seata的AT模式,seata AT模式依赖的还是单个服务或单个数据源自己的事务控制(分支事务),采用的是wal的思想,提交事务的时候同时记录undolog,如果全局事务成功,则删除undolog,如果失败,则使用undolog的数据回滚分支事务,最后删除undolog。


TCC模式的特点是不再依赖于undolog,采用2阶段提交的方式,第一阶段使用prepare尝试事务提交,第二阶段使用commit或者rollback让事务提交或者回滚。官方的示例图如下:

微信图片_20221212142737.png

从示例图可以看到,TM对全局事务进行管理,RM对分支事务进行管理,而TC管理着全局事务和分支事务的状态,RM需要注册到TC。TM发起全局事务后,调用TM(每个分支事务)的prepare进行try操作,成功后TC会调用RM的commit方法,失败后TC会调用分支事务的rollback方法。


使用spring的事务管理进行尝试


我试图使用spring的编程式事务来实现2阶段提交,我们先看一下prepare方法,代码如下:

public boolean decrease(String xid, Long userId, BigDecimal payAmount) {
    LOGGER.info("------->尝试扣减账户开始account");
    //尝试扣减账户金额,事务不提交
    DefaultTransactionDefinition def = new DefaultTransactionDefinition();
    def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
    TransactionStatus status = transactionManager.getTransaction(def);
    try {
        accountDao.decrease(userId,payAmount);
        //此处不提交事务
        transactionStatusMap.put(xid, status);
    } catch (Exception e) {
        LOGGER.error("decrease parepare failure:", e);
        return false;
    }
    LOGGER.info("------->尝试扣减账户结束account");
    return true;
}

这样我在这个方法中不提交事务,等到请求调用commit方法时,再提交事务,commit方法代码如下:

public boolean commit(String xid){
    LOGGER.info("commit, xid:{}", xid);
    if (null == transactionStatusMap.get(xid)){
        return true;
    }
    transactionManager.commit(transactionStatusMap.get(xid));
    transactionStatusMap.remove(xid);
    return true;
}

但是spring是不允许这么做的,第二次http请求到来时,线程跟第一次请求的线程不一样了,所以抛出下面异常:

java.lang.IllegalStateException: No value for key [HikariDataSource (HikariPool-1)] bound to thread [http-nio-8181-exec-2]
 at org.springframework.transaction.support.TransactionSynchronizationManager.unbindResource(TransactionSynchronizationManager.java:213) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE]
 at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCleanupAfterCompletion(DataSourceTransactionManager.java:367) ~[spring-jdbc-5.1.8.RELEASE.jar:5.1.8.RELEASE]
 at org.springframework.transaction.support.AbstractPlatformTransactionManager.cleanupAfterCompletion(AbstractPlatformTransactionManager.java:1007) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE]
 at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:793) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE]
 at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:714) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE]
 at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:534) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE]
 at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:305) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE]

这个异常源码如下:

//TransactionSynchronizationManager类
public static Object unbindResource(Object key) throws IllegalStateException {
  Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
  Object value = doUnbindResource(actualKey);
  if (value == null) {
    throw new IllegalStateException(
        "No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
  }
  return value;
}
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
private static Object doUnbindResource(Object actualKey) {
  Map<Object, Object> map = resources.get();//resources是ThreadLocal变量,所以第二个线程不可能取到第一个线程绑定的值
  if (map == null) {
    return null;//此处直接返回null
  }
  Object value = map.remove(actualKey);
  // Remove entire ThreadLocal if empty...
  if (map.isEmpty()) {
    resources.remove();
  }
  // Transparently suppress a ResourceHolder that was marked as void...
  if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
    value = null;
  }
  if (value != null && logger.isTraceEnabled()) {
    logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +
        Thread.currentThread().getName() + "]");
  }
  return value;
}

使用jdbc进行尝试


整个项目的sql语句跟上篇文章中基本一样,只是少了undo_log表:

#########################seata_order库
use database seata_order;
CREATE TABLE `orders` (
  `id` mediumint(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `product_id` int(11) DEFAULT NULL,
  `COUNT` int(11) DEFAULT NULL COMMENT '数量',
  `pay_amount` decimal(10,2) DEFAULT NULL,
  `status` varchar(100) DEFAULT NULL,
  `add_time` datetime DEFAULT CURRENT_TIMESTAMP,
  `last_update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8
#########################seata_pay库
use database seata_pay;
DROP TABLE account;
CREATE TABLE `account` (
  `id` BIGINT(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
  `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
  `total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',
  `used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额',
  `balance` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度',
  `last_update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
INSERT INTO `seata_pay`.`account` (`id`, `user_id`, `total`, `used`, `balance`) VALUES ('1', '1', '1000', '0', '100');
#########################seata_storage库
use database seata_storage;
CREATE TABLE `storage` (
  `id` BIGINT(11) NOT NULL AUTO_INCREMENT,
  `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
  `total` INT(11) DEFAULT NULL COMMENT '总库存',
  `used` INT(11) DEFAULT NULL COMMENT '已用库存',
  `residue` INT(11) DEFAULT NULL COMMENT '剩余库存',
  PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
INSERT INTO `seata_storage`.`storage` (`id`, `product_id`, `total`, `used`, `residue`) VALUES ('1', '1', '100', '0', '100');

回顾一下实验环境的架构图:

微信图片_20221212144338.png

可以看到,order-server既是一个RM,也是一个TM,因为全局事务从这里发起。


这里重点有几个地方说明一下:

1.全局事务从订单服务发起,OrderServiceImpl类create方法,代码如下:

@GlobalTransactional
public boolean create(Order order) {
    String xid = RootContext.getXID();
    LOGGER.info("------->交易开始");
    BusinessActionContext actionContext = new BusinessActionContext();
    actionContext.setXid(xid);
    boolean result = orderSaveImpl.saveOrder(actionContext, order);//订单服务prepare
    if(!result){
        throw new RuntimeException("保存订单失败");
    }
    //远程方法 扣减库存
    LOGGER.info("------->扣减库存开始storage中");
    result = storageApi.decrease(actionContext, order.getProductId(), order.getCount());//库存服务prepare
    if(!result){
        throw new RuntimeException("扣减库存失败");
    }
    LOGGER.info("------->扣减库存结束storage中");
    //远程方法 扣减账户余额
    LOGGER.info("------->扣减账户开始account中");
    result = accountApi.prepare(actionContext, order.getUserId(),order.getPayAmount());//账户服务prepare
    LOGGER.info("------->扣减账户结束account中" + result);
    LOGGER.info("------->交易结束");
    return true;
}

可以看到,全局事务发起的地方需要加@GlobalTransactional注解,这个事务首先获取了全局事务id,也就是xid,然后分别调了3个服务的prepare方法,只要有一个服务prepare返回失败,则抛出异常。


相关文章
|
23天前
|
数据库 微服务
SEATA模式
Seata 是一款开源的分布式事务解决方案,支持多种事务模式以适应不同的应用场景。其主要模式包括:AT(TCC)模式,事务分三阶段执行;TCC 模式,提供更灵活的事务控制;SAGA 模式,基于状态机实现跨服务的事务一致性;XA 模式,采用传统两阶段提交协议确保数据一致性。
41 5
|
26天前
|
负载均衡 Java 开发者
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
88 5
|
29天前
Seata框架在AT模式下是如何保证数据一致性的?
通过以上这些机制的协同作用,Seata 在 AT 模式下能够有效地保证数据的一致性,确保分布式事务的可靠执行。你还可以进一步深入研究 Seata 的具体实现细节,以更好地理解其数据一致性保障的原理。
40 3
|
3月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
3月前
|
负载均衡 Java Nacos
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
微服务介绍、SpringCloud、服务拆分和远程调用、Eureka注册中心、Ribbon负载均衡、Nacos注册中心
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
|
2月前
|
负载均衡 算法 Nacos
SpringCloud 微服务nacos和eureka
SpringCloud 微服务nacos和eureka
78 0
|
2月前
|
负载均衡 Java API
【Spring Cloud生态】Spring Cloud Gateway基本配置
【Spring Cloud生态】Spring Cloud Gateway基本配置
62 0
|
4月前
|
Java Spring
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
|
4月前
|
Java Spring 容器
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
|
4月前
|
存储 Java Spring
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?