RM在Seata AT模式中的sql执行流程

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: RM在Seata AT模式中的sql执行流程

初始化

  • SeataDataSourceProxy的创建
    在Seata Client初始化过程中,会通过io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration创建SeataAutoDataSourceProxyCreator,在SeataAutoDataSourceProxyCreatorwrapIfNecessary()方法中,会针对DataSource做一层代理,这样的话,就可以在构建出来的代理对象中为实现分布式事务插入自定义逻辑;
  • RMClient的初始化
    Seata Client初始化的另一部分逻辑中,也就是io.seata.spring.boot.autoconfigure.SeataAutoConfiguration中,会构建GlobalTransactionScanner对象,在该对象的初始化方法中,会调用RMClient.init(),该方法的作用是建立与TC服务的通信,以便后续分支事务的注册、状态上报、提交或回滚;

业务执行过程

在业务执行过程中,在执行数据库操作时,主要关注SeataDataSourceProxy,也就是AT模式实现类:

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
  @Override
    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }
    @Override
    public ConnectionProxy getConnection(String username, String password) throws SQLException {
        Connection targetConnection = targetDataSource.getConnection(username, password);
        return new ConnectionProxy(this, targetConnection);
    }
}
复制代码

DataSourceProxy重写了getConnection()方法,针对获得的Connection也做了一个代理层ConnectionProxy,因为ConnectionProxy继承了AbstractConnectionProxy,核心逻辑还是在AbstractConnectionProxy中:

public abstract class AbstractConnectionProxy implements Connection {
    @Override
    public Statement createStatement() throws SQLException {
        Statement targetStatement = getTargetConnection().createStatement();
        // 包装Statement
        return new StatementProxy(this, targetStatement);
    }
    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        String dbType = getDbType();
        // support oracle 10.2+
        PreparedStatement targetPreparedStatement = null;
        if (BranchType.AT == RootContext.getBranchType()) {
            List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
            if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
                    TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
                            sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
                    String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
                    tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
                    // 调用原生Connection.prepareStatement()实现预编译逻辑
                    targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
                }
            }
        }
        if (targetPreparedStatement == null) {
            targetPreparedStatement = getTargetConnection().prepareStatement(sql);
        }
        // 同样针对PreparedStatement做一层代理
        return new PreparedStatementProxy(this, targetPreparedStatement, sql);
    }
}
复制代码

根据上述源码,可以知道,其实RM层的大体框架就是利用了Proxy方式针对DataSource、Connection、

Statement、PreparedStatement做一层代理,在执行数据库操作的过程中插入分布式事务逻辑;

逐步深入源码,我们需要了解PreparedStatementProxy的部分实现逻辑:

public class PreparedStatementProxy extends AbstractPreparedStatementProxy
    implements PreparedStatement, ParametersHolder {
  // 采用模版模式
  @Override
    public int executeUpdate() throws SQLException {
        return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());
    }
}
复制代码

当执行sql语句的时候,PreparedStatementProxy采用了模版模式:

public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
            // 没有全局锁,也不是AT模式,直接使用原生statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }
        String dbType = statementProxy.getConnectionProxy().getDbType();
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            sqlRecognizers = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    dbType);
        }
        // 根据不同的sql语句,使用不同类型的Executor,相当于就是策略模式
        Executor<T> executor;
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            executor = new PlainExecutor<>(statementProxy, statementCallback);
        } else {
            if (sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        // 插入
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                    new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                    new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        // 更新
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        // 删除
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        // 当前读
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case INSERT_ON_DUPLICATE_UPDATE:
                        // 没有就插入,重复就更新
                        switch (dbType) {
                            case JdbcConstants.MYSQL:
                            case JdbcConstants.MARIADB:
                                executor =
                                    new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            default:
                                throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                        }
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }
            } else {
                // 批量处理
                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
            }
        }
        T rs;
        try {
            // 执行sql语句,重点看这里
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException) ex;
        }
        return rs;
    }
复制代码

我们需要重点关注executor.execute(args)方法,这里使用到的是策略模式,不同的sql语句处理逻辑不一样,不过我们可以找到比较典型的UpdateExecutor,在它继承的抽象类AbstractDMLBaseExecutor中:

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
  @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        // 如果实现没有设置的话,模式是自动提交
        if (connectionProxy.getAutoCommit()) {
            // 默认执行当前分支
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }
  protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            // 把自动提交设置成false
            connectionProxy.changeAutoCommit();
            return new LockRetryPolicy(connectionProxy).execute(() -> {
                // 执行sql语句
                T result = executeAutoCommitFalse(args);
                // 提交本地事务
                connectionProxy.commit();
                // 返回执行结果
                return result;
            });
        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            // 异常抛出
            throw e;
        } finally {
            // 恢复现场
            // 重置connectionContext
            connectionProxy.getContext().reset();
            // 恢复自动提交
            connectionProxy.setAutoCommit(true);
        }
    }
}
复制代码

上述代码使用模版模式完成了sql的执行,但是我们重点需要关注两部分逻辑:

  • 执行sql语句T result = executeAutoCommitFalse(args):
protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        // 获取前镜像,也就是sql执行前的快照
        TableRecords beforeImage = beforeImage();
        // 真正地执行sql语句
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        // 获取影响行数
        int updateCount = statementProxy.getUpdateCount();
        if (updateCount > 0) {
            // 如果影响行数大于0,那么获取执行后的快照
            TableRecords afterImage = afterImage(beforeImage);
            // 计算行锁,设置到ConnectionContext中,下面需要取出来
            // 计算undoLogItem,后面要用
            prepareUndoLog(beforeImage, afterImage);
        }
        return result;
    }
