@[TOC]
一、前言
至此,seata系列的内容包括:
- can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
- Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
- Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
- 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
- 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
- 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
- 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
- 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
- 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
- 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
- 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
本文正式进入Seata最核心的全局事务执行流程。
二、全局事务执行的入口
在【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务一文,我们知道了所谓的@GlobalTransactional
注解开启全局事务,其实就是给类 或 类的方法上标注了@GlobalTransactional
注解的类创建动态代理对象。但是动态代理对象是针对类的;
1、拦截器GlobalTransactionalInterceptor
当一个类中有多个方法并且类没有被@GlobalTransactional
注解标注,但只有一个方法被@GlobalTransactional
注解标注时,这里针对整个类生成了动态代理对象,当调用Bean时,拦截器GlobalTransactionalInterceptor
会做进一步处理,保证只有加了@GlobalTransactional
注解的方法才会开启全局事务。
GlobalTransactionalInterceptor
类的继承图:
GlobalTransactionalInterceptor
实现了MethodInterceptor
接口,所以当每次执行添加了 GlobalTransactionalInterceptor
拦截器的Bean的方法时,都会进入到GlobalTransactionalInterceptor
类覆写MethodInterceptor
接口的invoke()
方法;
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
// method invocation是一次方法调用,一定是针对某个对象的方法调用;
// methodInvocation.getThis()就是拿到当前方法所属的对象;
// AopUtils.getTargetClass()获取到当前实例对象所对应的Class
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
// 通过反射获取到被调用目标Class的method方法
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
// 如果目标method不为空,并且方法的DeclaringClass不是Object
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
// 通过BridgeMethodResolver寻找method的桥接方法
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 获取目标方法的@GlobalTransactional注解
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
// 如果目标方法被@GlobalLock注解标注,获取到@GlobalLock注解内容
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
// 如果禁用了全局事务 或 开启了事务降级检查并且降级检查次数大于等于降级检查允许的次数
// 则localDisable等价于全局事务被禁用了
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
// 如果全局事务没有被禁用
if (!localDisable) {
// 全局事务注解不为空 或者 AOP切面全局事务核心配置不为空
if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
AspectTransactional transactional;
if (globalTransactionalAnnotation != null) {
// 构建一个AOP切面全局事务核心配置,配置的数据从全局事务注解中取
transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
globalTransactionalAnnotation.rollbackForClassName(),
globalTransactionalAnnotation.noRollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.propagation(),
globalTransactionalAnnotation.lockRetryInterval(),
globalTransactionalAnnotation.lockRetryTimes());
} else {
transactional = this.aspectTransactional;
}
// 真正处理全局事务的入口
return handleGlobalTransaction(methodInvocation, transactional);
} else if (globalLockAnnotation != null) {
// 获取事务锁
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
// 直接运行目标方法
return methodInvocation.proceed();
}
invoke()方法解析
1)方法入参--MethodInvocation
invoke()方法的入参为MethodInvocation
,MethodInvocation是一次方法调用,并且是针对某个对象的方法调用;
- methodInvocation.getThis()会拿到当前方法所属的对象;
在通过methodInvocation.getThis()
会拿到当前方法所属的对象时,如果获取到的是null,则使用AopUtils.getTargetClass()
获取到当前实例对象所对应的Class(如果被AOP代理,则是代理类,否则是普通类)。
2)判断目标方法是否需要开启全局事务
直接通过反射拿到目标Class的method方法;如果method不为空,并且method所属的类不是Object类;再判断如果method直接或间接被GlobalTransactional
注解标注,并且没有禁用全局事务,则再进一步判断全局事务是否被禁用,如果没有被禁用则执行全局事务。
3)开始处理全局事务
handleGlobalTransaction()方法中真正开始进行全局事务的处理。方法具体内容见<三、全局事务执行>
2、不用开启全局事务的情况
1)全局事务被禁用
在判断完method直接或间接被@GlobalTransactional
标注之后,会判断全局事务是否被禁用,如果被禁用则至今运行目标方法。
禁用全局事务有两种方式:
1> 显示的设置disable属性
- 配置
service.disableGlobalTransaction
,默认为false,表示不禁用全局事务;
2> 开启了事务降级检查,并且降级检查次数大于等于降级检查允许的次数
- 配置
client.tm.degradeCheck
,默认为false,表示不开启事务降级检查; - 配置
client.tm.degradeCheckAllowTimes
,只有当开启事务降级检查,这个配置才有意义;
2)某一个类被标注的注解,但Object超类下的所有方法仍都不会开启全局事务
在GlobalTransactionalInterceptor#invoke()方法中会判断如果目标类的方法是Object类下的方法,则不会执行全局事务;
3)某一个方法标注了事务注解,其余方法没标注,并且类没有被标注,其余方法都不会开启全局事务
假如我们调用TradeService
类中没有标注@GlobalTransactional
注解的test()方法(且 TradeService类也没有标注@GlobalTransaction注解);
invoke()方法中会再次判断 当前调用的bean的方法 或 方法所处的类上是否标注了@GlobalTransactional
注解,如果没有标注,则执行运行目标方法;否则才会以全局事务的方式执行方法。
三、全局事务执行
在上面我们聊了GlobalTransactionalInterceptor#handleGlobalTransaction()方法会进行全局事务的处理;
全局事务的执行会交给全局事务执行业务逻辑的模板TransactionalTemplate
,并将目标方法封装到TransactionalExecutor
中作为全局事务中执行业务逻辑的回调。
全局事务执行模板TransactionalTemplate
全局事务的整体执行流程体现在TransactionalTemplate#execute()方法中:
具体代码 和 注释:
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. Get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
// 获取当前事务,根据ThreadLocal,获取当前线程本地变量副本中的xid,进而判断是否存在一个全局事务
// 刚开始一个全局事务时,肯定是没有全局事务的
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 Handle the transaction propagation.
// 从全局事务的配置里 获取事务传播级别,默认是REQUIRED(如果存在则加入,否则开启一个新的)
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
// 根据事务的隔离级别做不同的处理
switch (propagation) {
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction(tx)) {
// 事务存在,则挂起事务(默认将xid从RootContext中移除)
suspendedResourcesHolder = tx.suspend();
}
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
if (tx == null) {
// 创建全局事务(角色为事务发起者),并关联全局事务管理器
tx = GlobalTransactionContext.createNew();
}
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
// 开启全局事务,如果事务角色是'GlobalTransactionRole.Launcher',发送开始事务请求到seata-server(TC)
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
// 执行业务方法,把全局事务ID通过 MVC拦截器 / dubbo filter传递到后面的分支事务;
// 每个分支事务都会去运行
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
// 如果全局事务执行发生了异常,则回滚;
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
// 全局事务和分支事务运行无误,提交事务;
commitTransaction(tx);
return rs;
} finally {
//5. clear
// 全局事务完成之后做一些清理工作
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
// 如果有挂起的全局事务,则恢复全局事务
tx.resume(suspendedResourcesHolder);
}
}
}
整个全局事务的执行由八步组成:
- 从线程本地变量副本中获取到xid,进而判断是否存在一个全局事务;
- 根据事务的隔离级别,对已存在的全局事务做不同的处理,包括:挂起事务、新建一个事务.....
最后如果事务为空,则创建一个新的全局事务(刚开始一个新的全局事务时,会走进这个逻辑)- 开启一个全局事务;
- 执行业务方法,把全局事务ID通过 MVC拦截器 / dubbo filter传递到后面的分支事务;
- 如果全局事务执行发生了异常,则通知TC回滚全局事务和所有的分支事务;
- 如果全局事务和分支事务运行无误,提交事务;
- 无论全局事务是否运行成功,都需要清理占用的全局锁资源;
- 最后,如果存在被挂起的全局事务,则恢复全局事务。
下面我们针对每一步具体来看;
1、第一步:判断是否存在一个全局事务
因为执行分支事务时,分支事务的业务方法也有可能被@GlobalTransactional注解直接或间接修饰,进而导致分支事务和全局事务的执行入口是一样的;所以需要先判断是否存在一个全局事务(而当存在全局事务时,分支事务应该如何执行,我们下一篇文章讨论)。
刚开始执行一个全局事务时,当前线程本地变量副本中的xid为null,即不存在一个全局事务。
2、第二步:根据事务的隔离级别做不同的处理
默认事务的隔离级别为REQUIRED
:即:如果当前存在一个事务,则加入事务;否者新建一个事务。
由于刚开始执行一个全局事务时,不存在事务,所以默认会新建一个全局事务。
GlobalTransactionContext.createNew()
负责新建一个全局事务:
6种事务隔离级别的具体逻辑
1> NOT_SUPPORTED
- 不支持事务: 如果事务存在,则挂起事务(默认将xid从RootContext中移除,记录下挂起的事务资源)
2> REQUIRES_NEW
- 新建一个事务:如果事务存在,则挂起事务,再新建一个事务。
3> SUPPORTS
- 支持事务:如果当前存在事务,则加入事务,不存在事务,则以非事务方式执行。
4> REQUIRED(默认事务模式)
- 必须有事务:如果当前存在一个事务,则加入事务;否者新建一个事务。
5> NEVER
- 不支持事务:如果当前存在事务,则报错;否则以非事务方式执行。
6> MANDATORY
- 强制使用事务:如果当前不存在事务,则报错;否则加入事务执行。
3、第三步:开启全局事务
在开启全局事务前后会有钩子函数,默认开启全局事务前后的两个钩子中没有任何实现,如果有需要可以自己定制。这个业务执行前后的钩子函数在Spring体系中随处可见。
整个开启全局事务的逻辑如下:
开启全局事务时,会首先判断事务的角色是否
Launcher
,即全局事务;刚开始执行一个全局事务时,创建出来的DefaultGlobalTransaction
,其role就是Launcher
,也就是说事务角色为全局事务。
- 如果事务的角色不是全局事务,则会断言xid不许为null,否者抛出异常
IllegalStateException
;- 当事务为全局事务时,首选断言xid为null,否者抛出异常
IllegalStateException
;因为超时重试机制的缘故,会再次判断线程本地上下文中的xid是否为null,如果不为null,同样抛出异常IllegalStateException
。- 请求TC(seata-server)开启全局事务,并获取到全局事务xid。
- 请求TC开启全局事务之后,设置事务的状态为开启,并将全局事务xid绑定到线程本地变量副本上。
下面着重看一下TM如何请求TC开启全局事务并获取到xid?
TM如何请求TC开启全局事务
全局事务发起者TM,会通过netty和TC进行网络通信;其中包括对seata-server集群的负载均衡,在获取到相应seata-server实例对应的channel之后,会进步处理请求的发送和相应结果的接收。
在写Channel之前,channelWritableCheck()
方法会检查channel是否可写。
TM / RM 和TC的RPC通信均是异步进行的:
- TM / RM 发送请求时,将封装了
CompletableFuture
的MessageFuture放到futures
(ConcurrentHashMap<Integer, MessageFuture>)中;- TC处理完请求之后,会通过netty框架发送响应到TM / RM 的
AbstractNettyRemoting
中,其再将futures
中的MessageFuture完成,发送请求的代码段中messageFuture.get()
会获取到返回值,停止阻塞。
TM发送请求之后,TC如何接收请求,如何处理请求?
TC接收到TM的请求如何开启全局事务
在【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信一文中,我们聊了Seata Client 如何和Seata Server建立连接、通信;
又在【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么一文中,我们知道了TC(Seata Server)启动之后,AbstractNettyRemotingServer的内部类ServerHandler负责接收并处理请求。
ServerHandler类上有个@ChannelHandler.Sharable
注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。
processMessage(ctx, (RpcMessage) msg)
方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。
/**
* Rpc message processing.
*
* @param ctx Channel handler context.
* @param rpcMessage rpc message.
* @throws Exception throws exception process message error.
* @since 1.3.0
*/
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
}
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
// 根据消息的类型获取到请求处理组件和请求处理线程池组成的Pair
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
// 如果消息对应的处理器设置了线程池,则放到线程池中执行
if (pair.getSecond() != null) {
try {
pair.getSecond().execute(() -> {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
// 线程池拒绝策略之一,抛出异常:RejectedExecutionException
LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
if (allowDumpStack) {
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@")[0];
long idx = System.currentTimeMillis();
try {
String jstackFile = idx + ".log";
LOGGER.info("jstack command will dump to " + jstackFile);
Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
} catch (IOException exx) {
LOGGER.error(exx.getMessage());
}
allowDumpStack = false;
}
}
} else {
// 对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
} else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
}
} else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
}
}
Seata Serer接收到请求的执行链路为:
又由于TM发送开启事务请求时的RPCMessage的body为GlobalBeginRequest:
所以进入到:
又由于在DefaultCoordinator#onRequest()
方法中,将DefaultCoordinator
自身绑定到了AbstractTransactionRequestToTC
的handler
属性中:
所以进入到:
而AbstractExceptionHandler#exceptionHandleTemplate()
方法只是运行方法的入参Callback
,即接着会进入到:
DefaultCore执行开启全局事务的业务逻辑
DefaultCore#begin()方法负责开启全局事务的业务逻辑,方法的入参包括:开启全局事务的应用程序名称、事务服务分组名称、事务名称(开启全局事务的方法名以及方法的入参类型)、事务超时时间。
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// 创建一个全局事务会话
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
// 通过MDC把XID放入线程本地变量ThreadLocal中(MDC是Slf4j提供的工具)
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
// 添加对全局事务会话生命周期的监听
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// 开启全局事务会话
session.begin();
// transaction start event
// 发布全局事务开启事件 做指标监控
MetricsPublisher.postSessionDoingEvent(session, false);
// 返回全局事务会话的xid
return session.getXid();
}
seata-server开启全局事务的流程:
- 创建一个全局会话GlobalSession;
- 通过MDC把XID放入线程本地变量ThreadLocal中,并添加对全局事务会话生命周期的监听;
- 开启全局事务会话;
- 发布全局事务开启事件 做指标监控;
- 返回全局事务会话的xid。
1> 第一步:创建全局会话GlobalSession
创建全局会话的最主要的点是根据雪花算法生成全局事务ID(transactionId)、XID(seata server的IP、Port和transactionId使用:拼接到一起)。
Seata如何使用雪花算法生成全局事务ID的见文章:【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?
2> 第二步:把XID放入线程本地变量副本,并添加对全局事务会话生命周期的监听
3> 第三步:开启全局事务会话
开启全局事务会话的逻辑主要在遍历所有的生成周期监听函数,执行begin事件;
根据我们启动Seata Server时选择的store.mode
,会执行不同的SessionLifecycleListener
:
博主启动Seata Server时store.mode = db
,所以我这里的SessionLifecycleListener
为DataBaseSessionManager
:
DataBaseSessionManager
执行begin事件的链路如下:
这里其实就是将全局事务会话信息持久化到DB中:
- 首先将全局事务会话信息封装到
GlobalTransactionDO
模型中; - 然后使用JDBC将全局事务会话信息持久化到表
global_table
中;
所谓的开启全局事务会话,其实就是将全局事务会话信息持久化到Store.mode中。
4> 第四步:发布全局事务开启事件 做指标监控
这一块对了解seata事务的执行主流程没影响,不需要耗费特别大的精力关注,如果有指标监控的需求再重点看。
5> 返回全局事务会话的xid
4、第四步 --- 第八步:见下一篇博文
点个关注、订阅订阅专栏,下一篇系列文章更精彩。
执行业务方法(AT模式下)、全局事务回滚、全局事务提交、全局锁资源释放见下一篇博文。
四、总结
本文重点聊了Seata事务执行流程中TM、TC中如何开启全局事务;其中设计几个比较关键的类:
- TransactionalExecutor --> 全局事务执行组件
- TransactionalTemplate --> 全局事务生命周期模板管理组件,负责管理事务的生命周期;
- TransactionManager --> 全局事务管理组件,负责执行事务的业务逻辑;
- DefaultCore --> Seata Server端事务业务的执行逻辑,封装了AT、TCC、Saga、XA分布式事务模式的具体实现。