当我们需要做入库操作的时候,一般采取导入的方式,而导入的方式中,一般采取excel导入的方式比较多,而当excel中数据量很大的时候,导入的时长就不受控制,所以我们需要考虑异步并行化处理。
如何实现异步
我们可以考虑在用户上传导入文件的时候,在文件上传完毕的时候,直接返回结果,并提示数据导入中。然后利用@Async注解实现异步。
以下面的方法为例:
@PutMapping("/vulns/add")
@ResponseBody
public Result importVulns(MultipartFile file, @BaseApiVisible VisibleUser visibleUser,
@Nullable @RequestParam("force_import") Boolean forceImport) {
// TODO: 2022/9/7 校验文件是否正在使用
String md5 = MD5Utils.compMd5(file);
if (fileService.inProcessFileMd5s.contains(md5)) {
if (Objects.isNull(forceImport) || (Objects.nonNull(forceImport) && !forceImport)) {
return Result.failure(CommonResultStatus.REPEAT_FILE);
}
}
fileService.inProcessFileMd5s.add(md5);
List<ImportAddVulnQuery> queries = VulnPersonFormatUtils.getDataFromExcel(file);
Long fileId = -1L;
try {
String name = minioUtil.uploadFile(file, "demo");
.build());
Long hisId = vulnImportHistoryRepository.save(VulnImportHistory.builder()
.build());
Long importDataProcessId = importDataProcessRepository.save(ImportDataProcess.builder()
.build());
demo1(visibleUser, file, md5, hisId, importDataProcessId, fileId);
return Result.success("导入文件上传成功,请到导入管理查看导入情况");
} catch (Exception e) {
e.printStackTrace();
}
return Result.failure("导入文件上传失败");
}
我们真正处理导入是在demo1这个函数中执行的,而我们只需要给该函数加入@Async注解,即可实现异步调用。
即
@Async
public void demo1(){
}
不过,想要使该注解生效,必须在启动类加上@EnableAsync的配置。
如何实现并行化
在实现并行化之前,我们需要了解一个方法或者一个导入流程中哪些情况比较耗时。我总结了以下串行化需要转变为并行化的场景:
1、数据库反复访问
2、反复调用外部接口
3、爬虫反复抓取外部页面
等等
一句话描述就是:需要反复进行网络传输的部分都需要考虑并行化。
即,原本我们需要反复执行n次的步骤,如下图:
在局部并行化之后,我们就只需要执行n/并发数 次了。可以节省等待的时间。
那么代码怎么写呢?
假设原来的写法为
for (A a : as) {
handle(a)
}
那么现在可以写为
ExecutorService updatePool = Executors.newFixedThreadPool(10);
List<Callable<Integer>> updateCallers = new ArrayList<>();
for (A a : as) {
updateCallers.add(() -> {
handle(a);
return null;
});
}
try {
updatePool.invokeAll(callers);
updatePool.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
这样,就以10个并发进行执行了。
但是这里需要注意的点就是,要确保你的执行没有先后顺序,如果有,就不能这么改。
迭代建议
这个版本有一些问题
1、每一处导入都需要进行类似的修改和变化,会带来巨大的成本,不够简洁
2、没有形成框架化的导入模块