sharding jdbc多数据源同表结果查询问题

简介: 以往项目中,客户规定,各个部门的数据不能放在同一个数据库,导致办件服务存储不同部门的数据库在不同的数据源中,引发的多数据源查询的一系列问题

一、背景

      以往项目客户规定,各个部门的数据不能放在同一个数据库,导致办件服务存储不同部门的数据库在不同的数据源中,引发的多数据源查询的一系列问题

二、sharding jdbc 简介

     Sharding-JDBC是谷歌的一个开源的框架,提供标准化的数据分片、分布式事务和数据库治理功能,定位为轻量级Java框架,在Java的JDBC层提供的额外服务。 它使用客户端直连数据库,以jar包形式提供服务,无需额外部署和依赖,可理解为增强版的JDBC驱动,完全兼容JDBC和各种ORM框架。
适用于任何基于Java的ORM框架,如:JPA, Hibernate, Mybatis, Spring JDBC Template或直接使用JDBC。
基于任何第三方的数据库连接池,如:DBCP, C3P0, BoneCP, Druid, HikariCP等。
支持任意实现JDBC规范的数据库。目前支持MySQL,Oracle,SQLServer和PostgreSQL。

   三、项目融合sharding jdbc

     办件服务现有的是动态切换对应的数据源,key是部门的id,并不满足我们对一条sql语句就可以查询所有的部门数据的问题,所以要对现有有的逻辑进行改造

    首先我们要获取到所有的数据源的信息,初始化一份数据,如下图,这个是现有的存数据源信息的表dic_database_info

   

  然后我们要通过这张表获取所有的部门的数据源,并且初始化到线程池中,代码如下:

/*** @Description: 通过jdbc查询出数据连接信息  * @return* @date 2020-09-11 12:25* @throws*/privateMap<String, DataSource>getDepartmentDataSources (DataSourcerdsDataSource){
Stringsql="select id, department_id as departmentId, access_url as accessUrl,\n"+"        access_key as accessKey, access_value as accessValue\n"+"        from dic_database_info where status=1 order by id ";
Map<String, DataSource>dataSourceMap=newHashMap<>();
try {
PreparedStatementpreparedStatement=rdsDataSource.getConnection().prepareStatement(sql);
ResultSetresultSet=preparedStatement.executeQuery();
while (resultSet.next()){
StringdepartmentId=String.valueOf(resultSet.getLong("departmentId"));
StringaccessUrl=resultSet.getString("accessUrl");
StringaccessKey=resultSet.getString("accessKey");
StringaccessValue=resultSet.getString("accessValue");
accessValue=JasyptUtils.decrypt(accessValue,salt);
DataSourcedataSource=DataSourceBuilder.create().driverClassName(driverClassName)
                        .url(accessUrl)
                        .username(accessKey)
                        .password(accessValue)
                        .build();
StringserialNum=shardingSphereDataSourceSerialRecord.addDataSourceAlias(departmentId);
dataSourceMap.put(serialNum,dataSource);
            }
        }catch (SQLExceptione){
LOGGER.error("通过JDBC查询各局数据库信息出错:{}",e.getMessage());
        }
returndataSourceMap;
    }


   然后在maven引入包

<dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-jdbc-core</artifactId><version>4.1.1</version></dependency>


由于办件服务只需要查询某一些表,所以我们把查询的表的表名配置在配置文件中

spring.shardingsphere.sharding.ext.tables=agent_materials,agents,applicant_materials,applicants,application_result_info,current_node,fee_unit_info,group_data_search,mark_results,node_result,paid_info,result_receive_info,service_application,service_attachment,service_correction,service_info_apply_nullify,service_info_nullify,tab_accept_affair_request,tab_apply_invalid_service_application,tab_apply_revoke_service_application,tab_get_node_info,tab_service_application_accepted,tab_service_application_result,tab_service_application_result_info


    然后在配置文件中获取

@ConfigurationProperties(prefix="spring.shardingsphere.sharding.ext")
publicstaticclassShardingJdbcProperties {
privateList<String>tables=Collections.emptyList();
publicList<String>getTables() {
returntables;
        }
publicvoidsetTables(List<String>tables) {
this.tables=tables;
        }
    }



     初始化配置文件

@OverridepublicvoidafterPropertiesSet() throwsException {
ShardingJdbcPropertiesshardingJdbcProperties=beanFactory.getBean(ShardingJdbcProperties.class);
if (shardingJdbcProperties!=null){
logicTables=newHashSet<>(shardingJdbcProperties.getTables());
initialized=true;
lock.lock();
try{
condition.signalAll();
            }finally {
lock.unlock();
            }
        }
    }
@OverridepublicvoidsetBeanFactory(BeanFactorybeanFactory) throwsBeansException {
this.beanFactory=beanFactory;
    }

 

  最后初始化sharding jdbc,并且保存数据源,表信息

@Bean(name="apiDataSource")
@ConfigurationProperties(prefix="spring.datasource.api")
publicDataSourcerdsDataSource() {
returnDataSourceBuilder.create().build();
    }
