面试官问我:看过sharding-jdbc的源码吗?我吧啦吧啦说了一通!!

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 在产品初期快速迭代的过程中,往往为了快速上线而占据市场,在后端开发的过程中往往不会过多的考虑分布式和微服务,往往会将后端服务做成一个单体应用,而数据库也是一样,最初会把所有的业务数据都放到一个数据库中,即所谓的单实例数据库。随着业务的迅速发展,将所有数据都放在一个数据库中已经不足以支撑业务发展的需要。此时,就会对系统进行分布式改造,而数据库业务进行分库分表的拆分。那么,问题来了,如何更好的访问和管理拆分后的数据库呢?业界已经有很多成熟的解决方案,其中,一个非常优秀的解决方案就是:Apache ShardingSphere。今天,我们就从源码级别来共同探讨下sharding-jdbc的核心源码。

sharding-jdbc经典用法

Sharding-Jdbc 是一个轻量级的分库分表框架,使用时最关键的是配制分库分表策略,其余的和使用普通的 MySQL 驱动一样,几乎不用改代码。例如下面的代码片段。

try(DataSource dataSource =  ShardingDataSourceFactory.createDataSource(
    createDataSourceMap(), shardingRuleConfig, new Properties()) {
    Connection connection = dataSource.getConnection();
    ...
}

我们在程序中拿到Connection对象后,就可以像使用普通的JDBC一样来使用sharding-jdbc操作数据库了。

sharding-jdbc包结构

sharding-jdbc  
    ├── sharding-jdbc-core      重写DataSource/Connection/Statement/ResultSet四大对象
    └── sharding-jdbc-orchestration        配置中心
sharding-core
    ├── sharding-core-api       接口和配置类 
    ├── sharding-core-common    通用分片策略实现...
    ├── sharding-core-entry     SQL解析、路由、改写,核心类BaseShardingEngine
    ├── sharding-core-route     SQL路由,核心类StatementRoutingEngine
    ├── sharding-core-rewrite   SQL改写,核心类ShardingSQLRewriteEngine
    ├── sharding-core-execute   SQL执行,核心类ShardingExecuteEngine
    └── sharding-core-merge     结果合并,核心类MergeEngine
shardingsphere-sql-parser 
    ├── shardingsphere-sql-parser-spi       SQLParserEntry,用于初始化SQLParser
    ├── shardingsphere-sql-parser-engine    SQL解析,核心类SQLParseEngine
    ├── shardingsphere-sql-parser-relation
    └── shardingsphere-sql-parser-mysql     MySQL解析器,核心类MySQLParserEntry和MySQLParser
shardingsphere-underlying           基础接口和api
    ├── shardingsphere-rewrite      SQLRewriteEngine接口
    ├── shardingsphere-execute      QueryResult查询结果
    └── shardingsphere-merge        MergeEngine接口
shardingsphere-spi                  SPI加载工具类
sharding-transaction
    ├── sharding-transaction-core   接口ShardingTransactionManager,SPI加载  
    ├── sharding-transaction-2pc    实现类XAShardingTransactionManager
    └── sharding-transaction-base   实现类SeataATShardingTransactionManager

sharding-jdbc中的四大对象

所有的一切都从 ShardingDataSourceFactory 开始的,创建了一个 ShardingDataSource 的分片数据源。除了 ShardingDataSource(分片数据源),在 Sharding-Sphere 中还有  MasterSlaveDataSourceFactory(主从数据源)、EncryptDataSourceFactory(脱敏数据源)。

public static DataSource createDataSource(
        final Map<String, DataSource> dataSourceMap,
        final ShardingRuleConfiguration shardingRuleConfig,
        final Properties props) throws SQLException {
    return new ShardingDataSource(dataSourceMap,
               new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
}

说明: 本文主要以 ShardingDataSource 为切入点分析 Sharding-Sphere 是如何对 JDBC 四大对象 DataSource、Connection、Statement、ResultSet 进行封装的。

DataSource

这里,涉及到两个比较重要的接口,一个是DataSource,一个是Connection。我们首先来看下它们的类图。

  • DataSource微信图片_20211119154445.jpg
  • Connection微信图片_20211119154452.jpg

DataSource 和 Connection 都比较简单,没有处理过多的逻辑,只是 dataSourceMap, shardingRule 进行简单的封装。

ShardingDataSource 持有对数据源和分片规则,可以通过 getConnection 方法获取 ShardingConnection 连接。

private final ShardingRuntimeContext runtimeContext = new ShardingRuntimeContext(
                dataSourceMap, shardingRule, props, getDatabaseType());
@Override
public final ShardingConnection getConnection() {
    return new ShardingConnection(getDataSourceMap(), runtimeContext,
            TransactionTypeHolder.get());
}

Connection

ShardingConnection 可以创建 Statement 和 PrepareStatement 两种运行方式,如下代码所示。

@Override
public Statement createStatement(final int resultSetType,
        final int resultSetConcurrency, final int resultSetHoldability) {
    return new ShardingStatement(this, resultSetType,
            resultSetConcurrency, resultSetHoldability);
}
@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType,
        final int resultSetConcurrency, final int resultSetHoldability)
        throws SQLException {
    return new ShardingPreparedStatement(this, sql, resultSetType,
            resultSetConcurrency, resultSetHoldability);
}

说明: ShardingConnection 主要是将创建 ShardingStatement 和  ShardingPreparedStatement 两个对象,主要的执行逻辑都在 Statement 对象中。另外,ShardingConnection 还有两个重要的功能,一个是获取真正的数据库连接,一个是事务提交功能。

Statement

Statement 相对来说比较复杂,因为它都是 JDBC 的真正执行器,所有逻辑都封装在 Statement 中。我们来看下Statement的类图

微信图片_20211119154502.jpg

对于Statement,我就不做过对的描述了,相信使用过JDBC的小伙伴,对Statement都不陌生了。

ResultSet

ResultSet类图如下所示。

微信图片_20211119154509.jpg

我们从源码中可以看出:ShardingResultSet 只是对 MergedResult 的简单封装。

private final MergedResult mergeResultSet;
@Override
public boolean next() throws SQLException {
    return mergeResultSet.next();
}

sharding-jdbc-core核心分析

ShardingStatement 内部有三个核心的类,一是 SimpleQueryShardingEngine 完成 SQL  解析、路由、改写;一是 StatementExecutor 进行 SQL 执行;最后调用 MergeEngine 对结果进行合并处理。

ShardingStatement

初始化

private final ShardingConnection connection;
private final StatementExecutor statementExecutor;
public ShardingStatement(final ShardingConnection connection) {
    this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
            ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
public ShardingStatement(final ShardingConnection connection, final int resultSetType,
        final int resultSetConcurrency, final int resultSetHoldability) {
    super(Statement.class);
    this.connection = connection;
    statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency,
            resultSetHoldability, connection);
}

ShardingStatement 内部执行 SQL 委托给了 statementExecutor。

执行

(1)executeQuery 执行过程

@Override
public ResultSet executeQuery(final String sql) throws SQLException {
    ResultSet result;
    try {
        clearPrevious();
        // 1. SQL 解析、路由、改写,最终生成 SQLRouteResult
        shard(sql);
        // 2. 生成执行计划 SQLRouteResult -> StatementExecuteUnit
        initStatementExecutor();
        // 3. statementExecutor.executeQuery() 执行任务
        MergeEngine mergeEngine = MergeEngineFactory.newInstance(
                connection.getRuntimeContext().getDatabaseType(),
                connection.getRuntimeContext().getRule(), sqlRouteResult,
                connection.getRuntimeContext().getMetaData().getRelationMetas(),
                statementExecutor.executeQuery());
        // 4. 结果合并
        result = getResultSet(mergeEngine);
    } finally {
        currentResultSet = null;
    }
    currentResultSet = result;
    return result;
}

(2)SQL 路由(包括 SQL 解析、路由、改写)

private SQLRouteResult sqlRouteResult;
private void shard(final String sql) {
    ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
    SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(
            runtimeContext.getRule(), runtimeContext.getProps(),
            runtimeContext.getMetaData(), runtimeContext.getParseEngine());
    sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());
}

