目录
所有文章正文execute方法getCurrentOrCreatebeginTransactioncommitTransactioncompleteTransactionAfterThrowing总结
正文
回到顶部所有文章
回到顶部正文
上一篇文章中,我们看了一下GlobalTransactionalInterceptor这个拦截器,知道了@GlobalTransactional注解的主体逻辑被委托给了TransactionalTemplate来实现。
本文,将看一下TransationalTemplate这个模板方法处理全局事务的主体逻辑。
回到顶部execute方法
首先,我们跟进TransactionalTemplate的execute方法
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. 获取或者创建一个全局事务
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.1 获取事务信息
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
try {
// 2. 开始全局事务
beginTransaction(txInfo, tx);
Object rs = null;
try {
// 执行业务逻辑
rs = business.execute();
} catch (Throwable ex) {
// 3.rollback全局事务
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}
// 4. commit全局事务
commitTransaction(tx);
return rs;
} finally {
//5. 清理
triggerAfterCompletion();
cleanUp();
}
}
execute方法的逻辑我们应该非常的熟悉,这和JDBC的API非常的相似。同样是经历:begin -> commit || rollback,这样一个逻辑。
步骤主要分为如下几个:
1)获取或者创建一个全局事务;
2)begin全局事务;
3)异常rollback事务;
4)正常commit事务;
下面,我们将逐步阅读对应步骤的代码
getCurrentOrCreate
GlobalTransactionContext作为一个上下文,提供了获取或者创建全局事务的方法,跟进它的getCurrentOrCreate方法
public static GlobalTransaction getCurrentOrCreate() {
GlobalTransaction tx = getCurrent();
// 如果获取不到现有的全局事务,那么创建一个
if (tx == null) {
return createNew();
}
return tx;
}
我们,先看看它是如何获取到已经存在的全局事务的,跟进getCurrent方法
private static GlobalTransaction getCurrent() {
// 获取全局事务的XID
String xid = RootContext.getXID();
// XID不存在,表示不存在全局事务
if (xid == null) {
return null;
}
// 否则以参与者的身份加入全局事务中
return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
}
显然,判断是否存在于全局事务中是根据传递而来的XID是否存在而决定的。我们跟进getXID看看从哪里获取XID
private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();
public static String getXID() {
String xid = CONTEXT_HOLDER.get(KEY_XID);
if (StringUtils.isNotBlank(xid)) {
return xid;
}
String xidType = //代码效果参考:http://www.jhylw.com.cn/451823643.html
CONTEXT_HOLDER.get(KEY_XID_INTERCEPTOR TYPE);if (StringUtils.isNotBlank(xidType) && xidType.indexOf("") > -1) {
return xidType.split("_")【0】;
}
return null;
}
可以看到,CONTEXT_HOLDER是一个静态对象,通过load方法加载的。跟进load,我们看一下加载逻辑
private static class ContextCoreHolder {
private static ContextCore instance;
static {
// SPI机制加载自定义配置
ContextCore contextCore = EnhancedServiceLoader.load(ContextCore.class);
if (contextCore == null) {
// 默认采用ThreadLocal实现
contextCore = new ThreadLocalContextCore();
}
instance = contextCore;
}
}
public static ContextCore load() {
return ContextCoreHolder.instance;
}
seata没有直接采用JAVA提供的SPI机制,而是自己写了一套。通过load方法加载自定义实现,不过默认是选择ThreadLocal来实现的。也就是说,在同一个线程中可以直接获取XID,不同线程中通过set到threadLocal中实现传递。
总得来说,只要有XID,那么就表示已经存在全局事务。
我们再回到getCurrentOrCreate方法中
public static GlobalTransaction getCurrentOrCreate() {
GlobalTransaction tx = getCurrent();
// 如果获取不到现有的全局事务,那么创建一个
if (tx == null) {
return createNew();
}
return tx;
}
如果XID不存在,那么tx就会为null,这时候将会创建初始的GlobalTransaction。跟进createNew方法看看创建过程
private static GlobalTransaction createNew() {
GlobalTransaction tx = new DefaultGlobalTransaction();
return tx;
}
创建过程非常简单,直接new了一个默认实现类。那么,DefaultGlobalTransaction的构造方法有没有做什么?
DefaultGlobalTransaction() {
// status是unknow表示初始状态,role是launcher表示发起者
this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
}
可以看到,和加入一个事务不同,创建一个新的事务状态是unknow而不是begin,角色是创建人而不是参与者。
到这里我们应该明白,分布式事务中XID的传递显得非常重要,这样你才能把不同机器中的本地事务关联到全局事务当中,而如果存在XID丢失的问题就比较可怕了。
beginTransaction
经过getCurrentOrCreate以后,我们已经获得了一个新的或者既有的全局事务。在业务逻辑执行之前,我们需要先做一次begin。跟进beginTransaction
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeBegin();
// 调用的是全局事务的begin方法
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);
}
}
继续跟进DefaultGlobalTransaction的begin方法
public void begin(int timeout, String name) throws TransactionException {
// 参与者不做后续处理
if (role != GlobalTransactionRole.Launcher) {
return;
}
// 省略
// 调用TransactionManager,begin全局事务
xid = transactionManager.begin(null, null, name, timeout);
// status从UNKNOW变成BEGIN
status = GlobalStatus.Begin;
// 绑定XID到threadLocal
RootContext.bind(xid);
}
可以看到,参与者这里被return了,所以全局事务只会begin一次。调用TransactionManager以后会生成一个XID标识这个全局事务,并绑定到context当中,状态变成BEGIN。
我们跟进TransactionManager的begin方法,看看做了啥
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
// 构建一个begin请求
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
// 同步请求seata的服务端
GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
// 获取服务端的XID
return response.getXid();
}
TransactionManager将发起一个请求到服务端,由服务端来生成一个XID。可见,开启一个全局事务会在服务端生成一份全局事务的信息。这时候全局事务处于begin状态了
commitTransaction
业务逻辑处理完毕,将由commitTransaction来提交全局事务。跟进commitTransaction方法看看
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeCommit();
// 提交全局事务
tx.commit();
triggerAfterCommit();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure);
}
}
和begin一样,将调用GlobalTransaction的commit方法,跟进commit方法
@Override
public void commit() throws TransactionException {
if (role == GlobalTransactionRole.Participant) {
return;
}
// 省略
int retry = COMMIT_RETRY_COUNT;
try {
while (retry > 0) {
try {
// 提交该XID的全局事务
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
retry--;
if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex);
}
}
}
} finally {
if (RootContext.getXID() != null) {
// 解绑XID
if (xid.equals(RootContext.getXID())) {
RootContext.unbind();
}
}
}
}
GlobalTransaction也是调用了TransactionManager的commit方法,finally块将解绑XID,可见如果到finally部分基本上算是结束了。而while部分做了一些重试操作。
再看看TransactionManager的commit方法
@Override
public GlobalStatus commit(String xid) throws TransactionException {
// 全局事务提交的请求
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
// 同步提交全局事务提交请求
GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);
return response.getGlobalStatus();
}
请求Server端,响应回来的status将作为本地全局事务的status。如果一切顺利,到GlobalTransaction的commit这里就已经结束了。
completeTransactionAfterThrowing
标准的事务逻辑除了commit还存在rollback的逻辑,当业务逻辑执行异常的时候要进行rollback回滚操作。
我们跟进completeTransactionAfterThrowing方法看看
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable ex) throws TransactionalExecutor.ExecutionException {
// 判断是否回滚
if (txInfo != null && txInfo.rollbackOn(ex)) {
try {
// 回滚事务
rollbackTransaction(tx, ex);
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, ex);
}
} else {
// 如果不需要回滚,直接commit
commitTransaction(tx);
}
}
可以看到,并不是所有异常都需要回滚的,txInfo包含的回滚规则会判断该异常是否回滚,如果不需要回滚直接commit全局事务。
跟进rollbackTransaction看看回滚操作
private void rollbackTransaction(GlobalTransaction tx, Throwable ex) throws TransactionException, TransactionalExecutor.ExecutionException {
triggerBeforeRollback();
// 调用GlobalTransaction的rollback操作
<span style="color: rgba(0, 0