分布式事务Seata源码解析八:本地事务执行流程(AT模式下)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 分布式事务Seata源码解析八:本地事务执行流程(AT模式下)

@[TOC]

一、前言

至此,seata系列的内容包括:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
  8. 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
  9. 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
  10. 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
  11. 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
  12. 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务

Seata最核心的全局事务执行流程,前面我们已经聊到了Seata全局事务的开启,本文接着聊Seata全局事务中执行具体业务操作时,DB操作是如何执行的(含:全局锁keys、undologs的构建(AT模式))?

在这里插入图片描述

二、本地事务SQL执行流程

全局事务的整体执行流程体现在TransactionalTemplate#execute()方法中:

在这里插入图片描述

具体代码 和 注释:

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. Get transactionInfo
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
    // 获取当前事务,根据ThreadLocal,获取当前线程本地变量副本中的xid,进而判断是否存在一个全局事务
    // 刚开始一个全局事务时,肯定是没有全局事务的
    GlobalTransaction tx = GlobalTransactionContext.getCurrent();

    // 1.2 Handle the transaction propagation.
    // 从全局事务的配置里 获取事务传播级别,默认是REQUIRED(如果存在则加入,否则开启一个新的)
    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        // 根据事务的隔离级别做不同的处理
        switch (propagation) {
            case NOT_SUPPORTED:
                // If transaction is existing, suspend it.
                if (existingTransaction(tx)) {
                    // 事务存在,则挂起事务(默认将xid从RootContext中移除)
                    suspendedResourcesHolder = tx.suspend();
                }
                // Execute without transaction and return.
                return business.execute();
            case REQUIRES_NEW:
                // If transaction is existing, suspend it, and then begin new transaction.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend();
                    tx = GlobalTransactionContext.createNew();
                }
                // Continue and execute with new transaction
                break;
            case SUPPORTS:
                // If transaction is not existing, execute without transaction.
                if (notExistingTransaction(tx)) {
                    return business.execute();
                }
                // Continue and execute with new transaction
                break;
            case REQUIRED:
                // If current transaction is existing, execute with current transaction,
                // else continue and execute with new transaction.
                break;
            case NEVER:
                // If transaction is existing, throw exception.
                if (existingTransaction(tx)) {
                    throw new TransactionException(
                        String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                , tx.getXid()));
                } else {
                    // Execute without transaction and return.
                    return business.execute();
                }
            case MANDATORY:
                // If transaction is not existing, throw exception.
                if (notExistingTransaction(tx)) {
                    throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                // Continue and execute with current transaction.
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }

        // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
        if (tx == null) {
            // 创建全局事务(角色为事务发起者),并关联全局事务管理器
            tx = GlobalTransactionContext.createNew();
        }

        // set current tx config to holder
        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

        try {
            // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
            //    else do nothing. Of course, the hooks will still be triggered.
            // 开启全局事务,如果事务角色是'GlobalTransactionRole.Launcher',发送开始事务请求到seata-server(TC)
            beginTransaction(txInfo, tx);

            Object rs;
            try {
                // Do Your Business
                // 执行业务方法,把全局事务ID通过 MVC拦截器 / dubbo filter传递到后面的分支事务;
                // 每个分支事务都会去运行
                rs = business.execute();
            } catch (Throwable ex) {
                // 3. The needed business exception to rollback.
                // 如果全局事务执行发生了异常,则回滚;
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            // 全局事务和分支事务运行无误,提交事务;
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            // 全局事务完成之后做一些清理工作
            resumeGlobalLockConfig(previousConfig);
            triggerAfterCompletion();
            cleanUp();
        }
    } finally {
        // If the transaction is suspended, resume it.
        if (suspendedResourcesHolder != null) {
            // 如果有挂起的全局事务,则恢复全局事务
            tx.resume(suspendedResourcesHolder);
        }
    }
}

在前一篇文章: 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务,我们已经聊到了开启全局事务,本文继续聊开启全局事务之后,本地事务中的SQL执行流程。

