分布式事务中的tcc模式理论介绍的文章非常多,但是网上找到一个代码实现的demo很难,包括阿里的seata官方示例都没有TCC模式的具体实现。今天我们来看一下微服务环境下使用seata TCC模式解决分布式事务的场景,同时提供一个详细的实现。
本文使用的实验环境跟上篇《springcloud+eureka整合分布式事务中间件seata》类似,都是订单、库存和账户3个微服务,全局事务从订单发起:
springboot:2.1.6.RELEASE
orm框架:jdbc
数据库:mysql
数据库连接池:HikariCP
seata server:1.3.0
springcloud:Greenwich.SR2
注:因为微服务采用跟上篇介绍的一样,所以环境搭建就不再重复写了,大家实验过程中有问题的可以参考上篇文章,或者号内留言。
理论回顾
前面我讲了2篇关于seata的文章,都是使用了seata的AT模式,seata AT模式依赖的还是单个服务或单个数据源自己的事务控制(分支事务),采用的是wal的思想,提交事务的时候同时记录undolog,如果全局事务成功,则删除undolog,如果失败,则使用undolog的数据回滚分支事务,最后删除undolog。
TCC模式的特点是不再依赖于undolog,采用2阶段提交的方式,第一阶段使用prepare尝试事务提交,第二阶段使用commit或者rollback让事务提交或者回滚。官方的示例图如下:
从示例图可以看到,TM对全局事务进行管理,RM对分支事务进行管理,而TC管理着全局事务和分支事务的状态,RM需要注册到TC。TM发起全局事务后,调用TM(每个分支事务)的prepare进行try操作,成功后TC会调用RM的commit方法,失败后TC会调用分支事务的rollback方法。
使用spring的事务管理进行尝试
我试图使用spring的编程式事务来实现2阶段提交,我们先看一下prepare方法,代码如下:
public boolean decrease(String xid, Long userId, BigDecimal payAmount) { LOGGER.info("------->尝试扣减账户开始account"); //尝试扣减账户金额,事务不提交 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); TransactionStatus status = transactionManager.getTransaction(def); try { accountDao.decrease(userId,payAmount); //此处不提交事务 transactionStatusMap.put(xid, status); } catch (Exception e) { LOGGER.error("decrease parepare failure:", e); return false; } LOGGER.info("------->尝试扣减账户结束account"); return true; }
这样我在这个方法中不提交事务,等到请求调用commit方法时,再提交事务,commit方法代码如下:
public boolean commit(String xid){ LOGGER.info("commit, xid:{}", xid); if (null == transactionStatusMap.get(xid)){ return true; } transactionManager.commit(transactionStatusMap.get(xid)); transactionStatusMap.remove(xid); return true; }
但是spring是不允许这么做的,第二次http请求到来时,线程跟第一次请求的线程不一样了,所以抛出下面异常:
java.lang.IllegalStateException: No value for key [HikariDataSource (HikariPool-1)] bound to thread [http-nio-8181-exec-2] at org.springframework.transaction.support.TransactionSynchronizationManager.unbindResource(TransactionSynchronizationManager.java:213) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCleanupAfterCompletion(DataSourceTransactionManager.java:367) ~[spring-jdbc-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.transaction.support.AbstractPlatformTransactionManager.cleanupAfterCompletion(AbstractPlatformTransactionManager.java:1007) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:793) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:714) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:534) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:305) ~[spring-tx-5.1.8.RELEASE.jar:5.1.8.RELEASE]
这个异常源码如下:
//TransactionSynchronizationManager类 public static Object unbindResource(Object key) throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doUnbindResource(actualKey); if (value == null) { throw new IllegalStateException( "No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } return value; } private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources"); private static Object doUnbindResource(Object actualKey) { Map<Object, Object> map = resources.get();//resources是ThreadLocal变量,所以第二个线程不可能取到第一个线程绑定的值 if (map == null) { return null;//此处直接返回null } Object value = map.remove(actualKey); // Remove entire ThreadLocal if empty... if (map.isEmpty()) { resources.remove(); } // Transparently suppress a ResourceHolder that was marked as void... if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { value = null; } if (value != null && logger.isTraceEnabled()) { logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" + Thread.currentThread().getName() + "]"); } return value; }
使用jdbc进行尝试
整个项目的sql语句跟上篇文章中基本一样,只是少了undo_log表:
#########################seata_order库 use database seata_order; CREATE TABLE `orders` ( `id` mediumint(11) NOT NULL AUTO_INCREMENT, `user_id` int(11) DEFAULT NULL, `product_id` int(11) DEFAULT NULL, `COUNT` int(11) DEFAULT NULL COMMENT '数量', `pay_amount` decimal(10,2) DEFAULT NULL, `status` varchar(100) DEFAULT NULL, `add_time` datetime DEFAULT CURRENT_TIMESTAMP, `last_update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 #########################seata_pay库 use database seata_pay; DROP TABLE account; CREATE TABLE `account` ( `id` BIGINT(11) NOT NULL AUTO_INCREMENT COMMENT 'id', `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id', `total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度', `used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额', `balance` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度', `last_update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; INSERT INTO `seata_pay`.`account` (`id`, `user_id`, `total`, `used`, `balance`) VALUES ('1', '1', '1000', '0', '100'); #########################seata_storage库 use database seata_storage; CREATE TABLE `storage` ( `id` BIGINT(11) NOT NULL AUTO_INCREMENT, `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id', `total` INT(11) DEFAULT NULL COMMENT '总库存', `used` INT(11) DEFAULT NULL COMMENT '已用库存', `residue` INT(11) DEFAULT NULL COMMENT '剩余库存', PRIMARY KEY (`id`) ) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; INSERT INTO `seata_storage`.`storage` (`id`, `product_id`, `total`, `used`, `residue`) VALUES ('1', '1', '100', '0', '100');
回顾一下实验环境的架构图:
可以看到,order-server既是一个RM,也是一个TM,因为全局事务从这里发起。
这里重点有几个地方说明一下:
1.全局事务从订单服务发起,OrderServiceImpl类create方法,代码如下:
@GlobalTransactional public boolean create(Order order) { String xid = RootContext.getXID(); LOGGER.info("------->交易开始"); BusinessActionContext actionContext = new BusinessActionContext(); actionContext.setXid(xid); boolean result = orderSaveImpl.saveOrder(actionContext, order);//订单服务prepare if(!result){ throw new RuntimeException("保存订单失败"); } //远程方法 扣减库存 LOGGER.info("------->扣减库存开始storage中"); result = storageApi.decrease(actionContext, order.getProductId(), order.getCount());//库存服务prepare if(!result){ throw new RuntimeException("扣减库存失败"); } LOGGER.info("------->扣减库存结束storage中"); //远程方法 扣减账户余额 LOGGER.info("------->扣减账户开始account中"); result = accountApi.prepare(actionContext, order.getUserId(),order.getPayAmount());//账户服务prepare LOGGER.info("------->扣减账户结束account中" + result); LOGGER.info("------->交易结束"); return true; }
可以看到,全局事务发起的地方需要加@GlobalTransactional注解,这个事务首先获取了全局事务id,也就是xid,然后分别调了3个服务的prepare方法,只要有一个服务prepare返回失败,则抛出异常。