Fescar - RM Executor执行过程介绍

简介: 开篇 这篇文章的目的是介绍ExecuteTemplate中调用的Executor的执行过程。Executor的类图说明:BaseTransactionalExecutor提供解析SQL语句获取元数据的功能。

开篇

 这篇文章的目的是介绍ExecuteTemplate中调用的Executor的执行过程。


Executor的类图

Executor.png

说明:

  • BaseTransactionalExecutor提供解析SQL语句获取元数据的功能。
  • AbstractDMLBaseExecutor提供执行SQL的功能包括doExecute。
  • Insert/Delete/UpdateExecutor提供SQL执行前后镜像获取功能。


InsertExecutor执行过程

InsertExecutor.jpg

说明:

  • Executor的整个执行的过程(以InsertExecutor为例),整个是父类调用子类的过程。
  • 对比InsertExecutor的类图的接口能够更好的理解整个调用过程。


Executor源码介绍 - 执行过程

StatementProxy

public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {

    @Override
    public ResultSet executeQuery(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<ResultSet, T>() {
            @Override
            public ResultSet execute(Statement statement, Object... args) throws SQLException {
                return statement.executeQuery((String) args[0]);
            }
        }, sql);
    }
}

说明:

  • StatementProxy的核心调用逻辑。
  • ExecuteTemplate.execute负责调用ExecuteTemplate。
  • StatementCallback()负责在Executor当中被回调。


ExecuteTemplate

public class ExecuteTemplate {

    public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        return execute(null, statementProxy, statementCallback, args);
    }


    public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {

        if (!RootContext.inGlobalTransaction()) {
            // Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }

        // sqlRecognizer 执行过程中传入值为null,走的分支代码
        if (sqlRecognizer == null) {

           //  SQLVisitorFactory.get是一个核心代码,会具体分析
            sqlRecognizer = SQLVisitorFactory.get(
                statementProxy.getTargetSQL(),
                statementProxy.getConnectionProxy().getDbType());
        }
        Executor<T> executor = null;
        if (sqlRecognizer == null) {
            executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
        } else {
            switch (sqlRecognizer.getSQLType()) {
                case INSERT:
                    executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case UPDATE:
                    executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case DELETE:
                    executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case SELECT_FOR_UPDATE:
                    executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                    break;
                default:
                    executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
                    break;
            }
        }
        T rs = null;
        try {
            rs = executor.execute(args);

        } catch (Throwable ex) {
            if (ex instanceof SQLException) {
                throw (SQLException) ex;
            } else {
                // Turn everything into SQLException
                new SQLException(ex);
            }
        }
        return rs;
    }
}

说明:

  • ExecuteTemplate 执行InsertExecutor的整个过程如上述代码。
  • 获取SQLRecognizer sqlRecognizer对象,sqlRecognizer = SQLVisitorFactory.get()。
  • 创建InsertExecutor对象,executor = new InsertExecutor()。
  • 执行InsertExecutor对象并返回结果,rs = executor.execute(args);


BaseTransactionalExecutor

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {

    protected StatementProxy<S> statementProxy;
    protected StatementCallback<T, S> statementCallback;
    protected SQLRecognizer sqlRecognizer;
    private TableMeta tableMeta;

    public BaseTransactionalExecutor(StatementProxy<S> statementProxy, 
                   StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
        this.statementProxy = statementProxy;
        this.statementCallback = statementCallback;
        this.sqlRecognizer = sqlRecognizer;
    }

    @Override
    public Object execute(Object... args) throws Throwable {
        String xid = RootContext.getXID();
        statementProxy.getConnectionProxy().bind(xid);
        return doExecute(args);
    }

    protected abstract Object doExecute(Object... args) throws Throwable;
}

说明:

  • 关注BaseTransactionalExecutor的构造函数。
  • BaseTransactionalExecutor的execute是执行的入口函数,调用抽象方法doExecute()
  • BaseTransactionalExecutor的doExecute()在子类中实现。


AbstractDMLBaseExecutor

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {

    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDMLBaseExecutor.class);

    public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, 
       StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }

    @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }

    protected T executeAutoCommitFalse(Object[] args) throws Throwable {
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        T result = null;
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        LockRetryController lockRetryController = new LockRetryController();
        try {
            connectionProxy.setAutoCommit(false);
            while (true) {
                try {
                    result = executeAutoCommitFalse(args);
                    connectionProxy.commit();
                    break;
                } catch (LockConflictException lockConflict) {
                    connectionProxy.getTargetConnection().rollback();
                    lockRetryController.sleep(lockConflict);
                }
            }

        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("exception occur", e);
            throw e;
        } finally {
            connectionProxy.setAutoCommit(true);
        }
        return result;
    }
}

说明:

  • BaseTransactionalExecutor的调用子类AbstractDMLBaseExecutor的doExecute方法。
  • AbstractDMLBaseExecutor的doExecute()执行executeAutoCommitFalse或executeAutoCommitTrue方法。
  • executeAutoCommitTrue方法内部也是通过executeAutoCommitFalse实现的。
  • executeAutoCommitFalse()按照保存beforeImage()、statementCallback.execute()、afterImage()、prepareUndoLog()顺序执行。
  • beforeImage()负责保存执行前镜像、afterImage()负责保存执行后镜像、prepareUndoLog()负责保存回滚日志。
  • statementCallback.execute()负责执行statement操作。


