Fescar - RM Executor执行过程介绍

简介: 开篇 这篇文章的目的是介绍ExecuteTemplate中调用的Executor的执行过程。Executor的类图说明:BaseTransactionalExecutor提供解析SQL语句获取元数据的功能。

开篇

 这篇文章的目的是介绍ExecuteTemplate中调用的Executor的执行过程。


Executor的类图

Executor.png

说明:

  • BaseTransactionalExecutor提供解析SQL语句获取元数据的功能。
  • AbstractDMLBaseExecutor提供执行SQL的功能包括doExecute。
  • Insert/Delete/UpdateExecutor提供SQL执行前后镜像获取功能。


InsertExecutor执行过程

InsertExecutor.jpg

说明:

  • Executor的整个执行的过程(以InsertExecutor为例),整个是父类调用子类的过程。
  • 对比InsertExecutor的类图的接口能够更好的理解整个调用过程。


Executor源码介绍 - 执行过程

StatementProxy

public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {

    @Override
    public ResultSet executeQuery(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<ResultSet, T>() {
            @Override
            public ResultSet execute(Statement statement, Object... args) throws SQLException {
                return statement.executeQuery((String) args[0]);
            }
        }, sql);
    }
}

说明:

  • StatementProxy的核心调用逻辑。
  • ExecuteTemplate.execute负责调用ExecuteTemplate。
  • StatementCallback()负责在Executor当中被回调。


ExecuteTemplate

public class ExecuteTemplate {

    public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        return execute(null, statementProxy, statementCallback, args);
    }


    public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {

        if (!RootContext.inGlobalTransaction()) {
            // Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }

        // sqlRecognizer 执行过程中传入值为null,走的分支代码
        if (sqlRecognizer == null) {

           //  SQLVisitorFactory.get是一个核心代码,会具体分析
            sqlRecognizer = SQLVisitorFactory.get(
                statementProxy.getTargetSQL(),
                statementProxy.getConnectionProxy().getDbType());
        }
        Executor<T> executor = null;
        if (sqlRecognizer == null) {
            executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
        } else {
            switch (sqlRecognizer.getSQLType()) {
                case INSERT:
                    executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case UPDATE:
                    executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case DELETE:
                    executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case SELECT_FOR_UPDATE:
                    executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                    break;
                default:
                    executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
                    break;
            }
        }
        T rs = null;
        try {
            rs = executor.execute(args);

        } catch (Throwable ex) {
            if (ex instanceof SQLException) {
                throw (SQLException) ex;
            } else {
                // Turn everything into SQLException
                new SQLException(ex);
            }
        }
        return rs;
    }
}

说明:

  • ExecuteTemplate 执行InsertExecutor的整个过程如上述代码。
  • 获取SQLRecognizer sqlRecognizer对象,sqlRecognizer = SQLVisitorFactory.get()。
  • 创建InsertExecutor对象,executor = new InsertExecutor()。
  • 执行InsertExecutor对象并返回结果,rs = executor.execute(args);


BaseTransactionalExecutor

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {

    protected StatementProxy<S> statementProxy;
    protected StatementCallback<T, S> statementCallback;
    protected SQLRecognizer sqlRecognizer;
    private TableMeta tableMeta;

    public BaseTransactionalExecutor(StatementProxy<S> statementProxy, 
                   StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
        this.statementProxy = statementProxy;
        this.statementCallback = statementCallback;
        this.sqlRecognizer = sqlRecognizer;
    }

    @Override
    public Object execute(Object... args) throws Throwable {
        String xid = RootContext.getXID();
        statementProxy.getConnectionProxy().bind(xid);
        return doExecute(args);
    }

    protected abstract Object doExecute(Object... args) throws Throwable;
}

说明:

  • 关注BaseTransactionalExecutor的构造函数。
  • BaseTransactionalExecutor的execute是执行的入口函数,调用抽象方法doExecute()
  • BaseTransactionalExecutor的doExecute()在子类中实现。


AbstractDMLBaseExecutor

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {

    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDMLBaseExecutor.class);

    public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, 
       StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }

    @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }

    protected T executeAutoCommitFalse(Object[] args) throws Throwable {
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        T result = null;
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        LockRetryController lockRetryController = new LockRetryController();
        try {
            connectionProxy.setAutoCommit(false);
            while (true) {
                try {
                    result = executeAutoCommitFalse(args);
                    connectionProxy.commit();
                    break;
                } catch (LockConflictException lockConflict) {
                    connectionProxy.getTargetConnection().rollback();
                    lockRetryController.sleep(lockConflict);
                }
            }

        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("exception occur", e);
            throw e;
        } finally {
            connectionProxy.setAutoCommit(true);
        }
        return result;
    }
}

说明:

  • BaseTransactionalExecutor的调用子类AbstractDMLBaseExecutor的doExecute方法。
  • AbstractDMLBaseExecutor的doExecute()执行executeAutoCommitFalse或executeAutoCommitTrue方法。
  • executeAutoCommitTrue方法内部也是通过executeAutoCommitFalse实现的。
  • executeAutoCommitFalse()按照保存beforeImage()、statementCallback.execute()、afterImage()、prepareUndoLog()顺序执行。
  • beforeImage()负责保存执行前镜像、afterImage()负责保存执行后镜像、prepareUndoLog()负责保存回滚日志。
  • statementCallback.execute()负责执行statement操作。


