开篇
这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。
个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。这篇文章讲解Executor的实现类UpdateExecutor。
类依赖图
说明:
- 着重讲解UpdateExecutor实现类。
UpdateExecutor方法介绍
public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
public UpdateExecutor(StatementProxy statementProxy, StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {
super(statementProxy, statementCallback, sqlRecognizer);
}
@Override
protected TableRecords beforeImage() throws SQLException {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
TableMeta tmeta = getTableMeta();
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
if (!tmeta.containsPK(updateColumns)) {
// PK should be included.
selectSQLAppender.append(getColumnNameInSQL(tmeta.getPkName()) + ", ");
}
for (int i = 0; i < updateColumns.size(); i++) {
selectSQLAppender.append(updateColumns.get(i));
if (i < (updateColumns.size() - 1)) {
selectSQLAppender.append(", ");
}
}
String whereCondition = null;
ArrayList<Object> paramAppender = new ArrayList<>();
if (statementProxy instanceof ParametersHolder) {
whereCondition = recognizer.getWhereCondition((ParametersHolder)statementProxy, paramAppender);
} else {
whereCondition = recognizer.getWhereCondition();
}
selectSQLAppender.append(" FROM " + getFromTableInSQL() + " WHERE " + whereCondition + " FOR UPDATE");
String selectSQL = selectSQLAppender.toString();
TableRecords beforeImage = null;
PreparedStatement ps = null;
Statement st = null;
ResultSet rs = null;
try {
if (paramAppender.isEmpty()) {
st = statementProxy.getConnection().createStatement();
rs = st.executeQuery(selectSQL);
} else {
ps = statementProxy.getConnection().prepareStatement(selectSQL);
for (int i = 0; i< paramAppender.size(); i++) {
ps.setObject(i + 1, paramAppender.get(i));
}
rs = ps.executeQuery();
}
beforeImage = TableRecords.buildRecords(tmeta, rs);
} finally {
if (rs != null) {
rs.close();
}
if (st != null) {
st.close();
}
if (ps != null) {
ps.close();
}
}
return beforeImage;
}
}
说明:
- UpdateExecutor需要保存SQL执行前的镜像。
- 执行前镜像的准备整体思路是按照按照update的更新条件逆向拼接正向查询SQL。
- 查询字段当中必须包含主键原因是因为afterImage()操作当中需要根据主键进行查询。原因猜测因为update会把where条件中字段的值变更,保存主键最安全。
- 查询的SQL的select的字段从update字段中解析并加上主键列。
- 查询的SQL的条件通过解析update的条件生成。
- 根据TableRecords.buildRecords生成执行前镜像。
public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
public UpdateExecutor(StatementProxy statementProxy, StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {
super(statementProxy, statementCallback, sqlRecognizer);
}
@Override
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
TableMeta tmeta = getTableMeta();
if (beforeImage == null || beforeImage.size() == 0) {
return TableRecords.empty(getTableMeta());
}
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
if (!tmeta.containsPK(updateColumns)) {
// PK should be included.
selectSQLAppender.append(getColumnNameInSQL(tmeta.getPkName()) + ", ");
}
for (int i = 0; i < updateColumns.size(); i++) {
selectSQLAppender.append(updateColumns.get(i));
if (i < (updateColumns.size() - 1)) {
selectSQLAppender.append(", ");
}
}
List<Field> pkRows = beforeImage.pkRows();
selectSQLAppender.append(
" FROM " + getFromTableInSQL() + " WHERE " + buildWhereConditionByPKs(pkRows) + " FOR UPDATE");
String selectSQL = selectSQLAppender.toString();
TableRecords afterImage = null;
PreparedStatement pst = null;
ResultSet rs = null;
try {
pst = statementProxy.getConnection().prepareStatement(selectSQL);
int index = 0;
for (Field pkField : pkRows) {
index++;
pst.setObject(index, pkField.getValue(), pkField.getType());
}
rs = pst.executeQuery();
afterImage = TableRecords.buildRecords(tmeta, rs);
} finally {
if (rs != null) {
rs.close();
}
if (pst != null) {
pst.close();
}
}
return afterImage;
}
}
说明:
- UpdateExecutor需要保存SQL执行后的镜像。
- UpdateExecutor执行后的镜像通过查询更新的字段字段生成的。
- 查询更新后镜像的SQL的select的字段从update字段中解析并加上主键列。
- 查询更新后镜像的SQL的条件是beforeImage()查询得到的主键key。
- 根据TableRecords.buildRecords生成执行前镜像。
期待
UpdateExecutor生成执行前后镜像的过程分析完后,接下去会分析生成镜像日志的流程。