@Bean@PrimarypublicDataSourceshardingSphereDataSource(ShardingJdbcPropertiesshardingJdbcProperties, @Qualifier("apiDataSource") DataSourcedataSource) throwsSQLException {
finalStringMASTER_DATASOURCE_ALIAS="master";
ShardingRuleConfigurationshardingRuleConfig=newShardingRuleConfiguration();
shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig(newHintShardingStrategyConfiguration(
newDepartmentDataSourceHintShardingAlgorithm()));
shardingRuleConfig.setDefaultTableShardingStrategyConfig(newHintShardingStrategyConfiguration(
newDepartmentDataSourceHintShardingAlgorithm.DepartmentTableHintShardingAlgorithm()));
Map<String, DataSource>fullDataSourceMap=newHashMap<>();
fullDataSourceMap.put(MASTER_DATASOURCE_ALIAS, dataSource);
Map<String, DataSource>departmentDatasourceMap=getDepartmentDataSources(dataSource);
if (departmentDatasourceMap!=null&&departmentDatasourceMap.size() >0 ){
fullDataSourceMap.putAll(departmentDatasourceMap);
        }
shardingRuleConfig.setDefaultDataSourceName(MASTER_DATASOURCE_ALIAS);
for (Stringtable : shardingJdbcProperties.getTables()){
TableRuleConfigurationtableRuleConfiguration=getDepartmentTableRuleConfiguration(table);
if (tableRuleConfiguration!=null){
shardingRuleConfig.getTableRuleConfigs().add(tableRuleConfiguration);
shardingRuleConfig.getBindingTableGroups().add(table);
            }
        }
Propertiesproperties=newProperties();
properties.put("sql.show", sqlShow);
returnShardingDataSourceFactory.createDataSource(fullDataSourceMap, shardingRuleConfig, properties);
    }
privateTableRuleConfigurationgetDepartmentTableRuleConfiguration (Stringtable) {
finalStringDS_TABLE_EXPRESSION="%s${0..%d}.%s";
if (StringUtils.isNotEmpty(table)){
Stringexpression=String.format(DS_TABLE_EXPRESSION, shardingSphereDataSourceSerialRecord.getDataSourcePrefix(),
shardingSphereDataSourceSerialRecord.getMaxSerialNum(), table);
returnnewTableRuleConfiguration(table, expression);
        }
returnnull;
    }
@Bean(name="apiSqlSessionFactory")
@PrimarypublicSqlSessionFactoryrdsSqlSessionFactory(@Qualifier("shardingSphereDataSource")DataSourcedynamicDataSource) throwsException {
SqlSessionFactoryBeansqlSessionFactory=newSqlSessionFactoryBean();
sqlSessionFactory.setDataSource(dynamicDataSource);
sqlSessionFactory.setConfigLocation(newClassPathResource("mybatis-config.xml"));
sqlSessionFactory.setMapperLocations(newPathMatchingResourcePatternResolver().getResources("classpath:mapper/**/*.xml"));
returnsqlSessionFactory.getObject();
    }
@Bean(name="apiTransactionManager")
@PrimarypublicDataSourceTransactionManagerrdsTransactionManager(@Qualifier("shardingSphereDataSource") DataSourcedynamicDataSource){
returnnewDataSourceTransactionManager(dynamicDataSource);
    }
@Bean(name="apiSqlSessionTemplate")
@PrimarypublicSqlSessionTemplaterdsSqlSessionTemplate(@Qualifier("apiSqlSessionFactory") SqlSessionFactoryrdsSqlSessionFactory){
returnnewSqlSessionTemplate(rdsSqlSessionFactory);
    }
@Bean(name="apiNamedParameterJdbcTemplate")
@PrimarypublicNamedParameterJdbcTemplaterdsNamedParameterJdbcTemplate(@Qualifier("shardingSphereDataSource") DataSourcedynamicDataSource){
returnnewNamedParameterJdbcTemplate(dynamicDataSource);
    }
publicstaticvoidcheckInitialize() throwsInterruptedException {
if (!initialized) {
lock.lock();
try{
condition.await(5000, TimeUnit.MILLISECONDS);
            }finally {
lock.unlock();
            }
        }
    }
publicstaticbooleancontainLogicTable(StringtableName){
try {
checkInitialize();
        } catch (InterruptedExceptione) {
        }
returnlogicTables==null?false : logicTables.contains(tableName);
    }
publicstaticSet<String>getLogicTables () {
try {
checkInitialize();
        } catch (InterruptedExceptione) {
        }
returnlogicTables==null?Collections.emptySet() : logicTables;
    }


 然后,封装一个类,对外提供切换数据源的接口

