使用SparkSQL实现多线程分页查询并写入文件

简介: 一、由于具有多张宽表且字段较多,每个宽表数据大概为4000万条,根据业务逻辑拼接别名,并每张宽表的固定字段进行left join 拼接SQL。这样就能根据每个宽表的主列,根据每个宽表的不同字段关联出一张新的集合。

一、由于具有多张宽表且字段较多,每个宽表数据大概为4000万条,根据业务逻辑拼接别名,并每张宽表的固定字段进行left join 拼接SQL。这样就能根据每个宽表的主列,根据每个宽表的不同字段关联出一张新的集合。由于下来要进行分页查询,如果要使用SparkSQL进行分页查询,需要增加序号列,那么就在刚才的Sql之前增加一句 create table tableName as SELECT  ROW_NUMBER() OVER() as id,* from (拼接的SQL) 就可创建一张带自增序列的,业务需要字段的几张宽表的关联集合,方便下来分页。

for(int i=0;i<ColumnNames.size();i++){
			SiCustomerLabelInfoModel Column = ColumnNames.get(i);
			List<CiMdaSysTable> ciMdaSysTable = ciCustomerJDao.getMdaSysTableName(Column.getColumnName());
			
			String alias = "t_" + ciMdaSysTable.get(0).getTableId();
			String aliasColumn = alias + "." + Column.getColumnName();
			String aliasTable = ciMdaSysTable.get(0).getTableName() +" "+ alias;
			
			if(mainTable == null){
				mainTable = aliasTable;
			}
			if(ciMdaSysTable.get(0).getUpdateCycle() == 1){
				mainTable = aliasTable;
			}
			
			ColumnNameList.add(aliasColumn);
			tableNameList.add(aliasTable);
		}
		String[] keyAlias = mainTable.split(" ");
		String mainKeyColumn = keyAlias[1] + "." + keyColumn;
		selectResult.append("select ").append(mainKeyColumn);
		if(StringUtil.isNotEmpty(mainTable)){
			fromTableName.append(" from ").append(mainTable);
		}
		Iterator<String> table = tableNameList.iterator();
		while(table.hasNext()){
			String tableName = table.next();
			String[] tableAlias = tableName.split(" ");
			String[] mainAlias = mainTable.split(" ");
			String alias = tableAlias[1];
			String mAlias = mainAlias[1];
			
			if(!mainTable.equals(tableName)){
			fromTableName.append(" left join ").append(tableName).append(" on ").append(mAlias).append(".").append(keyColumn)
			.append(" = ").append(alias).append(".").append(keyColumn).append(" ");
			}
		}
        

          fromTableName.append(" ) a");
          Iterator<String> column = ColumnNameList.iterator();
          while(column.hasNext()){
          String columnName = column.next();
          selectResult.append(",").append(columnName);
          }
          selectResult.append(fromTableName);
          Createtable.append("create table ").append(cocDwName).append(" as SELECT ROW_NUMBER() OVER() as id,* from").append(" (").append(selectResult);


二、由于业务场景,需要将4000万条数据最终写入10个文件,这里通过声明线程池pool,使用多线程的方法执行,有些人会担心那不会数据错乱吗,不会。因为后面要用分页sql,根据循环传入的 i 的值进行处理。

	private ExecutorService pools = Executors.newFixedThreadPool(15);
if(result = true){
			String queryCount = "select count(*) from "+cocDwName;
			int count = ciCustomerJDao.getDwTotolCount(queryCount);
			log.info(""+keyColumn);
				try {
					for(int i=0;i<10;i++){
						CreateDwFileThread jd = new CreateDwFileThread(jndiName,keyColumn,num,cocDwName,count,sysId,i);
						Future fu = pools.submit(jd);
						fus.add(fu);
					}
					long start = System.currentTimeMillis();
					while (true) {
						boolean done = true;
						for (Future f : fus) {
							if (!f.isDone()) {
								done = false;
								break;
							}
						}
						if (!done) {
							try {
								Thread.sleep(1000 * 10);
							} catch (InterruptedException e) {
								log.error("sleep error", e);
								e.printStackTrace();
							}
							continue;
						} else {
							break;
						}
					}
					log.debug("wait tasks finish cost:" + (System.currentTimeMillis() - start));
					
					}catch(Exception e){
						result = false;
						log.error("error", e);
					}
				}

