@[TOC]
一、前言
至此,seata系列的内容包括:
- can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
- Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
- Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
- 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
- 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
- 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
- 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
- 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
- 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
- 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
- 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
- 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务
Seata最核心的全局事务执行流程,前面我们已经聊到了Seata全局事务的开启,本文接着聊Seata全局事务中执行具体业务操作时,DB操作是如何执行的(含:全局锁keys、undologs的构建(AT模式))?
二、本地事务SQL执行流程
全局事务的整体执行流程体现在TransactionalTemplate#execute()方法中:
具体代码 和 注释:
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. Get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
// 获取当前事务,根据ThreadLocal,获取当前线程本地变量副本中的xid,进而判断是否存在一个全局事务
// 刚开始一个全局事务时,肯定是没有全局事务的
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 Handle the transaction propagation.
// 从全局事务的配置里 获取事务传播级别,默认是REQUIRED(如果存在则加入,否则开启一个新的)
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
// 根据事务的隔离级别做不同的处理
switch (propagation) {
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction(tx)) {
// 事务存在,则挂起事务(默认将xid从RootContext中移除)
suspendedResourcesHolder = tx.suspend();
}
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
if (tx == null) {
// 创建全局事务(角色为事务发起者),并关联全局事务管理器
tx = GlobalTransactionContext.createNew();
}
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
// 开启全局事务,如果事务角色是'GlobalTransactionRole.Launcher',发送开始事务请求到seata-server(TC)
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
// 执行业务方法,把全局事务ID通过 MVC拦截器 / dubbo filter传递到后面的分支事务;
// 每个分支事务都会去运行
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
// 如果全局事务执行发生了异常,则回滚;
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
// 全局事务和分支事务运行无误,提交事务;
commitTransaction(tx);
return rs;
} finally {
//5. clear
// 全局事务完成之后做一些清理工作
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
// 如果有挂起的全局事务,则恢复全局事务
tx.resume(suspendedResourcesHolder);
}
}
}
在前一篇文章: 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务,我们已经聊到了开启全局事务,本文继续聊开启全局事务之后,本地事务中的SQL执行流程。
1、DataSourceProxy 数据库资源代理入口
在Spring Cloud 整合Seata实现分布式事务一文中有聊到Spring Cloud 集成Seata 的AT模式,需要写一个配置类DataSourceConfig
,其中会注入一个Bean(DataSourceProxy
):
到这里,博主有一个问题:注入DataSourceProxy到Spring容器中之后,哪里会用到它?执行数据增删改查时如何切换到代理数据源?
1)哪里使用了DataSourceProxy?
从源码来看,有一个Spring AOP抽象类AbstractAutoProxyCreator
的子类SeataAutoDataSourceProxyCreator
;
Spring 通过 AbstractAutoProxyCreator来创建 AOP 代理,其实现了BeanPostProcessor 接口,在 bean 初始化完成之后会创建它的代理,让后将代理对象增加到Spring容器。
在Seata 中,SeataAutoDataSourceProxyCreator的主要作用是为数据源DataSource
添加Advisor
,当数据源执行操作时,便会进入到SeataAutoDataSourceProxyAdvice
类中处理;
因此,当数据源执行CRUD操作时,由于添加了AOP代理,会进入到SeataAutoDataSourceProxyAdvice#invoke()方法中:
咦,这里没有DataSourceProxy呀,只有SeataDataSourceProxy,从命名来看,这俩类总感觉有点关系!
2)SeataDataSourceProxy
从DataSourceProxy类的继承结构来看,DataSourceProxy实现了SeataDataSourceProxy
接口;因此SeataAutoDataSourceProxyAdvice#invoke()方法中
动态代理类实际就是DataSourceProxy
。
2、本地事务SQL的执行流程(execute)
1)执行本地事务SQL的入口
JDBC的执行流程:
- 第一步:注册驱动;
- 第二步:获取与数据库的连接Connection;
- 第三步:获取数据库操作对象Statement;
- 第四步:执行sql语句(DQL、DML…),并且返回结果集;
- 第五步:处理查询结果集;
- 第六步:释放资源、关闭连接;
try {
//加载数据库驱动
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
// do something
}
Connection conn = DriverManager.getConnection(URL, USER_NAME, PASSWORD);
PreparedStatement pst = conn.prepareStatement("update user set name=? where id = ?");
pst.setString(1, "bobDog");
pst.setInt(2, 1);
int updateRes = pst.executeUpdate();
if (updateRes > 0) {
System.out.println("更新成功!");
}
Seata代理的数据库资源DataSource底层也是JDBC操作数据库,所以也需要先获取数据库连接Connection、再根据数据库连接获取数据库操作对象Statement、接着再通过Statement#execute()执行SQL。在Seata中的表现为:
- 先获取seata代理的数据库连接ConnectionProxy;
- 再根据ConnectionProxy获取一个数据库操作对象
StatementProxy
或PreparedStatementProxy
;- 然后再利用数据库操作对象
StatementProxy
或PreparedStatementProxy
的execute() 或 executeUpdate() 方法执行SQL语句。
StatementProxy
或 PreparedStatementProxy
增强了所有的execute方法,由ExecuteTemplate选择需要的Executor执行来sql。
下面以常见的更新操作(PreparedStatementProxy#executeUpdate()
)为例:
ExecuteTemplate#execute()
重载方法调用链路如下:
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
// 没获取到全局锁,并且事务模式不是AT
if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
// 获取DB的类型
String dbType = statementProxy.getConnectionProxy().getDbType();
if (CollectionUtils.isEmpty(sqlRecognizers)) {
sqlRecognizers = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
dbType);
}
Executor<T> executor;
if (CollectionUtils.isEmpty(sqlRecognizers)) {
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else {
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
// 数据库操作类型
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case INSERT_ON_DUPLICATE_UPDATE:
switch (dbType) {
case JdbcConstants.MYSQL:
case JdbcConstants.MARIADB:
executor =
new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
default:
throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
}
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
}
T rs;
try {
// 通过Executor真正的执行
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException) ex;
}
return rs;
}
- 如果当前事务不需要获取全局锁,并且不是AT模式,则以original statement的方式执行。默认Seata Client层面不需要获取全局锁,事务模式是AT模式。
- 获取到的DB类型,比如MySQL、Oracle.....,博主的项目DBType是MYSQL。
- 获取SQL DML类型,并根据DML类型,选择不同的Executor。这里可以看做是策略模式。
因为示例是Update类型,所以最终选择的Executor是UpdateExecutor。
### 2)执行本地事务SQL逻辑
UpdateExecutor#execute()方法中会执行本地事务SQL,UpdateExecutor的类继承图如下:
除了数据更新前后的Image构造体现在UpdateExecutor类的方法中,其余方法均在其父类BaseTransactionalExecutor
中,包括execute()方法。
@Override
public T execute(Object... args) throws Throwable {
// 从全局事务上下文中获取xid
String xid = RootContext.getXID();
if (xid != null) {
// 将xid绑定到ConnectionContext中,后续提交本地事务时会用到
statementProxy.getConnectionProxy().bind(xid);
}
// RootContext.requireGlobalLock()检查是否需要全局锁,默认不需要
statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
return doExecute(args);
}
开始执行本地事务SQL时:
- 首先从全局事务上下文RootContext中获取到xid,如果存在全局事务xid,则将xid绑定到数据库连接的上下文ConnectionContext中;
- 从全局事务上下文RootContext获取是否全局锁标识,默认不需要;如果需要获取全局锁,则将数据库连接上下文ConnectionContext中的
isGlobalLockRequire
设置为true;- 调用doExecute()方法真正开始执行SQL;
UpdateExecutor#doExecutor()方法:
开启了全局事务之后,DML语句的本地事务不会自动提交。
即使自动提交没有关闭,AbstractDMLBaseExecutor#doExecute(Object… args)
方法中也会先将其关闭,然后再以非自动提交的方式执行SQL,走ConnectionProxy提交本地事务,然后再将自动提交设置为true;这一块逻辑体现在executeAutoCommitTrue()
方法中:
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
connectionProxy.changeAutoCommit();
return new LockRetryPolicy(connectionProxy).execute(() -> {
T result = executeAutoCommitFalse(args);
connectionProxy.commit();
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
}
}
正常情况下都是直接以非自动提交的方式执行,即执行executeAutoCommitFalse()方法:
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
}
// 根据SQL语句构建before image,目标SQL执行之前的数据镜像:从数据库根据ID主键等信息查询出更新前的数据;
TableRecords beforeImage = beforeImage();
// 真正的去执行SQL语句,但是本地事务还没有提交
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
int updateCount = statementProxy.getUpdateCount();
if (updateCount > 0) {
// 目标SQL执行之后的数据镜像:从数据库根据ID主键等信息查询出更新后的数据;
TableRecords afterImage = afterImage(beforeImage);
// 准备好undo log数据
prepareUndoLog(beforeImage, afterImage);
}
return result;
}
由于AbstractDMLBaseExecutor提供了公用的executeAutoCommitFalse()
给Insert、Delete、Update类型的Executor使用,所以无论是Insert、Delete还是Update操作都会走AbstractDMLBaseExecutor#executeAutoCommitFalse()
方法执行SQL。不过MySQL的MySQLInsertOrUpdateExecutor
是个个例,其执行SQL的逻辑由自己实现(有兴趣可以自己看一下MySQLInsertOrUpdateExecutor)。
以非自动提交执行SQL的流程如下:
beforeImage()
-- 根据SQL语句构建before image,查询目标sql执行前的数据快照;
- Update、Delete操作从数据库根据ID主键等信息查询出更新前的数据;
- Insert操作直接返回空的TableRecords,其中只包含TableMeta,没有数据记录;
- 执行SQL语句,但是本地事务还没有提交;
afterImage()
-- 构建after image,查询目标SQL执行之后的数据快照;
- Insert、Update操作从数据库根据ID主键等信息查询出更新后的数据;
- Delete操作直接返回空的TableRecords,其中只包含TableMeta,没有数据记录;
prepareUndoLog(beforeImage, afterImage)
--> 将before image 和 after image合并作为回滚日志undo log,保存到当前数据库连接上下文ConnectionContext中。
其中还包括构建当前本地事务要占用所有全局锁key信息,然后将其保存到当前数据库连接上下文ConnectionContext中。
下面就这几步展开看一看;
1> 构建before image
此处依旧以Update为例:
点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html),具体细节见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志)
2> 执行SQL
最终使用源Statement执行SQL;
3> 构建after image
执行完SQL之后,再构建SQL查询出当前最新的数据记录作为after image;
点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html),具体细节见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志)
4> 预处理undo log
将before image 和 after image合并作为回滚日志undo log,存储到当前数据库连接上下文ConnectionContext中。
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
return;
}
if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
if (beforeImage.getRows().size() != afterImage.getRows().size()) {
throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
}
}
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
// 1、构建全局锁key信息,针对更新的一批数据主键ID构建这批数据的全局锁key
// 例如:table_name:id_1101
String lockKeys = buildLockKey(lockKeyRecords);
if (null != lockKeys) {
// 将lockKeys信息保存到ConnectionContext中,在注册分支事务时,再将全局锁信息放入到TC中进行检查、存储
connectionProxy.appendLockKey(lockKeys);
// 2、构建undo log
SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
// 将undo log信息保存到ConnectionContext中
connectionProxy.appendUndoLog(sqlUndoLog);
}
}
点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html),具体细节见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志)
由于关闭了AutoCommit,所以在Statement.execute()执行完SQL之后,需要“手动”提交本地事务。
3、本地事务SQL的提交(commit)
回到ConnectionProxy#commit()方法,这里是“手动”提交本地事务的入口;
@Override
public void commit() throws SQLException {
try {
// 由LockRetryPolicy负责提交事务,LockRetryPolicy中包含全局锁的概念,支持retry重试策略
lockRetryPolicy.execute(() -> {
doCommit();
return null;
});
} catch (SQLException e) {
if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
rollback();
}
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}
本地事务的提交又会委托给LockRetryPolicy的execute方法来执行;
1)LockRetryPolicy重试机制
LockRetryPolicy是ConnectionProxy的静态内部类,其中包含了全局锁的概念,支持retry策略,当出现全局锁冲突时支持多次重试获取全局锁。
默认情况下execute()方法中:
LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT
为TRUE,可以通过配置client.rm.lock.retryPolicyBranchRollbackOnConflict=false
属性改变;connection.getContext().isAutoCommitChanged()
为FALSE;
所以默认情况下,都会走重试获取全局锁的逻辑:doRetryOnLockConflict()
方法。(当然可以选择开启自动提交事务、并设置属性client.rm.lock.retryPolicyBranchRollbackOnConflict=true
,这样便不会走重试获取全局锁逻辑。)
protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
LockRetryController lockRetryController = new LockRetryController();
while (true) {
try {
return callable.call();
} catch (LockConflictException lockConflict) {
// 出现全局锁冲突,回滚本地事务
onException(lockConflict);
// AbstractDMLBaseExecutor#executeAutoCommitTrue the local lock is released
if (connection.getContext().isAutoCommitChanged()
&& lockConflict.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) {
lockConflict.setCode(TransactionExceptionCode.LockKeyConflict);
}
// 线程睡眠10ms,然后再重试,超过重试次数,抛出异常结束流程
lockRetryController.sleep(lockConflict);
} catch (Exception e) {
// 出现非全局锁冲突的异常,则直接报错返回
onException(e);
throw e;
}
}
}
在doRetryOnLockConflict()
方法中:
- 如果因为全局锁冲突导致提交本地事务失败,先回滚本地事务,然后会判断重试次数(
lockRetryTimes
,默认30次)再进行重试,重试之前会让线程睡眠一段时间(lockRetryInterval
,默认10ms)。如果重试次数已经够了,则直接抛出异常结束流程。 - 如果因为其他异常(包括超过重试次数)导致提交本地事务失败,直接回滚本地事务、抛出异常结束流程。
上面的ConnectionProxy#onException()
方法中负责回滚本地事务、清理当前连接的ConnectionContext中的undo log信息、全局锁keys信息;
了解完了全局锁冲突引起的重试机制,下面接着看本地事务的提交流程。
2)本地事务提交流程
LockRetryPolicy#execute()方法中会运行方法的入参Callable,在ConnectionProxy#commit()方法中传入的到LockRetryPolicy#execute()方法中的Callable为:
() -> {
doCommit();
return null;
}
doCommit()方法:
private void doCommit() throws SQLException {
// 当前DML操作在全局事务中时,判定条件:ConnectionContext中包含xid
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
// 如果使用了@GlobalLock,需要获取全局锁
processLocalCommitWithGlobalLocks();
} else {
// 不在分布式事务中,则以原生connection提交本地事务
targetConnection.commit();
}
}
doCommit()方法中分三种情况进行不同的处理:
- 如果当前DML操作在全局事务中,即:当前连接的ConnectionContext中包含xid,则以处理全局事务方式(
processGlobalTransactionCommit(
)提交本地事务;- 如果使用了@GlobalLock,需要获取全局锁,再以原生connection提交本地事务;
- 否则如果事务不在分布式事务中,则以原生connection提交本地事务;
正常我们使用分布式事务,一般肯定是要以全局事务的方式执行DML操作;即:默认会进入到processGlobalTransactionCommit():
private void processGlobalTransactionCommit() throws SQLException {
try {
// 向远程的TC中注册分支事务,并检查、增加全局行锁
register();
} catch (TransactionException e) {
// 出现异常时,回滚本地事务 再重试。
// 大多数情况是因为全局锁冲突走到这里。
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
// 回滚日志管理组件,持久化undo log
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
// 提交本地事务
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
// 上报分支事务执行失败,用于监控
report(false);
throw new SQLException(ex);
}
// 上报分支事务执行成功,默认不会上报
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
// 重置连接的ConnectionContext
context.reset();
}
以全局事务的方式提交本地事务会做四件事:
通过netty请求TC,注册分支事务,并检查、增加全局行锁;
- 如果出现异常,则回滚本地事务。若异常类型为全局锁冲突
LockConflictException
,则进入重试策略;其他异常类型则直接抛出SQLException
;- 将执行SQL时保存到ConnectionContext中的undo log 回滚日志 保存到DB;
- 提交本地事务,真正将业务数据和回滚日志 持久化到DB;
向TC上报本地事务提交结果;
- 如果持久化undo log 或 提交本地事务出现异常,则上报分支事务执行失败;
- 如果本地事务提交成功,上报分支事务执行成功;默认并不会上报。
最后,清空当前数据库连接的ConnectionContext。
点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html);
- 分支事务的注册细节见下一篇文章(【微服务41】分布式事务Seata源码解析九:分支事务如何注册到全局事务);
- undo log持久化细节 见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志);
三、总结
AT模式下本地事务的SQL执行流程,即RM的分支事务执行流程,主要包括一下几步:
- 开始执行本地事务的SQL之前,从全局事务上下文RootContext中获取到xid,然后将xid绑定到数据库连接的上下文ConnectionContext中;
- 构建before image,查询目标sql执行前的数据快照;
- 执行目标SQL语句,但是本地事务还没有提交;
- 构建after image,查询目标SQL执行之后的数据快照;
- 将before image 和 after image合并作为回滚日志undo log,保存到当前数据库连接上下文ConnectionContext中;
- 构建当前本地事务要占用所有全局锁key信息,然后将其保存到当前数据库连接上下文ConnectionContext中;
- 通过netty请求TC,注册分支事务,并检查、增加全局行锁;这里可能会出现全局锁冲突 导致注册分支事务失败,所以有一个重试机制;
- 将执行SQL时保存到ConnectionContext中的undo log 回滚日志 保存到DB(undo_log表);
- 提交本地事务;
- 向TC上报本地事务提交结果;
- 最后清空当前数据库连接的ConnectionContext,恢复现场。
整个SQL提交可以理解为两阶段提交:
- 一阶段:先注册分支事务,检查全局锁。
- 二阶段:插入undolog、提交本地事务。