技术经验解读:三、TransactionalTemplate处理全局事务

简介: 技术经验解读:三、TransactionalTemplate处理全局事务

目录


所有文章正文execute方法getCurrentOrCreatebeginTransactioncommitTransactioncompleteTransactionAfterThrowing总结


正文


回到顶部所有文章


回到顶部正文


上一篇文章中,我们看了一下GlobalTransactionalInterceptor这个拦截器,知道了@GlobalTransactional注解的主体逻辑被委托给了TransactionalTemplate来实现。


本文,将看一下TransationalTemplate这个模板方法处理全局事务的主体逻辑。


回到顶部execute方法


首先,我们跟进TransactionalTemplate的execute方法


public Object execute(TransactionalExecutor business) throws Throwable {


// 1. 获取或者创建一个全局事务


GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();


// 1.1 获取事务信息


TransactionInfo txInfo = business.getTransactionInfo();


if (txInfo == null) {


throw new ShouldNeverHappenException("transactionInfo does not exist");


}


try {


// 2. 开始全局事务


beginTransaction(txInfo, tx);


Object rs = null;


try {


// 执行业务逻辑


rs = business.execute();


} catch (Throwable ex) {


// 3.rollback全局事务


completeTransactionAfterThrowing(txInfo,tx,ex);


throw ex;


}


// 4. commit全局事务


commitTransaction(tx);


return rs;


} finally {


//5. 清理


triggerAfterCompletion();


cleanUp();


}


}


execute方法的逻辑我们应该非常的熟悉,这和JDBC的API非常的相似。同样是经历:begin -> commit || rollback,这样一个逻辑。


步骤主要分为如下几个:


1)获取或者创建一个全局事务;


2)begin全局事务;


3)异常rollback事务;


4)正常commit事务;


下面,我们将逐步阅读对应步骤的代码


getCurrentOrCreate


GlobalTransactionContext作为一个上下文,提供了获取或者创建全局事务的方法,跟进它的getCurrentOrCreate方法


public static GlobalTransaction getCurrentOrCreate() {


GlobalTransaction tx = getCurrent();


// 如果获取不到现有的全局事务,那么创建一个


if (tx == null) {


return createNew();


}


return tx;


}


我们,先看看它是如何获取到已经存在的全局事务的,跟进getCurrent方法


private static GlobalTransaction getCurrent() {


// 获取全局事务的XID


String xid = RootContext.getXID();


// XID不存在,表示不存在全局事务


if (xid == null) {


return null;


}


// 否则以参与者的身份加入全局事务中


return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);


}


显然,判断是否存在于全局事务中是根据传递而来的XID是否存在而决定的。我们跟进getXID看看从哪里获取XID


private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();


public static String getXID() {


String xid = CONTEXT_HOLDER.get(KEY_XID);


if (StringUtils.isNotBlank(xid)) {


return xid;


}


String xidType = //代码效果参考:http://www.jhylw.com.cn/451823643.html

CONTEXT_HOLDER.get(KEY_XID_INTERCEPTOR TYPE);

if (StringUtils.isNotBlank(xidType) && xidType.indexOf("") > -1) {


return xidType.split("_")【0】;


}


return null;


}


可以看到,CONTEXT_HOLDER是一个静态对象,通过load方法加载的。跟进load,我们看一下加载逻辑


private static class ContextCoreHolder {


private static ContextCore instance;


static {


// SPI机制加载自定义配置


ContextCore contextCore = EnhancedServiceLoader.load(ContextCore.class);


if (contextCore == null) {


// 默认采用ThreadLocal实现


contextCore = new ThreadLocalContextCore();


}


instance = contextCore;


}


}


public static ContextCore load() {


return ContextCoreHolder.instance;


}


seata没有直接采用JAVA提供的SPI机制,而是自己写了一套。通过load方法加载自定义实现,不过默认是选择ThreadLocal来实现的。也就是说,在同一个线程中可以直接获取XID,不同线程中通过set到threadLocal中实现传递。


总得来说,只要有XID,那么就表示已经存在全局事务。


我们再回到getCurrentOrCreate方法中


public static GlobalTransaction getCurrentOrCreate() {


GlobalTransaction tx = getCurrent();


// 如果获取不到现有的全局事务,那么创建一个


if (tx == null) {


return createNew();


}


return tx;


}