三、根据第一步创建的表中的自增序列ID进行分页,由于要多线程并发执行,所以不能使用传统分页的begin与end,根据步骤二中传入的 i (这里参数为partNumber)进行处理,根据循环,每条线程执行的开始数据必定以上条数据结束的条数为开始,每次将查询出来的结果集通过list2File写入文件。这里还有个while循环,因为分成10份还是有400万条数据啊,还是觉得大,于是就又分成了10次~就是说每次查询出40万条写入文件,直到新加入400万条flag返回true退出循环。

	while(flag == false){
			pager.setPageSize(bufferedRowSize);
			pager.setPageNum(pageNumber);
			
			int begin = (pager.getPageNum() - 1) * pager.getPageSize()+createFileCount*partNumber;
			int end = begin + pager.getPageSize();
			if(end >= createFileCount*(partNumber+1)){
				end = createFileCount*(partNumber+1);
			}
			StringBuffer sql = new StringBuffer() ;
			sql.append(" select ").append(columns).append(" from ").append(cocDwName).append(" where id > ").append(begin).append(" and ").append(" id < ").append(end+1);
			JdbcBaseDao jdbcBaseDao = (JdbcBaseDao) SystemServiceLocator.getInstance().getService("jdbcBaseDao");
			String BackjndiName = PropertiesUtils.getProperties("JNDI_CI_BACK");
			final String file = fileLocalPath + File.separator + dwName+ "_" + String.valueOf(partNumber)+ ".csv";
			Log.info("---------sql;:"+ sql + "-------fileName:"+file);

              

List<Map<String, Object>> dataList = jdbcBaseDao.getBackSimpleJdbcTemplate().queryForList(sql.toString());
              if (dataList.size() > 0) {
              list2File(dataList, title, columns, file, encode, null, null);
              pageNumber++;
              }
              if(end == createFileCount * partNumber + createFileCount){
              flag = true;
              }

  

有人会问你为啥不用ResultSet 直接放入400万条数据 为啥还要分开每40万条数据再分页写~ 我想说 我就是想这么干~ 啊哈哈。。。不过程序中貌似是有问题的 没有考虑到的情景,所以还在推敲。。(Resultset 查出来400万条不还是放在内存中,还是有可能内存溢出的,分页写大不了通过thriftserver多连接几次spark嘛~ 不过代码写的很烂,还在提高哈~)

 

目录
相关文章
|
缓存 测试技术 API
API的封装步骤流程
API封装流程是一个系统化的过程,旨在将内部功能转化为可复用的接口供外部调用。流程包括明确需求、设计接口、选择技术和工具、编写代码、测试、文档编写及部署维护。具体步骤为确定业务功能、数据来源;设计URL、请求方式、参数及响应格式;选择开发语言、框架和数据库技术;实现数据连接、业务逻辑、错误处理;进行功能、性能测试;编写详细文档;部署并持续维护。通过这些步骤,确保API稳定可靠,提高性能。
|
9月前
|
关系型数据库 测试技术 分布式数据库
刷新世界纪录!阿里云PolarDB凭借创新的「三层解耦」架构刷新TPC-C基准测试世界纪录
刷新世界纪录!阿里云PolarDB凭借创新的「三层解耦」架构刷新TPC-C基准测试世界纪录
|
Oracle 关系型数据库 分布式数据库
PolarDB产品使用问题之如何设置或修改密码
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
缓存 开发框架 .NET
看看 Asp.net core Webapi 项目如何优雅地使用内存缓存
看看 Asp.net core Webapi 项目如何优雅地使用内存缓存
294 1
|
安全 关系型数据库 MySQL
【MySQL】Orchestrator最简单的 mysql 高可用方案最细细细细~
【MySQL】Orchestrator最简单的 mysql 高可用方案最细细细细~
|
SQL 分布式计算 数据库
离线数仓--大数据技术之DolphinScheduler
离线数仓--大数据技术之DolphinScheduler
1226 2
|
存储 SQL 缓存
聊聊数据湖的11个参考架构
数据湖是传统数据仓库概念在源类型、处理类型和用于业务分析解决方案的结构方面的高级版本。
聊聊数据湖的11个参考架构
|
编译器 C语言 Windows
Qt 使用MinGW编译器调用MinGW编译生成的dll
Qt 使用MinGW编译器调用MinGW编译生成的dll
657 0