RM在Seata AT模式中的sql执行流程

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: RM在Seata AT模式中的sql执行流程

初始化

  • SeataDataSourceProxy的创建
    在Seata Client初始化过程中,会通过io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration创建SeataAutoDataSourceProxyCreator,在SeataAutoDataSourceProxyCreatorwrapIfNecessary()方法中,会针对DataSource做一层代理,这样的话,就可以在构建出来的代理对象中为实现分布式事务插入自定义逻辑;
  • RMClient的初始化
    Seata Client初始化的另一部分逻辑中,也就是io.seata.spring.boot.autoconfigure.SeataAutoConfiguration中,会构建GlobalTransactionScanner对象,在该对象的初始化方法中,会调用RMClient.init(),该方法的作用是建立与TC服务的通信,以便后续分支事务的注册、状态上报、提交或回滚;

业务执行过程

在业务执行过程中,在执行数据库操作时,主要关注SeataDataSourceProxy,也就是AT模式实现类:

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
  @Override
    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }
    @Override
    public ConnectionProxy getConnection(String username, String password) throws SQLException {
        Connection targetConnection = targetDataSource.getConnection(username, password);
        return new ConnectionProxy(this, targetConnection);
    }
}
复制代码

DataSourceProxy重写了getConnection()方法,针对获得的Connection也做了一个代理层ConnectionProxy,因为ConnectionProxy继承了AbstractConnectionProxy,核心逻辑还是在AbstractConnectionProxy中:

public abstract class AbstractConnectionProxy implements Connection {
    @Override
    public Statement createStatement() throws SQLException {
        Statement targetStatement = getTargetConnection().createStatement();
        // 包装Statement
        return new StatementProxy(this, targetStatement);
    }
    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        String dbType = getDbType();
        // support oracle 10.2+
        PreparedStatement targetPreparedStatement = null;
        if (BranchType.AT == RootContext.getBranchType()) {
            List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
            if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
                    TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
                            sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
                    String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
                    tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
                    // 调用原生Connection.prepareStatement()实现预编译逻辑
                    targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
                }
            }
        }
        if (targetPreparedStatement == null) {
            targetPreparedStatement = getTargetConnection().prepareStatement(sql);
        }
        // 同样针对PreparedStatement做一层代理
        return new PreparedStatementProxy(this, targetPreparedStatement, sql);
    }
}
复制代码

根据上述源码,可以知道,其实RM层的大体框架就是利用了Proxy方式针对DataSource、Connection、

Statement、PreparedStatement做一层代理,在执行数据库操作的过程中插入分布式事务逻辑;

逐步深入源码,我们需要了解PreparedStatementProxy的部分实现逻辑:

public class PreparedStatementProxy extends AbstractPreparedStatementProxy
    implements PreparedStatement, ParametersHolder {
  // 采用模版模式
  @Override
    public int executeUpdate() throws SQLException {
        return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());
    }
}
复制代码

当执行sql语句的时候,PreparedStatementProxy采用了模版模式:

public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
            // 没有全局锁,也不是AT模式,直接使用原生statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }
        String dbType = statementProxy.getConnectionProxy().getDbType();
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            sqlRecognizers = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    dbType);
        }
        // 根据不同的sql语句,使用不同类型的Executor,相当于就是策略模式
        Executor<T> executor;
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            executor = new PlainExecutor<>(statementProxy, statementCallback);
        } else {
            if (sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        // 插入
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                    new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                    new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        // 更新
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        // 删除
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        // 当前读
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case INSERT_ON_DUPLICATE_UPDATE:
                        // 没有就插入,重复就更新
                        switch (dbType) {
                            case JdbcConstants.MYSQL:
                            case JdbcConstants.MARIADB:
                                executor =
                                    new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            default:
                                throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                        }
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }
            } else {
                // 批量处理
                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
            }
        }
        T rs;
        try {
            // 执行sql语句,重点看这里
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException) ex;
        }
        return rs;
    }
复制代码

