浅谈mysql数据库分库分表那些事(下)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 浅谈mysql数据库分库分表那些事
问题3: 可以在Executor切换完数据库完成之后, 更改sql, 或者在StatementHandler阶段更改sql

对于分库:

原始sql:

insert into article(id, uid, status,create_time,update_time) value(201333425976180992L, 1, 1, '2020-05-17 00:00:00', '2020-05-17 00:00:00')

目标sql:

insert into ba_test_001.article (id, user_id, status,create_time,update_time) value(201333425976180992L, 1, 1, '2020-05-17 00:00:00', '2020-05-17 00:00:00')

完整插件如下

package com.bytearch.mybatis.sharding.plugin;
import com.bytearch.mybatis.sharding.annotation.DB;
import com.bytearch.mybatis.sharding.annotation.ShardingBy;
import com.bytearch.mybatis.sharding.annotation.UseMaster;
import com.bytearch.mybatis.sharding.common.NodeNameEnum;
import com.bytearch.mybatis.sharding.configuration.DynamicDataSourceContextHolder;
import com.bytearch.mybatis.sharding.exception.ShardingException;
import com.bytearch.mybatis.sharding.strategy.IDatabaseShardingStrategy;
import com.bytearch.mybatis.sharding.strategy.IShardingStrategy;
import com.bytearch.mybatis.sharding.strategy.ITableShardingStrategy;
import com.bytearch.mybatis.sharding.strategy.ShardingStrategyUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.mapping.SqlCommandType;
import org.apache.ibatis.mapping.SqlSource;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.springframework.util.StringUtils;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
/**
 * @author bytearch
 */
@Intercepts(
        {
                @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
                @Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}),
        })
