大纲
1.单库版本到分库分表的演进介绍
2.订单系统项目模版
3.完成一次查询全过程
4.磁盘IO为什么这么慢
5.MySQL的索引是如何形成的
6.SQL优化
7.千万级数据优化之加缓存—理论
8.千万级数据优化之加缓存—实战
9.千万级数据优化之读写分离-理论
10.千万级数据优化之读写分离-实战
11.千万级数据优化-水平拆分和垂直拆分
12.分库分表多数据源实战
13.分库分表后订单路由策略和全局ID
14.MyCat和ShardingSphere介绍
15.创建库和表的小工具
16.ShardingSphere数据分片核心原理
17.C端ShardingSphere分片策略实现
18.C端ShardingSphere读写分离实现
19.异构订单实现
20.单库亿级数据怎么迁移到多库多表上
1.单库版本到分库分表的演进介绍
(1)单库版本的电商系统架构
(2)SQL优化阶段
(3)缓存优化阶段
(4)读写分离优化阶段
(5)垂直分库优化阶段
(6)分库分表版本的订单系统架构
2.订单系统项目模版
(1)创建订单的时序图
(2)查询订单列表的时序图
3.完成一次查询全过程
(1)用户发起查询请求到MySQL的流程
(2)MySQL收到查询语句的处理流程
(3)InnoDB引擎对查询语句的处理流程
4.磁盘IO为什么这么慢
(1)一次磁盘IO花费的时间
(2)一次内存IO花费的时间
(3)磁盘IO分为随机读写和顺序读写
(1)一次磁盘IO花费的时间
读写磁头在磁盘扇区上读取或者写入数据花费的时间,也就是一次完整的磁盘IO花费的时间,包括如下三个方面:
一.寻道时间
指的就是读写磁头移动到正确的半径上所需要的时间。寻道时间越短,磁盘IO操作越快。一般磁盘的寻道时间是3-15ms,主流的磁盘寻道时间是5ms。
二.旋转延迟时间
找到正确的磁道后,读写磁头移动到正确的位置上所消耗的时间。一般取磁盘旋转周期的一半作为旋转延迟的近似值:7200转/分 -> 120转/秒 -> 每转1/120秒 -> 每转的一半是1/240秒即4ms。
三.数据传输时间
指的是将数据从磁盘盘片读取或者写入的时间。一般是1ms以内,可以忽略不计。所以主流磁盘的一次磁盘IO的时间为:5ms + 4ms = 9ms。
(2)一次内存IO花费的时间
内存读取一次数据,一般是100ns以内,而1ms = 10^6ns = 100万ns。所以一次磁盘IO花费的时间是一次内存IO花费时间的约9万倍,几万倍。
(3)磁盘IO分为随机读写和顺序读写
一.顺序读写
就是读写磁头按照顺序读写磁盘盘片中的数据,速度还是很快的。MySQL里的binlog和redo日志就是顺序读写的。
二.随机读写
就是读写磁头会随机切换到不同的磁盘盘片的位置,速度比较缓慢耗时。
5.MySQL的索引是如何形成的
(1)数据页的结构
(2)数据区中存放的多个数据行组成单向链表
(3)多个数据页则组成双向链表
(4)索引页会存放数据页页号 + 其最小主键ID
(5)多个索引页的展示图
(1)数据页的结构
(2)数据区中存放的多个数据行组成单向链表
(3)多个数据页则组成双向链表
(4)索引页会存放数据页页号 + 其最小主键ID
(5)多个索引页的展示图
数据页和索引页组成的B+树中,叶子节点是数据页,非叶子节点是索引页。B+树的时间复杂度是log(n)。
6.SQL优化
(1)SQL优化流程
(2)SQL优化中的join优化原理算法
(1)SQL优化流程
判断join语句 -> 判断where条件 -> 判断聚合函数 -> 判断排序
(2)SQL优化中的join优化原理算法
SQL优化中不管是对where语句、聚合函数、还是排序操作的优化,优化起来相对而言会简单点,为对应的字段创建合适的索引即可。但是join语句这块的优化涉及到一些比较重要的如下原理了。
简单来说,在MySQL中使用join语句关联2张表的话,比如执行这条SQL:
select t1.order_no, t2.product_id from order_info t1 left join order_item_detail t2 on t1.order_no = t2.order_no
这个时候,join关联查询的过程是什么样子的呢?其实,这个就取决于当前join语句用到的算法了,join语句一共有3种算法。
第一种:最基础的是Simple Nested Loop算法
简单嵌套循环算法,相当于双重for循环。
第二种:Block Nested Loop算法
MySQL提供了一个Join Buffer, 但是Join Buffer大小为256K,内存有限。当然我们也可以通过join buffer size参数调节Join Buffer的大小。
第三种:Index Nested Loop算法
原来的匹配次数为:驱动表行数 * 被驱动表行数,而现在变成了:驱动表行数 * 被驱动表索引的高度。这样就极大地减少了被驱动表的匹配次数,极大地提升了join的性能。
总结:如果join关联查询能使用到索引,MySQL就会使用Index Nested Loop算法,查询效率会比较高。如果无法使用Index Nested Loop算法,MYSQL默认会使用Block Nested Loop算法,查询效率会很慢。
7.千万级数据优化之加缓存—理论
(1)高峰期导致数据库压力很大
(2)加缓存进行流量削峰
(3)如何提高缓存命中率
(1)高峰期导致数据库压力很大
通过对SQL优化特别是添加索引,可以提升查询的速度。此时已将SQL的一次查询稳定在300ms以下,但有一天DBA告知有一条SQL偶尔会超过2s。
为什么SQL查询时间会偶尔激增超过2s呢?通过排查发现,这条SQL在平时是没有问题的,一般稳定在300ms以下。经过分析和发现,在高峰期这条SQL才偶尔会超过2s。原因是在高峰期的时候,这台服务器的资源占用非常高。
所以定位到问题的原因是:高峰期时,大量请求会跑到MySQL数据库,从而导致这台服务器的CPU和内存占用率迅速飙升,最终导致数据库查询非常慢。
(2)加缓存进行流量削峰
解决方案:通过加缓存解决问题。
缓存的目的:为了流量削峰,减轻MySQL的负载,让MySQL稳定的去提供读写。
虽然说缓存非常好用,但是需要注意缓存命中率。缓存命中率 = 命中缓存的结果数 / 请求的缓存数。缓存命中率是衡量缓存有效性的重要指标。也就是说,命中率越高缓存的使用率越高。
(3)如何提高缓存命中率
第一:选择合适的业务场景
缓存适合读多写少的场景,最好是高频访问的场景,这里以已经完成订单为例。
第二:合理设置缓存容量
缓存容量如果很小则会触发Redis内存淘汰机制,导致key被删除,从而没能起到缓存效果。
一般都是二八原则,总数据量的20%会放在Redis里面,通常会根据业务场景取总数据量的15%到30%放到Redis中。
第三:控制缓存的粒度
单个key的数据单位越小,这个缓存就越不容容易更改,粒度越小缓存率越高。
第四:灵活设置缓存过期时间
如果缓存过期的时间设置不好,则可能会导致key同时失效。从而导致所有的请求都同时到数据库当中,这就是缓存击穿。
第五:避免缓存的穿透
查询一条数据,先从Redis缓存里查。如果没有,再从数据库中查,数据库中也没有,就说明缓存穿透。如果大量的请求过来,Redis中没有,数据库中也没有,就会把数据打穿。缓存穿透会造成缓存雪崩,导致最后服务崩溃。
解决办法:可以在缓存中给这个查询的请求设置一个空对象,让这个请求拿着空对象返回,然后设置到Redis缓存里。
第六:做好缓存预热
我们可以提前将数据库的数据(热点数据)放入到Redis缓存当中,这样第一次查询时就可以直接走缓存了。
下面是各个硬件的执行时间和容量:
CPU:20-50ns,1-32M级别
内存:100ns,32-96GB级别
磁盘:3-5ms,TB或PB级别
8.千万级数据优化之加缓存—实战
(1)业务场景是对历史订单进行查询
(2)Redis缓存的代码
(1)业务场景是对历史订单进行查询
由于历史订单的状态不会发生变化,符合读多写少的场景。所以可把用户的查询结果放入Redis缓存,并设置过期时间为1小时。只要缓存失效前再次查询,就会查Redis进行流量削峰来减轻数据库压力。
(2)Redis缓存的代码
缓存key的生成规则是:用户的ID+当前页+页数。
@RestController @RequestMapping(value = "/user/order") public class UserOrderController { @Autowired private UserOrderInfoService userOrderInfoService; ... // 查询订单列表 @PostMapping("/queryOrderInfoList") public PageResponse queryOrderInfoList(@RequestBody OrderInfoQuery orderInfoQuery) { //开始计时 long bTime = System.currentTimeMillis(); try { Page<OrderInfoVO> orderInfoPage = userOrderInfoService.queryOrderInfoList(orderInfoQuery); //关闭分段计时 long eTime = System.currentTimeMillis(); //输出 log.info("查询用户订单耗时:" + (eTime - bTime)); return PageResponse.success(orderInfoPage); } catch (Exception e) { log.info(e.getMessage()); return PageResponse.error(e.getMessage()); } } ... } //用户订单服务实现 @Service public class UserOrderInfoServiceImpl implements UserOrderInfoService { private final static Long FINISH = 50L; @Autowired private OrderInfoRepository orderInfoRepository; @Autowired private RedisUtils redisUtils; ... @Override public Page<OrderInfoVO> queryOrderInfoList(OrderInfoQuery orderInfoQuery) { OrderValidation.checkVerifyOrderQuery(orderInfoQuery); Page<OrderInfoVO> page = new Page<OrderInfoVO>(); page.setCurrent(orderInfoQuery.getCurrent()); page.setSize(orderInfoQuery.getSize()); //查询已完成的订单 if (FINISH.equals(orderInfoQuery.getOrderStatus())) { //组装redisKey String redisKey = orderInfoQuery.getUserId() + orderInfoQuery.getCurrent().toString() + orderInfoQuery.getSize().toString(); //获取redis缓存 Object redisObject = redisUtils.get(redisKey); //redis为空则从数据库中查询 if (Objects.isNull(redisObject)) { Page<OrderInfoVO> userOrderInfoVOPage = orderInfoRepository.queryOrderInfoList(page, orderInfoQuery); //设置redis缓存,过期时间为一小时 redisUtils.set(redisKey, userOrderInfoVOPage, 3600L, TimeUnit.SECONDS); return userOrderInfoVOPage; } log.info("从redis中获取数据, key: {}", redisKey); return (Page<OrderInfoVO>) redisObject; } return orderInfoRepository.queryOrderInfoList(page, orderInfoQuery); } ... }
9.千万级数据优化之读写分离-理论
(1)读写分离的业务背景
(2)主从复制的原理是什么
(3)主从复制的几种模式
(1)读写分离的业务背景
营销系统那边做了一些活动,导致订单请求量突增,大量下单的用户可能会不断刷新订单来查询订单是否配送等信息。
此时大量的请求会打到MySQL上,而单库又抗不了这么多读请求。这就会导致数据库负载很高,从而严重降低MySQL的查询效率。现在我们缓存也加过了,但是数据库负载还是很高,此时该怎么办?
其实很简单,既然单个库扛不住,那就搞2个库一起来抗。因为这对于订单系统来说是典型的读多写少场景,所以在这个场景下可以搞个一主两从的架构来进行优化,就像如下这样:
也就是写数据走主库,而读数据走从库,并且多个从库可以一起来抗大量的读请求。关键的一点是,从库会通过主从复制,从主库中不断的同步数据,以此来保证从库的数据和主库一致。所以想要实现读写分离,那么就先要了解主从复制的具体原理。
(2)主从复制的原理是什么
我们以MySQL一主两从架构为例,也就是一个Master节点下有两个Slave节点。在这套架构下,写请求统一交给Master节点处理,而读请求交给Slave节点处理。
为了保证Slave节点和Master节点的数据一致性:Master节点在写入数据后,同时会把数据复制一份到自己的各个Slave节点上。
在复制过程中一共会使用到三个线程:一个是Binlog Dump线程,位于Master节点上。另外两个线程分别是IO线程和SQL线程,它们都分别位于Slave节点上。如下图示:
主从复制的核心流程:
步骤一:首先,当Master节点接收到一个写请求时,这个写请求可能是增删改操作。此时会把写请求的操作都记录到BinLog日志中。
步骤二:然后,Master节点会把数据复制给Slave节点,这个过程首先要每个Slave节点连接到Master节点上。当Slave节点连接到Master节点上时,Master节点会为每一个Slave节点分别创建一个BinLog Dump线程。每个BinLog Dump线程用于向各个Slave节点发送BinLog日志。
步骤三:BinLog Dump线程会读取Master节点上的BinLog日志,并将BinLog日志发送给Slave节点上的IO线程。
步骤四:Slave节点上的IO线程接收到BinLog日志后,会将BinLog日志先写入到本地的RelayLog中。Slave节点的RelayLog中就保存了Master的BinLog Dump线程发送过来的BinLog日志。
步骤五:最后,Slave节点上的SQL线程就会来读取RelayLog中的BinLog日志,将其解析成具体的增删改操作。然后把这些在Master节点上进行过的操作,重新在Slave节点上也重做一遍,达到数据还原的效果。这样就可保证Master节点和Slave节点的数据一致性。
(3)主从复制的几种模式
MySQL的主从复制,分为全同步复制、异步复制、半同步复制和增强半同步复制这四种。
模式一:全同步复制
全同步复制就是当主库执行完一个事务后,所有从库也必须执行完该事务才可以返回结果给客户端。因此虽然全同步复制数据一致性得到保证,但主库完成一个事务需等待所有从库也完成,性能较低。
模式二:异步复制
异步复制就是当主库提交事务后,会通知BinLog Dump线程发送BinLog日志给从库。一旦BinLog Dump线程将BinLog日志发送给从库后,无需等从库也同步完事务,主库就会将处理结果返回给客户端。
因为主库只管自己执行完事务,就可以将处理结果返回给客户端,而不用关心从库是否执行完事务。这就可能导致短暂的主从数据不一致,比如刚在主库插入的新数据,如果马上在从库查询,就可能查询不到。
而且当主库提交事务后,如果宕机挂掉了,此时可能BinLog还没来得及同步给从库。这时如果为了恢复故障切换主从节点,就会出现数据丢失的问题。所以异步复制虽然性能高,但数据一致性上是较弱的。
不过MySQL主从复制,默认采用的就是异步复制这种复制策略。
模式三:半同步复制
半同步复制就是在同步和异步中做了折中选择,半同步主从复制的过程如下图示:
当主库提交事务后,至少还需要一个从库返回接收到BinLog日志,并成功写入到RelayLog的消息,这时主库才会将处理结果返回给客户端。相比前2种复制方式,半同步复制较好地兼顾了数据一致性以及性能损耗的问题。
但半同步复制也存在以下几个问题:
问题一:半同步复制的性能相比异步复制有所下降。因为异步复制是不需要等待任何从库是否接收到数据的响应,而半同步复制则需要等待至少一个从库确认接收到binlog日志的响应,性能上是损耗更大。
问题二:如果超过了配置的主库等待从库响应的最大时长,半同步复制就会变成异步复制,此时异步复制的问题同样会出现。
问题三:在MySQL 5.7.2之前的版本中,半同步复制存在着幻读问题。当主库成功提交事务并处于等待从库确认过程,这时从库都还没来得及返回处理结果给客户端,但因为主库存储引擎内部已经提交事务了,所以其他客户端是可以到从主库中读到数据的。但是如果下一秒主库突然挂了,那么下一次请求过来时,就只能把请求切换到从库中。而因为从库还没从主库同步完数据,所以从库就读不到这条数据了。和上一秒读取数据的结果对比,就造成了幻读的现象。注意这不是数据丢失,因为后续从库会同步完数据的。
模式四:增强半同步复制
增强半同步复制是MySQL 5.7.2后的版本对半同步复制做的一个改进。原理上几乎是一样的,主要是解决幻读的问题。主库配置了参数rpl_semi_sync_master_wait_point = AFTER_SYNC后,主库在提交事务前必须先收到从库数据同步完成的确认信息,才能提交事务,以此来解决幻读问题。增强半同步主从复制过程如下:
10.千万级数据优化之读写分离-实战
(1)读写分离配置核心组件流程图
(2)读写分离的实现步骤
(1)读写分离配置核心组件流程图
(2)读写分离的实现步骤
步骤一:配置文件中配置主从库连接信息
application.yaml配置文件
spring: datasource: masters: - url: jdbc:mysql://192.168.10.8:3307/order_db?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT username: root password: Sharding@Master#1990 driver-class-name: com.mysql.cj.jdbc.Driver slaves: - url: jdbc:mysql://192.168.10.8:3308/order_db?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT username: root password: Sharding@Slave#1990 driver-class-name: com.mysql.cj.jdbc.Driver
步骤二:注入数据源
DataSourceConfig类
//多数据源配置 @Configuration @ConfigurationProperties(prefix = "spring.datasource") public class DataSourceConfig { //主库数据源信息 private Map<String, String> masters; //从库数据源信息 private List<Map<String, String>> slaves; @Bean public DataSource masterDataSource() throws Exception { if (CollectionUtils.isEmpty(masters)) { throw new Exception("主库数据源不能为空"); } return DruidDataSourceFactory.createDataSource(masters); } @Bean public List<DataSource> slaveDataSources() throws Exception { if (CollectionUtils.isEmpty(slaves)) { throw new Exception("从库数据源不能为空"); } final List<DataSource> dataSources = new ArrayList<>(); for (Map map : slaves) { dataSources.add(DruidDataSourceFactory.createDataSource(map)); } return dataSources; } @Bean @Primary @DependsOn({"masterDataSource", "slaveDataSources"}) public DataSourceRouter routingDataSource() throws Exception { final Map<Object, Object> targetDataSources = new HashMap<>(); targetDataSources.put(DataSourceContextHolder.MASTER, masterDataSource()); for (int i = 0; i < slaveDataSources().size(); i++) { targetDataSources.put(DataSourceContextHolder.SLAVE + i, slaveDataSources().get(i)); } final DataSourceRouter routingDataSource = new DataSourceRouter(); routingDataSource.setTargetDataSources(targetDataSources); routingDataSource.setDefaultTargetDataSource(masterDataSource()); return routingDataSource; } //设置事务,事务需要知道当前使用的是哪个数据源才能进行事务处理 @Bean public DataSourceTransactionManager dataSourceTransactionManager() throws Exception { return new DataSourceTransactionManager(routingDataSource()); } public Map<String, String> getMasters() { return masters; } public void setMasters(Map<String, String> masters) { this.masters = masters; } public List<Map<String, String>> getSlaves() { return slaves; } public void setSlaves(List<Map<String, String>> slaves) { this.slaves = slaves; } }
步骤三:数据源切换上下文,其中使用了ThreadLocal保存当前线程的数据源
DataSourceContextHolder类
//数据源上下文 public class DataSourceContextHolder { public static final String MASTER = "MASTER"; public static final String SLAVE = "SLAVE"; private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>(); //默认写模式 public static String getDataSourceType() { return CONTEXT_HOLDER.get() == null ? MASTER : CONTEXT_HOLDER.get(); } public static void setDataSourceType(String dataSourceType) { if (dataSourceType == null) { log.error("dataSource为空"); throw new NullPointerException(); } log.info("设置dataSource:{}", dataSourceType); CONTEXT_HOLDER.set(dataSourceType); } public static void removeDataSourceType() { CONTEXT_HOLDER.remove(); } }
步骤四:继承AbstractRoutingDataSource类重写determineCurrentLookupKey方法实现数据源动态切换
DataSourceRouter类继承了SpringBoot的AbstractRoutingDataSource类,并且重写了determineCurrentLookupKey方法来实现数据源动态切换。
//动态主从数据源切换 public class DataSourceRouter extends AbstractRoutingDataSource { @Override protected Object determineCurrentLookupKey() { log.info("当前数据源为" + DataSourceContextHolder.getDataSourceType()); //返回选择的数据源 return DataSourceContextHolder.getDataSourceType(); } }
步骤五:创建读库的自定义注解
//自定义读库注解 //被这个注解的方法使用读库 @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD, ElementType.TYPE}) @Inherited public @interface ReadOnly { }
步骤六:切面类DynamicDataSourceAsepct
//数据源切面 @Aspect @Component public class DynamicDataSourceAspect implements Ordered { //在Service层方法获取DataSource对象之前,在切面中指定当前线程数据源Slave @Before(value = "execution(* *(..)) && @annotation(readOnly)") public void before(JoinPoint point, ReadOnly readOnly) { log.info(point.getSignature().getName() + "走从库"); DataSourceContextHolder.setDataSourceType(DataSourceContextHolder.SLAVE); } @After(value = "execution(* *(..)) && @annotation(readOnly)") public void restoreDataSource(JoinPoint point, ReadOnly readOnly) { log.info(point.getSignature().getName() + "清除数据源"); //方法执行完后清除数据源 DataSourceContextHolder.removeDataSourceType(); } @Override public int getOrder() { return 0; } }
步骤七:在需要走读库的业务方法上添加@ReadOnly注解
这样执行这些业务方法时就会被切面拦截修改数据源,从而走读库进行查询。
//用户订单服务实现 @Service public class UserOrderInfoServiceImpl implements UserOrderInfoService { ... //获取订单详情 @ReadOnly @Override public OrderDetailVO getOrderDetail(String orderNo) { return orderInfoRepository.getOrderDetail(orderNo); } ... }
11.千万级数据优化-水平拆分和垂直拆分
(1)什么时候考虑水平拆分和垂直拆分
(2)什么是垂直拆分
(3)什么是水平拆分
(1)什么时候考虑水平拆分和垂直拆分
当数据库的数据量越来越大时:首先可以从SQL入手进行优化,比如通过加入索引。然后可以使用缓存来进行优化,适合读多写少的场景。接着可以通过主从复制和读写分离来实现优化。此时增删改已全部走主库,查询都走从库,已经大大提升了读数据的能力。但是没有办法提升主库写数据能力,于是可以考虑对数据进行水平拆分和垂直拆分了。
(2)什么是垂直拆分
原来很多模块共用一个数据库资源,经过垂直分库后,商品模块、订单模块、用户模块等使用上自己单独的数据资源,于是各模块的资源竞争就不存在了。
垂直拆分的好处:
一.减轻了数据库的压力
二.每个数据库分摊数据,提高了查询的效率
三.每个数据库访问的CPU、内存、网络压力变小
四.业务更加清晰
五.解耦
六.系统扩展也变得容易
垂直拆分带来的不足:
一.系统的复杂性增加了
二.增加了多个数据库数据表联查的复杂性
三.事务处理变得麻烦
四.垂直拆分也解决不了单表数据量很大的问题
(3)什么是水平拆分
单表数据量很大可能会引起接口查询超时的问题。
履约系统会通过Dubbo发起RPC请求调用订单系统。假如设置的超时间为1s,此时订单模块的查询时间就已经快1s了,很容易超时。而一旦延长超时时间成5s,那么履约系统的的线程池就很容易被打满,导致资源迅速被耗尽,甚至会导致履约系统的服务雪崩。所以垂直拆分解决不了单表数据量很大的问题,需要水平拆分。
水平拆分时表的结构不会发生改变,水平拆分分为:水平拆表、水平分库、水平分库分表。
一.水平拆表如下
二.水平拆库如下
三.水平分库分表
水平拆分带来的不足:
一.水平拆分过程比较复杂
二.事务处理变得复杂
三.多库多表联查难度加大
四.水平拆分之后单表的数据会分散到不同的数据源中(多数据源管理问题)
12.分库分表多数据源实战
(1)首先配置好三个数据源
(2)然后创建一个数据库配置DataSourceConfig读入三个数据源的具体配置
(3)创建数据源上下文DataSourceContextHolder
(4)DataSourceRouter继承SpringBoot的AbstractRoutingDataSource实现动态切换数据源
(5)创建DataSources将数据源配置注入到DataSourceConfig对象中并交给SpringBoot管理
(6)在业务中设置当前线程的数据源类型
(1)首先配置好三个数据源
datasources.properties
spring.datasource01.url=jdbc:mysql://192.168.10.8:3307/order_db?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT spring.datasource01.username=root spring.datasource01.password=Sharding@Master#1202 spring.datasource01.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource02.url=jdbc:mysql://192.168.10.8:3307/order_db?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT spring.datasource02.username=root spring.datasource02.password=Sharding@Master#1202 spring.datasource02.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource03.url=jdbc:mysql://127.0.0.1/demo?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT spring.datasource03.username=root spring.datasource03.password=123456 spring.datasource03.driver-class-name=com.mysql.cj.jdbc.Driver
(2)然后创建一个数据库配置DataSourceConfig读入三个数据源的具体配置
//数据库配置 @Data public class DataSourceConfig { //driverClassName驱动名称 private String driverClassName; //url private String url; //用户名 private String username; //密码 private String password; }
(3)创建数据源上下文DataSourceContextHolder
写死各个数据源,并且为每个线程创建一个副本,当并发访问时,每个线程可以找到它对应的数据源。
//数据源上下文 public class DataSourceContextHolder { public static final String DATA_SOURCE_01 = "datasource01"; public static final String DATA_SOURCE_02 = "datasource02"; public static final String DATA_SOURCE_03 = "datasource03"; private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>(); public static void setDataSourceType(String dataSourceType) { if (dataSourceType == null) { log.error("dataSource为空"); throw new NullPointerException(); } log.info("设置dataSource:{}", dataSourceType); CONTEXT_HOLDER.set(dataSourceType); } public static String getDataSourceType() { return CONTEXT_HOLDER.get(); } //根据取模结果,设置当前线程的数据源类型 public static void setDataSourceType(Long userId) { Long result = userId % DataSources.getDataSourceSize(); if (result == 0) { DataSourceContextHolder.setDataSourceType(DataSourceContextHolder.DATA_SOURCE_01); } else if (result == 1) { DataSourceContextHolder.setDataSourceType(DataSourceContextHolder.DATA_SOURCE_02); } else { DataSourceContextHolder.setDataSourceType(DataSourceContextHolder.DATA_SOURCE_03); } } public static void removeDataSourceType() { CONTEXT_HOLDER.remove(); } }
(4)DataSourceRouter继承SpringBoot的AbstractRoutingDataSource实现动态切换数据源
当创建了数据源上下文DataSourceContextHolder,每个线程是怎么找到它对应的数据源呢?
首先肯定需要在请求开始的时候通过DataSourceContextHolder的setDataSourceType()方法设置线程的数据源。由于DataSourceRouter继承SpringBoot的AbstractRoutingDataSource重写了determineCurrentLookupKey()方法,而在AbstractRoutingDataSource中SpringBoot已经实现了动态切换数据源的路由,所以每个线程便能借助SpringBoot去找到它对应的数据源了。
(5)创建DataSources将数据源配置注入到DataSourceConfig对象中并交给SpringBoot管理
//多数据源配置 @Configuration @PropertySource("classpath:datasources.properties") @ConfigurationProperties(prefix = "spring") public class DataSources { //数据源集合 private static List<DataSource> dataSources = new ArrayList<DataSource>(); //数据源01信息 private DataSourceConfig datasource01; //数据源02信息 private DataSourceConfig datasource02; //数据源03信息 private DataSourceConfig datasource03; //获取数据源数量 public static int getDataSourceSize() { return dataSources.size(); } @Bean public List<DataSource> ds() throws Exception { dataSources.add(buildDruidDataSource(datasource01)); dataSources.add(buildDruidDataSource(datasource02)); dataSources.add(buildDruidDataSource(datasource03)); return dataSources; } //构建Druid数据源 public DruidDataSource buildDruidDataSource(DataSourceConfig dataSourceConfig) { return DataSourceBuilder.create() .type(DruidDataSource.class) .driverClassName(dataSourceConfig.getDriverClassName()) .url(dataSourceConfig.getUrl()) .username(dataSourceConfig.getUsername()) .password(dataSourceConfig.getPassword()) .build(); } @Bean @Primary public DataSourceRouter routingDataSource() throws Exception { Map<Object, Object> targetDataSources = new HashMap<>(); List<DataSource> dataSources = ds(); targetDataSources.put(DataSourceContextHolder.DATA_SOURCE_01, dataSources.get(0)); targetDataSources.put(DataSourceContextHolder.DATA_SOURCE_02, dataSources.get(1)); targetDataSources.put(DataSourceContextHolder.DATA_SOURCE_03, dataSources.get(2)); DataSourceRouter routingDataSource = new DataSourceRouter(); routingDataSource.setTargetDataSources(targetDataSources); routingDataSource.setDefaultTargetDataSource(dataSources.get(0)); return routingDataSource; } //设置事务,事务需要知道当前使用的是哪个数据源才能进行事务处理 @Bean public DataSourceTransactionManager dataSourceTransactionManager() throws Exception { return new DataSourceTransactionManager(routingDataSource()); } public DataSourceConfig getDatasource01() { return datasource01; } public void setDatasource01(DataSourceConfig datasource01) { this.datasource01 = datasource01; } public DataSourceConfig getDatasource02() { return datasource02; } public void setDatasource02(DataSourceConfig datasource02) { this.datasource02 = datasource02; } public DataSourceConfig getDatasource03() { return datasource03; } public void setDatasource03(DataSourceConfig datasource03) { this.datasource03 = datasource03; } }
(6)在业务中设置当前线程的数据源类型
@Repository public class OrderInfoRepository { ... //查询用户订单列表 public Page<OrderInfoVO> queryOrderInfoList(OrderInfoBaseQuery orderInfoBaseQuery) { //1.组装mybatis查询插件 Page page = new Page(); page.setCurrent(orderInfoBaseQuery.getPageNo()); page.setSize(orderInfoBaseQuery.getPageSize()); //2.设置当前线程的数据源类型 DataSourceContextHolder.setDataSourceType(orderInfoBaseQuery.getUserId()); //3.查询该用户订单信息 Page<OrderInfoVO> orderInfoVOPage = orderInfoMapper.queryOrderInfoList(page, orderInfoBaseQuery); return orderInfoVOPage; } ... }
13.分库分表后订单路由策略和全局ID
(1)为什么需要路由策略
(2)路由策略的设计
(3)全局ID的作用
(4)全局唯一ID的算法之雪花算法
(1)为什么需要路由策略
一.分库分表后必须要支持的场景
场景一:用户下单后需要查看订单信息了解订单状态
场景二:商家需要获取订单信息进行分析
二.分库分表后不同数据会分散在不同的库表
一个用户下单后,订单可能会落在不同的表中,他查询自己的订单列表时可能需要查多张表。同样,商家查询他的订单列表时也可能需要查多张表。
所以希望根据一定规则,让同一个用户下的单在同一张表,同一个商家的单在同一张表,这个规则就是路由策略。
(2)路由策略的设计
一.用户的路由策略之使用用户ID作为路由Key
单库多表的路由策略:用户ID的Hash值 % 3
用户下单时,把用户ID作为路由Key: 首先对用户ID取Hash值,然后对表的数量进行取模。
多库多表的路由策略:需要先找到库再找到表
找库:根据用户ID的Hash值对数据库的数量进行取模找到对应的数据库; 找表:根据用户ID的Hash值除以数据库的数量接着再对表的数量进行取模找到对应的表;
可以用2库3表来简单举1-10的ID例子分析理解
找库:用户ID的Hash值 % 2; 找表:用户ID的Hash值 / 2 % 3;
设计路由策略的要点:根据具体的业务场景、与用户信息关联度比较大的路由Key进行Hash值取模。
二.商家的路由策略之使用商家ID作为路由Key并单独为商家B端设计一张订单表
很明显不能使用用户ID作为商家的路由Key。我们会单独为商家B端设计了一张订单表,也就是C端和B端的订单表是相互独立的。
用户的角度查询订单,是以用户ID作为路由Key。商家的角度查询订单,是以商家ID作为路由Key。B端订单表的路由策略和C端订单表的路由策略是一致的,仅仅是路由Key不同。
那么如何为B端的订单表写入数据呢?当C端用户下单时,会把订单号发送到MQ。然后B端会消费这个MQ消息,根据订单号获取订单信息,然后写入到B端的订单表中。
(3)全局ID的作用
全局ID是为了解决订单主键重复的问题。可以引入一个全局唯一ID的服务,用来生成全局唯一ID。该服务每次生成的ID都不一样,可以保证主键的唯一性。
(4)全局唯一ID的算法之雪花算法
雪花算法生成的ID一共64个bit位 = 8字节。
第一部分:0表示正数。
第二部分:41位的时间戳,2^41 - 1 = 69年。
第三部分:10位的全局机器ID,其中5位的数据中心 + 5位机器ID。2 ^10 = 1024,表示在同一个时间戳下,最多可允许1024台机器得到全局唯一ID。
第四部分:12位的序列号 2^12 = 4096。
雪花算法的含义:
就是同一个时间戳下,可为1024台机器获取全局唯一ID,可为每台机器并发分配4096个ID。
雪花算法的ID生成类:
public class IdGenerater { //数据中心id:外部传入 private long datacenterId; //机器id:外部传入 private long workerId; //12位的序列号 private long sequence; //数据中心id的bit位数 private long datacenterIdBits = 5L; //工作id的bit位数 private long workerIdBits = 5L; //序列号bit位数 private long sequenceBits = 12L; //工作id向左移动位数:12位 private long workerIdShift = sequenceBits; //数据id向左移动位数:12 + 5 = 17位 private long datacenterIdShift = sequenceBits + workerIdBits; //时间戳需要左移位数 12 + 5 + 5 = 22位 private long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits; //序列号最大值 private long sequenceMask = -1L ^ (-1L << sequenceBits); //初始时间戳 private long initialTimestamp = 1288834974657L; //上次时间戳,初始值为负数是不想浪费 lastTimestamp + 1 = 0 这个序列号 private long lastTimestamp = -1L; public IdGenerater(long workerId, long datacenterId, long sequence) { this.workerId = workerId; this.datacenterId = datacenterId; this.sequence = sequence; } //通过雪花算法生成一个ID public synchronized long nextId() throws Exception { long timestamp = System.currentTimeMillis(); //获取当前时间戳:当前时间戳不能小于上次的获取的时间戳 if (timestamp < lastTimestamp) { throw new RuntimeException("当前时间戳小于上次的时间戳,可能时钟回拨了!"); } //当前时间戳,如果等于上次时间戳(同一毫秒内),则序列号加一 //否则序列号赋值为0,从0开始 if (timestamp == lastTimestamp) { sequence = (sequence + 1) & sequenceMask; //序列号超过12个bit位、也就是已经生成4096个ID了 //那这一毫秒内就不能再生成新的ID了,切换到下一毫秒内 if (sequence == 0) { timestamp = tilNextMillis(lastTimestamp); } } else { //第一个ID的序列号,从0开始 sequence = 0; } //将当前时间戳设置为最近的一次时间戳 lastTimestamp = timestamp; //返回结果: //(timestamp - initialTimestamp) << timestampLeftShift) 表示将时间戳减去初始时间戳,再左移相应位数 //(datacenterId << datacenterIdShift) 表示将数据id左移相应位数 //(workerId << workerIdShift) 表示将工作id左移相应位数 return ((timestamp - initialTimestamp) << timestampLeftShift) | (datacenterId << datacenterIdShift) | (workerId << workerIdShift) | sequence; } //获取时间戳,并与上次时间戳比较 private long tilNextMillis(long lastTimestamp) { long timestamp = System.currentTimeMillis(); while (timestamp <= lastTimestamp) { timestamp = System.currentTimeMillis(); } return timestamp; } }
雪花算法的测试类:
//测试雪花算法生成ID public class SnowFlakeTest { public static void main(String[] args) throws Exception { IdGenerater worker = new IdGenerater(1, 1, 1); for (int i = 0; i < 5; i++) { long id = worker.nextId(); System.out.println("id为:" + id); System.out.println("二进制id为:" + Long.toBinaryString(id)); } } }
需要注意雪花算法的时钟回拨问题:也就是存在的人为调整系统时间,导致生成同样的ID。
雪花算法的时钟回拨问题的解决方案:可以把上一次系统获取到的时间戳缓存起来,每次获取时间戳和上次的进行比较。如果小于上一次,就证明时钟回拨,这时候就可以取上次时间戳 + 1即可。
14.MyCat和ShardingSphere介绍
(1)MyCat简介
(2)ShardingSphere简介
(1)MyCat简介
如果没有MyCat,系统需要自己管理数据库的配置。
如果有MyCat,数据库的配置会由MyCat管理。同时MyCat可以帮我们完成:读写分离、数据分片路由、多数据源管理等功能。
具体的流程如下:
MyCat适用于中小型公司、创业公司,MyCat是需要单独部署一个服务。如果数据库特别多,则MyCat的压力会变得很大,从而导致性能降低。
(2)ShardingSphere简介
和MyCat不一样的是,ShardingSphere只是一个jar包,更加轻量级。只需要在项目中引入即可,无需独立部署服务。使用ShardingSphere时的系统和MySQL交互流程如下:
ShardingSphere是一款分布式的数据库生态系统,它可以将任意数据库转换为分布式数据库,它还可以通过数据分片、弹性伸缩、加密等能力对原有数据库增强。
ShardingSphere主要由ShardingSphere-JDBC和ShardingSphere-Proxy组成。ShardingSphere-JDBC定位为轻量级Java框架,在Java的JDBC层提供额外服务。ShardingSphere-Proxy定位为透明化的数据库代理端,通过实现数据库二进制协议,对异构语言提供支持。ShardingSphere-JDBC是基于已经有的库和表进行操作的,会进行数据分片(sharding)和SQL解析(parse)。
ShardingSphere的功能有:数据分片、分布式事务、读写分离、高可用、数据库网关、流量治理、数据迁移、数据加密、数据脱密。
15.创建库和表的小工具
(1)简单总结
(2)创建库和表的工具
(1)简单总结
从项目整体演进的角度:单体项目 -> 流量暴增 -> 查询变慢 -> SQL优化 + 索引 -> 引入缓存 -> 读写分离 -> 垂直拆分 + 水平拆分 -> 分库分表所需要支持的场景(B端和C端) -> 路由设计策略 -> 全局ID(分布式ID)的雪花算法 -> ShardingSphere引入
从数据流动方向的角度:请求数据 -> 业务逻辑处理 -> Mapper生成逻辑SQL -> 选择路由Key -> 选择路由策略 -> 选择具体的数据库 -> 选定具体的表 -> ShardingSphere生成实际的SQL
ShardingSphere并不会帮助我们先把库和表生成好,ShardingSphere是基于已经有的库和表进行操作的,会进行数据分片(sharding)和SQL解析(parse)。所以使用ShardingSphere之前,需要先创建好库和表。
(2)创建库和表的工具
步骤一:配置好分片的策略,两个数据源、8库8表、两个维度的表
也就是8个库会落在两个数据源上,每个库有8张用户维度的表和8张商家维度的表。
shardingstrategyConfig.properties
#数据源的配置,多个数据源参考下边的配置 #数据源1 shardingstrategy.datasources[0].hostName=192.168.10.8 shardingstrategy.datasources[0].port=3307 shardingstrategy.datasources[0].username=root shardingstrategy.datasources[0].password=Sharding@Master#1990 #数据源2 #shardingstrategy.datasources[1].hostName=xxx.xxx.xx.xxx #shardingstrategy.datasources[1].port=3306 #shardingstrategy.datasources[1].username=root #shardingstrategy.datasources[1].password=123456 #分库的配置 #分库分表前的库名 shardingstrategy.originDbName=order_db #要分的逻辑库总数 shardingstrategy.dbNum=8 #库名的后缀 shardingstrategy.dbSuffix=_ #分表的配置 #每个逻辑库中表的数量 shardingstrategy.tableNumPerDb=8 #表名后缀,一种分表分库维度对应一种后缀,比如C端维度:_by_user_id_ 商户端维度:_by_merchant_id_ shardingstrategy.tableSuffix=_sharded_by_user_id_,_sharded_by_merchant_id_
步骤二:加载配置类并交给Spring管理
将配置类shardingstrategyConfig.properties的信息加载到ShardingStrategyConfig类中。
@Data @Component @ConfigurationProperties(prefix = "shardingstrategy") @PropertySource("classpath:shardingstrategyConfig.properties") public class ShardingStrategyConfig { //原始库名,即分库分表前的库名 private String originDbName; //要分的逻辑库总数 private int dbNum; //每个逻辑库中表的数量 private int tableNumPerDb; //库名的后缀 private String dbSuffix; //表名后缀,一种分表分库维度对应一种后缀,比如C端维度:_by_user_id_ 商户端维度:_by_merchant_id_ private List<String> tableSuffix; //数据源 private List<DataSourceConfig> datasources; } @Data public class DataSourceConfig { //主机名 private String hostName; //端口号 private int port; //用户名 private String username; //密码 private String password; }
步骤三:创建ShardingUtil类生成具体的库和表
@Component public class ShardingUtil { private static final Logger logger = LoggerFactory.getLogger(ShardingUtil.class); @Autowired private ShardingStrategyConfig shardingStrategyConfig; //创建库和表 public void initTables(String sqlPath) { //1.校验sqlPath if (StringUtils.isBlank(sqlPath)) { throw new IllegalArgumentException("sqlPath cannot be empty"); } Optional.of(shardingStrategyConfig) .map(ShardingStrategyConfig::getDatasources) .ifPresent(dataSources -> { //逻辑库的索引 AtomicInteger dbIndex = new AtomicInteger(); //2.依次处理每个数据源(物理库) dataSources.stream().filter(Objects::nonNull).forEach(dataSourceConfig -> { //3.创建数据源,建库用临时DataSource MysqlDataSource dataSource = buildDataSource(dataSourceConfig); JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); //4.获取配置的数据库名称 String originDbName = shardingStrategyConfig.getOriginDbName(); logger.info("originDbName:[{}],prepare to process the data source:[{}]", originDbName, dataSource.getUrl()); //5.每个数据源需要创建多少个逻辑库 逻辑库总数/物理库总数 = 每个物理库上应该创建多少个逻辑库 int dbNumPerDataSource = shardingStrategyConfig.getDbNum() / dataSources.size(); //6.依次创建逻辑库 for (int i = 0; i < dbNumPerDataSource; i++) { //7.创建逻辑数据库 String targetDbName = createDatabase(dbIndex, jdbcTemplate, originDbName); //8.在指定的逻辑库中创建table createTables(sqlPath, dataSource, targetDbName); } }); }); } //在指定的逻辑db中创建table private void createTables(String sqlPath, MysqlDataSource dataSource, String targetDbName) { //1.指定建表的逻辑库 dataSource.setDatabaseName(targetDbName); //2.创建建表使用的jdbcTemplate JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); try { //3.拼接建表sql String finalTableSql = splicingFinalTableSql(sqlPath); //4.执行建表sql jdbcTemplate.execute(finalTableSql); logger.info("DB[{}] create table complete...", targetDbName); } catch (Exception e) { logger.error("createTables fail,e:[{}]", e.getMessage()); } } //创建逻辑库 private String createDatabase(AtomicInteger dbIndex, JdbcTemplate jdbcTemplate, String originDbName) { //1.拼接要创建的目标数据库名称 String targetDbName = originDbName + shardingStrategyConfig.getDbSuffix() + (dbIndex.getAndIncrement()); //2.创建逻辑库 jdbcTemplate.execute(String.format("CREATE DATABASE IF NOT EXISTS `%s` DEFAULT CHARACTER SET UTF8", targetDbName)); logger.info("DB[{}]create db complete...", targetDbName); return targetDbName; } //拼接建表sql private String splicingFinalTableSql(String sqlPath) throws IOException { StringBuilder finalSqlBuilder = new StringBuilder(); //1.使用流读取sqlPath文件中配置的sql语句 InputStream inputStream = ShardingUtil.class.getResourceAsStream(sqlPath); if (Objects.isNull(inputStream)) { throw new IOException("The specified sqlPath file does not exist"); } //2.将流转换成字符串 String sqlText = IOUtils.toString(inputStream, StandardCharsets.UTF_8); for (int i = 0; i < shardingStrategyConfig.getTableNumPerDb(); i++) { int tableIndex = i; //3.依次处理每条sql Optional.of(sqlText.split(";")) .ifPresent(sqls -> Arrays.stream(sqls) .filter(Objects::nonNull) .forEach(sql -> { //4.依次处理配置的表后缀,拼接出相应后缀名的sql shardingStrategyConfig.getTableSuffix().forEach(tableSuffix -> { //5.为sql中的表名拼接后缀 String finalTableSql = splicingTableSuffix(sql, tableSuffix + tableIndex); //6.将最终要执行的sql统一拼接起来 finalSqlBuilder.append(finalTableSql); }); })); } return finalSqlBuilder.toString(); } //为sql中的表名拼接后缀 private String splicingTableSuffix(String sql, String suffix) { //1.编写匹配表名的正则表达式,(?i)表示忽略大小写,(?s)表示开启单行模式,即多行sql就像在同一行一样,方便匹配 Pattern tablePattern = Pattern.compile("(?i)(?s)CREATE\\s+TABLE\\s+(\\S+)"); StringBuilder sb = new StringBuilder(sql); //2.使用正则表达式来匹配表名 Matcher matcher = tablePattern.matcher(sb); if (matcher.find()) { //3.在表名后边,拼接上指定的后缀名 sb.insert(matcher.end(), suffix) .append(";") .append("\n\n"); logger.debug("match to table:[{}]", matcher.group(1)); return sb.toString(); } return ""; } //创建数据源 private MysqlDataSource buildDataSource(DataSourceConfig dataSourceConfig) { MysqlDataSource dataSource = new MysqlDataSource(); dataSource.setServerName(dataSourceConfig.getHostName()); dataSource.setPort(dataSourceConfig.getPort()); dataSource.setUser(dataSourceConfig.getUsername()); dataSource.setPassword(dataSourceConfig.getPassword()); dataSource.setCharacterEncoding("utf-8"); //开启批处理 dataSource.setAllowMultiQueries(Boolean.TRUE); return dataSource; } }
步骤四:通过测试类执行创建库和表
@SpringBootTest @RunWith(SpringRunner.class) public class ShardingUtilTest { @Autowired private ShardingUtil shardingUtil; @Test public void test_initTables() { shardingUtil.initTables("/sql/init.sql"); } }
16.ShardingSphere数据分片核心原理
(1)ShardingSphere数据分片的流程
(2)ShardingSphere订单系统流程
由于这里使用的是版本4系列作为例子,所以可以参考下面的官网。
https://shardingsphere.apache.org/document/4.1.1/cn/overview/
(1)ShardingSphere数据分片的流程
一.SQL解析
分为词法解析和语法解析。先通过词法解析器将SQL拆分为一个个不可再分的单词,再使用语法解析器对SQL进行理解,并最终提炼出解析上下文。解析上下文包括表、选择项、排序项、分组项、聚合函数、分页信息、查询条件以及可能需要修改的占位符的标记。执行器优化合并和优化分片条件,如OR等。
二.SQL路由
根据解析上下文匹配用户配置的分片策略,并生成路由路径。目前支持分片路由和广播路由。
三.SQL改写
将SQL改写为在真实数据库中可以正确执行的语句,SQL改写分为正确性改写和优化改写。
四.SQL执行
通过多线程执行器异步执行。
五.结果归并
将多个执行结果集归并以便于通过统一的JDBC接口输出,结果归并包括流式归并、内存归并和使用装饰者模式的追加归并这几种方式。
(2)ShardingSphere订单系统流程
17.C端ShardingSphere分片策略实现
(1)在application.properties进行分库分表配置
(2)编写C端维度分库路由算法类
(3)编写C端维度分表路由算法类
(4)创建订单的具体实现
(1)在application.properties进行分库分表配置
以下是基于Spring Boot的规则配置:配置了8主8从,8个主库在数据源1—192.168.10.8:3307,8个从库在数据源2—192.168.10.8:3308。
//参考:https://shardingsphere.apache.org/document/4.1.1/cn/manual/sharding-jdbc/usage/sharding/ #分库分表配置 #打印执行的数据库以及语句 spring.shardingsphere.props.sql.show=true spring.shardingsphere.datasource.names=master0,slave0,master1,slave1,master2,slave2,master3,slave3,master4,slave4,master5,slave5,master6,slave6,master7,slave7 spring.shardingsphere.datasource.master0.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.master0.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.master0.url=jdbc:mysql://192.168.10.8:3307/order_db_0?characterEncoding=utf-8 spring.shardingsphere.datasource.master0.username=root spring.shardingsphere.datasource.master0.password=Sharding@Master#1990 spring.shardingsphere.datasource.slave0.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.slave0.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.slave0.url=jdbc:mysql://192.168.10.8:3308/order_db_0?characterEncoding=utf-8 spring.shardingsphere.datasource.slave0.username=root spring.shardingsphere.datasource.slave0.password=Sharding@Slave#1990 spring.shardingsphere.datasource.master1.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.master1.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.master1.url=jdbc:mysql://192.168.10.8:3307/order_db_1?characterEncoding=utf-8 spring.shardingsphere.datasource.master1.username=root spring.shardingsphere.datasource.master1.password=Sharding@Master#1990 spring.shardingsphere.datasource.slave1.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.slave1.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.slave1.url=jdbc:mysql://192.168.10.8:3308/order_db_1?characterEncoding=utf-8 spring.shardingsphere.datasource.slave1.username=root spring.shardingsphere.datasource.slave1.password=Sharding@Slave#1990 spring.shardingsphere.datasource.master2.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.master2.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.master2.url=jdbc:mysql://192.168.10.8:3307/order_db_2?characterEncoding=utf-8 spring.shardingsphere.datasource.master2.username=root spring.shardingsphere.datasource.master2.password=Sharding@Master#1990 spring.shardingsphere.datasource.slave2.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.slave2.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.slave2.url=jdbc:mysql://192.168.10.8:3308/order_db_2?characterEncoding=utf-8 spring.shardingsphere.datasource.slave2.username=root spring.shardingsphere.datasource.slave2.password=Sharding@Slave#1990 spring.shardingsphere.datasource.master3.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.master3.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.master3.url=jdbc:mysql://192.168.10.8:3307/order_db_3?characterEncoding=utf-8 spring.shardingsphere.datasource.master3.username=root spring.shardingsphere.datasource.master3.password=Sharding@Master#1990 spring.shardingsphere.datasource.slave3.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.slave3.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.slave3.url=jdbc:mysql://192.168.10.8:3308/order_db_3?characterEncoding=utf-8 spring.shardingsphere.datasource.slave3.username=root spring.shardingsphere.datasource.slave3.password=Sharding@Slave#1990 spring.shardingsphere.datasource.master4.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.master4.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.master4.url=jdbc:mysql://192.168.10.8:3307/order_db_4?characterEncoding=utf-8 spring.shardingsphere.datasource.master4.username=root spring.shardingsphere.datasource.master4.password=Sharding@Master#1990 spring.shardingsphere.datasource.slave4.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.slave4.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.slave4.url=jdbc:mysql://192.168.10.8:3308/order_db_4?characterEncoding=utf-8 spring.shardingsphere.datasource.slave4.username=root spring.shardingsphere.datasource.slave4.password=Sharding@Slave#1990 spring.shardingsphere.datasource.master5.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.master5.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.master5.url=jdbc:mysql://192.168.10.8:3307/order_db_5?characterEncoding=utf-8 spring.shardingsphere.datasource.master5.username=root spring.shardingsphere.datasource.master5.password=Sharding@Master#1990 spring.shardingsphere.datasource.slave5.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.slave5.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.slave5.url=jdbc:mysql://192.168.10.8:3308/order_db_5?characterEncoding=utf-8 spring.shardingsphere.datasource.slave5.username=root spring.shardingsphere.datasource.slave5.password=Sharding@Slave#1990 spring.shardingsphere.datasource.master6.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.master6.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.master6.url=jdbc:mysql://192.168.10.8:3307/order_db_6?characterEncoding=utf-8 spring.shardingsphere.datasource.master6.username=root spring.shardingsphere.datasource.master6.password=Sharding@Master#1990 spring.shardingsphere.datasource.slave6.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.slave6.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.slave6.url=jdbc:mysql://192.168.10.8:3308/order_db_6?characterEncoding=utf-8 spring.shardingsphere.datasource.slave6.username=root spring.shardingsphere.datasource.slave6.password=Sharding@Slave#1990 spring.shardingsphere.datasource.master7.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.master7.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.master7.url=jdbc:mysql://192.168.10.8:3307/order_db_7?characterEncoding=utf-8 spring.shardingsphere.datasource.master7.username=root spring.shardingsphere.datasource.master7.password=Sharding@Master#1990 spring.shardingsphere.datasource.slave7.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.slave7.driver-class-name=com.mysql.jdbc.Driver spring.shardingsphere.datasource.slave7.url=jdbc:mysql://192.168.10.8:3308/order_db_7?characterEncoding=utf-8 spring.shardingsphere.datasource.slave7.username=root spring.shardingsphere.datasource.slave7.password=Sharding@Slave#1990 #用于多分片键的复合分片场景 #C端order_info分表策略分片策略 #分片列名称,多个列以逗号分隔 spring.shardingsphere.sharding.tables.order_info_sharded_by_user_id_.database-strategy.complex.sharding-columns=order_no,user_id #复合分片算法类名称,该类需实现ComplexKeysShardingAlgorithm接口并提供无参数的构造器 spring.shardingsphere.sharding.tables.order_info_sharded_by_user_id_.database-strategy.complex.algorithm-class-name=com.demo.sharding.order.sharding.algorithm.OrderDbShardingByUserAlgorithm #C端分表策略 spring.shardingsphere.sharding.tables.order_info_sharded_by_user_id_.actual-data-nodes=master$->{0..7}.order_info_sharded_by_user_id_$->{0..7} #分片字段 spring.shardingsphere.sharding.tables.order_info_sharded_by_user_id_.table-strategy.complex.sharding-columns=order_no,user_id #order_info表分片策略类 spring.shardingsphere.sharding.tables.order_info_sharded_by_user_id_.table-strategy.complex.algorithm-class-name=com.demo.sharding.order.sharding.algorithm.OrderTableShardingByUserAlgorithm #主键名称,缺省表示不使用自增主键生成器 spring.shardingsphere.sharding.tables.order_info_sharded_by_user_id_.key-generator.column=id #主键生成器类型,内置类型:SNOWFLAKE/UUID spring.shardingsphere.sharding.tables.order_info_sharded_by_user_id_.key-generator.type=SNOWFLAKE #工作机器 id spring.shardingsphere.sharding.tables.order_info_sharded_by_user_id_.key-generator.props.worker.id=${workerId} #时钟回拨可以容忍的毫秒数,默认10ms spring.shardingsphere.sharding.tables.order_info_sharded_by_user_id_.key-generator.props.max.tolerate.time.difference.milliseconds=5 #C端order_item_detail分表策略分片策略 #分片列名称,多个列以逗号分隔 spring.shardingsphere.sharding.tables.order_item_detail_sharded_by_user_id_.database-strategy.complex.sharding-columns=order_no,user_id #复合分片算法类名称,该类需实现ComplexKeysShardingAlgorithm接口并提供无参数的构造器 spring.shardingsphere.sharding.tables.order_item_detail_sharded_by_user_id_.database-strategy.complex.algorithm-class-name=com.demo.sharding.order.sharding.algorithm.OrderDbShardingByUserAlgorithm #C端分表策略 spring.shardingsphere.sharding.tables.order_item_detail_sharded_by_user_id_.actual-data-nodes=master$->{0..7}.order_item_detail_sharded_by_user_id_$->{0..7} #分片字段 spring.shardingsphere.sharding.tables.order_item_detail_sharded_by_user_id_.table-strategy.complex.sharding-columns=order_no,user_id #order_item_detail表分片策略类 spring.shardingsphere.sharding.tables.order_item_detail_sharded_by_user_id_.table-strategy.complex.algorithm-class-name=com.demo.sharding.order.sharding.algorithm.OrderTableShardingByUserAlgorithm #主键名称,缺省表示不使用自增主键生成器 spring.shardingsphere.sharding.tables.order_item_detail_sharded_by_user_id_.key-generator.column=id #主键生成器类型,内置类型:SNOWFLAKE/UUID spring.shardingsphere.sharding.tables.order_item_detail_sharded_by_user_id_.key-generator.type=SNOWFLAKE #工作机器 id spring.shardingsphere.sharding.tables.order_item_detail_sharded_by_user_id_.key-generator.props.worker.id=${workerId} #时钟回拨可以容忍的毫秒数,默认10ms spring.shardingsphere.sharding.tables.order_item_detail_sharded_by_user_id_.key-generator.props.max.tolerate.time.difference.milliseconds=5
其中官网中关于数据分片的数据项配置说明如下:
//参考:https://shardingsphere.apache.org/document/4.1.1/cn/manual/sharding-jdbc/configuration/config-spring-boot/ spring.shardingsphere.datasource.names= #数据源名称,多数据源以逗号分隔 spring.shardingsphere.datasource.<data-source-name>.type= #数据库连接池类名称 spring.shardingsphere.datasource.<data-source-name>.driver-class-name= #数据库驱动类名 spring.shardingsphere.datasource.<data-source-name>.url= #数据库url连接 spring.shardingsphere.datasource.<data-source-name>.username= #数据库用户名 spring.shardingsphere.datasource.<data-source-name>.password= #数据库密码 spring.shardingsphere.datasource.<data-source-name>.xxx= #数据库连接池的其它属性 spring.shardingsphere.sharding.tables.<logic-table-name>.actual-data-nodes= #由数据源名 + 表名组成,以小数点分隔。多个表以逗号分隔,支持inline表达式。缺省表示使用已知数据源与逻辑表名称生成数据节点,用于广播表(即每个库中都需要一个同样的表用于关联查询,多为字典表)或只分库不分表且所有库的表结构完全一致的情况 #分库策略,缺省表示使用默认分库策略,以下的分片策略只能选其一 #用于单分片键的标准分片场景 spring.shardingsphere.sharding.tables.<logic-table-name>.database-strategy.standard.sharding-column= #分片列名称 spring.shardingsphere.sharding.tables.<logic-table-name>.database-strategy.standard.precise-algorithm-class-name= #精确分片算法类名称,用于=和IN。该类需实现PreciseShardingAlgorithm接口并提供无参数的构造器 spring.shardingsphere.sharding.tables.<logic-table-name>.database-strategy.standard.range-algorithm-class-name= #范围分片算法类名称,用于BETWEEN,可选。该类需实现RangeShardingAlgorithm接口并提供无参数的构造器 #用于多分片键的复合分片场景 spring.shardingsphere.sharding.tables.<logic-table-name>.database-strategy.complex.sharding-columns= #分片列名称,多个列以逗号分隔 spring.shardingsphere.sharding.tables.<logic-table-name>.database-strategy.complex.algorithm-class-name= #复合分片算法类名称。该类需实现ComplexKeysShardingAlgorithm接口并提供无参数的构造器 #行表达式分片策略 spring.shardingsphere.sharding.tables.<logic-table-name>.database-strategy.inline.sharding-column= #分片列名称 spring.shardingsphere.sharding.tables.<logic-table-name>.database-strategy.inline.algorithm-expression= #分片算法行表达式,需符合groovy语法 #Hint分片策略 spring.shardingsphere.sharding.tables.<logic-table-name>.database-strategy.hint.algorithm-class-name= #Hint分片算法类名称。该类需实现HintShardingAlgorithm接口并提供无参数的构造器 #分表策略,同分库策略 spring.shardingsphere.sharding.tables.<logic-table-name>.table-strategy.xxx= #省略 spring.shardingsphere.sharding.tables.<logic-table-name>.key-generator.column= #自增列名称,缺省表示不使用自增主键生成器 spring.shardingsphere.sharding.tables.<logic-table-name>.key-generator.type= #自增列值生成器类型,缺省表示使用默认自增列值生成器。可使用用户自定义的列值生成器或选择内置类型:SNOWFLAKE/UUID spring.shardingsphere.sharding.tables.<logic-table-name>.key-generator.props.<property-name>= #属性配置, 注意:使用SNOWFLAKE算法,需要配置worker.id与max.tolerate.time.difference.milliseconds属性。若使用此算法生成值作分片值,建议配置max.vibration.offset属性 spring.shardingsphere.sharding.binding-tables[0]= #绑定表规则列表 spring.shardingsphere.sharding.binding-tables[1]= #绑定表规则列表 spring.shardingsphere.sharding.binding-tables[x]= #绑定表规则列表 spring.shardingsphere.sharding.broadcast-tables[0]= #广播表规则列表 spring.shardingsphere.sharding.broadcast-tables[1]= #广播表规则列表 spring.shardingsphere.sharding.broadcast-tables[x]= #广播表规则列表 spring.shardingsphere.sharding.default-data-source-name= #未配置分片规则的表将通过默认数据源定位 spring.shardingsphere.sharding.default-database-strategy.xxx= #默认数据库分片策略,同分库策略 spring.shardingsphere.sharding.default-table-strategy.xxx= #默认表分片策略,同分表策略 spring.shardingsphere.sharding.default-key-generator.type= #默认自增列值生成器类型,缺省将使用org.apache.shardingsphere.core.keygen.generator.impl.SnowflakeKeyGenerator。可使用用户自定义的列值生成器或选择内置类型:SNOWFLAKE/UUID spring.shardingsphere.sharding.default-key-generator.props.<property-name>= #自增列值生成器属性配置, 比如SNOWFLAKE算法的worker.id与max.tolerate.time.difference.milliseconds spring.shardingsphere.sharding.master-slave-rules.<master-slave-data-source-name>.master-data-source-name= #详见读写分离部分 spring.shardingsphere.sharding.master-slave-rules.<master-slave-data-source-name>.slave-data-source-names[0]= #详见读写分离部分 spring.shardingsphere.sharding.master-slave-rules.<master-slave-data-source-name>.slave-data-source-names[1]= #详见读写分离部分 spring.shardingsphere.sharding.master-slave-rules.<master-slave-data-source-name>.slave-data-source-names[x]= #详见读写分离部分 spring.shardingsphere.sharding.master-slave-rules.<master-slave-data-source-name>.load-balance-algorithm-class-name= #详见读写分离部分 spring.shardingsphere.sharding.master-slave-rules.<master-slave-data-source-name>.load-balance-algorithm-type= #详见读写分离部分 spring.shardingsphere.props.sql.show= #是否开启SQL显示,默认值: false spring.shardingsphere.props.executor.size= #工作线程数量,默认值: CPU核数
(2)编写C端维度分库路由算法类
该算法类已配置在application.properties中。
//C端维度分库路由算法类 public class OrderDbShardingByUserAlgorithm implements ComplexKeysShardingAlgorithm<Comparable<?>> { @Override public Collection<String> doSharding(Collection<String> dbs, ComplexKeysShardingValue<Comparable<?>> shardingValue) { Collection<Comparable<?>> orderNos = shardingValue.getColumnNameAndShardingValuesMap().get("order_no"); Collection<Comparable<?>> userIds = shardingValue.getColumnNameAndShardingValuesMap().get("user_id"); Set<String> actualDbNames = null; if (CollectionUtils.isNotEmpty(orderNos)) { actualDbNames = orderNos.stream() .map(orderNo -> getActualDbName(String.valueOf(orderNo), dbs)) .collect(Collectors.toSet()); } else if (CollectionUtils.isNotEmpty(userIds)) { actualDbNames = userIds.stream() .map(userId -> getActualDbName(String.valueOf(userId), dbs)) .collect(Collectors.toSet()); } return actualDbNames; } public String getActualDbName(String shardingValue, Collection<String> dbs) { //获取userId后三位 String userIdSuffix = StringUtils.substring(shardingValue, shardingValue.length() - 3); //使用userId后三位进行路由 int dbSuffix = userIdSuffix.hashCode() % dbs.size(); for (String db : dbs) { if (db.endsWith(String.valueOf(dbSuffix))) { return db; } } return null; } }
(3)编写C端维度分表路由算法类
该算法类已配置在application.properties中。
//C端维度分表路由算法类 public class OrderTableShardingByUserAlgorithm implements ComplexKeysShardingAlgorithm<Comparable<?>> { @Override public Collection<String> doSharding(Collection<String> tables, ComplexKeysShardingValue<Comparable<?>> shardingValue) { Collection<Comparable<?>> orderNos = shardingValue.getColumnNameAndShardingValuesMap().get("order_no"); Collection<Comparable<?>> userIds = shardingValue.getColumnNameAndShardingValuesMap().get("user_id"); Set<String> actualTableNames = null; if (CollectionUtils.isNotEmpty(orderNos)) { actualTableNames = orderNos.stream() .map(orderNo -> getActualTableName(String.valueOf(orderNo), tables)) .collect(Collectors.toSet()); } else if (CollectionUtils.isNotEmpty(userIds)) { actualTableNames = userIds.stream() .map(userId -> getActualTableName(String.valueOf(userId), tables)) .collect(Collectors.toSet()); } return actualTableNames; } public String getActualTableName(String shardingValue, Collection<String> tables) { //获取userId后三位 String userIdSuffix = StringUtils.substring(shardingValue, shardingValue.length() - 3); //使用userId后三位进行路由 int tableSuffix = userIdSuffix.hashCode() / tables.size() % tables.size(); for (String table : tables) { if (table.endsWith(String.valueOf(tableSuffix))) { return table; } } return null; } }
(4)创建订单的具体实现
@RestController @RequestMapping(value = "/user/order") public class UserOrderController { @Autowired private UserOrderInfoService userOrderInfoService; ... //创建订单 @PostMapping("/generateOrder") public OperationResponse generateOrder(@RequestBody UserOrderInfoRequest userOrderInfoRequest) { try { long startTime = System.currentTimeMillis(); userOrderInfoService.generateOrder(userOrderInfoRequest); long endTime = System.currentTimeMillis(); log.info("创建用户订单耗时:[{}]", (endTime - startTime)); return OperationResponse.success(OrderCode.ADD_ORDER_SUCCESS.getDesc()); } catch (BaseException e) { return OperationResponse.error(e.getMessage()); } catch (Exception e) { log.error("generateOrder error: [{}]", e.getMessage(), e); return OperationResponse.error(OrderCode.ADD_ORDER_ERROR.getDesc()); } } ... } //用户订单服务实现 @Service public class UserOrderInfoServiceImpl implements UserOrderInfoService { @Autowired private UserOrderRepository userOrderRepository; ... @Override public void generateOrder(UserOrderInfoRequest userOrderInfoRequest) { //1.入参校验 OrderValidation.checkVerifyOrderRequest(userOrderInfoRequest); //2.添加订单信息 OrderInfo orderInfo = orderConvertor.dtoConvertOrderInfo(userOrderInfoRequest); //3.添加订单详情 List<OrderItemDetail> orderItemDetailList = userOrderInfoRequest.getOrderItemDetailList(); //4.生成订单号 String orderNo = GenerateOrderNoUtils.getOrderNo(orderInfo.getUserId(), orderInfo.getMerchantId()); //5.为订单和订单明细设置订单号 orderInfo.setOrderNo(orderNo); orderItemDetailList.forEach(orderItemDetail -> orderItemDetail.setOrderNo(orderInfo.getOrderNo())); //6.保存订单信息 userOrderRepository.generateOrderInfo(orderInfo, orderItemDetailList); ... } ... } public class GenerateOrderNoUtils { ... //生成订单单号编码(调用方法),网站中该用户唯一ID 防止重复 public static String getOrderNo(Long userId, Long merchantId) { return getOrderIdKey(String.valueOf(userId), String.valueOf(merchantId)); } private static String getOrderIdKey(String userId, String merchantId) { return "10" + getDateTimeKey() + getAutoNoKey() + toCode(userId, merchantId); } //生成时间戳 private static String getDateTimeKey() { return DateFormatUtils.format(new Date(), "yyMMdd"); } private static String getAutoNoKey() { long random = getRandom(6); return String.valueOf(genNo(random, 5)); } //根据ID进行加密 + 加随机数组成固定长度编码:3位随机数 + 商家ID后3位 + 用户ID后3位 private static String toCode(String userId, String merchantId) { String userIdSubstring = userId.substring(userId.length() - 3); String merchantIdSubstring = merchantId.substring(merchantId.length() - 3); long random = getRandom(3); return random + merchantIdSubstring + userIdSubstring; } ... } @Repository public class UserOrderRepository { ... @Transactional(rollbackFor = Exception.class) public void generateOrderInfo(OrderInfo orderInfo, List<OrderItemDetail> orderItemDetailList) { //1.保存订单信息 userOrderInfoMapper.insertSelective(orderInfo); //2.批量保存订单明细 userOrderItemDetailMapper.batchInsert(orderItemDetailList); } ... }
而在userOrderInfoMapper.xml和userOrderItemDetailMapper.xml中已变为逻辑SQL。这些逻辑SQL会交给ShardingSphere-JDBC进行处理:解析SQL、进行拦截路由拼装、生成具体的SQL。ShardingSphere-JDBC会最终定位到具体的哪个库和哪个表中执行具体的SQL。
####################################userOrderInfoMapper.xml#################################### <insert id="insertSelective" parameterType="com.demo.sharding.order.domain.entity.OrderInfo" keyProperty="orderInfo.id" useGeneratedKeys="true"> insert into order_info_sharded_by_user_id_ <trim prefix="(" suffix=")" suffixOverrides=","> ... </trim> </insert> ####################################userOrderItemDetailMapper.xml#################################### <insert id="batchInsert"> insert into order_item_detail_sharded_by_user_id_ (order_no, product_id,category_id,goods_num,goods_price,goods_amount,discount_amount,discount_id,create_user,update_user) values <foreach collection="records" item="record" separator=","> (#{record.orderNo,jdbcType=VARCHAR}, #{record.productId,jdbcType=BIGINT}, #{record.categoryId,jdbcType=BIGINT}, #{record.goodsNum,jdbcType=BIGINT}, #{record.goodsPrice,jdbcType=DECIMAL},#{record.goodsAmount,jdbcType=DECIMAL}, #{record.discountAmount,jdbcType=DECIMAL}, #{record.discountId,jdbcType=BIGINT}, #{record.createUser,jdbcType=BIGINT},#{record.updateUser,jdbcType=BIGINT}) </foreach> </insert>
18.C端ShardingSphere读写分离实现
只需在application.properties进行读写分离配置:
#主从配置-读写分离 spring.shardingsphere.sharding.master-slave-rules.master0.master-data-source-name=master0 spring.shardingsphere.sharding.master-slave-rules.master0.slave-data-source-names=slave0 spring.shardingsphere.sharding.master-slave-rules.master1.master-data-source-name=master1 spring.shardingsphere.sharding.master-slave-rules.master1.slave-data-source-names=slave1 spring.shardingsphere.sharding.master-slave-rules.master2.master-data-source-name=master2 spring.shardingsphere.sharding.master-slave-rules.master2.slave-data-source-names=slave2 spring.shardingsphere.sharding.master-slave-rules.master3.master-data-source-name=master3 spring.shardingsphere.sharding.master-slave-rules.master3.slave-data-source-names=slave3 spring.shardingsphere.sharding.master-slave-rules.master4.master-data-source-name=master4 spring.shardingsphere.sharding.master-slave-rules.master4.slave-data-source-names=slave4 spring.shardingsphere.sharding.master-slave-rules.master5.master-data-source-name=master5 spring.shardingsphere.sharding.master-slave-rules.master5.slave-data-source-names=slave5 spring.shardingsphere.sharding.master-slave-rules.master6.master-data-source-name=master6 spring.shardingsphere.sharding.master-slave-rules.master6.slave-data-source-names=slave6 spring.shardingsphere.sharding.master-slave-rules.master7.master-data-source-name=master7 spring.shardingsphere.sharding.master-slave-rules.master7.slave-data-source-names=slave7
其中官网中关于读写分离的数据项配置说明如下:
//参考:https://shardingsphere.apache.org/document/4.1.1/cn/manual/sharding-jdbc/configuration/config-spring-boot/ spring.shardingsphere.sharding.master-slave-rules.<master-slave-data-source-name>.master-data-source-name= #主库数据源名称 spring.shardingsphere.sharding.master-slave-rules.<master-slave-data-source-name>.slave-data-source-names[0]= #从库数据源名称列表 spring.shardingsphere.sharding.master-slave-rules.<master-slave-data-source-name>.slave-data-source-names[1]= #从库数据源名称列表 spring.shardingsphere.sharding.master-slave-rules.<master-slave-data-source-name>.slave-data-source-names[x]= #从库数据源名称列表 spring.shardingsphere.sharding.master-slave-rules.<master-slave-data-source-name>.load-balance-algorithm-class-name= #从库负载均衡算法类名称。该类需实现MasterSlaveLoadBalanceAlgorithm接口且提供无参数构造器 spring.shardingsphere.sharding.master-slave-rules.<master-slave-data-source-name>.load-balance-algorithm-type= #从库负载均衡算法类型,可选值:ROUND_ROBIN,RANDOM。若`load-balance-algorithm-class-name`存在则忽略该配置 spring.shardingsphere.props.sql.show= #是否开启SQL显示,默认值: false spring.shardingsphere.props.executor.size= #工作线程数量,默认值: CPU核数 spring.shardingsphere.props.check.table.metadata.enabled= #是否在启动时检查分表元数据一致性,默认值: false
19.异构订单实现
要实现异构订单,就需要在C端创建完订单后,发送消息到MQ,然后由B端进行消费。
@Service public class UserOrderInfoServiceImpl implements UserOrderInfoService { ... @Override public void generateOrder(UserOrderInfoRequest userOrderInfoRequest) { //1.入参校验 OrderValidation.checkVerifyOrderRequest(userOrderInfoRequest); //2.添加订单信息 OrderInfo orderInfo = orderConvertor.dtoConvertOrderInfo(userOrderInfoRequest); //3.添加订单详情 List<OrderItemDetail> orderItemDetailList = userOrderInfoRequest.getOrderItemDetailList(); //4.生成订单号 String orderNo = GenerateOrderNoUtils.getOrderNo(orderInfo.getUserId(), orderInfo.getMerchantId()); //5.为订单和订单明细设置订单号 orderInfo.setOrderNo(orderNo); orderItemDetailList.forEach(orderItemDetail -> orderItemDetail.setOrderNo(orderInfo.getOrderNo())); //6.保存订单信息 userOrderRepository.generateOrderInfo(orderInfo, orderItemDetailList); //7.构建消息体 OrderSyncMessage orderSyncMessage = OrderSyncMessage.builder() .requestSource(RequestSource.C) .orderOperateType(OrderOperateType.add) .orderNo(orderInfo.getOrderNo()) .build(); //8.发送mq消息 rocketMQProducer.reliablySend(topic, orderSyncMessage, orderSyncMessage.getOrderNo(), obj -> { OrderSyncMessage message = (OrderSyncMessage) obj; // 这里可以将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker log.info("generateOrder send mq message failed, store the mq message to db,orderNo:[{}],message:[{}]", message.getOrderNo(), message); }); } ... }
消费消息创建异构订单:
@Component @RocketMQMessageListener(consumerGroup = "${order.sync.consumer.group}", topic = "${order.topic}") public class OrderSyncConsumer implements RocketMQListener<OrderSyncMessage> { @Autowired private MerchantOrderRepository merchantOrderRepository; @Autowired private UserOrderRepository userOrderRepository; @Override public void onMessage(OrderSyncMessage orderSyncMessage) { log.info("received message,start processing,orderNo:[{}],message:[{}]", orderSyncMessage.getOrderNo(), orderSyncMessage); //1.check请求来源 if (Objects.isNull(orderSyncMessage.getRequestSource())) { log.error("requestSource can not be empty,orderNo:[{}]", orderSyncMessage.getOrderNo()); return; } //2.获取订单源数据 OrderDetailVO orderDetail = getOrderDetail(orderSyncMessage.getOrderNo(), orderSyncMessage.getRequestSource()); //3.如果订单为空,打印日志并抛出异常,从而等待rocketMQ下次重试 if (Objects.isNull(orderDetail)) { //打印错误日志 log.error("orderDetail not exist,orderNo:[{}]", orderSyncMessage.getOrderNo()); //抛出异常,等待rocketMQ下次重试 throw new NullPointerException("orderDetail is null,orderNo:[{}]" + orderSyncMessage.getOrderNo()); } //4.获取要处理的目标源 RequestSource target = RequestSource.C.equals(orderSyncMessage.getRequestSource()) ? RequestSource.B : RequestSource.C; //5.同步订单数据的处理,即异构一份订单数据到目标源 或 同步订单状态到目标源 if (OrderOperateType.add.equals(orderSyncMessage.getOrderOperateType())) { //异构一份订单数据到目标源 syncOrderInfo(orderSyncMessage, orderDetail, target); } else if (OrderOperateType.updateOrderStatus.equals(orderSyncMessage.getOrderOperateType())) { //同步订单状态到目标源 updateOrderStatus(orderDetail.getOrderNo(), orderDetail.getOrderStatus(), target); } } //异构一份订单数据到目标源 private void syncOrderInfo(OrderSyncMessage orderSyncMessage, OrderDetailVO orderDetail, RequestSource target) { //1.幂等处理,目标订单存在则跳过 OrderDetailVO targetOrder = getOrderDetail(orderSyncMessage.getOrderNo(), target); if (Objects.nonNull(targetOrder)) { log.info("targetOrder existed,orderNo:[{}]", orderSyncMessage.getOrderNo()); return; } //2.同步订单信息到目标源 syncOrderInfo(orderDetail, target); } //同步订单信息到目标源 private void syncOrderInfo(OrderDetailVO orderDetail, RequestSource target) { //1.拷贝数据 orderDetail -> orderInfo OrderInfo orderInfo = new OrderInfo(); BeanUtils.copyProperties(orderDetail, orderInfo); //2.转换类型 OrderItemDetailDto -> OrderItemDetail List<OrderItemDetail> orderItemDetailList = convertOrderItemDetails(orderDetail.getOrderItemDetails()); //3.异构订单到商家,目前只有从C端下单,异构到B端的业务 if (RequestSource.B.equals(target)) { //异构订单到商家 merchantOrderRepository.syncOrderInfo(orderInfo, orderItemDetailList); } } //获取订单详情 private OrderDetailVO getOrderDetail(String orderNo, RequestSource requestSource) { if (RequestSource.C.equals(requestSource)) { return userOrderRepository.getOrderDetail(orderNo); } else if (RequestSource.B.equals(requestSource)) { return merchantOrderRepository.getOrderDetail(orderNo); } return null; } //同步订单状态到目标源 private void updateOrderStatus(String orderNo, Integer orderStatus, RequestSource requestSource) { if (RequestSource.C.equals(requestSource)) { userOrderRepository.updateStatus(orderNo, orderStatus); } else if (RequestSource.B.equals(requestSource)) { merchantOrderRepository.updateStatus(orderNo, orderStatus); } } //将OrderItemDetailDto类型 转换为 OrderItemDetail类型 private List<OrderItemDetail> convertOrderItemDetails(List<OrderItemDetailDto> orderItemDetailDtos) { List<OrderItemDetail> orderItemDetailList = new ArrayList<>(); for (OrderItemDetailDto itemDetail : orderItemDetailDtos) { OrderItemDetail orderItemDetail = new OrderItemDetail(); BeanUtils.copyProperties(itemDetail, orderItemDetail); orderItemDetailList.add(orderItemDetail); } return orderItemDetailList; } }
20.单库亿级数据怎么迁移到多库多表上
(1)单库单表的亿级数据如何迁移到8库8表上
(2)全量同步的处理
(3)数据校验的处理
(4)增量同步的处理
(5)增量同步的问题
(1)单库单表的亿级数据如何迁移到8库8表上
数据迁移 -> 全量同步 数据校验的问题 断点续传的问题 数据迁移 -> 增量同步
(2)全量同步的处理
全量同步,指的就是将旧库的数据,通过ShardingShpere的路由,同步迁移到新库当中。单库的数据特别大,有上亿条数据,我们并不是一下子全部查出来,而是分批次去查询。
全量同步的处理流程如下:
每当需要进行一次全量同步时,就会创建一条迁移记录。这条迁移记录其实就代表着一次全量同步的任务,对线上的数据进行迁移可能会执行很多次这种全量同步的任务。在一次全量同步的任务中,会对全量数据进行循环式、分批次、每次尝试查询500条数据。如果能从单库单表查询到数据,就会添加一条该批次的迁移明细记录。接着可以对每次查询出来的500条数据,按照时间等条件进行过滤。然后将过滤出来的数据按表进行分组,批量插入到多库多表中。最后更新该批次的迁移明细记录的状态为同步状态。如果在对全量数据进行循环式、每次尝试查询500条数据时,终于查不到数据了,则更新本次全量同步任务对应的迁移记录的状态为同步状态。
(3)数据校验的处理
情况一:旧库有某数据而新库没有,直接在新库中插入数据,保证一致性。
情况二:旧库和新库中都有某数据,但旧库的更新时间更大,新库需要更新。
情况三:旧库和新库中都有某数据,但新库的更新时间更大,新库无需操作。
(4)增量同步的处理
当已经进行完全量同步将旧库的数据都同步到新库时,由于数据迁移和开发的过程是一个平滑的过程,旧库在全量同步时还在承担业务的访问处理,新的数据还会打到旧库中。一旦进行增删改操作,还是需要进行同步的,这就是增量同步。
增量同步的处理流程如下:
引入Canal监听旧库,一旦旧库有增删改操作,Canal就可以监听到并把binlog日志发送到MQ中。这时便可以通过定时任务去获取MQ的binlog日志消息,然后在新库中进行重做binlog日志,从而保证新库和旧库数据的一致性。
(5)增量同步的问题
问题一:如何保证MQ消息不丢失
问题二:数据的合并问题,上一秒新增了数据,下一秒修改了数据,此时需要数据合并