【图文并茂】源码解析MyBatis Sharding-Jdbc SQL语句执行流程详解

简介: 【图文并茂】源码解析MyBatis Sharding-Jdbc SQL语句执行流程详解

本文将详细介绍Mybatis SQL语句执行的全流程,本文与上篇具有一定的关联性,建议先阅读该系列中的前面3篇文章,重点掌握Mybatis Mapper类的初始化过程,因为在Mybatis中,Mapper是执行SQL语句的入口,类似下面这段代码:


1@Service
2public UserService implements IUserService {
3     @Autowired
4    private UserMapper userMapper;
5    public User findUser(Integer id) {
6        return userMapper.find(id);
7    }
8}

开始进入本文的主题,以源码为手段,分析Mybatis执行SQL语句的流行,并且使用了数据库分库分表中间件sharding-jdbc,其版本为sharding-jdbc1.4.1。


为了方便大家对本文的源码分析,先给出Mybatis层面核心类的方法调用序列图。

image.png

cb01fc32f703c88a8c4ec943fcb8100b.jpg

image.png

接下来从从源码的角度对其进行剖析。


温馨提示:在本文的末尾,还会给出一张详细的Mybatis Shardingjdbc语句执行流程图。(请勿错过哦)。


2.1 MapperProxy#invoker


1public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
 2    if (Object.class.equals(method.getDeclaringClass())) {
 3      try {
 4        return method.invoke(this, args);
 5      } catch (Throwable t) {
 6        throw ExceptionUtil.unwrapThrowable(t);
 7      }
 8    }
 9    final MapperMethod mapperMethod = cachedMapperMethod(method);   // @1
10    return mapperMethod.execute(sqlSession, args);                                     // @2
11  }

代码@1:创建并缓存MapperMethod对象。


代码@2:调用MapperMethod对象的execute方法,即mapperInterface中定义的每一个方法最终会对应一个MapperMethod。


2.2 MapperMethod#execute


1public Object execute(SqlSession sqlSession, Object[] args) {
 2    Object result;
 3    if (SqlCommandType.INSERT == command.getType()) { 
 4      Object param = method.convertArgsToSqlCommandParam(args);
 5      result = rowCountResult(sqlSession.insert(command.getName(), param));
 6    } else if (SqlCommandType.UPDATE == command.getType()) {
 7      Object param = method.convertArgsToSqlCommandParam(args);
 8      result = rowCountResult(sqlSession.update(command.getName(), param));
 9    } else if (SqlCommandType.DELETE == command.getType()) {
10      Object param = method.convertArgsToSqlCommandParam(args);
11      result = rowCountResult(sqlSession.delete(command.getName(), param));
12    } else if (SqlCommandType.SELECT == command.getType()) {
13      if (method.returnsVoid() && method.hasResultHandler()) {
14        executeWithResultHandler(sqlSession, args);
15        result = null;
16      } else if (method.returnsMany()) {
17        result = executeForMany(sqlSession, args);
18      } else if (method.returnsMap()) {
19        result = executeForMap(sqlSession, args);
20      } else {
21        Object param = method.convertArgsToSqlCommandParam(args);
22        result = sqlSession.selectOne(command.getName(), param);
23      }
24    } else {
25      throw new BindingException("Unknown execution method for: " + command.getName());
26    }
27    if (result == null && method.getReturnType().isPrimitive() && !method.returnsVoid()) {
28      throw new BindingException("Mapper method '" + command.getName() 
29          + " attempted to return null from a method with a primitive return type (" + method.getReturnType() + ").");
30    }
31    return result;
32  }

该方法主要是根据SQL类型,insert、update、select等操作,执行对应的逻辑,本文我们以查询语句,进行跟踪,进入executeForMany(sqlSession, args)方法。


2.3 MapperMethod#executeForMany


1private <E> Object executeForMany(SqlSession sqlSession, Object[] args) {
 2    List<E> result;
 3    Object param = method.convertArgsToSqlCommandParam(args);
 4    if (method.hasRowBounds()) {
 5      RowBounds rowBounds = method.extractRowBounds(args);
 6      result = sqlSession.<E>selectList(command.getName(), param, rowBounds);
 7    } else {
 8      result = sqlSession.<E>selectList(command.getName(), param);
 9    }
10    // issue #510 Collections & arrays support
11    if (!method.getReturnType().isAssignableFrom(result.getClass())) {
12      if (method.getReturnType().isArray()) {
13        return convertToArray(result);
14      } else {
15        return convertToDeclaredCollection(sqlSession.getConfiguration(), result);
16      }
17    }
18    return result;
19  }