我们需要重点关注executor.execute(args)方法,这里使用到的是策略模式,不同的sql语句处理逻辑不一样,不过我们可以找到比较典型的UpdateExecutor,在它继承的抽象类AbstractDMLBaseExecutor中:

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
  @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        // 如果实现没有设置的话,模式是自动提交
        if (connectionProxy.getAutoCommit()) {
            // 默认执行当前分支
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }
  protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            // 把自动提交设置成false
            connectionProxy.changeAutoCommit();
            return new LockRetryPolicy(connectionProxy).execute(() -> {
                // 执行sql语句
                T result = executeAutoCommitFalse(args);
                // 提交本地事务
                connectionProxy.commit();
                // 返回执行结果
                return result;
            });
        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            // 异常抛出
            throw e;
        } finally {
            // 恢复现场
            // 重置connectionContext
            connectionProxy.getContext().reset();
            // 恢复自动提交
            connectionProxy.setAutoCommit(true);
        }
    }
}
复制代码

上述代码使用模版模式完成了sql的执行,但是我们重点需要关注两部分逻辑:

  • 执行sql语句T result = executeAutoCommitFalse(args):
protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        // 获取前镜像,也就是sql执行前的快照
        TableRecords beforeImage = beforeImage();
        // 真正地执行sql语句
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        // 获取影响行数
        int updateCount = statementProxy.getUpdateCount();
        if (updateCount > 0) {
            // 如果影响行数大于0,那么获取执行后的快照
            TableRecords afterImage = afterImage(beforeImage);
            // 计算行锁,设置到ConnectionContext中,下面需要取出来
            // 计算undoLogItem,后面要用
            prepareUndoLog(beforeImage, afterImage);
        }
        return result;
    }
复制代码
  • 提交本地事务connectionProxy.commit()
@Override
    public void commit() throws SQLException {
        try {
            // 重试策略
            lockRetryPolicy.execute(() -> {
                // 真正地提交本地事务
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                // 失败后回滚
                rollback();
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }
private void doCommit() throws SQLException {
        // 如果使用的@GlobalTransaction
        if (context.inGlobalTransaction()) {
            // AT模式下,肯定执行当前分支
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
          // 如果使用的@GlobalLock
            processLocalCommitWithGlobalLocks();
        } else {
          // 不在分布式事务中,原生connection提交
            targetConnection.commit();
        }
    }
private void processGlobalTransactionCommit() throws SQLException {
        try {
            // 向远程TC服务注册当前分支事务,并加上行锁
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {//插入undoLog
          UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            // 提交本地事务
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            // 提交失败,上报分支事务状态
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            // 提交成功,上报分支事务状态
            report(true);
        }
        // 恢复ConnectionContext
        context.reset();
    }
复制代码
  • 综上所述,RM在分支事务执行过程中,主要有以下几步:

1.先获取业务sql执行前的快照BeforeImage

2.执行业务sql

3.如果业务sql影响行数大于0,那么需要获取业务sql执行后的快照AfterImage;并计算行锁以及后续需要插入的undolog;

4.向TC服务注册分支事务,并添加行锁;此处可能会存在锁冲突导致注册失败,直至注册成功或者分布式事务超时;

5.注册成功后,插入undolog日志表记录;

6.提交本地事务,此时业务sql和undolog日志一起提交;

7.上报分支事务状态;

8.恢复现场;


相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
26天前
|
自然语言处理 监控 Dubbo
Seata常见问题之使用tcc模式配置yml如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
69 4
|
2月前
|
数据库
|
23天前
|
监控 API 数据库
Seata常见问题之Seata AT的设计不支持使用临时表如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
25天前
|
NoSQL Java 数据库
Seata常见问题之xa模式下插入一条数据再更新这条数据会报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
108 2
|
25天前
|
Java 关系型数据库 微服务
Seata常见问题之项目一直启动不成功如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
53 0
|
25天前
|
SQL JSON Kubernetes
Seata常见问题之服务端 error日志没有输出,客户端执行sql报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
97 0
|
25天前
|
SQL 存储 Kubernetes
Seata常见问题之mybatisplus的批量插入方法报SQL错误如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
25 0
|
26天前
|
存储 Java Nacos
Seata常见问题之xa模式出现错误xid is not valid如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
60 4
|
2月前
|
数据库 开发者
Seata的 TCC 模式
Seata的 TCC 模式
|
2月前
|
存储 Java Apache
Seata 的 AT 模式
Seata 的 AT 模式