RM在Seata AT模式中的sql执行流程

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: RM在Seata AT模式中的sql执行流程

初始化

  • SeataDataSourceProxy的创建
    在Seata Client初始化过程中,会通过io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration创建SeataAutoDataSourceProxyCreator,在SeataAutoDataSourceProxyCreatorwrapIfNecessary()方法中,会针对DataSource做一层代理,这样的话,就可以在构建出来的代理对象中为实现分布式事务插入自定义逻辑;
  • RMClient的初始化
    Seata Client初始化的另一部分逻辑中,也就是io.seata.spring.boot.autoconfigure.SeataAutoConfiguration中,会构建GlobalTransactionScanner对象,在该对象的初始化方法中,会调用RMClient.init(),该方法的作用是建立与TC服务的通信,以便后续分支事务的注册、状态上报、提交或回滚;

业务执行过程

在业务执行过程中,在执行数据库操作时,主要关注SeataDataSourceProxy,也就是AT模式实现类:

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
  @Override
    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }
    @Override
    public ConnectionProxy getConnection(String username, String password) throws SQLException {
        Connection targetConnection = targetDataSource.getConnection(username, password);
        return new ConnectionProxy(this, targetConnection);
    }
}
复制代码

DataSourceProxy重写了getConnection()方法,针对获得的Connection也做了一个代理层ConnectionProxy,因为ConnectionProxy继承了AbstractConnectionProxy,核心逻辑还是在AbstractConnectionProxy中:

public abstract class AbstractConnectionProxy implements Connection {
    @Override
    public Statement createStatement() throws SQLException {
        Statement targetStatement = getTargetConnection().createStatement();
        // 包装Statement
        return new StatementProxy(this, targetStatement);
    }
    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        String dbType = getDbType();
        // support oracle 10.2+
        PreparedStatement targetPreparedStatement = null;
        if (BranchType.AT == RootContext.getBranchType()) {
            List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
            if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
                    TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
                            sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
                    String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
                    tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
                    // 调用原生Connection.prepareStatement()实现预编译逻辑
                    targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
                }
            }
        }
        if (targetPreparedStatement == null) {
            targetPreparedStatement = getTargetConnection().prepareStatement(sql);
        }
        // 同样针对PreparedStatement做一层代理
        return new PreparedStatementProxy(this, targetPreparedStatement, sql);
    }
}
复制代码

根据上述源码,可以知道,其实RM层的大体框架就是利用了Proxy方式针对DataSource、Connection、

Statement、PreparedStatement做一层代理,在执行数据库操作的过程中插入分布式事务逻辑;

逐步深入源码,我们需要了解PreparedStatementProxy的部分实现逻辑:

public class PreparedStatementProxy extends AbstractPreparedStatementProxy
    implements PreparedStatement, ParametersHolder {
  // 采用模版模式
  @Override
    public int executeUpdate() throws SQLException {
        return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());
    }
}
复制代码

当执行sql语句的时候,PreparedStatementProxy采用了模版模式:

public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
            // 没有全局锁,也不是AT模式,直接使用原生statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }
        String dbType = statementProxy.getConnectionProxy().getDbType();
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            sqlRecognizers = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    dbType);
        }
        // 根据不同的sql语句,使用不同类型的Executor,相当于就是策略模式
        Executor<T> executor;
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            executor = new PlainExecutor<>(statementProxy, statementCallback);
        } else {
            if (sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        // 插入
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                    new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                    new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        // 更新
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        // 删除
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        // 当前读
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case INSERT_ON_DUPLICATE_UPDATE:
                        // 没有就插入,重复就更新
                        switch (dbType) {
                            case JdbcConstants.MYSQL:
                            case JdbcConstants.MARIADB:
                                executor =
                                    new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            default:
                                throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                        }
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }
            } else {
                // 批量处理
                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
            }
        }
        T rs;
        try {
            // 执行sql语句,重点看这里
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException) ex;
        }
        return rs;
    }
复制代码

我们需要重点关注executor.execute(args)方法,这里使用到的是策略模式,不同的sql语句处理逻辑不一样,不过我们可以找到比较典型的UpdateExecutor,在它继承的抽象类AbstractDMLBaseExecutor中:

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
  @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        // 如果实现没有设置的话,模式是自动提交
        if (connectionProxy.getAutoCommit()) {
            // 默认执行当前分支
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }
  protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            // 把自动提交设置成false
            connectionProxy.changeAutoCommit();
            return new LockRetryPolicy(connectionProxy).execute(() -> {
                // 执行sql语句
                T result = executeAutoCommitFalse(args);
                // 提交本地事务
                connectionProxy.commit();
                // 返回执行结果
                return result;
            });
        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            // 异常抛出
            throw e;
        } finally {
            // 恢复现场
            // 重置connectionContext
            connectionProxy.getContext().reset();
            // 恢复自动提交
            connectionProxy.setAutoCommit(true);
        }
    }
}
复制代码

上述代码使用模版模式完成了sql的执行,但是我们重点需要关注两部分逻辑:

  • 执行sql语句T result = executeAutoCommitFalse(args):
protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        // 获取前镜像,也就是sql执行前的快照
        TableRecords beforeImage = beforeImage();
        // 真正地执行sql语句
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        // 获取影响行数
        int updateCount = statementProxy.getUpdateCount();
        if (updateCount > 0) {
            // 如果影响行数大于0,那么获取执行后的快照
            TableRecords afterImage = afterImage(beforeImage);
            // 计算行锁,设置到ConnectionContext中,下面需要取出来
            // 计算undoLogItem,后面要用
            prepareUndoLog(beforeImage, afterImage);
        }
        return result;
    }