复制代码
  • 提交本地事务connectionProxy.commit()
@Override
    public void commit() throws SQLException {
        try {
            // 重试策略
            lockRetryPolicy.execute(() -> {
                // 真正地提交本地事务
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                // 失败后回滚
                rollback();
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }
private void doCommit() throws SQLException {
        // 如果使用的@GlobalTransaction
        if (context.inGlobalTransaction()) {
            // AT模式下,肯定执行当前分支
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
          // 如果使用的@GlobalLock
            processLocalCommitWithGlobalLocks();
        } else {
          // 不在分布式事务中,原生connection提交
            targetConnection.commit();
        }
    }
private void processGlobalTransactionCommit() throws SQLException {
        try {
            // 向远程TC服务注册当前分支事务,并加上行锁
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {//插入undoLog
          UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            // 提交本地事务
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            // 提交失败,上报分支事务状态
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            // 提交成功,上报分支事务状态
            report(true);
        }
        // 恢复ConnectionContext
        context.reset();
    }
复制代码
  • 综上所述,RM在分支事务执行过程中,主要有以下几步:

1.先获取业务sql执行前的快照BeforeImage

2.执行业务sql

3.如果业务sql影响行数大于0,那么需要获取业务sql执行后的快照AfterImage;并计算行锁以及后续需要插入的undolog;

4.向TC服务注册分支事务,并添加行锁;此处可能会存在锁冲突导致注册失败,直至注册成功或者分布式事务超时;

5.注册成功后,插入undolog日志表记录;

6.提交本地事务,此时业务sql和undolog日志一起提交;

7.上报分支事务状态;

8.恢复现场;


相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
22天前
|
数据库 微服务
SEATA模式
Seata 是一款开源的分布式事务解决方案,支持多种事务模式以适应不同的应用场景。其主要模式包括:AT(TCC)模式,事务分三阶段执行;TCC 模式,提供更灵活的事务控制;SAGA 模式,基于状态机实现跨服务的事务一致性;XA 模式,采用传统两阶段提交协议确保数据一致性。
39 5
|
28天前
Seata框架在AT模式下是如何保证数据一致性的?
通过以上这些机制的协同作用,Seata 在 AT 模式下能够有效地保证数据的一致性,确保分布式事务的可靠执行。你还可以进一步深入研究 Seata 的具体实现细节,以更好地理解其数据一致性保障的原理。
39 3
|
3月前
|
SQL 缓存 关系型数据库
揭秘MySQL一条SQL语句的执行流程
以上步骤共同构成了MySQL处理SQL语句的完整流程,理解这一流程有助于更有效地使用MySQL数据库,优化查询性能,及时解决可能出现的性能瓶颈问题。
117 7
|
4月前
|
Java 测试技术 容器
从零到英雄:Struts 2 最佳实践——你的Web应用开发超级变身指南!
【8月更文挑战第31天】《Struts 2 最佳实践:从设计到部署的全流程指南》深入介绍如何利用 Struts 2 框架从项目设计到部署的全流程。从初始化配置到采用 MVC 设计模式,再到性能优化与测试,本书详细讲解了如何构建高效、稳定的 Web 应用。通过最佳实践和代码示例,帮助读者掌握 Struts 2 的核心功能,并确保应用的安全性和可维护性。无论是在项目初期还是后期运维,本书都是不可或缺的参考指南。
55 0
|
4月前
|
SQL 存储 数据管理
掌握SQL Server Integration Services (SSIS)精髓:从零开始构建自动化数据提取、转换与加载(ETL)流程,实现高效数据迁移与集成——轻松上手SSIS打造企业级数据管理利器
【8月更文挑战第31天】SQL Server Integration Services (SSIS) 是 Microsoft 提供的企业级数据集成平台,用于高效完成数据提取、转换和加载(ETL)任务。本文通过简单示例介绍 SSIS 的基本使用方法,包括创建数据包、配置数据源与目标以及自动化执行流程。首先确保安装了 SQL Server Data Tools (SSDT),然后在 Visual Studio 中创建新的 SSIS 项目,通过添加控制流和数据流组件,实现从 CSV 文件到 SQL Server 数据库的数据迁移。
348 0
|
4月前
|
SQL 数据库 数据库管理
SQL查询是否都需要解析:深入解析SQL执行流程与优化技巧
在数据库管理系统中,SQL(Structured Query Language)查询是用户与数据库交互的主要方式
|
6月前
|
Java 微服务 Spring
Seata 客户端需要同时启动 TM 和 RM 吗?
Seata 客户端需要同时启动 TM 和 RM 吗?
|
6月前
|
Apache 开发者
Apache Seata 如何解决 TCC 模式的幂等、悬挂和空回滚问题
【6月更文挑战第8天】Apache Seata 是一款分布式事务框架,解决TCC模式下的幂等、悬挂和空回滚问题。通过记录事务状态处理幂等,设置超时机制避免悬挂,明确标记Try操作成功来处理空回滚。Seata 提供丰富配置和管理功能,确保分布式事务的可靠性和效率,支持复杂事务处理场景,为企业业务发展提供支持。
240 7
|
7月前
|
SQL 算法
基于若依的ruoyi-nbcio流程管理系统修改代码生成的sql菜单id修改成递增id(谨慎修改,大并发分布式有弊端)
基于若依的ruoyi-nbcio流程管理系统修改代码生成的sql菜单id修改成递增id(谨慎修改,大并发分布式有弊端)
118 1
|
7月前
|
SQL
flowable的流程任务统计sql(续)
flowable的流程任务统计sql(续)
55 0