问题3: 可以在Executor切换完数据库完成之后, 更改sql, 或者在StatementHandler阶段更改sql
对于分库:
原始sql:
insert into article(id, uid, status,create_time,update_time) value(201333425976180992L, 1, 1, '2020-05-17 00:00:00', '2020-05-17 00:00:00')
目标sql:
insert into ba_test_001.article (id, user_id, status,create_time,update_time) value(201333425976180992L, 1, 1, '2020-05-17 00:00:00', '2020-05-17 00:00:00')
完整插件如下
package com.bytearch.mybatis.sharding.plugin; import com.bytearch.mybatis.sharding.annotation.DB; import com.bytearch.mybatis.sharding.annotation.ShardingBy; import com.bytearch.mybatis.sharding.annotation.UseMaster; import com.bytearch.mybatis.sharding.common.NodeNameEnum; import com.bytearch.mybatis.sharding.configuration.DynamicDataSourceContextHolder; import com.bytearch.mybatis.sharding.exception.ShardingException; import com.bytearch.mybatis.sharding.strategy.IDatabaseShardingStrategy; import com.bytearch.mybatis.sharding.strategy.IShardingStrategy; import com.bytearch.mybatis.sharding.strategy.ITableShardingStrategy; import com.bytearch.mybatis.sharding.strategy.ShardingStrategyUtils; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.executor.Executor; import org.apache.ibatis.mapping.BoundSql; import org.apache.ibatis.mapping.MappedStatement; import org.apache.ibatis.mapping.SqlCommandType; import org.apache.ibatis.mapping.SqlSource; import org.apache.ibatis.plugin.*; import org.apache.ibatis.session.ResultHandler; import org.apache.ibatis.session.RowBounds; import org.springframework.util.StringUtils; import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.Arrays; import java.util.Map; import java.util.Properties; /** * @author bytearch */ @Intercepts( { @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}), @Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}), }) @Slf4j public class ShardingInterceptor implements Interceptor { @Override public Object intercept(Invocation invocation) throws Throwable { Object[] args = invocation.getArgs(); MappedStatement ms = (MappedStatement) args[0]; if (Arrays.asList(SqlCommandType.INSERT, SqlCommandType.UPDATE, SqlCommandType.DELETE, SqlCommandType.SELECT).contains(ms.getSqlCommandType())) { // 读请求: 默认使用从库 // 写请求(INSERT,UPDATE,DELETE): 使用主库 boolean useMaster = !SqlCommandType.SELECT.equals(ms.getSqlCommandType()); DB DB = null; String methodId = ms.getId(); String className = methodId.substring(0, methodId.lastIndexOf('.')); String methodName = methodId.substring(methodId.lastIndexOf('.') + 1); //是否使用了分库分表策略 Class clz = Class.forName(className); Annotation dbAnno = clz.getAnnotation(DB.class); if (dbAnno != null) { DB = (DB) dbAnno; } if (DB != null) { //方法是否使用了@UseMaster注解 @PartitionBy注解 String partitionName = null; for (Method declaredMethod : clz.getDeclaredMethods()) { if (!declaredMethod.getName().equals(methodName)) { continue; } if (declaredMethod.getAnnotation(UseMaster.class) != null) { useMaster = true; } ShardingBy shardingByAnno = declaredMethod.getAnnotation(ShardingBy.class); if (shardingByAnno != null) { partitionName = shardingByAnno.value(); if (DB == null) { throw new ShardingException("error! must @DB on :{}", clz); } } } //记录sql是否需要改变 boolean sqlNeedChanged = false; Object partitionKey = null; String schema = DB.schema(); String tableName = DB.tableName(); //获取partition Object pa = args[1]; if (pa instanceof Map) { //params中获取partitionKey Map<String, Object> paMap = (Map<String, Object>) pa; if (!StringUtils.isEmpty(partitionName)) { partitionKey = paMap.get(partitionName); } } else if (pa instanceof Object && partitionKey == null) { //Bean对象中获取partitionKey for (Field declaredField : pa.getClass().getDeclaredFields()) { ShardingBy annotation = declaredField.getAnnotation(ShardingBy.class); if (annotation != null) { declaredField.setAccessible(true); partitionKey = declaredField.get(pa); } } } if (partitionKey != null) { log.info("获取到shardingKey:{}]", partitionKey); //权重 分库 < 分表 < 分库分表(原则上同一Mapper策略只配置一种,如果配置多种依次覆盖) //分库 IDatabaseShardingStrategy databaseShardingStrategy = ShardingStrategyUtils.getDatabaseShardingStrategy(DB); if (databaseShardingStrategy != null) { schema = databaseShardingStrategy.getSchemaName(DB.schema(), partitionKey); databaseShardingStrategy.changeDatasourceByPartitionKey(partitionKey, useMaster); sqlNeedChanged = true; } //分表 ITableShardingStrategy ITableShardingStrategy = ShardingStrategyUtils.getTableShardingStrategy(DB); if (ITableShardingStrategy != null) { tableName = ITableShardingStrategy.getTargetTable(DB.tableName(), partitionKey); sqlNeedChanged = true; NodeNameEnum nodeNameEnum = NodeNameEnum.valueOf(DB.schema()); if (nodeNameEnum != null) { DynamicDataSourceContextHolder.useDataSourceByNodeNum(nodeNameEnum, useMaster); } } //分库分表 IShardingStrategy shardingStategy = ShardingStrategyUtils.getShardingStategy(DB); if (shardingStategy != null) { schema = shardingStategy.getSchemaName(DB.schema(), partitionKey); tableName = shardingStategy.getTargetTable(DB.tableName(), partitionKey); databaseShardingStrategy.changeDatasourceByPartitionKey(partitionKey, useMaster); sqlNeedChanged = true; } } else { //不分库也不分表 NodeNameEnum nodeNameEnum = NodeNameEnum.valueOf(DB.schema()); if (nodeNameEnum != null) { DynamicDataSourceContextHolder.useDataSourceByNodeNum(nodeNameEnum, useMaster); } } if (sqlNeedChanged) { BoundSql boundSql = ms.getBoundSql(pa); String originSql = boundSql.getSql(); log.info("[原始SQL] sql:{}", originSql); String sql = originSql.replaceAll(DB.tableName(), schema + '.' + tableName); log.info("[更改SQL] sql:{}", sql); BoundSql boundSqlNew = new BoundSql(ms.getConfiguration(), sql, boundSql.getParameterMappings(), boundSql.getParameterObject()); MappedStatement mappedStatement = copyFromMappedStatement(ms, new BoundSqlSqlSource(boundSqlNew)); args[0] = mappedStatement; } } } return invocation.proceed(); } @Override public Object plugin(Object target) { if (target instanceof Executor) { return Plugin.wrap(target, this); } return target; } @Override public void setProperties(Properties properties) { } private MappedStatement copyFromMappedStatement(MappedStatement ms, SqlSource newSqlSource) { MappedStatement.Builder builder = new MappedStatement.Builder(ms.getConfiguration(), ms.getId(), newSqlSource, ms.getSqlCommandType()); builder.resource(ms.getResource()); builder.fetchSize(ms.getFetchSize()); builder.statementType(ms.getStatementType()); builder.keyGenerator(ms.getKeyGenerator()); if (ms.getKeyProperties() != null && ms.getKeyProperties().length > 0) { builder.keyProperty(ms.getKeyProperties()[0]); } builder.timeout(ms.getTimeout()); builder.parameterMap(ms.getParameterMap()); builder.resultMaps(ms.getResultMaps()); builder.resultSetType(ms.getResultSetType()); builder.cache(ms.getCache()); builder.flushCacheRequired(ms.isFlushCacheRequired()); builder.useCache(ms.isUseCache()); return builder.build(); } public static class BoundSqlSqlSource implements SqlSource { private BoundSql boundSql; public BoundSqlSqlSource(BoundSql boundSql) { this.boundSql = boundSql; } @Override public BoundSql getBoundSql(Object parameterObject) { return boundSql; } } }
其中定义了三个注解
@useMaster 是否强制读主
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface UseMaster { }
@shardingBy 分片标识
/** * * @ShardingBy作用于方法 和 Bean属性 优先级 方法 > 属性 * @author yarw */ @Target({ElementType.FIELD, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface ShardingBy { /** * 指定分片参数 * @return */ String value() default ShardingConstant.DEFAULT_PARTITION_KEY_NAME; }
@DB 定义逻辑表名 库名以及分片策略
package com.bytearch.mybatis.sharding.annotation; import com.bytearch.mybatis.sharding.strategy.IDatabaseShardingStrategy; import com.bytearch.mybatis.sharding.strategy.IShardingStrategy; import com.bytearch.mybatis.sharding.strategy.ITableShardingStrategy; import com.bytearch.mybatis.sharding.strategy.impl.NotUseDatabaseShardingStrategy; import com.bytearch.mybatis.sharding.strategy.impl.NotUseShardingStrategy; import com.bytearch.mybatis.sharding.strategy.impl.NotUseTableShardingStrategy; import java.lang.annotation.*; /** * @author yarw */ @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface DB { /** * 分表切分策略 * * @return */ Class<? extends ITableShardingStrategy> tableShardingStrategy() default NotUseTableShardingStrategy.class; /** * 分库切分策略 * * @return */ Class<? extends IDatabaseShardingStrategy> databaseShardingStrategy() default NotUseDatabaseShardingStrategy.class; /** * 分库&分表切分策略 * @return */ Class<? extends IShardingStrategy> shardingStrategy() default NotUseShardingStrategy.class; /** * 逻辑表名 * * @return */ String tableName(); /** * 逻辑库名 * * @return */ String schema(); }
测试走一波
1)编写entity
package com.bytearch.mybatis.sharding.entity; import java.util.Date; import com.bytearch.mybatis.sharding.annotation.ShardingBy; import lombok.Data; @Data public class Article { /** * 文章id */ private Long id; /** * 作者id * 可以在此处通过注解指定shardingKey */ @ShardingBy private Long userId; /** * 文章状态 -1: 删除 1:草稿 2:已发布 */ private Byte status; private Date createTime; private Date updateTime; }
2) 编写mapper
/** * @author yarw */ @DB(databaseShardingStrategy = LongHashDatabasePartitionStrategy.class, schema = "blog", tableName = "article") @Mapper public interface ArticleShardingMapper { /** * 也可以通过参数指定shardingKey参数 */ @Select("select * from article where id = #{id}") @ShardingBy("shardingKey") Article selectById(@Param("id") Long id, @Param("shardingKey") Long shardingKey); @Insert("insert into article (id, user_id, status,create_time,update_time) value(#{id}, #{userId}, #{status}, #{createTime}, #{updateTime})") int insert(Article kv); }
3) 编写测试类
@Test public void insertArticleTest() { Article article = new Article(); Long userId = 1L; article.setId(SeqIdUtil.nextId(userId)); article.setUserId(userId); article.setStatus((byte)1); article.setCreateTime(new Date()); article.setUpdateTime(new Date()); articleShardingMapper.insert(article); } @Test public void selectArticleTest() { Article article = articleShardingMapper.selectById(201364919411081472L, SeqIdUtil.decodeId(201364919411081472L).getExtraId()); System.out.println(article); }
4) 测试结果
Insert
select
以上顺利实现mysql分库,同样的道理实现同时分库分表也很容易实现。
此插件具体实现方案已开源: https://github.com/bytearch/mybatis-sharding
目录如下:
. ├── bytearch_article.sql ├── mybatis-sharding.iml ├── pom.xml ├── readme.md ├── sharding.sql ├── src │ ├── main │ │ ├── java │ │ │ └── com │ │ │ └── bytearch │ │ │ └── mybatis │ │ │ └── sharding │ │ │ ├── ShardingApplication.java │ │ │ ├── annotation │ │ │ │ ├── DB.java │ │ │ │ ├── ShardingBy.java //分片标识注解 │ │ │ │ └── UseMaster.java //强制读主注解 │ │ │ ├── common │ │ │ │ ├── NodeNameEnum.java │ │ │ │ └── ShardingConstant.java │ │ │ ├── configuration │ │ │ │ ├── DynamicDataSourceContextHolder.java │ │ │ │ ├── DynamicDatasource.java │ │ │ │ ├── NormalDateSourceConfig.java │ │ │ │ ├── ShardingConfiguration.java │ │ │ │ └── ShardingDateSourceConfig.java │ │ │ ├── dao │ │ │ │ ├── KVShardingMapper.java │ │ │ │ └── KvShardingMapper.xml │ │ │ ├── dto │ │ │ │ ├── DataSourceKeyNodeDTO.java │ │ │ │ └── DataSourceNodeDTO.java │ │ │ ├── entity │ │ │ │ └── Kv.java │ │ │ ├── exception │ │ │ │ └── ShardingException.java │ │ │ ├── plugin //插件 │ │ │ │ ├── ShardingInterceptor.java │ │ │ ├── sequence //唯一id生成器 │ │ │ │ ├── IdEntity.java │ │ │ │ ├── IpUtil.java │ │ │ │ └── SeqIdUtil.java │ │ │ └── strategy //策略类 │ │ │ ├── IDatabaseShardingStrategy.java │ │ │ ├── IShardingStrategy.java │ │ │ ├── ITableShardingStrategy.java │ │ │ ├── ShardingStrategyUtils.java │ │ │ └── impl │ │ │ ├── LongHashDatabasePartitionStrategy.java │ │ │ ├── LongHashTableShardingStrategy.java │ │ │ ├── NotUseDatabaseShardingStrategy.java │ │ │ ├── NotUseShardingStrategy.java │ │ │ └── NotUseTableShardingStrategy.java │ │ └── resources │ │ ├── application.yml │ │ └── mybatis │ │ └── mybatis-config.xml │ └── test │ └── java │ └── com │ └── bytearch │ └── mybatis │ └── sharding │ └── DBApplicationTests.java
五.总结
mysql分库分表,首先得找到瓶颈在哪里(IO or CPU),是分库还是分表,分多少?不能为了分库分表而拆分。
原则上是尽量先垂直拆分 后 水平拆分。
以上基于mybatis插件分库分表是一种实现思路,还有很多不完善的地方,
例如: 目前sql是直接替换的,这里有很大隐患。分库后,跨库事务的如何处理等等
以上仅供参考! 有其它思路的朋友可以欢迎联系我一起交流.