一、背景
以往项目客户规定,各个部门的数据不能放在同一个数据库,导致办件服务存储不同部门的数据库在不同的数据源中,引发的多数据源查询的一系列问题
二、sharding jdbc 简介
Sharding-JDBC是谷歌的一个开源的框架,提供标准化的数据分片、分布式事务和数据库治理功能,定位为轻量级Java框架,在Java的JDBC层提供的额外服务。 它使用客户端直连数据库,以jar包形式提供服务,无需额外部署和依赖,可理解为增强版的JDBC驱动,完全兼容JDBC和各种ORM框架。
适用于任何基于Java的ORM框架,如:JPA, Hibernate, Mybatis, Spring JDBC Template或直接使用JDBC。
基于任何第三方的数据库连接池,如:DBCP, C3P0, BoneCP, Druid, HikariCP等。
支持任意实现JDBC规范的数据库。目前支持MySQL,Oracle,SQLServer和PostgreSQL。
三、项目融合sharding jdbc
办件服务现有的是动态切换对应的数据源,key是部门的id,并不满足我们对一条sql语句就可以查询所有的部门数据的问题,所以要对现有有的逻辑进行改造
首先我们要获取到所有的数据源的信息,初始化一份数据,如下图,这个是现有的存数据源信息的表dic_database_info
然后我们要通过这张表获取所有的部门的数据源,并且初始化到线程池中,代码如下:
/*** @Description: 通过jdbc查询出数据连接信息 * @return* @date 2020-09-11 12:25* @throws*/privateMap<String, DataSource>getDepartmentDataSources (DataSourcerdsDataSource){ Stringsql="select id, department_id as departmentId, access_url as accessUrl,\n"+" access_key as accessKey, access_value as accessValue\n"+" from dic_database_info where status=1 order by id "; Map<String, DataSource>dataSourceMap=newHashMap<>(); try { PreparedStatementpreparedStatement=rdsDataSource.getConnection().prepareStatement(sql); ResultSetresultSet=preparedStatement.executeQuery(); while (resultSet.next()){ StringdepartmentId=String.valueOf(resultSet.getLong("departmentId")); StringaccessUrl=resultSet.getString("accessUrl"); StringaccessKey=resultSet.getString("accessKey"); StringaccessValue=resultSet.getString("accessValue"); accessValue=JasyptUtils.decrypt(accessValue,salt); DataSourcedataSource=DataSourceBuilder.create().driverClassName(driverClassName) .url(accessUrl) .username(accessKey) .password(accessValue) .build(); StringserialNum=shardingSphereDataSourceSerialRecord.addDataSourceAlias(departmentId); dataSourceMap.put(serialNum,dataSource); } }catch (SQLExceptione){ LOGGER.error("通过JDBC查询各局数据库信息出错:{}",e.getMessage()); } returndataSourceMap; }
然后在maven引入包
<dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-jdbc-core</artifactId><version>4.1.1</version></dependency>
由于办件服务只需要查询某一些表,所以我们把查询的表的表名配置在配置文件中
spring.shardingsphere.sharding.ext.tables=agent_materials,agents,applicant_materials,applicants,application_result_info,current_node,fee_unit_info,group_data_search,mark_results,node_result,paid_info,result_receive_info,service_application,service_attachment,service_correction,service_info_apply_nullify,service_info_nullify,tab_accept_affair_request,tab_apply_invalid_service_application,tab_apply_revoke_service_application,tab_get_node_info,tab_service_application_accepted,tab_service_application_result,tab_service_application_result_info
然后在配置文件中获取
prefix="spring.shardingsphere.sharding.ext") (publicstaticclassShardingJdbcProperties { privateList<String>tables=Collections.emptyList(); publicList<String>getTables() { returntables; } publicvoidsetTables(List<String>tables) { this.tables=tables; } }
初始化配置文件
publicvoidafterPropertiesSet() throwsException { ShardingJdbcPropertiesshardingJdbcProperties=beanFactory.getBean(ShardingJdbcProperties.class); if (shardingJdbcProperties!=null){ logicTables=newHashSet<>(shardingJdbcProperties.getTables()); initialized=true; lock.lock(); try{ condition.signalAll(); }finally { lock.unlock(); } } } publicvoidsetBeanFactory(BeanFactorybeanFactory) throwsBeansException { this.beanFactory=beanFactory; }
最后初始化sharding jdbc,并且保存数据源,表信息
name="apiDataSource") (prefix="spring.datasource.api") (publicDataSourcerdsDataSource() { returnDataSourceBuilder.create().build(); } publicDataSourceshardingSphereDataSource(ShardingJdbcPropertiesshardingJdbcProperties, ("apiDataSource") DataSourcedataSource) throwsSQLException { finalStringMASTER_DATASOURCE_ALIAS="master"; ShardingRuleConfigurationshardingRuleConfig=newShardingRuleConfiguration(); shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig(newHintShardingStrategyConfiguration( newDepartmentDataSourceHintShardingAlgorithm())); shardingRuleConfig.setDefaultTableShardingStrategyConfig(newHintShardingStrategyConfiguration( newDepartmentDataSourceHintShardingAlgorithm.DepartmentTableHintShardingAlgorithm())); Map<String, DataSource>fullDataSourceMap=newHashMap<>(); fullDataSourceMap.put(MASTER_DATASOURCE_ALIAS, dataSource); Map<String, DataSource>departmentDatasourceMap=getDepartmentDataSources(dataSource); if (departmentDatasourceMap!=null&&departmentDatasourceMap.size() >0 ){ fullDataSourceMap.putAll(departmentDatasourceMap); } shardingRuleConfig.setDefaultDataSourceName(MASTER_DATASOURCE_ALIAS); for (Stringtable : shardingJdbcProperties.getTables()){ TableRuleConfigurationtableRuleConfiguration=getDepartmentTableRuleConfiguration(table); if (tableRuleConfiguration!=null){ shardingRuleConfig.getTableRuleConfigs().add(tableRuleConfiguration); shardingRuleConfig.getBindingTableGroups().add(table); } } Propertiesproperties=newProperties(); properties.put("sql.show", sqlShow); returnShardingDataSourceFactory.createDataSource(fullDataSourceMap, shardingRuleConfig, properties); } privateTableRuleConfigurationgetDepartmentTableRuleConfiguration (Stringtable) { finalStringDS_TABLE_EXPRESSION="%s${0..%d}.%s"; if (StringUtils.isNotEmpty(table)){ Stringexpression=String.format(DS_TABLE_EXPRESSION, shardingSphereDataSourceSerialRecord.getDataSourcePrefix(), shardingSphereDataSourceSerialRecord.getMaxSerialNum(), table); returnnewTableRuleConfiguration(table, expression); } returnnull; } name="apiSqlSessionFactory") (publicSqlSessionFactoryrdsSqlSessionFactory( ("shardingSphereDataSource")DataSourcedynamicDataSource) throwsException { SqlSessionFactoryBeansqlSessionFactory=newSqlSessionFactoryBean(); sqlSessionFactory.setDataSource(dynamicDataSource); sqlSessionFactory.setConfigLocation(newClassPathResource("mybatis-config.xml")); sqlSessionFactory.setMapperLocations(newPathMatchingResourcePatternResolver().getResources("classpath:mapper/**/*.xml")); returnsqlSessionFactory.getObject(); } name="apiTransactionManager") (publicDataSourceTransactionManagerrdsTransactionManager( ("shardingSphereDataSource") DataSourcedynamicDataSource){ returnnewDataSourceTransactionManager(dynamicDataSource); } name="apiSqlSessionTemplate") (publicSqlSessionTemplaterdsSqlSessionTemplate( ("apiSqlSessionFactory") SqlSessionFactoryrdsSqlSessionFactory){ returnnewSqlSessionTemplate(rdsSqlSessionFactory); } name="apiNamedParameterJdbcTemplate") (publicNamedParameterJdbcTemplaterdsNamedParameterJdbcTemplate( ("shardingSphereDataSource") DataSourcedynamicDataSource){ returnnewNamedParameterJdbcTemplate(dynamicDataSource); } publicstaticvoidcheckInitialize() throwsInterruptedException { if (!initialized) { lock.lock(); try{ condition.await(5000, TimeUnit.MILLISECONDS); }finally { lock.unlock(); } } } publicstaticbooleancontainLogicTable(StringtableName){ try { checkInitialize(); } catch (InterruptedExceptione) { } returnlogicTables==null?false : logicTables.contains(tableName); } publicstaticSet<String>getLogicTables () { try { checkInitialize(); } catch (InterruptedExceptione) { } returnlogicTables==null?Collections.emptySet() : logicTables; }
然后,封装一个类,对外提供切换数据源的接口
importlombok.extern.slf4j.Slf4j; importorg.apache.commons.collections4.CollectionUtils; importorg.apache.commons.lang3.ArrayUtils; importorg.apache.commons.lang3.StringUtils; importorg.apache.shardingsphere.api.hint.HintManager; importjava.util.*; publicclassDynamicDataSourceContextHolder { privatestaticfinalThreadLocal<String>CONTEXT_HOLDER=newThreadLocal<String>() { /*** 将 master 数据源的 key作为默认数据源的 key*/protectedStringinitialValue() { return"master"; } }; /*** 数据源的 key集合,用于切换时判断数据源是否存在*/publicstaticList<Object>dataSourceKeys=newArrayList<>(); /*** 切换数据源* @param keys*/publicstaticvoidsetDataSourceKey(String ... keys) { if (ArrayUtils.isNotEmpty(keys)){ log.info("调用数据源部门id为:"+Arrays.toString(keys)); Set<String>logicTables=ShardingJdbcConfig.getLogicTables(); HintManagerhintManager=HintManager.getInstance(); ShardingSphereDataSourceSerialRecordshardingSphereDataSourceSerialRecord=ShardingSphereDataSourceSerialRecord.getInstance(); if (CollectionUtils.isNotEmpty(logicTables)){ for (StringlogicTable : logicTables){ for (Stringkey : keys){ if (shardingSphereDataSourceSerialRecord.containDataSourceAliase(key)){ hintManager.addDatabaseShardingValue(logicTable, key); } } } } CONTEXT_HOLDER.set(keys[0]); }else{ log.info("调用master数据源"); } } publicstaticvoidsetDataSourceKeyByDepartmentIds (StringtableName, String ... departmentIds){ if (ArrayUtils.isEmpty(departmentIds) ||StringUtils.isEmpty(tableName)){ return; } HintManagerhintManager=HintManager.getInstance(); ShardingSphereDataSourceSerialRecordshardingSphereDataSourceSerialRecord=ShardingSphereDataSourceSerialRecord.getInstance(); for (StringdepartmentId : departmentIds){ if (!shardingSphereDataSourceSerialRecord.containDataSourceAliase(departmentId)){ continue; } StringserialNum=shardingSphereDataSourceSerialRecord.getDataSourceSerialNum(departmentId); hintManager.addDatabaseShardingValue(tableName, serialNum); } } /*** 获取数据源* @return*/publicstaticStringgetDataSourceKey() { returnCONTEXT_HOLDER.get(); } /*** 重置数据源*/publicstaticvoidclearDataSourceKey() { HintManager.clear(); CONTEXT_HOLDER.remove(); } /*** 判断是否包含数据源* @param key 数据源key* @return*/publicstaticbooleancontainDataSourceKey(Stringkey) { returndataSourceKeys.contains(key); } /*** 添加数据源keys* @param keys* @return*/publicstaticbooleanaddDataSourceKeys(Collection<?extendsObject>keys) { returndataSourceKeys.addAll(keys); } }
然后就可以通过DynamicDataSourceContextHolder.setDataSourceKey()切换数据源了,不同的数据源相同的表结果一条sql就可以搞定了