SimpleQueryShardingEngine 进行 SQL 路由(包括 SQL 解析、路由、改写),生成 SQLRouteResult,当 ShardingStatement 完成 SQL 的路由,生成 SQLRouteResult 后,剩下的执行任务就全部交给 StatementExecutor 完成。

StatementExecutor

StatementExecutor 内部封装了 SQL 任务的执行过程,包括:SqlExecutePrepareTemplate 类生成执行计划 StatementExecuteUnit,以及  SQLExecuteTemplate 用于执行 StatementExecuteUnit。

类结构

重要属性

AbstractStatementExecutor 类中重要的属性:

// SQLExecutePrepareTemplate用于生成执行计划StatementExecuteUnit
private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
// 保存生成的执行计划StatementExecuteUnit
private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups =
            new LinkedList<>();
// SQLExecuteTemplate用于执行StatementExecuteUnit
private final SQLExecuteTemplate sqlExecuteTemplate;
// 保存查询结果
private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();

生成执行计划

// 执行前清理状态
private void clearPrevious() throws SQLException {
    statementExecutor.clear();
}
// 执行时初始化
private void initStatementExecutor() throws SQLException {
    statementExecutor.init(sqlRouteResult);
    replayMethodForStatements();
}

这里,需要注意的是: StatementExecutor 是有状态的,每次执行前都要调用 statementExecutor.clear()  清理上一次执行的状态,并调用 statementExecutor.init() 重新初始化。