InsertExecutor

public class InsertExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {

    public InsertExecutor(StatementProxy statementProxy, 
        StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }

    @Override
    protected TableRecords beforeImage() throws SQLException {
        return TableRecords.empty(getTableMeta());
    }

    @Override
    protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
        SQLInsertRecognizer recogizier = (SQLInsertRecognizer)sqlRecognizer;
        List<String> insertColumns = recogizier.getInsertColumns();
        TableMeta tmeta = getTableMeta();
        TableRecords afterImage = null;
        if (tmeta.containsPK(insertColumns)) {
            // insert values including PK
            List<Object> pkValues = null;
            String pk = tmeta.getPkName();
            for (int paramIdx = 0; paramIdx < insertColumns.size(); paramIdx++) {
                if (insertColumns.get(paramIdx).equalsIgnoreCase(pk)) {
                    if (statementProxy instanceof PreparedStatementProxy) {
                        pkValues = ((PreparedStatementProxy) statementProxy).getParamsByIndex(paramIdx);
                    } else {
                        List<List<Object>> insertRows = recogizier.getInsertRows();
                        pkValues = new ArrayList<>(insertRows.size());
                        for (List<Object> row : insertRows) {
                            pkValues.add(row.get(paramIdx));
                        }
                    }
                    break;
                }
            }
            if (pkValues == null) {
                throw new ShouldNeverHappenException();
            }
            afterImage = getTableRecords(pkValues);

        } else {
            // PK is just auto generated
            Map<String, ColumnMeta> pkMetaMap = getTableMeta().getPrimaryKeyMap();
            if (pkMetaMap.size() != 1) {
                throw new NotSupportYetException();
            }
            ColumnMeta pkMeta = pkMetaMap.values().iterator().next();
            if (!pkMeta.isAutoincrement()) {
                throw new ShouldNeverHappenException();
            }

            ResultSet genKeys = null;
            try {
                genKeys = statementProxy.getTargetStatement().getGeneratedKeys();
            } catch (SQLException e) {
                // java.sql.SQLException: Generated keys not requested. You need to
                // specify Statement.RETURN_GENERATED_KEYS to
                // Statement.executeUpdate() or Connection.prepareStatement().
                if ("S1009".equalsIgnoreCase(e.getSQLState())) {
                    genKeys = statementProxy.getTargetStatement().executeQuery("SELECT LAST_INSERT_ID()");
                } else {
                    throw e;
                }
            }
            List<Object> pkValues = new ArrayList<>();
            while (genKeys.next()) {
                Object v = genKeys.getObject(1);
                pkValues.add(v);
            }

            afterImage = getTableRecords(pkValues);

        }

        if (afterImage == null) {
            throw new SQLException("Failed to build after-image for insert");
        }

        return afterImage;
    }
}

说明:

  • InsertExecutor主要实现的功能是beforeImage()和afterImage()。
  • beforeImage()保存插入前镜像。
  • afterImage()保存插入后镜像。


期待

 该篇文章把Executor的执行过程讲解清楚了,后续针对Executor中涉及的通用功能代码进行介绍。

目录
相关文章
|
5月前
|
Java
解析Java线程池:参数详解与执行流程
解析Java线程池:参数详解与执行流程
58 1
|
6月前
|
消息中间件 存储 机器人
Flink执行问题之执行checkpoint失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
SQL 存储 Java
Flowable 已经执行完毕的流程去哪找?
Flowable 已经执行完毕的流程去哪找?
Flowable 已经执行完毕的流程去哪找?
CacheCloud中Task任务执行过程
CacheCloud中Task任务执行过程实质是先将task进行堆积到taskQueue中,然后进行统一多线程进行处理。
119 0
|
SQL 数据库
RM在Seata AT模式中的sql执行流程
RM在Seata AT模式中的sql执行流程
95 0
|
Java Spring
spring中事务执行完成后/回滚后执行
有时候业务场景需要 在事务结束后执行一些更新操作; 或者在事务失败回滚后执行一些更新表状态的操作;
339 0
|
存储 算法 Unix
bthread源码剖析(四): 通过ParkingLot实现Worker间任务状态同步
通过之前的文章我们知道TaskGroup(以下简称TG)是在死循环等待任务,然后切换栈去执行任务。在当前TG没有任务的时候会进行“工作窃取”窃取其他TG的任务。在没有任务的时候TG会“休眠”,当任务出现的时候被唤醒然后消费。
318 0
|
监控 FESCAR
Fescar - RM 提交本地事务流程
开篇  这篇文章的目的是介绍Fescar的提交流程(Commit)和回滚流程(Rollback),这两个流程其实是Fescar中RM的核心逻辑,涉及和TC交互的流程。  由于RM和TC交互涉及到网络通信,所以这块我们暂时只关注RM端的处理流程而暂时忽略网络通信的过程,网络通信的过程值得通过一篇文章单独进行描述。
1523 0
|
分布式计算
Spark2.4.0源码分析之WorldCount 触发作业提交(二)
Final RDD作为参数,通过RDD.collect()函数触发作业提交
1374 0
|
Java 数据库连接 FESCAR
Fescar - RM实现原理介绍
开篇  这个系列开始介绍Fescar当中的RM(Resource Manager),RM负责控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。 Fescar RM介绍 说明: RM负责注册分支事务并上报分支事务状态。
2590 0