复制代码
  • 提交本地事务connectionProxy.commit()
@Override
    public void commit() throws SQLException {
        try {
            // 重试策略
            lockRetryPolicy.execute(() -> {
                // 真正地提交本地事务
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                // 失败后回滚
                rollback();
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }
private void doCommit() throws SQLException {
        // 如果使用的@GlobalTransaction
        if (context.inGlobalTransaction()) {
            // AT模式下,肯定执行当前分支
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
          // 如果使用的@GlobalLock
            processLocalCommitWithGlobalLocks();
        } else {
          // 不在分布式事务中,原生connection提交
            targetConnection.commit();
        }
    }
private void processGlobalTransactionCommit() throws SQLException {
        try {
            // 向远程TC服务注册当前分支事务,并加上行锁
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {//插入undoLog
          UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            // 提交本地事务
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            // 提交失败,上报分支事务状态
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            // 提交成功,上报分支事务状态
            report(true);
        }
        // 恢复ConnectionContext
        context.reset();
    }
复制代码
  • 综上所述,RM在分支事务执行过程中,主要有以下几步:

1.先获取业务sql执行前的快照BeforeImage

2.执行业务sql

3.如果业务sql影响行数大于0,那么需要获取业务sql执行后的快照AfterImage;并计算行锁以及后续需要插入的undolog;

4.向TC服务注册分支事务,并添加行锁;此处可能会存在锁冲突导致注册失败,直至注册成功或者分布式事务超时;

5.注册成功后,插入undolog日志表记录;

6.提交本地事务,此时业务sql和undolog日志一起提交;

7.上报分支事务状态;

8.恢复现场;


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
6月前
|
SQL 数据库 数据库管理
【SQL server】玩转SQL server数据库:第三章 关系数据库标准语言SQL(一)模式、表、索引与视图
【SQL server】玩转SQL server数据库:第三章 关系数据库标准语言SQL(一)模式、表、索引与视图
178 11
|
2月前
|
SQL 缓存 关系型数据库
揭秘MySQL一条SQL语句的执行流程
以上步骤共同构成了MySQL处理SQL语句的完整流程,理解这一流程有助于更有效地使用MySQL数据库,优化查询性能,及时解决可能出现的性能瓶颈问题。
92 7
|
3月前
|
Java 测试技术 容器
从零到英雄:Struts 2 最佳实践——你的Web应用开发超级变身指南!
【8月更文挑战第31天】《Struts 2 最佳实践:从设计到部署的全流程指南》深入介绍如何利用 Struts 2 框架从项目设计到部署的全流程。从初始化配置到采用 MVC 设计模式,再到性能优化与测试,本书详细讲解了如何构建高效、稳定的 Web 应用。通过最佳实践和代码示例,帮助读者掌握 Struts 2 的核心功能,并确保应用的安全性和可维护性。无论是在项目初期还是后期运维,本书都是不可或缺的参考指南。
50 0
|
3月前
|
SQL 安全 数据库
|
3月前
|
SQL 数据处理 数据库
SQL正则表达式应用:文本数据处理的强大工具——深入探讨数据验证、模式搜索、字符替换等核心功能及性能优化和兼容性问题
【8月更文挑战第31天】SQL正则表达式是数据库管理和应用开发中处理文本数据的强大工具,支持数据验证、模式搜索和字符替换等功能。本文通过问答形式介绍了其基本概念、使用方法及注意事项,帮助读者掌握这一重要技能,提升文本数据处理效率。尽管功能强大,但在不同数据库系统中可能存在兼容性问题,需谨慎使用以优化性能。
56 0
|
3月前
|
SQL 存储 数据管理
掌握SQL Server Integration Services (SSIS)精髓:从零开始构建自动化数据提取、转换与加载(ETL)流程,实现高效数据迁移与集成——轻松上手SSIS打造企业级数据管理利器
【8月更文挑战第31天】SQL Server Integration Services (SSIS) 是 Microsoft 提供的企业级数据集成平台,用于高效完成数据提取、转换和加载(ETL)任务。本文通过简单示例介绍 SSIS 的基本使用方法,包括创建数据包、配置数据源与目标以及自动化执行流程。首先确保安装了 SQL Server Data Tools (SSDT),然后在 Visual Studio 中创建新的 SSIS 项目,通过添加控制流和数据流组件,实现从 CSV 文件到 SQL Server 数据库的数据迁移。
189 0
|
4月前
|
SQL 弹性计算 资源调度
云服务器 ECS产品使用问题之bin/spark-sql --master yarn如何进行集群模式运行
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
|
3月前
|
SQL 数据库 数据库管理
SQL查询是否都需要解析:深入解析SQL执行流程与优化技巧
在数据库管理系统中,SQL(Structured Query Language)查询是用户与数据库交互的主要方式
|
5月前
|
存储 SQL 关系型数据库
【BUG记录】Cause: java.sql.SQLException: Incorrect string value: '\xF0\x9F\x90\xA6' for column 'name' at row 1
在MySQL中遇到`Incorrect string value`错误通常是因为尝试插入的字符串包含不被数据库字符集支持的字符,如表情符号。错误根源是MySQL默认的utf8不支持4字节的UTF-8字符(如Emoji)。
476 1
|
5月前
|
关系型数据库 MySQL 数据库
深入OceanBase分布式数据库:MySQL 模式下的 SQL 基本操作
深入OceanBase分布式数据库:MySQL 模式下的 SQL 基本操作