@Slf4j
public class ShardingInterceptor implements Interceptor {
    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        Object[] args = invocation.getArgs();
        MappedStatement ms = (MappedStatement) args[0];
        if (Arrays.asList(SqlCommandType.INSERT, SqlCommandType.UPDATE, SqlCommandType.DELETE, SqlCommandType.SELECT).contains(ms.getSqlCommandType())) {
            // 读请求: 默认使用从库
            // 写请求(INSERT,UPDATE,DELETE): 使用主库
            boolean useMaster = !SqlCommandType.SELECT.equals(ms.getSqlCommandType());
            DB DB = null;
            String methodId = ms.getId();
            String className = methodId.substring(0, methodId.lastIndexOf('.'));
            String methodName = methodId.substring(methodId.lastIndexOf('.') + 1);
            //是否使用了分库分表策略
            Class clz = Class.forName(className);
            Annotation dbAnno = clz.getAnnotation(DB.class);
            if (dbAnno != null) {
                DB = (DB) dbAnno;
            }
            if (DB != null) {
                //方法是否使用了@UseMaster注解 @PartitionBy注解
                String partitionName = null;
                for (Method declaredMethod : clz.getDeclaredMethods()) {
                    if (!declaredMethod.getName().equals(methodName)) {
                        continue;
                    }
                    if (declaredMethod.getAnnotation(UseMaster.class) != null) {
                        useMaster = true;
                    }
                    ShardingBy shardingByAnno = declaredMethod.getAnnotation(ShardingBy.class);
                    if (shardingByAnno != null) {
                        partitionName = shardingByAnno.value();
                        if (DB == null) {
                            throw new ShardingException("error! must @DB on :{}", clz);
                        }
                    }
                }
                //记录sql是否需要改变
                boolean sqlNeedChanged = false;
                Object partitionKey = null;
                String schema = DB.schema();
                String tableName = DB.tableName();
                //获取partition
                Object pa = args[1];
                if (pa instanceof Map) {
                    //params中获取partitionKey
                    Map<String, Object> paMap = (Map<String, Object>) pa;
                    if (!StringUtils.isEmpty(partitionName)) {
                        partitionKey = paMap.get(partitionName);
                    }
                } else if (pa instanceof Object && partitionKey == null) {
                    //Bean对象中获取partitionKey
                    for (Field declaredField : pa.getClass().getDeclaredFields()) {
                        ShardingBy annotation = declaredField.getAnnotation(ShardingBy.class);
                        if (annotation != null) {
                            declaredField.setAccessible(true);
                            partitionKey = declaredField.get(pa);
                        }
                    }
                }
                if (partitionKey != null) {
                     log.info("获取到shardingKey:{}]", partitionKey);
                    //权重 分库 < 分表 < 分库分表(原则上同一Mapper策略只配置一种,如果配置多种依次覆盖)
                    //分库
                    IDatabaseShardingStrategy databaseShardingStrategy = ShardingStrategyUtils.getDatabaseShardingStrategy(DB);
                    if (databaseShardingStrategy != null) {
                        schema = databaseShardingStrategy.getSchemaName(DB.schema(), partitionKey);
                        databaseShardingStrategy.changeDatasourceByPartitionKey(partitionKey, useMaster);
                        sqlNeedChanged = true;
                    }
                    //分表
                    ITableShardingStrategy ITableShardingStrategy = ShardingStrategyUtils.getTableShardingStrategy(DB);
                    if (ITableShardingStrategy != null) {
                        tableName = ITableShardingStrategy.getTargetTable(DB.tableName(), partitionKey);
                        sqlNeedChanged = true;
                        NodeNameEnum nodeNameEnum = NodeNameEnum.valueOf(DB.schema());
                        if (nodeNameEnum != null) {
                            DynamicDataSourceContextHolder.useDataSourceByNodeNum(nodeNameEnum, useMaster);
                        }
                    }
                    //分库分表
                    IShardingStrategy shardingStategy = ShardingStrategyUtils.getShardingStategy(DB);
                    if (shardingStategy != null) {
                        schema = shardingStategy.getSchemaName(DB.schema(), partitionKey);
                        tableName = shardingStategy.getTargetTable(DB.tableName(), partitionKey);
                        databaseShardingStrategy.changeDatasourceByPartitionKey(partitionKey, useMaster);
                        sqlNeedChanged = true;
                    }
                } else {
                    //不分库也不分表
                    NodeNameEnum nodeNameEnum = NodeNameEnum.valueOf(DB.schema());
                    if (nodeNameEnum != null) {
                        DynamicDataSourceContextHolder.useDataSourceByNodeNum(nodeNameEnum, useMaster);
                    }
                }
                if (sqlNeedChanged) {
                    BoundSql boundSql = ms.getBoundSql(pa);
                    String originSql = boundSql.getSql();
                    log.info("[原始SQL] sql:{}", originSql);
                    String sql = originSql.replaceAll(DB.tableName(), schema + '.' + tableName);
                    log.info("[更改SQL] sql:{}", sql);
                    BoundSql boundSqlNew = new BoundSql(ms.getConfiguration(), sql, boundSql.getParameterMappings(), boundSql.getParameterObject());
                    MappedStatement mappedStatement = copyFromMappedStatement(ms, new BoundSqlSqlSource(boundSqlNew));
                    args[0] = mappedStatement;
                }
            }
        }
        return invocation.proceed();
    }
    @Override
    public Object plugin(Object target) {
        if (target instanceof Executor) {
            return Plugin.wrap(target, this);
        }
        return target;
    }
    @Override
    public void setProperties(Properties properties) {
    }
    private MappedStatement copyFromMappedStatement(MappedStatement ms, SqlSource newSqlSource) {
        MappedStatement.Builder builder = new MappedStatement.Builder(ms.getConfiguration(), ms.getId(), newSqlSource, ms.getSqlCommandType());
        builder.resource(ms.getResource());
        builder.fetchSize(ms.getFetchSize());
        builder.statementType(ms.getStatementType());
        builder.keyGenerator(ms.getKeyGenerator());
        if (ms.getKeyProperties() != null && ms.getKeyProperties().length > 0) {
            builder.keyProperty(ms.getKeyProperties()[0]);
        }
        builder.timeout(ms.getTimeout());
        builder.parameterMap(ms.getParameterMap());
        builder.resultMaps(ms.getResultMaps());
        builder.resultSetType(ms.getResultSetType());
        builder.cache(ms.getCache());
        builder.flushCacheRequired(ms.isFlushCacheRequired());
        builder.useCache(ms.isUseCache());
        return builder.build();
    }
    public static class BoundSqlSqlSource implements SqlSource {
        private BoundSql boundSql;
        public BoundSqlSqlSource(BoundSql boundSql) {
            this.boundSql = boundSql;
        }
        @Override
        public BoundSql getBoundSql(Object parameterObject) {
            return boundSql;
        }
    }
}

其中定义了三个注解

@useMaster 是否强制读主

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface UseMaster {
}

@shardingBy 分片标识

/**
 *
 * @ShardingBy作用于方法 和 Bean属性  优先级 方法 > 属性
 * @author yarw
 */