该方法也比较简单,最终通过SqlSession调用selectList方法。


2.4 DefaultSqlSession#selectList


1public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
 2    try {
 3      MappedStatement ms = configuration.getMappedStatement(statement);   // @1
 4      List<E> result = executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);   // @2
 5      return result;
 6    } catch (Exception e) {
 7      throw ExceptionFactory.wrapException("Error querying database.  Cause: " + e, e);
 8    } finally {
 9      ErrorContext.instance().reset();
10    }
11  }

代码@1:根据资源名称获取对应的MappedStatement对象,此时的statement为资源名称,例如com.demo.UserMapper.findUser。至于MappedStatement对象的生成在上一节初始化时已详细介绍过,此处不再重复介绍。


代码@2:调用Executor的query方法。这里说明一下,其实一开始会进入到CachingExecutor#query方法,由于CachingExecutor的Executor delegate属性默认是SimpleExecutor,故最终还是会进入到SimpleExecutor#query中。


接下来我们进入到SimpleExecutor的父类BaseExecutor的query方法中。


2.5 BaseExecutor#query


1public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {   // @1
 2    ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
 3    if (closed) throw new ExecutorException("Executor was closed.");
 4    if (queryStack == 0 && ms.isFlushCacheRequired()) {
 5      clearLocalCache();
 6    }
 7    List<E> list;
 8    try {
 9      queryStack++;
10      list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;                                            // @2
11      if (list != null) {
12        handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
13      } else {
14        list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);                   // @3
15      }
16    } finally {
17      queryStack--;
18    }
19    if (queryStack == 0) {
20      for (DeferredLoad deferredLoad : deferredLoads) {
21        deferredLoad.load();
22      }
23      deferredLoads.clear(); // issue #601
24      if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {                         // @4
25        clearLocalCache(); // issue #482
26      }
27    }
28    return list;
29  }

代码@1:首先介绍一下该方法的入参,这些类都是Mybatis的重要类:


  • MappedStatement ms
    映射语句,一个MappedStatemnet对象代表一个Mapper中的一个方法,是映射的最基本对象。
  • Object parameter
    SQL语句的参数列表。
  • RowBounds rowBounds
    行边界对象,其实就是分页参数limit与size。
  • ResultHandler resultHandler
    结果处理Handler。
  • CacheKey key
    Mybatis缓存Key
  • BoundSql boundSql
    SQL与参数绑定信息,从该对象可以获取在映射文件中的SQL语句。


代码@2:首先从缓存中获取,Mybatis支持一级缓存(SqlSession)与二级缓存(多个SqlSession共享)。


代码@3:从数据库查询结果,然后进入到doQuery方法,执行真正的查询动作。


代码@4:如果一级缓存是语句级别的,则语句执行完毕后,删除缓存。


2.6 SimpleExecutor#doQuery


1public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
 2    Statement stmt = null;
 3    try {
 4      Configuration configuration = ms.getConfiguration();
 5      StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);   // @1
 6      stmt = prepareStatement(handler, ms.getStatementLog());                                                                                                                   // @2
 7      return handler.<E>query(stmt, resultHandler);                                                                                                                                        // @3
 8    } finally {
 9      closeStatement(stmt);
10    }
11  }

代码@1:创建StatementHandler,这里会加入Mybatis的插件扩展机制(将在下篇详细介绍),如图所示:

f76c34a6934fad687df821a9900c302a.jpg

代码@2:创建Statement对象,注意,这里就是JDBC协议的java.sql.Statement对象了。


代码@3:使用Statment对象执行SQL语句。


接下来详细介绍Statement对象的创建过程与执行过程,即分布详细跟踪代码@2与代码@3。

image.png


3.1 java.sql.Connection对象创建


SimpleExecutor#prepareStatement


1private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
2    Statement stmt;
3    Connection connection = getConnection(statementLog);  // @1
4    stmt = handler.prepare(connection);                                  // @2
5    handler.parameterize(stmt);                                               // @3
6    return stmt;
7}

创建Statement对象,分成三步:


代码@1:创建java.sql.Connection对象。


代码@2:使用Connection对象创建Statment对象。