如果XID不存在,那么tx就会为null,这时候将会创建初始的GlobalTransaction。跟进createNew方法看看创建过程


private static GlobalTransaction createNew() {


GlobalTransaction tx = new DefaultGlobalTransaction();


return tx;


}


创建过程非常简单,直接new了一个默认实现类。那么,DefaultGlobalTransaction的构造方法有没有做什么?


DefaultGlobalTransaction() {


// status是unknow表示初始状态,role是launcher表示发起者


this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);


}


可以看到,和加入一个事务不同,创建一个新的事务状态是unknow而不是begin,角色是创建人而不是参与者。


到这里我们应该明白,分布式事务中XID的传递显得非常重要,这样你才能把不同机器中的本地事务关联到全局事务当中,而如果存在XID丢失的问题就比较可怕了。


beginTransaction


经过getCurrentOrCreate以后,我们已经获得了一个新的或者既有的全局事务。在业务逻辑执行之前,我们需要先做一次begin。跟进beginTransaction


private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {


try {


triggerBeforeBegin();


// 调用的是全局事务的begin方法


tx.begin(txInfo.getTimeOut(), txInfo.getName());


triggerAfterBegin();


} catch (TransactionException txe) {


throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);


}


}


继续跟进DefaultGlobalTransaction的begin方法


public void begin(int timeout, String name) throws TransactionException {


// 参与者不做后续处理


if (role != GlobalTransactionRole.Launcher) {


return;


}


// 省略


// 调用TransactionManager,begin全局事务


xid = transactionManager.begin(null, null, name, timeout);


// status从UNKNOW变成BEGIN


status = GlobalStatus.Begin;


// 绑定XID到threadLocal


RootContext.bind(xid);


}


可以看到,参与者这里被return了,所以全局事务只会begin一次。调用TransactionManager以后会生成一个XID标识这个全局事务,并绑定到context当中,状态变成BEGIN。


我们跟进TransactionManager的begin方法,看看做了啥


@Override


public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {


// 构建一个begin请求


GlobalBeginRequest request = new GlobalBeginRequest();


request.setTransactionName(name);


request.setTimeout(timeout);


// 同步请求seata的服务端


GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);


if (response.getResultCode() == ResultCode.Failed) {


throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());


}


// 获取服务端的XID


return response.getXid();


}


TransactionManager将发起一个请求到服务端,由服务端来生成一个XID。可见,开启一个全局事务会在服务端生成一份全局事务的信息。这时候全局事务处于begin状态了


commitTransaction


业务逻辑处理完毕,将由commitTransaction来提交全局事务。跟进commitTransaction方法看看


private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {


try {


triggerBeforeCommit();


// 提交全局事务


tx.commit();


triggerAfterCommit();


} catch (TransactionException txe) {


throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure);


}


}


和begin一样,将调用GlobalTransaction的commit方法,跟进commit方法


@Override


public void commit() throws TransactionException {


if (role == GlobalTransactionRole.Participant) {


return;


}


// 省略


int retry = COMMIT_RETRY_COUNT;


try {


while (retry > 0) {


try {


// 提交该XID的全局事务


status = transactionManager.commit(xid);


break;


} catch (Throwable ex) {


retry--;


if (retry == 0) {


throw new TransactionException("Failed to report global commit", ex);


}


}


}


} finally {


if (RootContext.getXID() != null) {


// 解绑XID


if (xid.equals(RootContext.getXID())) {


RootContext.unbind();


}


}


}


}


GlobalTransaction也是调用了TransactionManager的commit方法,finally块将解绑XID,可见如果到finally部分基本上算是结束了。而while部分做了一些重试操作。


再看看TransactionManager的commit方法


@Override


public GlobalStatus commit(String xid) throws TransactionException {


// 全局事务提交的请求


GlobalCommitRequest globalCommit = new GlobalCommitRequest();


globalCommit.setXid(xid);


// 同步提交全局事务提交请求


GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);


return response.getGlobalStatus();


}


请求Server端,响应回来的status将作为本地全局事务的status。如果一切顺利,到GlobalTransaction的commit这里就已经结束了。


completeTransactionAfterThrowing


标准的事务逻辑除了commit还存在rollback的逻辑,当业务逻辑执行异常的时候要进行rollback回滚操作。