@Target({ElementType.FIELD, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface ShardingBy {
    /**
     * 指定分片参数
     * @return
     */
    String value() default ShardingConstant.DEFAULT_PARTITION_KEY_NAME;
}

@DB 定义逻辑表名 库名以及分片策略

package com.bytearch.mybatis.sharding.annotation;
import com.bytearch.mybatis.sharding.strategy.IDatabaseShardingStrategy;
import com.bytearch.mybatis.sharding.strategy.IShardingStrategy;
import com.bytearch.mybatis.sharding.strategy.ITableShardingStrategy;
import com.bytearch.mybatis.sharding.strategy.impl.NotUseDatabaseShardingStrategy;
import com.bytearch.mybatis.sharding.strategy.impl.NotUseShardingStrategy;
import com.bytearch.mybatis.sharding.strategy.impl.NotUseTableShardingStrategy;
import java.lang.annotation.*;
/**
 * @author yarw
 */
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface DB {
    /**
     * 分表切分策略
     *
     * @return
     */
    Class<? extends ITableShardingStrategy> tableShardingStrategy() default NotUseTableShardingStrategy.class;
    /**
     * 分库切分策略
     *
     * @return
     */
    Class<? extends IDatabaseShardingStrategy> databaseShardingStrategy() default NotUseDatabaseShardingStrategy.class;
    /**
     * 分库&分表切分策略
     * @return
     */
    Class<? extends IShardingStrategy> shardingStrategy() default NotUseShardingStrategy.class;
    /**
     * 逻辑表名
     *
     * @return
     */
    String tableName();
    /**
     * 逻辑库名
     *
     * @return
     */
    String schema();
}

测试走一波

1)编写entity

package com.bytearch.mybatis.sharding.entity;
import java.util.Date;
import com.bytearch.mybatis.sharding.annotation.ShardingBy;
import lombok.Data;
@Data
public class Article {
    /**
     * 文章id
     */
    private Long id;
    /**
     * 作者id
     * 可以在此处通过注解指定shardingKey
     */
    @ShardingBy
    private Long userId;
    /**
     * 文章状态 -1: 删除 1:草稿 2:已发布
     */
    private Byte status;
    private Date createTime;
    private Date updateTime;
}

2) 编写mapper

/**
 * @author yarw
 */
@DB(databaseShardingStrategy = LongHashDatabasePartitionStrategy.class, schema = "blog", tableName = "article")
@Mapper
public interface ArticleShardingMapper {
    /**
    * 也可以通过参数指定shardingKey参数
    */
    @Select("select * from article where id = #{id}")
    @ShardingBy("shardingKey")
    Article selectById(@Param("id") Long id, @Param("shardingKey") Long shardingKey);
    @Insert("insert into article (id, user_id, status,create_time,update_time) value(#{id}, #{userId}, #{status}, #{createTime}, #{updateTime})")
    int insert(Article kv);
}

3) 编写测试类

@Test
    public void insertArticleTest() {
        Article article = new Article();
        Long userId = 1L;
        article.setId(SeqIdUtil.nextId(userId));
        article.setUserId(userId);
        article.setStatus((byte)1);
        article.setCreateTime(new Date());
        article.setUpdateTime(new Date());
        articleShardingMapper.insert(article);
    }
    @Test
    public void selectArticleTest() {
        Article article = articleShardingMapper.selectById(201364919411081472L, SeqIdUtil.decodeId(201364919411081472L).getExtraId());
        System.out.println(article);
    }

4) 测试结果

Insert


select


以上顺利实现mysql分库,同样的道理实现同时分库分表也很容易实现。

此插件具体实现方案已开源: https://github.com/bytearch/mybatis-sharding

目录如下:

.
├── bytearch_article.sql
├── mybatis-sharding.iml
├── pom.xml
├── readme.md
├── sharding.sql
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── bytearch
│   │   │           └── mybatis
│   │   │               └── sharding
│   │   │                   ├── ShardingApplication.java
│   │   │                   ├── annotation
│   │   │                   │   ├── DB.java    
│   │   │                   │   ├── ShardingBy.java  //分片标识注解
│   │   │                   │   └── UseMaster.java  //强制读主注解
│   │   │                   ├── common
│   │   │                   │   ├── NodeNameEnum.java
│   │   │                   │   └── ShardingConstant.java
│   │   │                   ├── configuration
│   │   │                   │   ├── DynamicDataSourceContextHolder.java
│   │   │                   │   ├── DynamicDatasource.java
│   │   │                   │   ├── NormalDateSourceConfig.java
│   │   │                   │   ├── ShardingConfiguration.java
│   │   │                   │   └── ShardingDateSourceConfig.java
│   │   │                   ├── dao
│   │   │                   │   ├── KVShardingMapper.java
│   │   │                   │   └── KvShardingMapper.xml
│   │   │                   ├── dto
│   │   │                   │   ├── DataSourceKeyNodeDTO.java
│   │   │                   │   └── DataSourceNodeDTO.java
│   │   │                   ├── entity
│   │   │                   │   └── Kv.java
│   │   │                   ├── exception
│   │   │                   │   └── ShardingException.java
│   │   │                   ├── plugin  //插件
│   │   │                   │   ├── ShardingInterceptor.java
│   │   │                   ├── sequence  //唯一id生成器
│   │   │                   │   ├── IdEntity.java
│   │   │                   │   ├── IpUtil.java
│   │   │                   │   └── SeqIdUtil.java
│   │   │                   └── strategy //策略类 
│   │   │                       ├── IDatabaseShardingStrategy.java
│   │   │                       ├── IShardingStrategy.java
│   │   │                       ├── ITableShardingStrategy.java
│   │   │                       ├── ShardingStrategyUtils.java
│   │   │                       └── impl
│   │   │                           ├── LongHashDatabasePartitionStrategy.java
│   │   │                           ├── LongHashTableShardingStrategy.java
│   │   │                           ├── NotUseDatabaseShardingStrategy.java
│   │   │                           ├── NotUseShardingStrategy.java
│   │   │                           └── NotUseTableShardingStrategy.java
│   │   └── resources
│   │       ├── application.yml
│   │       └── mybatis
│   │           └── mybatis-config.xml
│   └── test
│       └── java
│           └── com
│               └── bytearch
│                   └── mybatis
│                       └── sharding
│                           └── DBApplicationTests.java