在这里插入图片描述

1、DataSourceProxy 数据库资源代理入口

Spring Cloud 整合Seata实现分布式事务一文中有聊到Spring Cloud 集成Seata 的AT模式,需要写一个配置类DataSourceConfig,其中会注入一个Bean(DataSourceProxy):

在这里插入图片描述

到这里,博主有一个问题:注入DataSourceProxy到Spring容器中之后,哪里会用到它?执行数据增删改查时如何切换到代理数据源?

1)哪里使用了DataSourceProxy?

在这里插入图片描述

从源码来看,有一个Spring AOP抽象类AbstractAutoProxyCreator的子类SeataAutoDataSourceProxyCreator

Spring 通过 AbstractAutoProxyCreator来创建 AOP 代理,其实现了BeanPostProcessor 接口,在 bean 初始化完成之后会创建它的代理,让后将代理对象增加到Spring容器。

在Seata 中,SeataAutoDataSourceProxyCreator的主要作用是为数据源DataSource添加Advisor,当数据源执行操作时,便会进入到SeataAutoDataSourceProxyAdvice类中处理;

在这里插入图片描述

因此,当数据源执行CRUD操作时,由于添加了AOP代理,会进入到SeataAutoDataSourceProxyAdvice#invoke()方法中:

在这里插入图片描述

咦,这里没有DataSourceProxy呀,只有SeataDataSourceProxy,从命名来看,这俩类总感觉有点关系!

2)SeataDataSourceProxy

在这里插入图片描述

从DataSourceProxy类的继承结构来看,DataSourceProxy实现了SeataDataSourceProxy接口;因此SeataAutoDataSourceProxyAdvice#invoke()方法中动态代理类实际就是DataSourceProxy

2、本地事务SQL的执行流程(execute)

1)执行本地事务SQL的入口

在这里插入图片描述
JDBC的执行流程:

  1. 第一步:注册驱动;
  2. 第二步:获取与数据库的连接Connection;
  3. 第三步:获取数据库操作对象Statement;
  4. 第四步:执行sql语句(DQL、DML…),并且返回结果集;
  5. 第五步:处理查询结果集;
  6. 第六步:释放资源、关闭连接;
try {
    //加载数据库驱动
    Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
    // do something
}
Connection conn = DriverManager.getConnection(URL, USER_NAME, PASSWORD);
PreparedStatement pst = conn.prepareStatement("update user set name=? where id = ?");
pst.setString(1, "bobDog");
pst.setInt(2, 1);
int updateRes = pst.executeUpdate();
if (updateRes > 0) {
    System.out.println("更新成功!");
}

Seata代理的数据库资源DataSource底层也是JDBC操作数据库,所以也需要先获取数据库连接Connection、再根据数据库连接获取数据库操作对象Statement、接着再通过Statement#execute()执行SQL。在Seata中的表现为:

  1. 先获取seata代理的数据库连接ConnectionProxy;
    在这里插入图片描述
  2. 再根据ConnectionProxy获取一个数据库操作对象 StatementProxyPreparedStatementProxy
    在这里插入图片描述
  3. 然后再利用数据库操作对象 StatementProxyPreparedStatementProxy 的execute() 或 executeUpdate() 方法执行SQL语句。
    在这里插入图片描述
    在这里插入图片描述

StatementProxyPreparedStatementProxy 增强了所有的execute方法,由ExecuteTemplate选择需要的Executor执行来sql。

