5 Mybatis的Executor介绍(一)
目录
Mybatis中所有的Mapper语句的执行都是通过Executor进行的,Executor是Mybatis的一个核心接口,其定义如下。从其定义的接口方法我们可以看出,对应的增删改语句是通过Executor接口的update方法进行的,查询是通过query方法进行的。虽然Executor接口的实现类有BaseExecutor和CachingExecutor,而BaseExecutor的子类又有SimpleExecutor、ReuseExecutor和BatchExecutor,但BaseExecutor是一个抽象类,其只实现了一些公共的封装,而把真正的核心实现都通过方法抽象出来给子类实现,如doUpdate()、doQuery();CachingExecutor只是在Executor的基础上加入了缓存的功能,底层还是通过Executor调用的,所以真正有作用的Executor只有SimpleExecutor、ReuseExecutor和BatchExecutor。它们都是自己实现的Executor核心功能,没有借助任何其它的Executor实现,它们是实现不同也就注定了它们的功能也是不一样的。Executor是跟SqlSession绑定在一起的,每一个SqlSession都拥有一个新的Executor对象,由Configuration创建。
public interface Executor {
ResultHandler NO_RESULT_HANDLER = null;
int update(MappedStatement ms, Object parameter) throws SQLException;
<E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey cacheKey, BoundSql boundSql) throws SQLException;
<E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException;
List<BatchResult> flushStatements() throws SQLException;
void commit(booleanrequired) throws SQLException;
void rollback(booleanrequired) throws SQLException;
CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql);
boolean isCached(MappedStatement ms, CacheKey key);
void clearLocalCache();
void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType);
Transaction getTransaction();
void close(booleanforceRollback);
boolean isClosed();
void setExecutorWrapper(Executor executor);
}
下面是BaseExecutor的源码,我们可以看看它是怎么实现Executor接口的,是怎么开放接口给子类实现的。
public abstract class BaseExecutor implements Executor {
private static final Log log = LogFactory.getLog(BaseExecutor.class);
protected Transaction transaction;
protected Executor wrapper;
protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;
protected PerpetualCache localCache;
protected PerpetualCache localOutputParameterCache;
protected Configuration configuration;
protected int queryStack = 0;
private boolean closed;
protected BaseExecutor(Configuration configuration, Transaction transaction) {
this.transaction = transaction;
this.deferredLoads = new ConcurrentLinkedQueue<DeferredLoad>();
this.localCache = new PerpetualCache("LocalCache");
this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");
this.closed = false;
this.configuration = configuration;
this.wrapper = this;
}
@Override
public Transaction getTransaction() {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
returntransaction;
}
@Override
public void close(boolean forceRollback) {
try {
try {
rollback(forceRollback);
} finally {
if (transaction != null) {
transaction.close();
}
}
} catch (SQLException e) {
// Ignore. There's nothing that can be done at this point.
log.warn("Unexpected exception on closing transaction. Cause: " + e);
} finally {
transaction = null;
deferredLoads = null;
localCache = null;
localOutputParameterCache = null;
closed = true;
}
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public int update(MappedStatement ms, Object parameter) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
clearLocalCache();
return doUpdate(ms, parameter);
}
@Override
public List<BatchResult> flushStatements() throws SQLException {
return flushStatements(false);
}
public List<BatchResult> flushStatements(boolean isRollBack) throws SQLException {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
return doFlushStatements(isRollBack);
}
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameter);
CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
@SuppressWarnings("unchecked")
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
queryStack++;
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
if (queryStack == 0) {
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
// issue #601
deferredLoads.clear();
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// issue #482
clearLocalCache();
}
}
return list;
}
@Override
public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
DeferredLoad deferredLoad = new DeferredLoad(resultObject, property, key, localCache, configuration, targetType);
if (deferredLoad.canLoad()) {
deferredLoad.load();
} else {
deferredLoads.add(new DeferredLoad(resultObject, property, key, localCache, configuration, targetType));
}
}
@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
CacheKey cacheKey = new CacheKey();
cacheKey.update(ms.getId());
cacheKey.update(Integer.valueOf(rowBounds.getOffset()));
cacheKey.update(Integer.valueOf(rowBounds.getLimit()));
cacheKey.update(boundSql.getSql());
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
// mimic DefaultParameterHandler logic
for (int i = 0; i < parameterMappings.size(); i++) {
ParameterMapping parameterMapping = parameterMappings.get(i);
if (parameterMapping.getMode() != ParameterMode.OUT) {
Object value;
String propertyName = parameterMapping.getProperty();
if (boundSql.hasAdditionalParameter(propertyName)) {
value = boundSql.getAdditionalParameter(propertyName);
} else if (parameterObject == null) {
value = null;
} else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
value = parameterObject;
} else {
MetaObject metaObject = configuration.newMetaObject(parameterObject);
value = metaObject.getValue(propertyName);
}
cacheKey.update(value);
}
}
if (configuration.getEnvironment() != null) {
// issue #176
cacheKey.update(configuration.getEnvironment().getId());
}
return cacheKey;
}
@Override
public boolean isCached(MappedStatement ms, CacheKey key) {
return localCache.getObject(key) != null;
}
@Override
public void commit(boolean required) throws SQLException {
if (closed) {
throw new ExecutorException("Cannot commit, transaction is already closed");
}
clearLocalCache();
flushStatements();
if (required) {
transaction.commit();
}
}
@Override
public void rollback(boolean required) throws SQLException {
if (!closed) {
try {
clearLocalCache();
flushStatements(true);
} finally {
if (required) {
transaction.rollback();
}
}
}
}
@Override
public void clearLocalCache() {
if (!closed) {
localCache.clear();
localOutputParameterCache.clear();
}
}
protected abstract int doUpdate(MappedStatement ms, Object parameter)
throws SQLException;
protected abstract List<BatchResult> doFlushStatements(boolean isRollback)
throws SQLException;
protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
throws SQLException;
protected void closeStatement(Statement statement) {
if (statement != null) {
try {
statement.close();
} catch (SQLException e) {
// ignore
}
}
}
private void handleLocallyCachedOutputParameters(MappedStatement ms, CacheKey key, Object parameter, BoundSql boundSql) {
if (ms.getStatementType() == StatementType.CALLABLE) {
final Object cachedParameter = localOutputParameterCache.getObject(key);
if (cachedParameter != null && parameter != null) {
final MetaObject metaCachedParameter = configuration.newMetaObject(cachedParameter);
final MetaObject metaParameter = configuration.newMetaObject(parameter);
for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) {
if (parameterMapping.getMode() != ParameterMode.IN) {
final String parameterName = parameterMapping.getProperty();
final Object cachedValue = metaCachedParameter.getValue(parameterName);
metaParameter.setValue(parameterName, cachedValue);
}
}
}
}
}
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
localCache.putObject(key, EXECUTION_PLACEHOLDER);
try {
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
localCache.removeObject(key);
}
localCache.putObject(key, list);
if (ms.getStatementType() == StatementType.CALLABLE) {
localOutputParameterCache.putObject(key, parameter);
}
returnlist;
}
protected Connection getConnection(Log statementLog) throws SQLException {
Connection connection = transaction.getConnection();
if (statementLog.isDebugEnabled()) {
return ConnectionLogger.newInstance(connection, statementLog, queryStack);
} else {
return connection;
}
}
@Override
public void setExecutorWrapper(Executor wrapper) {
this.wrapper = wrapper;
}
private static class DeferredLoad {
private final MetaObject resultObject;
private final String property;
private final Class<?> targetType;
private final CacheKey key;
private final PerpetualCache localCache;
private final ObjectFactory objectFactory;
private final ResultExtractor resultExtractor;
// issue #781
public DeferredLoad(MetaObject resultObject,
String property,
CacheKey key,
PerpetualCache localCache,
Configuration configuration,
Class<?> targetType) {
this.resultObject = resultObject;
this.property = property;
this.key = key;
this.localCache = localCache;
this.objectFactory = configuration.getObjectFactory();
this.resultExtractor = new ResultExtractor(configuration, objectFactory);
this.targetType = targetType;
}
public boolean canLoad() {
return localCache.getObject(key) != null && localCache.getObject(key) != EXECUTION_PLACEHOLDER;
}
public void load() {
@SuppressWarnings( "unchecked" )
// we suppose we get back a List
List<Object> list = (List<Object>) localCache.getObject(key);
Object value = resultExtractor.extractObjectFromList(list, targetType);
resultObject.setValue(property, value);
}
}
}
5.1 SimpleExecutor
SimpleExecutor是Mybatis执行Mapper语句时默认使用的Executor。它提供最基本的Mapper语句执行功能,没有过多的封装的。SimpleExecutor的源码如下。
public class SimpleExecutor extends BaseExecutor {
public SimpleExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
stmt = prepareStatement(handler, ms.getStatementLog());
returnhandler.update(stmt);
} finally {
closeStatement(stmt);
}
}
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.<E>query(stmt, resultHandler);
} finally {
closeStatement(stmt);
}
}
@Override
public List<BatchResult> doFlushStatements(booleanisRollback) throws SQLException {
return Collections.emptyList();
}
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection);
handler.parameterize(stmt);
returnstmt;
}
}
5.2 ReuseExecutor
ReuseExecutor,顾名思义,是可以重用的Executor。它重用的是Statement对象,它会在内部利用一个Map把创建的Statement都缓存起来,每次在执行一条SQL语句时,它都会去判断之前是否存在基于该SQL缓存的Statement对象,存在而且之前缓存的Statement对象对应的Connection还没有关闭的时候就继续用之前的Statement对象,否则将创建一个新的Statement对象,并将其缓存起来。因为每一个新的SqlSession都有一个新的Executor对象,所以我们缓存在ReuseExecutor上的Statement的作用域是同一个SqlSession。以下是ReuseExecutor的源码。
public class ReuseExecutor extends BaseExecutor {
private final Map<String, Statement> statementMap = new HashMap<String, Statement>();
public ReuseExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
Statement stmt = prepareStatement(handler, ms.getStatementLog());
return handler.update(stmt);
}
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
Statement stmt = prepareStatement(handler, ms.getStatementLog());
return handler.<E>query(stmt, resultHandler);
}
@Override
public List<BatchResult> doFlushStatements(booleanisRollback) throws SQLException {
for (Statement stmt : statementMap.values()) {
closeStatement(stmt);
}
statementMap.clear();
return Collections.emptyList();
}
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
BoundSql boundSql = handler.getBoundSql();
String sql = boundSql.getSql();
if (hasStatementFor(sql)) {
stmt = getStatement(sql);
} else {
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection);
putStatement(sql, stmt);
}
handler.parameterize(stmt);
return stmt;
}
private boolean hasStatementFor(String sql) {
try {
return statementMap.keySet().contains(sql) && !statementMap.get(sql).getConnection().isClosed();
} catch (SQLException e) {
return false;
}
}
private Statement getStatement(String s) {
return statementMap.get(s);
}
private void putStatement(String sql, Statement stmt) {
statementMap.put(sql, stmt);
}
}
5.3 BatchExecutor
BatchExecutor的设计主要是用于做批量更新操作的。其底层会调用Statement的executeBatch()方法实现批量操作。以下是BatchExecutor的源码。
public class BatchExecutor extends BaseExecutor { public static final int BATCH_UPDATE_RETURN_VALUE = Integer.MIN_VALUE + 1002; private final List<Statement> statementList = new ArrayList<Statement>(); private final List<BatchResult> batchResultList = new ArrayList<BatchResult>(); private String currentSql; private MappedStatement currentStatement; public BatchExecutor(Configuration configuration, Transaction transaction) { super(configuration, transaction); } @Override public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException { final Configuration configuration = ms.getConfiguration(); final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null); final BoundSql boundSql = handler.getBoundSql(); final String sql = boundSql.getSql(); final Statement stmt; if (sql.equals(currentSql) && ms.equals(currentStatement)) { int last = statementList.size() - 1; stmt = statementList.get(last); handler.parameterize(stmt);//fix Issues 322 BatchResult batchResult = batchResultList.get(last); batchResult.addParameterObject(parameterObject); } else { Connection connection = getConnection(ms.getStatementLog()); stmt = handler.prepare(connection); handler.parameterize(stmt); //fix Issues 322 currentSql = sql; currentStatement = ms; statementList.add(stmt); batchResultList.add(new BatchResult(ms, sql, parameterObject)); } // handler.parameterize(stmt); handler.batch(stmt); return BATCH_UPDATE_RETURN_VALUE; } @Override public <E> List<E> doQuery(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException { Statement stmt = null; try { flushStatements(); Configuration configuration = ms.getConfiguration(); StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameterObject, rowBounds, resultHandler, boundSql); Connection connection = getConnection(ms.getStatementLog()); stmt = handler.prepare(connection); handler.parameterize(stmt); return handler.<E>query(stmt, resultHandler); } finally { closeStatement(stmt); } } @Override public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException { try { List<BatchResult> results = new ArrayList<BatchResult>(); if (isRollback) { return Collections.emptyList(); } for (int i = 0, n = statementList.size(); i < n; i++) { Statement stmt = statementList.get(i); BatchResult batchResult = batchResultList.get(i); try { batchResult.setUpdateCounts(stmt.executeBatch()); MappedStatement ms = batchResult.getMappedStatement(); List<Object> parameterObjects = batchResult.getParameterObjects(); KeyGenerator keyGenerator = ms.getKeyGenerator(); if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) { Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator; jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects); } else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141 for (Object parameter : parameterObjects) { keyGenerator.processAfter(this, ms, stmt, parameter); } } } catch (BatchUpdateException e) { StringBuilder message = new StringBuilder(); message.append(batchResult.getMappedStatement().getId()) .append(" (batch index #") .append(i + 1) .append(")") .append(" failed."); if (i > 0) { message.append(" ") .append(i) .append(" prior sub executor(s) completed successfully, but will be rolled back."); } throw new BatchExecutorException(message.toString(), e, results, batchResult); } results.add(batchResult); } return results; } finally { for (Statement stmt : statementList) { closeStatement(stmt); } currentSql = null; statementList.clear(); batchResultList.clear(); } } }
5.4 Executor的选择
既然BaseExecutor下面有SimpleExecutor、ReuseExecutor和BatchExecutor,Executor还有一个CachingExecutor的实现,那我们怎么选择使用哪个Executor呢?默认情况下Mybatis的全局配置cachingEnabled=”true”,这就意味着默认情况下我们就会使用一个CachingExecutor来包装我们真正使用的Executor,这个在后续介绍Mybatis的缓存的文章中会介绍。那我们真正使用的BaseExecutor是怎么确定的呢?是通过我们在创建SqlSession的时候确定的。SqlSession都是通过SqlSessionFactory的openSession()创建的,SqlSessionFactory提供了一系列的openSession()方法。
public interface SqlSessionFactory {
SqlSession openSession();
SqlSession openSession(boolean autoCommit);
SqlSession openSession(Connection connection);
SqlSession openSession(TransactionIsolationLevel level);
SqlSession openSession(ExecutorType execType);
SqlSession openSession(ExecutorType execType, boolean autoCommit);
SqlSession openSession(ExecutorType execType, TransactionIsolationLevel level);
SqlSession openSession(ExecutorType execType, Connection connection);
Configuration getConfiguration();
}
从上面SqlSessionFactory提供的方法来看,它一共提供了两类创建SqlSession的方法,一类是没有指定ExecutorType的,一类是指定了ExecutorType的。很显然,指定了ExecutorType时将使用ExecutorType对应类型的Executor。ExecutorType是一个枚举类型,有SIMPLE、REUSE和BATCH三个对象。
5.4.1 默认的Executor
而没有指定ExecutorType时将使用默认的Executor。Mybatis默认的Executor是SimpleExecutor,我们可以通过Mybatis的全局配置defaultExecutorType来进行配置,其可选值也是SIMPLE、REUSE和BATCH。
<setting name="defaultExecutorType" value="SIMPLE"/>
注意,当Mybatis整合Spring后,Spring扫描后生成的Mapper对象,底层使用的SqlSession都是用的默认的Executor。如果我们需要在程序中使用非默认的Executor时,我们可以在Spring的bean容器中声明SqlSessionFactoryBean,然后在需要指定Executor的类中注入SqlSessionFactory,通过SqlSessionFactory来创建指定ExecutorType的SqlSession。
参考文档
Mybatis源码
(注:本文是基于Mybatis3.3.1所写,写于2016年12月24日星期六)