开篇
这篇文章的目的是介绍ExecuteTemplate中调用的Executor的执行过程。
Executor的类图
说明:
- BaseTransactionalExecutor提供解析SQL语句获取元数据的功能。
- AbstractDMLBaseExecutor提供执行SQL的功能包括doExecute。
- Insert/Delete/UpdateExecutor提供SQL执行前后镜像获取功能。
InsertExecutor执行过程
说明:
- Executor的整个执行的过程(以InsertExecutor为例),整个是父类调用子类的过程。
- 对比InsertExecutor的类图的接口能够更好的理解整个调用过程。
Executor源码介绍 - 执行过程
StatementProxy
public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {
@Override
public ResultSet executeQuery(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, new StatementCallback<ResultSet, T>() {
@Override
public ResultSet execute(Statement statement, Object... args) throws SQLException {
return statement.executeQuery((String) args[0]);
}
}, sql);
}
}
说明:
- StatementProxy的核心调用逻辑。
- ExecuteTemplate.execute负责调用ExecuteTemplate。
- StatementCallback()负责在Executor当中被回调。
ExecuteTemplate
public class ExecuteTemplate {
public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
return execute(null, statementProxy, statementCallback, args);
}
public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
if (!RootContext.inGlobalTransaction()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
// sqlRecognizer 执行过程中传入值为null,走的分支代码
if (sqlRecognizer == null) {
// SQLVisitorFactory.get是一个核心代码,会具体分析
sqlRecognizer = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
statementProxy.getConnectionProxy().getDbType());
}
Executor<T> executor = null;
if (sqlRecognizer == null) {
executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
} else {
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case UPDATE:
executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
break;
}
}
T rs = null;
try {
rs = executor.execute(args);
} catch (Throwable ex) {
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
// Turn everything into SQLException
new SQLException(ex);
}
}
return rs;
}
}
说明:
- ExecuteTemplate 执行InsertExecutor的整个过程如上述代码。
- 获取SQLRecognizer sqlRecognizer对象,sqlRecognizer = SQLVisitorFactory.get()。
- 创建InsertExecutor对象,executor = new InsertExecutor()。
- 执行InsertExecutor对象并返回结果,rs = executor.execute(args);
BaseTransactionalExecutor
public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {
protected StatementProxy<S> statementProxy;
protected StatementCallback<T, S> statementCallback;
protected SQLRecognizer sqlRecognizer;
private TableMeta tableMeta;
public BaseTransactionalExecutor(StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
this.statementProxy = statementProxy;
this.statementCallback = statementCallback;
this.sqlRecognizer = sqlRecognizer;
}
@Override
public Object execute(Object... args) throws Throwable {
String xid = RootContext.getXID();
statementProxy.getConnectionProxy().bind(xid);
return doExecute(args);
}
protected abstract Object doExecute(Object... args) throws Throwable;
}
说明:
- 关注BaseTransactionalExecutor的构造函数。
- BaseTransactionalExecutor的execute是执行的入口函数,调用抽象方法doExecute()
- BaseTransactionalExecutor的doExecute()在子类中实现。
AbstractDMLBaseExecutor
public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDMLBaseExecutor.class);
public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
super(statementProxy, statementCallback, sqlRecognizer);
}
@Override
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
return executeAutoCommitFalse(args);
}
}
protected T executeAutoCommitFalse(Object[] args) throws Throwable {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
T result = null;
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
LockRetryController lockRetryController = new LockRetryController();
try {
connectionProxy.setAutoCommit(false);
while (true) {
try {
result = executeAutoCommitFalse(args);
connectionProxy.commit();
break;
} catch (LockConflictException lockConflict) {
connectionProxy.getTargetConnection().rollback();
lockRetryController.sleep(lockConflict);
}
}
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("exception occur", e);
throw e;
} finally {
connectionProxy.setAutoCommit(true);
}
return result;
}
}
说明:
- BaseTransactionalExecutor的调用子类AbstractDMLBaseExecutor的doExecute方法。
- AbstractDMLBaseExecutor的doExecute()执行executeAutoCommitFalse或executeAutoCommitTrue方法。
- executeAutoCommitTrue方法内部也是通过executeAutoCommitFalse实现的。
- executeAutoCommitFalse()按照保存beforeImage()、statementCallback.execute()、afterImage()、prepareUndoLog()顺序执行。
- beforeImage()负责保存执行前镜像、afterImage()负责保存执行后镜像、prepareUndoLog()负责保存回滚日志。
- statementCallback.execute()负责执行statement操作。
InsertExecutor
public class InsertExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
public InsertExecutor(StatementProxy statementProxy,
StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {
super(statementProxy, statementCallback, sqlRecognizer);
}
@Override
protected TableRecords beforeImage() throws SQLException {
return TableRecords.empty(getTableMeta());
}
@Override
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
SQLInsertRecognizer recogizier = (SQLInsertRecognizer)sqlRecognizer;
List<String> insertColumns = recogizier.getInsertColumns();
TableMeta tmeta = getTableMeta();
TableRecords afterImage = null;
if (tmeta.containsPK(insertColumns)) {
// insert values including PK
List<Object> pkValues = null;
String pk = tmeta.getPkName();
for (int paramIdx = 0; paramIdx < insertColumns.size(); paramIdx++) {
if (insertColumns.get(paramIdx).equalsIgnoreCase(pk)) {
if (statementProxy instanceof PreparedStatementProxy) {
pkValues = ((PreparedStatementProxy) statementProxy).getParamsByIndex(paramIdx);
} else {
List<List<Object>> insertRows = recogizier.getInsertRows();
pkValues = new ArrayList<>(insertRows.size());
for (List<Object> row : insertRows) {
pkValues.add(row.get(paramIdx));
}
}
break;
}
}
if (pkValues == null) {
throw new ShouldNeverHappenException();
}
afterImage = getTableRecords(pkValues);
} else {
// PK is just auto generated
Map<String, ColumnMeta> pkMetaMap = getTableMeta().getPrimaryKeyMap();
if (pkMetaMap.size() != 1) {
throw new NotSupportYetException();
}
ColumnMeta pkMeta = pkMetaMap.values().iterator().next();
if (!pkMeta.isAutoincrement()) {
throw new ShouldNeverHappenException();
}
ResultSet genKeys = null;
try {
genKeys = statementProxy.getTargetStatement().getGeneratedKeys();
} catch (SQLException e) {
// java.sql.SQLException: Generated keys not requested. You need to
// specify Statement.RETURN_GENERATED_KEYS to
// Statement.executeUpdate() or Connection.prepareStatement().
if ("S1009".equalsIgnoreCase(e.getSQLState())) {
genKeys = statementProxy.getTargetStatement().executeQuery("SELECT LAST_INSERT_ID()");
} else {
throw e;
}
}
List<Object> pkValues = new ArrayList<>();
while (genKeys.next()) {
Object v = genKeys.getObject(1);
pkValues.add(v);
}
afterImage = getTableRecords(pkValues);
}
if (afterImage == null) {
throw new SQLException("Failed to build after-image for insert");
}
return afterImage;
}
}
说明:
- InsertExecutor主要实现的功能是beforeImage()和afterImage()。
- beforeImage()保存插入前镜像。
- afterImage()保存插入后镜像。
期待
该篇文章把Executor的执行过程讲解清楚了,后续针对Executor中涉及的通用功能代码进行介绍。