我们跟进completeTransactionAfterThrowing方法看看


private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable ex) throws TransactionalExecutor.ExecutionException {


// 判断是否回滚


if (txInfo != null && txInfo.rollbackOn(ex)) {


try {


// 回滚事务


rollbackTransaction(tx, ex);


} catch (TransactionException txe) {


throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, ex);


}


} else {


// 如果不需要回滚,直接commit


commitTransaction(tx);


}


}


可以看到,并不是所有异常都需要回滚的,txInfo包含的回滚规则会判断该异常是否回滚,如果不需要回滚直接commit全局事务。


跟进rollbackTransaction看看回滚操作


private void rollbackTransaction(GlobalTransaction tx, Throwable ex) throws TransactionException, TransactionalExecutor.ExecutionException {


triggerBeforeRollback();


// 调用GlobalTransaction的rollback操作


<span style="color: rgba(0, 0

相关文章
|
SQL OceanBase Python
OBCP第五章 分布式事务高级技术-分布式两阶段提交
OBCP第五章 分布式事务高级技术-分布式两阶段提交
205 0
|
6月前
|
消息中间件 Dubbo 应用服务中间件
分布式事物【Hmily实现TCC分布式事务、Hmily实现TCC事务、最终一致性分布式事务解决方案】(七)-全面详解(学习总结---从入门到深化)
分布式事物【Hmily实现TCC分布式事务、Hmily实现TCC事务、最终一致性分布式事务解决方案】(七)-全面详解(学习总结---从入门到深化)
196 0
|
4月前
|
存储 监控 安全
【实战经验】记录项目开发常见的8个难题
风沙席地起,战马踏风归!
187 21
|
6月前
|
Dubbo 应用服务中间件 微服务
分布式事物【Hmily实现TCC分布式事务、Hmily实现TCC事务、最终一致性分布式事务解决方案】(七)-全面详解(学习总结---从入门到深化)(上)
分布式事物【Hmily实现TCC分布式事务、Hmily实现TCC事务、最终一致性分布式事务解决方案】(七)-全面详解(学习总结---从入门到深化)
86 1
|
存储 消息中间件 关系型数据库
聊一聊分布式事务的解决方案
分布式事务是分布式系统中非常重要的一部分,最典型的例子是银行转账和扣款,A 和 B 的账户信息在不同的服务器上,A 给 B 转账 100 元,要完成这个操作,需要两个步骤,从 A 的账户上扣款,以及在 B 的账户上增加金额,两个步骤必须全部执行成功;否则如果有一个失败,那么另一个操作也不能执行。 那么像这种转账扣款的例子,在业务中如何保证一致性,有哪些解决方案呢?
256 0
|
6月前
|
消息中间件 RocketMQ 微服务
分布式事物【Hmily实现TCC分布式事务、Hmily实现TCC事务、最终一致性分布式事务解决方案】(七)-全面详解(学习总结---从入门到深化)(下)
分布式事物【Hmily实现TCC分布式事务、Hmily实现TCC事务、最终一致性分布式事务解决方案】(七)-全面详解(学习总结---从入门到深化)
196 1
|
6月前
|
SQL 关系型数据库 MySQL
分布式事物【XA强一致性分布式事务实战、分布式架构的理论知识、TCC核心组成】(六)-全面详解(学习总结---从入门到深化)
分布式事物【XA强一致性分布式事务实战、分布式架构的理论知识、TCC核心组成】(六)-全面详解(学习总结---从入门到深化)
60 0
|
存储 缓存 NoSQL
实战经验之线上对事务的处理
事务是我们平时项目中对数据操作最为直接、常用的方式,现在无论是大小公司都离不开对事务的操作,伴随业务的提升,客户量的积累也大大增加了对事务管理的难度。
189 0
实战经验之线上对事务的处理
|
存储 调度
【攻破技术盲点】一起学习和巩固TCC分布式事务模型
【攻破技术盲点】一起学习和巩固TCC分布式事务模型
166 0
【攻破技术盲点】一起学习和巩固TCC分布式事务模型
|
SQL 前端开发 分布式数据库
18.【学习心得】学习心得-柔性事务落地
【学习心得】学习心得-柔性事务落地
18.【学习心得】学习心得-柔性事务落地
下一篇
无影云桌面