statementExecutor.init() 初始化主要是生成执行计划 StatementExecuteUnit。

public void init(final SQLRouteResult routeResult) throws SQLException {
    setSqlStatementContext(routeResult.getSqlStatementContext());
    getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
    cacheStatements();
}
private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(
        final Collection<RouteUnit> routeUnits) throws SQLException {
    return getSqlExecutePrepareTemplate().getExecuteUnitGroups(
            routeUnits, new SQLExecutePrepareCallback() {
                // 获取连接
                @Override
                public List<Connection> getConnections(
                        final ConnectionMode connectionMode,
                        final String dataSourceName, final int connectionSize)
                        throws SQLException {
                    return StatementExecutor.super.getConnection().getConnections(
                            connectionMode, dataSourceName, connectionSize);
                }
                // 生成执行计划RouteUnit -> StatementExecuteUnit
                @Override
                public StatementExecuteUnit createStatementExecuteUnit(
                        final Connection connection, final RouteUnit routeUnit,
                        final ConnectionMode connectionMode) throws SQLException {
                    return new StatementExecuteUnit(
                            routeUnit, connection.createStatement(
                            getResultSetType(), getResultSetConcurrency(),
                            getResultSetHoldability()), connectionMode);
                }
            });
}

SqlExecutePrepareTemplate 是 sharding-core-execute  工程中提供的一个工具类,专门用于生成执行计划,将 RouteUnit 转化为 StatementExecuteUnit。同时还提供了另一个工具类 SQLExecuteTemplate 用于执行 StatementExecuteUnit,在任务执行时我们会看到这个类。

任务执行

public List<QueryResult> executeQuery() throws SQLException {
    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    SQLExecuteCallback<QueryResult> executeCallback = 
        new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
        @Override
        protected QueryResult executeSQL(final String sql, final Statement statement,
                final ConnectionMode connectionMode) throws SQLException {
            return getQueryResult(sql, statement, connectionMode);
        }
    };
    // 执行StatementExecuteUnit
    return executeCallback(executeCallback);
}
// sqlExecuteTemplate 执行 executeGroups(即StatementExecuteUnit)
protected final <T> List<T> executeCallback(
        final SQLExecuteCallback<T> executeCallback) throws SQLException {
    // 执行所有的任务 StatementExecuteUnit
    List<T> result = sqlExecuteTemplate.executeGroup(
            (Collection) executeGroups, executeCallback);
    refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
    return result;
}

SqlExecuteTemplate 执行 StatementExecuteUnit 会回调 SQLExecuteCallback#executeSQL 方法,最终调用 getQueryResult 方法。

private QueryResult getQueryResult(final String sql, final Statement statement,
        final ConnectionMode connectionMode) throws SQLException {
    ResultSet resultSet = statement.executeQuery(sql);
    getResultSets().add(resultSet);
    return ConnectionMode.MEMORY_STRICTLY == connectionMode
            ? new StreamQueryResult(resultSet)
            : new MemoryQueryResult(resultSet);
}