五.总结

mysql分库分表,首先得找到瓶颈在哪里(IO or CPU),是分库还是分表,分多少?不能为了分库分表而拆分。

原则上是尽量先垂直拆分 后 水平拆分。

以上基于mybatis插件分库分表是一种实现思路,还有很多不完善的地方,

例如: 目前sql是直接替换的,这里有很大隐患。分库后,跨库事务的如何处理等等


以上仅供参考! 有其它思路的朋友可以欢迎联系我一起交流.

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
5月前
|
存储 缓存 负载均衡
数据库分库分表:提升系统性能的必由之路
数据库分库分表:提升系统性能的必由之路
183 1
|
6月前
|
SQL 存储 关系型数据库
Mysql系列-5.Mysql分库分表(中)
Mysql系列-5.Mysql分库分表
67 0
|
3月前
|
存储 算法 关系型数据库
(二十二)全解MySQL之分库分表后带来的“副作用”一站式解决方案!
上篇《分库分表的正确姿势》中已经将分库分表的方法论全面阐述清楚了,总体看下来用一个字形容,那就是爽!尤其是分库分表技术能够让数据存储层真正成为三高架构,但前面爽是爽了,接着一起来看看分库分表后产生一系列的后患问题,注意我这里的用词,是一系列而不是几个,也就是分库分表虽然好,但你要解决的问题是海量的。
360 3
|
2月前
|
存储 SQL 关系型数据库
一篇文章搞懂MySQL的分库分表,从拆分场景、目标评估、拆分方案、不停机迁移、一致性补偿等方面详细阐述MySQL数据库的分库分表方案
MySQL如何进行分库分表、数据迁移?从相关概念、使用场景、拆分方式、分表字段选择、数据一致性校验等角度阐述MySQL数据库的分库分表方案。
402 15
一篇文章搞懂MySQL的分库分表,从拆分场景、目标评估、拆分方案、不停机迁移、一致性补偿等方面详细阐述MySQL数据库的分库分表方案
|
6月前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版操作报错之同步MySQL分库分表500张表报连接超时,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
缓存 关系型数据库 Java
不要将数据库中的“分库分表”理论盲目应用到 Elasticsearch
不要将数据库中的“分库分表”理论盲目应用到 Elasticsearch
49 0
|
3月前
|
SQL 算法 Java
(二十六)MySQL分库篇:Sharding-Sphere分库分表框架的保姆级教学!
前面《MySQL主从原理篇》、《MySQL主从实践篇》两章中聊明白了MySQL主备读写分离、多主多写热备等方案,但如果这些高可用架构依旧无法满足业务规模,或业务增长的需要,此时就需要考虑选用分库分表架构。
2501 4
|
3月前
|
存储 SQL 关系型数据库
(二十一)MySQL之高并发大流量情况下海量数据分库分表的正确姿势
从最初开设《全解MySQL专栏》到现在,共计撰写了二十个大章节详细讲到了MySQL各方面的进阶技术点,从最初的数据库架构开始,到SQL执行流程、库表设计范式、索引机制与原理、事务与锁机制剖析、日志与内存详解、常用命令与高级特性、线上调优与故障排查.....,似乎涉及到了MySQL的方方面面。但到此为止就黔驴技穷了吗?答案并非如此,以《MySQL特性篇》为分割线,整个MySQL专栏从此会进入“高可用”阶段的分析,即从上篇之后会开启MySQL的新内容,主要讲述分布式、高可用、高性能方面的讲解。
246 1
|
4月前
|
算法 搜索推荐 NoSQL
面试题MySQL问题之分库分表后的富查询问题处理如何解决
面试题MySQL问题之分库分表后的富查询问题处理如何解决
49 3
|
4月前
|
关系型数据库 分布式数据库 数据库
PolarDB产品使用问题之是否支持分库分表创建数据库
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。