代码@3:对Statement进行额外处理,特别是PrepareStatement的参数设置(ParameterHandler)。


SimpleExecutor#getConnection


getConnection方法,根据上面流程图所示,先是进入到SpringManagedTransaction,再通过spring-jdbc框架,利用DataSourceUtils获取连接,其代码如下:

1org.mybatis.spring.transaction.SpringManagedTransaction#doGetConnection
 2public static Connection doGetConnection(DataSource dataSource) throws SQLException {  
 3        Assert.notNull(dataSource, "No DataSource specified");
 4        ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource); 
 5        if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
 6            conHolder.requested();
 7            if (!conHolder.hasConnection()) {
 8                conHolder.setConnection(dataSource.getConnection());
 9            }
10            return conHolder.getConnection();
11        }
12        // Else we either got no holder or an empty thread-bound holder here.
13
14        logger.debug("Fetching JDBC Connection from DataSource");
15        Connection con = dataSource.getConnection();      // @1
16
17        // 这里省略与事务处理相关的代码
18        return con;
19    }

代码@1:通过DataSource获取connection,那此处的DataSource是“谁”呢?看一下我们工程的配置:

4b2caad9ca58d664c01f0e396696bd38.jpg

d9de08282de9e0fde0c144ea42078267.jpg

故最终dataSouce.getConnection获取的连接,是从SpringShardingDataSource中获取连接。


1com.dangdang.ddframe.rdb.sharding.jdbc.ShardingDataSource#getConnection
2public ShardingConnection getConnection() throws SQLException {
3        MetricsContext.init(shardingProperties);
4        return new ShardingConnection(shardingContext);
5}

返回的结果如下:

710be40d27a950918ea182d3acf8ee8f.jpg

备注:这里只是返回了一个ShardingConnection对象,该对象包含了分库分表上下文,但此时并没有执行具体的分库操作(切换数据源)。


Connection的获取流程清楚后,我们继续来看一下Statemnet对象的创建。


3.2 java.sql.Statement对象创建


1stmt = prepareStatement(handler, ms.getStatementLog());

上面语句的调用链:RoutingStatementHandler -》BaseStatementHand


BaseStatementHandler#prepare


3public Statement prepare(Connection connection) throws SQLException {
 4    ErrorContext.instance().sql(boundSql.getSql());
 5    Statement statement = null;
 6    try {
 7      statement = instantiateStatement(connection);    // @1
 8      setStatementTimeout(statement);                         // @2
 9      setFetchSize(statement);                                      // @3
10      return statement;
11    } catch (SQLException e) {
12      closeStatement(statement);
13      throw e;
14    } catch (Exception e) {
15      closeStatement(statement);
16      throw new ExecutorException("Error preparing statement.  Cause: " + e, e);
17    }
18  }

代码@1:根据Connection对象(本文中是ShardingConnection)来创建Statement对象,其默认实现类:PreparedStatementHandler#instantiateStatement方法。


代码@2:为Statement设置超时时间。


代码@3:设置fetchSize。

1PreparedStatementHandler#instantiateStatement
 2protected Statement instantiateStatement(Connection connection) throws SQLException {
 3    String sql = boundSql.getSql();
 4    if (mappedStatement.getKeyGenerator() instanceof Jdbc3KeyGenerator) {
 5      String[] keyColumnNames = mappedStatement.getKeyColumns();
 6      if (keyColumnNames == null) {
 7        return connection.prepareStatement(sql, PreparedStatement.RETURN_GENERATED_KEYS);
 8      } else {
 9        return connection.prepareStatement(sql, keyColumnNames);
10      }
11    } else if (mappedStatement.getResultSetType() != null) {
12      return connection.prepareStatement(sql, mappedStatement.getResultSetType().getValue(), ResultSet.CONCUR_READ_ONLY);
13    } else {
14      return connection.prepareStatement(sql);
15    }
16  }

其实Statement对象的创建,就比较简单了,既然Connection是ShardingConnection,那就看一下其对应的prepareStatement方法即可。


ShardingConnection#prepareStatement


1
 3public PreparedStatement prepareStatement(final String sql) throws SQLException {   // sql,为配置在mybatis xml文件中的sql语句
 4        return new ShardingPreparedStatement(this, sql);
 5}
 6ShardingPreparedStatement(final ShardingConnection shardingConnection, 
 7            final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
 8        super(shardingConnection, resultSetType, resultSetConcurrency, resultSetHoldability);
 9        preparedSQLRouter = shardingConnection.getShardingContext().getSqlRouteEngine().prepareSQL(sql);
10}