下面以常见的更新操作(PreparedStatementProxy#executeUpdate())为例:

ExecuteTemplate#execute()重载方法调用链路如下:

在这里插入图片描述

public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                 StatementProxy<S> statementProxy,
                                                 StatementCallback<T, S> statementCallback,
                                                 Object... args) throws SQLException {
    // 没获取到全局锁,并且事务模式不是AT
    if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
        // Just work as original statement
        return statementCallback.execute(statementProxy.getTargetStatement(), args);
    }

    // 获取DB的类型
    String dbType = statementProxy.getConnectionProxy().getDbType();
    if (CollectionUtils.isEmpty(sqlRecognizers)) {
        sqlRecognizers = SQLVisitorFactory.get(
                statementProxy.getTargetSQL(),
                dbType);
    }
    Executor<T> executor;
    if (CollectionUtils.isEmpty(sqlRecognizers)) {
        executor = new PlainExecutor<>(statementProxy, statementCallback);
    } else {
        if (sqlRecognizers.size() == 1) {
            SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
            // 数据库操作类型
            switch (sqlRecognizer.getSQLType()) {
                case INSERT:
                    executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                new Object[]{statementProxy, statementCallback, sqlRecognizer});
                    break;
                case UPDATE:
                    executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case DELETE:
                    executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case SELECT_FOR_UPDATE:
                    executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case INSERT_ON_DUPLICATE_UPDATE:
                    switch (dbType) {
                        case JdbcConstants.MYSQL:
                        case JdbcConstants.MARIADB:
                            executor =
                                new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                            break;
                        default:
                            throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                    }
                    break;
                default:
                    executor = new PlainExecutor<>(statementProxy, statementCallback);
                    break;
            }
        } else {
            executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
        }
    }
    T rs;
    try {
        // 通过Executor真正的执行
        rs = executor.execute(args);
    } catch (Throwable ex) {
        if (!(ex instanceof SQLException)) {
            // Turn other exception into SQLException
            ex = new SQLException(ex);
        }
        throw (SQLException) ex;
    }
    return rs;
}
  1. 如果当前事务不需要获取全局锁,并且不是AT模式,则以original statement的方式执行。默认Seata Client层面不需要获取全局锁,事务模式是AT模式。
  2. 获取到的DB类型,比如MySQL、Oracle.....,博主的项目DBType是MYSQL。
  3. 获取SQL DML类型,并根据DML类型,选择不同的Executor。这里可以看做是策略模式。

因为示例是Update类型,所以最终选择的Executor是UpdateExecutor。

### 2)执行本地事务SQL逻辑

UpdateExecutor#execute()方法中会执行本地事务SQL,UpdateExecutor的类继承图如下:

在这里插入图片描述

除了数据更新前后的Image构造体现在UpdateExecutor类的方法中,其余方法均在其父类BaseTransactionalExecutor中,包括execute()方法。

@Override
public T execute(Object... args) throws Throwable {
    // 从全局事务上下文中获取xid
    String xid = RootContext.getXID();
    if (xid != null) {
        // 将xid绑定到ConnectionContext中,后续提交本地事务时会用到
        statementProxy.getConnectionProxy().bind(xid);
    }

    // RootContext.requireGlobalLock()检查是否需要全局锁,默认不需要
    statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
    return doExecute(args);
}

开始执行本地事务SQL时:

  1. 首先从全局事务上下文RootContext中获取到xid,如果存在全局事务xid,则将xid绑定到数据库连接的上下文ConnectionContext中;
  2. 从全局事务上下文RootContext获取是否全局锁标识,默认不需要;如果需要获取全局锁,则将数据库连接上下文ConnectionContext中的isGlobalLockRequire设置为true;
  3. 调用doExecute()方法真正开始执行SQL;

UpdateExecutor#doExecutor()方法:

在这里插入图片描述

开启了全局事务之后,DML语句的本地事务不会自动提交。

即使自动提交没有关闭,AbstractDMLBaseExecutor#doExecute(Object… args)方法中也会先将其关闭,然后再以非自动提交的方式执行SQL,走ConnectionProxy提交本地事务,然后再将自动提交设置为true;这一块逻辑体现在executeAutoCommitTrue()方法中:

protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    try {
        connectionProxy.changeAutoCommit();
        return new LockRetryPolicy(connectionProxy).execute(() -> {
            T result = executeAutoCommitFalse(args);
            connectionProxy.commit();
            return result;
        });
    } catch (Exception e) {
        // when exception occur in finally,this exception will lost, so just print it here
        LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
        if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
            connectionProxy.getTargetConnection().rollback();
        }
        throw e;
    } finally {
        connectionProxy.getContext().reset();
        connectionProxy.setAutoCommit(true);
    }
}

正常情况下都是直接以非自动提交的方式执行,即执行executeAutoCommitFalse()方法:

protected T executeAutoCommitFalse(Object[] args) throws Exception {
    if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
        throw new NotSupportYetException("multi pk only support mysql!");
    }
    // 根据SQL语句构建before image,目标SQL执行之前的数据镜像:从数据库根据ID主键等信息查询出更新前的数据;
    TableRecords beforeImage = beforeImage();
    // 真正的去执行SQL语句,但是本地事务还没有提交
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    int updateCount = statementProxy.getUpdateCount();
    if (updateCount > 0) {
        // 目标SQL执行之后的数据镜像:从数据库根据ID主键等信息查询出更新后的数据;
        TableRecords afterImage = afterImage(beforeImage);
        // 准备好undo log数据
        prepareUndoLog(beforeImage, afterImage);
    }
    return result;
}

在这里插入图片描述

由于AbstractDMLBaseExecutor提供了公用的executeAutoCommitFalse()给Insert、Delete、Update类型的Executor使用,所以无论是Insert、Delete还是Update操作都会走AbstractDMLBaseExecutor#executeAutoCommitFalse()方法执行SQL。不过MySQL的MySQLInsertOrUpdateExecutor是个个例,其执行SQL的逻辑由自己实现(有兴趣可以自己看一下MySQLInsertOrUpdateExecutor)。

以非自动提交执行SQL的流程如下:

  1. beforeImage() -- 根据SQL语句构建before image,查询目标sql执行前的数据快照;

    • Update、Delete操作从数据库根据ID主键等信息查询出更新前的数据;
    • Insert操作直接返回空的TableRecords,其中只包含TableMeta,没有数据记录;
  2. 执行SQL语句,但是本地事务还没有提交;
  3. afterImage() -- 构建after image,查询目标SQL执行之后的数据快照;

    • Insert、Update操作从数据库根据ID主键等信息查询出更新后的数据;
    • Delete操作直接返回空的TableRecords,其中只包含TableMeta,没有数据记录;
  4. prepareUndoLog(beforeImage, afterImage) --> 将before image 和 after image合并作为回滚日志undo log,保存到当前数据库连接上下文ConnectionContext中。
    其中还包括构建当前本地事务要占用所有全局锁key信息,然后将其保存到当前数据库连接上下文ConnectionContext中。

下面就这几步展开看一看;

1> 构建before image

此处依旧以Update为例:

在这里插入图片描述

点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html),具体细节见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志)

2> 执行SQL

在这里插入图片描述
最终使用源Statement执行SQL;

3> 构建after image

执行完SQL之后,再构建SQL查询出当前最新的数据记录作为after image;

在这里插入图片描述

点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html),具体细节见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志)

4> 预处理undo log

将before image 和 after image合并作为回滚日志undo log,存储到当前数据库连接上下文ConnectionContext中。

protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
    if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
        return;
    }
    if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
        if (beforeImage.getRows().size() != afterImage.getRows().size()) {
            throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
        }
    }
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();

    TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
    // 1、构建全局锁key信息,针对更新的一批数据主键ID构建这批数据的全局锁key
    // 例如:table_name:id_1101
    String lockKeys = buildLockKey(lockKeyRecords);
    if (null != lockKeys) {
        // 将lockKeys信息保存到ConnectionContext中,在注册分支事务时,再将全局锁信息放入到TC中进行检查、存储
        connectionProxy.appendLockKey(lockKeys);

        // 2、构建undo log
        SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
        // 将undo log信息保存到ConnectionContext中
        connectionProxy.appendUndoLog(sqlUndoLog);
    }
}