ConnectionMode 有两种模式:内存限制(MEMORY_STRICTLY)和连接限制(CONNECTION_STRICTLY),如果一个连接执行多个 StatementExecuteUnit  则为内存限制(MEMORY_STRICTLY),采用流式处理,即  StreamQueryResult ,反之则为连接限制(CONNECTION_STRICTLY),此时会将所有从 MySQL  服务器返回的数据都加载到内存中。特别是在 Sharding-Proxy 中特别有用,避免将代理服务器撑爆。

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
4月前
|
存储 安全 Java
Java 集合面试题从数据结构到 HashMap 源码剖析详解及长尾考点梳理
本文深入解析Java集合框架,涵盖基础概念、常见集合类型及HashMap的底层数据结构与源码实现。从Collection、Map到Iterator接口,逐一剖析其特性与应用场景。重点解读HashMap在JDK1.7与1.8中的数据结构演变,包括数组+链表+红黑树优化,以及put方法和扩容机制的实现细节。结合订单管理与用户权限管理等实际案例,展示集合框架的应用价值,助你全面掌握相关知识,轻松应对面试与开发需求。
197 3
|
11月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
269 2
|
JavaScript 前端开发
【Vue面试题二十五】、你了解axios的原理吗?有看过它的源码吗?
这篇文章主要讨论了axios的使用、原理以及源码分析。 文章中首先回顾了axios的基本用法,包括发送请求、请求拦截器和响应拦截器的使用,以及如何取消请求。接着,作者实现了一个简易版的axios,包括构造函数、请求方法、拦截器的实现等。最后,文章对axios的源码进行了分析,包括目录结构、核心文件axios.js的内容,以及axios实例化过程中的配置合并、拦截器的使用等。
【Vue面试题二十五】、你了解axios的原理吗?有看过它的源码吗?
|
JavaScript 前端开发
【Vue面试题二十七】、你了解axios的原理吗?有看过它的源码吗?
文章讨论了Vue项目目录结构的设计原则和实践,强调了项目结构清晰的重要性,提出了包括语义一致性、单一入口/出口、就近原则、公共文件的绝对路径引用等原则,并展示了单页面和多页面Vue项目的目录结构示例。
|
10月前
|
Java 数据库连接 Maven
最新版 | 深入剖析SpringBoot3源码——分析自动装配原理(面试常考)
自动装配是现在面试中常考的一道面试题。本文基于最新的 SpringBoot 3.3.3 版本的源码来分析自动装配的原理,并在文未说明了SpringBoot2和SpringBoot3的自动装配源码中区别,以及面试回答的拿分核心话术。
最新版 | 深入剖析SpringBoot3源码——分析自动装配原理(面试常考)
|
10月前
|
存储 缓存 Java
Spring面试必问:手写Spring IoC 循环依赖底层源码剖析
在Spring框架中,IoC(Inversion of Control,控制反转)是一个核心概念,它允许容器管理对象的生命周期和依赖关系。然而,在实际应用中,我们可能会遇到对象间的循环依赖问题。本文将深入探讨Spring如何解决IoC中的循环依赖问题,并通过手写源码的方式,让你对其底层原理有一个全新的认识。
208 2
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
810 37
面试官: 请你手写一份 Call()源码,看完此篇不用担心!
面试官: 请你手写一份 Call()源码,看完此篇不用担心!
|
存储 安全 Java
Android面试题之ArrayList源码详解
ArrayList是Java中基于数组实现的列表,提供O(1)的索引访问,但插入和删除操作平均时间复杂度为O(n)。默认容量为10,当需要时会通过System.arraycopy扩容。允许存储null,非线程安全。面试常问:List是接口,ArrayList是其实现之一,推荐使用List接口编程以实现更好的灵活性。更多详情见[ArrayList源码](http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8u40-b25/java/util/ArrayList.java#ArrayList.Node)。
85 2
|
存储 JavaScript 前端开发
JS浅拷贝及面试时手写源码
JS浅拷贝及面试时手写源码