在构建ShardingPreparedStatement对象的时候,会根据SQL语句创建解析SQL路由的解析器对象,但此时并不会执行相关的路由计算,PreparedStatement对象创建完成后,就开始进入SQL执行流程中。

image.png

接下来我们继续看SimpleExecutor#doQuery方法的第3步,执行SQL语句:


1handler.<E>query(stmt, resultHandler)。


首先会进入RoutingStatementHandler这个类中,进行Mybatis层面的路由(主要是根据Statement类型)

c0c661b212be9c361c85ed9fced3d46a.jpg

然后进入到PreparedStatementHandler#query中


PreparedStatementHandler#query


3public <E> List<E> query(Statement statement, ResultHandler resultHandler) throws SQLException {
4    PreparedStatement ps = (PreparedStatement) statement;
5    ps.execute();  // @1
6    return resultSetHandler.<E> handleResultSets(ps);  // @2
7}

代码@1:调用PreparedStatement的execute方法,由于本例是使用了Sharding-jdbc分库分表,此时调用的具体实现为:ShardingPreparedStatement。


代码@2:处理结果。


我们接下来分别来跟进execute与结果处理方法。


ShardingPreparedStatement#execute


2public boolean execute() throws SQLException {
3    try {
4        return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routeSQL()).execute(); // @1
5    } finally {
6        clearRouteContext();
7    }
8}

这里奥妙无穷,其关键点如下:


1)创造PreparedStatementExecutor对象,其两个核心参数:

  • ExecutorEngine executorEngine:shardingjdbc执行引擎。
  • Collection< PreparedStatementExecutorWrapper> preparedStatemenWrappers
    一个集合,每一个集合是PreparedStatement的包装类,这个集合如何而来?

2)preparedStatemenWrappers是通过routeSQL方法产生的。

3)最终调用PreparedStatementExecutor方法的execute来执行。


接下来分别看一下routeSQL与execute方法。


ShardingPreparedStatement#routeSQL


3private List<PreparedStatementExecutorWrapper> routeSQL() throws SQLException {
 4        List<PreparedStatementExecutorWrapper> result = new ArrayList<>();
 5        SQLRouteResult sqlRouteResult = preparedSQLRouter.route(getParameters());   // @1
 6        MergeContext mergeContext = sqlRouteResult.getMergeContext();                      
 7        setMergeContext(mergeContext);
 8        setGeneratedKeyContext(sqlRouteResult.getGeneratedKeyContext());
 9        for (SQLExecutionUnit each : sqlRouteResult.getExecutionUnits()) {                      // @2          
10            PreparedStatement preparedStatement = (PreparedStatement) getStatement(getShardingConnection().getConnection(each.getDataSource(), sqlRouteResult.getSqlStatementType()), each.getSql());     // @3
11            replayMethodsInvocation(preparedStatement);
12            getParameters().replayMethodsInvocation(preparedStatement);
13            result.add(wrap(preparedStatement, each));
14        }
15        return result;
16}

代码@1:根据SQL参数进行路由计算,本文暂不关注其具体实现细节,这些将在具体分析Sharding-jdbc时具体详解,在这里就直观看一下其结果:


代码@2、@3:对分库分表的结果进行遍历,然后使用底层Datasource来创建Connection,创建PreparedStatement 对象。


routeSQL就暂时讲到这,从这里我们得知,会在这里根据路由结果,使用底层的具体数据源创建对应的Connection与PreparedStatement 对象。


PreparedStatementExecutor#execute


1
 3public boolean execute() {
 4    Context context = MetricsContext.start("ShardingPreparedStatement-execute");
 5    eventPostman.postExecutionEvents();
 6    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
 7    final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
 8    try {
 9        if (1 == preparedStatementExecutorWrappers.size()) {     // @1
10            PreparedStatementExecutorWrapper preparedStatementExecutorWrapper = preparedStatementExecutorWrappers.iterator().next();
11            return executeInternal(preparedStatementExecutorWrapper, isExceptionThrown, dataMap);
12        }
13        List<Boolean> result = executorEngine.execute(preparedStatementExecutorWrappers, new ExecuteUnit<PreparedStatementExecutorWrapper, Boolean>() {    // @2
14
15            @Override
16            public Boolean execute(final PreparedStatementExecutorWrapper input) throws Exception {
17                synchronized (input.getPreparedStatement().getConnection()) {
18                    return executeInternal(input, isExceptionThrown, dataMap);
19                }
20            }
21        });
22        return (null == result || result.isEmpty()) ? false : result.get(0);
23    } finally {
24        MetricsContext.stop(context);
25    }
26 }