点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html),具体细节见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志)

由于关闭了AutoCommit,所以在Statement.execute()执行完SQL之后,需要“手动”提交本地事务。

3、本地事务SQL的提交(commit)

回到ConnectionProxy#commit()方法,这里是“手动”提交本地事务的入口;

@Override
public void commit() throws SQLException {
    try {
        // 由LockRetryPolicy负责提交事务,LockRetryPolicy中包含全局锁的概念,支持retry重试策略
        lockRetryPolicy.execute(() -> {
            doCommit();
            return null;
        });
    } catch (SQLException e) {
        if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
            rollback();
        }
        throw e;
    } catch (Exception e) {
        throw new SQLException(e);
    }
}

本地事务的提交又会委托给LockRetryPolicy的execute方法来执行;

1)LockRetryPolicy重试机制

LockRetryPolicy是ConnectionProxy的静态内部类,其中包含了全局锁的概念,支持retry策略,当出现全局锁冲突时支持多次重试获取全局锁。

在这里插入图片描述

默认情况下execute()方法中:

  • LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT为TRUE,可以通过配置client.rm.lock.retryPolicyBranchRollbackOnConflict=false属性改变;
  • connection.getContext().isAutoCommitChanged()为FALSE;

所以默认情况下,都会走重试获取全局锁的逻辑:doRetryOnLockConflict()方法。(当然可以选择开启自动提交事务、并设置属性client.rm.lock.retryPolicyBranchRollbackOnConflict=true,这样便不会走重试获取全局锁逻辑。)

protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
    LockRetryController lockRetryController = new LockRetryController();
    while (true) {
        try {
            return callable.call();
        } catch (LockConflictException lockConflict) {
            // 出现全局锁冲突,回滚本地事务
            onException(lockConflict);
            // AbstractDMLBaseExecutor#executeAutoCommitTrue the local lock is released
            if (connection.getContext().isAutoCommitChanged()
                    && lockConflict.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) {
                lockConflict.setCode(TransactionExceptionCode.LockKeyConflict);
            }
            // 线程睡眠10ms,然后再重试,超过重试次数,抛出异常结束流程
            lockRetryController.sleep(lockConflict);
        } catch (Exception e) {
            // 出现非全局锁冲突的异常,则直接报错返回
            onException(e);
            throw e;
        }
    }
}

doRetryOnLockConflict()方法中:

  • 如果因为全局锁冲突导致提交本地事务失败,先回滚本地事务,然后会判断重试次数(lockRetryTimes,默认30次)再进行重试,重试之前会让线程睡眠一段时间(lockRetryInterval,默认10ms)。如果重试次数已经够了,则直接抛出异常结束流程。
    在这里插入图片描述
  • 如果因为其他异常(包括超过重试次数)导致提交本地事务失败,直接回滚本地事务、抛出异常结束流程。

上面的ConnectionProxy#onException()方法中负责回滚本地事务、清理当前连接的ConnectionContext中的undo log信息、全局锁keys信息;

在这里插入图片描述

了解完了全局锁冲突引起的重试机制,下面接着看本地事务的提交流程。

2)本地事务提交流程

在这里插入图片描述

LockRetryPolicy#execute()方法中会运行方法的入参Callable,在ConnectionProxy#commit()方法中传入的到LockRetryPolicy#execute()方法中的Callable为:

() -> {
        doCommit();
        return null;
    }

doCommit()方法:

private void doCommit() throws SQLException {
    // 当前DML操作在全局事务中时,判定条件:ConnectionContext中包含xid
    if (context.inGlobalTransaction()) {
        processGlobalTransactionCommit();
    } else if (context.isGlobalLockRequire()) {
        // 如果使用了@GlobalLock,需要获取全局锁
        processLocalCommitWithGlobalLocks();
    } else {
        // 不在分布式事务中,则以原生connection提交本地事务
        targetConnection.commit();
    }
}