InsertExecutor

public class InsertExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {

    public InsertExecutor(StatementProxy statementProxy, 
        StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }

    @Override
    protected TableRecords beforeImage() throws SQLException {
        return TableRecords.empty(getTableMeta());
    }

    @Override
    protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
        SQLInsertRecognizer recogizier = (SQLInsertRecognizer)sqlRecognizer;
        List<String> insertColumns = recogizier.getInsertColumns();
        TableMeta tmeta = getTableMeta();
        TableRecords afterImage = null;
        if (tmeta.containsPK(insertColumns)) {
            // insert values including PK
            List<Object> pkValues = null;
            String pk = tmeta.getPkName();
            for (int paramIdx = 0; paramIdx < insertColumns.size(); paramIdx++) {
                if (insertColumns.get(paramIdx).equalsIgnoreCase(pk)) {
                    if (statementProxy instanceof PreparedStatementProxy) {
                        pkValues = ((PreparedStatementProxy) statementProxy).getParamsByIndex(paramIdx);
                    } else {
                        List<List<Object>> insertRows = recogizier.getInsertRows();
                        pkValues = new ArrayList<>(insertRows.size());
                        for (List<Object> row : insertRows) {
                            pkValues.add(row.get(paramIdx));
                        }
                    }
                    break;
                }
            }
            if (pkValues == null) {
                throw new ShouldNeverHappenException();
            }
            afterImage = getTableRecords(pkValues);

        } else {
            // PK is just auto generated
            Map<String, ColumnMeta> pkMetaMap = getTableMeta().getPrimaryKeyMap();
            if (pkMetaMap.size() != 1) {
                throw new NotSupportYetException();
            }
            ColumnMeta pkMeta = pkMetaMap.values().iterator().next();
            if (!pkMeta.isAutoincrement()) {
                throw new ShouldNeverHappenException();
            }

            ResultSet genKeys = null;
            try {
                genKeys = statementProxy.getTargetStatement().getGeneratedKeys();
            } catch (SQLException e) {
                // java.sql.SQLException: Generated keys not requested. You need to
                // specify Statement.RETURN_GENERATED_KEYS to
                // Statement.executeUpdate() or Connection.prepareStatement().
                if ("S1009".equalsIgnoreCase(e.getSQLState())) {
                    genKeys = statementProxy.getTargetStatement().executeQuery("SELECT LAST_INSERT_ID()");
                } else {
                    throw e;
                }
            }
            List<Object> pkValues = new ArrayList<>();
            while (genKeys.next()) {
                Object v = genKeys.getObject(1);
                pkValues.add(v);
            }

            afterImage = getTableRecords(pkValues);

        }

        if (afterImage == null) {
            throw new SQLException("Failed to build after-image for insert");
        }

        return afterImage;
    }
}

说明:

  • InsertExecutor主要实现的功能是beforeImage()和afterImage()。
  • beforeImage()保存插入前镜像。
  • afterImage()保存插入后镜像。


期待

 该篇文章把Executor的执行过程讲解清楚了,后续针对Executor中涉及的通用功能代码进行介绍。

目录
相关文章
CacheCloud中Task任务执行过程
CacheCloud中Task任务执行过程实质是先将task进行堆积到taskQueue中,然后进行统一多线程进行处理。
122 0
|
SQL 数据库
RM在Seata AT模式中的sql执行流程
RM在Seata AT模式中的sql执行流程
96 0
|
Java Spring
spring中事务执行完成后/回滚后执行
有时候业务场景需要 在事务结束后执行一些更新操作; 或者在事务失败回滚后执行一些更新表状态的操作;
349 0
|
监控 FESCAR
Fescar - RM 提交本地事务流程
开篇  这篇文章的目的是介绍Fescar的提交流程(Commit)和回滚流程(Rollback),这两个流程其实是Fescar中RM的核心逻辑,涉及和TC交互的流程。  由于RM和TC交互涉及到网络通信,所以这块我们暂时只关注RM端的处理流程而暂时忽略网络通信的过程,网络通信的过程值得通过一篇文章单独进行描述。
1524 0
|
Java 数据库连接 FESCAR
Fescar - RM 全局事务提交回滚流程
开篇  这篇文章的目的主要是讲解RM在接收TC的请求后执行全局分支事务提交(doBranchCommit)和全局分支事务回滚(doBranchRollback)的流程。  全局的分支事务提交过程和回滚过程也算RM处理流程中核心的一环,了解以后并结合之前讲解的本地事务提交流程就能够较好的理解整个过程了。
1530 0
|
FESCAR SQL
Fescar - RM SelectForUpdateExecutor介绍
开篇  这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。  个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。
1179 0
|
Java 数据库连接 FESCAR
Fescar - RM实现原理介绍
开篇  这个系列开始介绍Fescar当中的RM(Resource Manager),RM负责控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。 Fescar RM介绍 说明: RM负责注册分支事务并上报分支事务状态。
2592 0
|
SQL FESCAR
Fescar - RM DeleteExecutor介绍
开篇  这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。  个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。
1090 0
|
SQL FESCAR 安全
Fescar - RM UpdateExecutor介绍
开篇  这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。  个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。
1119 0
|
FESCAR SQL druid
Fescar - RM InsertExecutor介绍
开篇  这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。  个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。
1338 0