代码@1:如果计算出来的路由信息为1个,则同步执行。


代码@2:如果计算出来的路由信息有多个,则使用线程池异步执行。


那还有一个问题,通过PreparedStatement#execute方法执行后,如何返回结果呢?特别是异步执行的。


在上文其实已经谈到:


DefaultResultSetHandler#handleResultSets


1
 3public List<Object> handleResultSets(Statement stmt) throws SQLException {
 4    ErrorContext.instance().activity("handling results").object(mappedStatement.getId());
 5
 6    final List<Object> multipleResults = new ArrayList<Object>();
 7
 8    int resultSetCount = 0;
 9    ResultSetWrapper rsw = getFirstResultSet(stmt);         // @1
10    //省略部分代码,完整代码可以查看DefaultResultSetHandler方法。
11    return collapseSingleResultList(multipleResults);
12  }
13
14private ResultSetWrapper getFirstResultSet(Statement stmt) throws SQLException {
15    ResultSet rs = stmt.getResultSet();              // @2
16    while (rs == null) {
17      // move forward to get the first resultset in case the driver
18      // doesn't return the resultset as the first result (HSQLDB 2.1)
19      if (stmt.getMoreResults()) {
20        rs = stmt.getResultSet();
21      } else {
22        if (stmt.getUpdateCount() == -1) {
23          // no more results. Must be no resultset
24          break;
25        }
26      }
27    }
28    return rs != null ? new ResultSetWrapper(rs, configuration) : null;
29  }

我们看一下其关键代码如下:


代码@1:调用Statement#getResultSet()方法,如果使用shardingJdbc,则会调用ShardingStatement#getResultSet(),并会处理分库分表结果集的合并,在这里就不详细进行介绍,该部分会在shardingjdbc专栏详细分析。


代码@2:jdbc statement中获取结果集的通用写法,这里也不过多的介绍。

mybatis shardingjdbc SQL执行流程就介绍到这里了,为了方便大家对上述流程的理解,最后给出SQL执行的流程图:

fa6c97eb835a057f1fbc996305be670e.jpg

Mybatis Sharding-Jdbc的SQL执行流程就介绍到这里了,从图中也能清晰看到Mybatis的拆件机制,将在下文详细介绍。

相关文章
|
19天前
|
SQL XML Java
mybatis 调用修改SQL时 出现了一个问题 没有修改成功也没有报错
mybatis 调用修改SQL时 出现了一个问题 没有修改成功也没有报错
19 0
|
2天前
|
SQL Java 数据库连接
mybatis动态sql
mybatis动态sql
|
4天前
|
SQL Java 数据库连接
MyBatis #与$的区别以及动态SQL
MyBatis #与$的区别以及动态SQL
8 0
|
5天前
|
SQL Java 数据库连接
【mybatis】动态sql之批量增删改查
【mybatis】动态sql之批量增删改查
10 0
|
16天前
|
SQL 安全 Java
【Mybatis】Mybatis如何防止sql注入
【Mybatis】Mybatis如何防止sql注入
|
17天前
|
SQL Java 数据库连接
【Mybatis】动态sql之sql的复用
【Mybatis】动态sql之sql的复用
12 0
|
18天前
|
SQL Java 关系型数据库
mybatis-plus启动时自动执行sql脚本
mybatis-plus启动时自动执行sql脚本
20 1
|
20天前
|
SQL 分布式计算 资源调度
一文解析 ODPS SQL 任务优化方法原理
本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,覆盖了部分调优方法的分析,从知道怎么优化,到为什么这样优化,以及还能怎样优化。
103482 1
|
20天前
|
SQL Java 数据库连接
Javaweb之Mybatis的动态SQL的详细解析
Javaweb之Mybatis的动态SQL的详细解析
11 0
|
20天前
|
XML Java 数据库连接
Javaweb之Mybatis的XML配置文件的详细解析
Javaweb之Mybatis的XML配置文件的详细解析
17 0