doCommit()方法中分三种情况进行不同的处理:

  1. 如果当前DML操作在全局事务中,即:当前连接的ConnectionContext中包含xid,则以处理全局事务方式(processGlobalTransactionCommit()提交本地事务;
  2. 如果使用了@GlobalLock,需要获取全局锁,再以原生connection提交本地事务;
    在这里插入图片描述
  3. 否则如果事务不在分布式事务中,则以原生connection提交本地事务;
    在这里插入图片描述

正常我们使用分布式事务,一般肯定是要以全局事务的方式执行DML操作;即:默认会进入到processGlobalTransactionCommit():

private void processGlobalTransactionCommit() throws SQLException {
    try {
        // 向远程的TC中注册分支事务,并检查、增加全局行锁
        register();
    } catch (TransactionException e) {
        // 出现异常时,回滚本地事务 再重试。
        // 大多数情况是因为全局锁冲突走到这里。
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }
    try {
        // 回滚日志管理组件,持久化undo log
        UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        // 提交本地事务
        targetConnection.commit();
    } catch (Throwable ex) {
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
        // 上报分支事务执行失败,用于监控
        report(false);
        throw new SQLException(ex);
    }
    // 上报分支事务执行成功,默认不会上报
    if (IS_REPORT_SUCCESS_ENABLE) {
        report(true);
    }
    // 重置连接的ConnectionContext
    context.reset();
}

以全局事务的方式提交本地事务会做四件事:

  1. 通过netty请求TC,注册分支事务,并检查、增加全局行锁;

    • 如果出现异常,则回滚本地事务。若异常类型为全局锁冲突LockConflictException,则进入重试策略;其他异常类型则直接抛出SQLException
  2. 将执行SQL时保存到ConnectionContext中的undo log 回滚日志 保存到DB;
  3. 提交本地事务,真正将业务数据和回滚日志 持久化到DB;
  4. 向TC上报本地事务提交结果;

    • 如果持久化undo log 或 提交本地事务出现异常,则上报分支事务执行失败;
    • 如果本地事务提交成功,上报分支事务执行成功;默认并不会上报。

最后,清空当前数据库连接的ConnectionContext。

点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html);

  • 分支事务的注册细节见下一篇文章(【微服务41】分布式事务Seata源码解析九:分支事务如何注册到全局事务);
  • undo log持久化细节 见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志);

三、总结

AT模式下本地事务的SQL执行流程,即RM的分支事务执行流程,主要包括一下几步:

  • 开始执行本地事务的SQL之前,从全局事务上下文RootContext中获取到xid,然后将xid绑定到数据库连接的上下文ConnectionContext中;
  1. 构建before image,查询目标sql执行前的数据快照;
  2. 执行目标SQL语句,但是本地事务还没有提交;
  3. 构建after image,查询目标SQL执行之后的数据快照;
  4. 将before image 和 after image合并作为回滚日志undo log,保存到当前数据库连接上下文ConnectionContext中;
  5. 构建当前本地事务要占用所有全局锁key信息,然后将其保存到当前数据库连接上下文ConnectionContext中;
  6. 通过netty请求TC,注册分支事务,并检查、增加全局行锁;这里可能会出现全局锁冲突 导致注册分支事务失败,所以有一个重试机制;
  7. 将执行SQL时保存到ConnectionContext中的undo log 回滚日志 保存到DB(undo_log表);
  8. 提交本地事务;
  9. 向TC上报本地事务提交结果;
  10. 最后清空当前数据库连接的ConnectionContext,恢复现场。

整个SQL提交可以理解为两阶段提交:

  • 一阶段:先注册分支事务,检查全局锁。
  • 二阶段:插入undolog、提交本地事务。
相关文章
|
18天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
18天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
18天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
19天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
3月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
5月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
143 2
基于Redis的高可用分布式锁——RedLock
|
1月前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
132 5
|
2月前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
76 8
|
2月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
64 16
|
2月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
47 5

推荐镜像

更多