importlombok.extern.slf4j.Slf4j;
importorg.apache.commons.collections4.CollectionUtils;
importorg.apache.commons.lang3.ArrayUtils;
importorg.apache.commons.lang3.StringUtils;
importorg.apache.shardingsphere.api.hint.HintManager;
importjava.util.*;
@Slf4jpublicclassDynamicDataSourceContextHolder {
privatestaticfinalThreadLocal<String>CONTEXT_HOLDER=newThreadLocal<String>() {
/*** 将 master 数据源的 key作为默认数据源的 key*/@OverrideprotectedStringinitialValue() {
return"master";
        }
    };
/*** 数据源的 key集合,用于切换时判断数据源是否存在*/publicstaticList<Object>dataSourceKeys=newArrayList<>();
/*** 切换数据源* @param keys*/publicstaticvoidsetDataSourceKey(String ... keys) {
if (ArrayUtils.isNotEmpty(keys)){
log.info("调用数据源部门id为:"+Arrays.toString(keys));
Set<String>logicTables=ShardingJdbcConfig.getLogicTables();
HintManagerhintManager=HintManager.getInstance();
ShardingSphereDataSourceSerialRecordshardingSphereDataSourceSerialRecord=ShardingSphereDataSourceSerialRecord.getInstance();
if (CollectionUtils.isNotEmpty(logicTables)){
for (StringlogicTable : logicTables){
for (Stringkey : keys){
if (shardingSphereDataSourceSerialRecord.containDataSourceAliase(key)){
hintManager.addDatabaseShardingValue(logicTable, key);
                        }
                    }
                }
            }
CONTEXT_HOLDER.set(keys[0]);
        }else{
log.info("调用master数据源");
        }
    }
publicstaticvoidsetDataSourceKeyByDepartmentIds (StringtableName, String ... departmentIds){
if (ArrayUtils.isEmpty(departmentIds) ||StringUtils.isEmpty(tableName)){
return;
        }
HintManagerhintManager=HintManager.getInstance();
ShardingSphereDataSourceSerialRecordshardingSphereDataSourceSerialRecord=ShardingSphereDataSourceSerialRecord.getInstance();
for (StringdepartmentId : departmentIds){
if (!shardingSphereDataSourceSerialRecord.containDataSourceAliase(departmentId)){
continue;
            }
StringserialNum=shardingSphereDataSourceSerialRecord.getDataSourceSerialNum(departmentId);
hintManager.addDatabaseShardingValue(tableName, serialNum);
        }
    }
/*** 获取数据源* @return*/publicstaticStringgetDataSourceKey() {
returnCONTEXT_HOLDER.get();
    }
/*** 重置数据源*/publicstaticvoidclearDataSourceKey() {
HintManager.clear();
CONTEXT_HOLDER.remove();
    }
/*** 判断是否包含数据源* @param key 数据源key* @return*/publicstaticbooleancontainDataSourceKey(Stringkey) {
returndataSourceKeys.contains(key);
    }
/*** 添加数据源keys* @param keys* @return*/publicstaticbooleanaddDataSourceKeys(Collection<?extendsObject>keys) {
returndataSourceKeys.addAll(keys);
    }
}



      然后就可以通过DynamicDataSourceContextHolder.setDataSourceKey()切换数据源了,不同的数据源相同的表结果一条sql就可以搞定了

相关文章
|
6天前
|
SQL Java 数据库连接
JDBC连接SQL Server2008 完成增加、删除、查询、修改等基本信息基本格式及示例代码
这篇文章提供了使用JDBC连接SQL Server 2008数据库进行增加、删除、查询和修改操作的基本步骤和示例代码。
|
3月前
|
Java 关系型数据库 MySQL
②⑩② 【读写分离】Sharding - JDBC 实现 MySQL读写分离[SpringBoot框架]
②⑩② 【读写分离】Sharding - JDBC 实现 MySQL读写分离[SpringBoot框架]
60 0
|
3月前
|
SQL Java 关系型数据库
JDBC技术【SQL注入、JDBC批量添加数据、JDBC事务处理、其他查询方式】(三)-全面详解(学习总结---从入门到深化)
JDBC技术【SQL注入、JDBC批量添加数据、JDBC事务处理、其他查询方式】(三)-全面详解(学习总结---从入门到深化)
51 0
|
8月前
|
Java 关系型数据库 MySQL
|
10月前
|
Java 关系型数据库 MySQL
JSP 空教室查询管理系统yeclipse开发mysql数据库bs框架java编程jdbc
JSP 空教室查询管理系统是一套完善的web设计系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,开发环境为TOMCAT7.0,Myeclipse8.5开发,数据库为Mysql5.0,使用java语言开发,系统主要采用B/S模式开发。
282 2
|
SQL 算法 Java
springboot中sharding jdbc绑定表配置实战
springboot中sharding jdbc绑定表配置实战
|
SQL 算法 Java
基于springboot的sharding jdbc广播表配置实战
基于springboot的sharding jdbc广播表配置实战
|
算法 Java 数据库连接
springboot整合sharding jdbc【分库】
springboot整合sharding jdbc【分库】
|
算法 Java 数据库连接
springboot整合sharding jdbc【分表】
springboot整合sharding jdbc【分表】
|
JavaScript 小程序 Oracle
Sharding JDBC 实战:分布式事务处理
Sharding JDBC 实战:分布式事务处理