数据导入任务并行化

简介: 数据导入任务并行化

当我们需要做入库操作的时候,一般采取导入的方式,而导入的方式中,一般采取excel导入的方式比较多,而当excel中数据量很大的时候,导入的时长就不受控制,所以我们需要考虑异步并行化处理。

如何实现异步

我们可以考虑在用户上传导入文件的时候,在文件上传完毕的时候,直接返回结果,并提示数据导入中。然后利用@Async注解实现异步。

以下面的方法为例:

@PutMapping("/vulns/add")
    @ResponseBody
    public Result importVulns(MultipartFile file, @BaseApiVisible VisibleUser visibleUser,
                              @Nullable @RequestParam("force_import") Boolean forceImport) {
   
   
        // TODO: 2022/9/7 校验文件是否正在使用
        String md5 = MD5Utils.compMd5(file);
        if (fileService.inProcessFileMd5s.contains(md5)) {
   
   
            if (Objects.isNull(forceImport) || (Objects.nonNull(forceImport) && !forceImport)) {
   
   
                return Result.failure(CommonResultStatus.REPEAT_FILE);
            }
        }
        fileService.inProcessFileMd5s.add(md5);
        List<ImportAddVulnQuery> queries = VulnPersonFormatUtils.getDataFromExcel(file);
        Long fileId = -1L;
        try {
   
   
            String name = minioUtil.uploadFile(file, "demo");
                    .build());
            Long hisId = vulnImportHistoryRepository.save(VulnImportHistory.builder()

                    .build());
            Long importDataProcessId = importDataProcessRepository.save(ImportDataProcess.builder()
                    .build());
            demo1(visibleUser, file, md5, hisId, importDataProcessId, fileId);
            return Result.success("导入文件上传成功,请到导入管理查看导入情况");
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
        return Result.failure("导入文件上传失败");
    }

我们真正处理导入是在demo1这个函数中执行的,而我们只需要给该函数加入@Async注解,即可实现异步调用。

@Async
public void demo1(){
   
   

}

不过,想要使该注解生效,必须在启动类加上@EnableAsync的配置。

如何实现并行化

在实现并行化之前,我们需要了解一个方法或者一个导入流程中哪些情况比较耗时。我总结了以下串行化需要转变为并行化的场景:

1、数据库反复访问
2、反复调用外部接口
3、爬虫反复抓取外部页面
等等

一句话描述就是:需要反复进行网络传输的部分都需要考虑并行化。
即,原本我们需要反复执行n次的步骤,如下图:
1aee65714e3949f1a681f6fd1b6ad15.jpg
在局部并行化之后,我们就只需要执行n/并发数 次了。可以节省等待的时间。
8b24d1a93fe0527682a3b01c9ec8f90.jpg

那么代码怎么写呢?
假设原来的写法为

for (A a : as) {
   
   
    handle(a)
}

那么现在可以写为

ExecutorService updatePool = Executors.newFixedThreadPool(10);
List<Callable<Integer>> updateCallers = new ArrayList<>();
for (A a : as) {
   
   
    updateCallers.add(() -> {
   
   
        handle(a);
        return null;
    });
}
try {
   
   
    updatePool.invokeAll(callers);
    updatePool.shutdown();
} catch (InterruptedException e) {
   
   
    e.printStackTrace();
}

这样,就以10个并发进行执行了。
但是这里需要注意的点就是,要确保你的执行没有先后顺序,如果有,就不能这么改。

迭代建议

这个版本有一些问题

1、每一处导入都需要进行类似的修改和变化,会带来巨大的成本,不够简洁
2、没有形成框架化的导入模块

目录
相关文章
|
数据库
聊聊Doris向量化执行引擎-过滤操作
聊聊Doris向量化执行引擎-过滤操作
300 0
|
数据库
聊聊StarRocks向量化执行引擎-过滤操作
聊聊StarRocks向量化执行引擎-过滤操作
251 0
|
监控 Shell 调度
使用EHPC实现“完美并行”的高效批处理方案
在高性能计算场景中,用户一次业务计算可以划分为大量的任务,每个任务的处理逻辑相同,但是输入文件、参数设置和输出文件不同。在此,给出了基于阿里云弹性高性能计算场景的数组作业解决方案——利用E-HPC集成的作业调度系统,将用户的批处理任务自动分配到数组作业,实现在云超算集群上高并发执行。
1907 0
|
Java
什么是批处理
什么是批处理:批处理就是多个dos命令组成的,双击可执行里面的命令。(微软系统) 批处理:桌面文件以双击就能打开,而java一双击是打不开的因为java是一个class文件他需要虚拟机得运行才能打开。
2272 0
|
算法 流计算
Flink批处理优化器之范围分区重写采用算法
采样算法 上一篇我们分析了RangePartitionRewriter的数据处理分支,接下来我们开始分析采样分支,采样分支的核心在于采样算法。因为范围分区输入端每个分区的数据量无从得知,也就是说我们无法得出采样比例。
1653 0
|
流计算 容器
Flink批处理优化器之数据属性
在一段时间之前我们已介绍过IP(Interesting Property)对于优化器的意义以及它将对优化器的优化决策产生的影响。本篇我们将介绍Flink的批处理优化器中涉及到的所有的IP,我们将其统称为数据属性。
1232 0
|
算法 流计算
浅谈Flink批处理优化器之Join优化
跟传统的关系型数据库类似,Flink提供了优化器“hint”(提示)以告诉优化器选择一些执行策略。目前优化提示主要针对批处理中的连接(join)。在批处理中共有三个跟连接有关的转换函数: join:默认为等值连接(Equi-join),维基百科将其归类为内连接(inner join)的一种 https://en.
2619 0
|
Go 数据安全/隐私保护
|
流计算 算法
Flink批处理优化器之成本估算
成本估算 在基于成本的优化器中,成本估算非常重要,它直接影响着候选计划的生成。在Flink中成本估算依赖于每个不同的运算符所提供的自己的“预算”,本篇我们将分析什么是成本、运算符如何提供自己的预算以及如何基于预算估算成本。
1612 0
|
流计算
Flink批处理中的增量迭代
对某些迭代而言并不是单次迭代产生的下一次工作集中的每个元素都需要重新参与下一轮迭代,有时只需要重新计算部分数据同时选择性地更新解集,这种形式的迭代就是增量迭代。增量迭代能够使得一些算法执行得更高效,它可以让算法专注于工作集中的“热点”数据部分,这导致工作集中的绝大部分数据冷却得非常快,因此随后的迭代面对的数据规模将会大幅缩小。
2641 0