数据导入任务并行化

简介: 数据导入任务并行化

当我们需要做入库操作的时候,一般采取导入的方式,而导入的方式中,一般采取excel导入的方式比较多,而当excel中数据量很大的时候,导入的时长就不受控制,所以我们需要考虑异步并行化处理。

如何实现异步

我们可以考虑在用户上传导入文件的时候,在文件上传完毕的时候,直接返回结果,并提示数据导入中。然后利用@Async注解实现异步。

以下面的方法为例:

@PutMapping("/vulns/add")
    @ResponseBody
    public Result importVulns(MultipartFile file, @BaseApiVisible VisibleUser visibleUser,
                              @Nullable @RequestParam("force_import") Boolean forceImport) {
   
   
        // TODO: 2022/9/7 校验文件是否正在使用
        String md5 = MD5Utils.compMd5(file);
        if (fileService.inProcessFileMd5s.contains(md5)) {
   
   
            if (Objects.isNull(forceImport) || (Objects.nonNull(forceImport) && !forceImport)) {
   
   
                return Result.failure(CommonResultStatus.REPEAT_FILE);
            }
        }
        fileService.inProcessFileMd5s.add(md5);
        List<ImportAddVulnQuery> queries = VulnPersonFormatUtils.getDataFromExcel(file);
        Long fileId = -1L;
        try {
   
   
            String name = minioUtil.uploadFile(file, "demo");
                    .build());
            Long hisId = vulnImportHistoryRepository.save(VulnImportHistory.builder()

                    .build());
            Long importDataProcessId = importDataProcessRepository.save(ImportDataProcess.builder()
                    .build());
            demo1(visibleUser, file, md5, hisId, importDataProcessId, fileId);
            return Result.success("导入文件上传成功,请到导入管理查看导入情况");
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
        return Result.failure("导入文件上传失败");
    }

我们真正处理导入是在demo1这个函数中执行的,而我们只需要给该函数加入@Async注解,即可实现异步调用。

@Async
public void demo1(){
   
   

}

不过,想要使该注解生效,必须在启动类加上@EnableAsync的配置。

如何实现并行化

在实现并行化之前,我们需要了解一个方法或者一个导入流程中哪些情况比较耗时。我总结了以下串行化需要转变为并行化的场景:

1、数据库反复访问
2、反复调用外部接口
3、爬虫反复抓取外部页面
等等

一句话描述就是:需要反复进行网络传输的部分都需要考虑并行化。
即,原本我们需要反复执行n次的步骤,如下图:
1aee65714e3949f1a681f6fd1b6ad15.jpg
在局部并行化之后,我们就只需要执行n/并发数 次了。可以节省等待的时间。
8b24d1a93fe0527682a3b01c9ec8f90.jpg

那么代码怎么写呢?
假设原来的写法为

for (A a : as) {
   
   
    handle(a)
}

那么现在可以写为

ExecutorService updatePool = Executors.newFixedThreadPool(10);
List<Callable<Integer>> updateCallers = new ArrayList<>();
for (A a : as) {
   
   
    updateCallers.add(() -> {
   
   
        handle(a);
        return null;
    });
}
try {
   
   
    updatePool.invokeAll(callers);
    updatePool.shutdown();
} catch (InterruptedException e) {
   
   
    e.printStackTrace();
}

这样,就以10个并发进行执行了。
但是这里需要注意的点就是,要确保你的执行没有先后顺序,如果有,就不能这么改。

迭代建议

这个版本有一些问题

1、每一处导入都需要进行类似的修改和变化,会带来巨大的成本,不够简洁
2、没有形成框架化的导入模块

目录
相关文章
|
2月前
|
数据采集 存储 分布式计算
ClickHouse大规模数据导入优化:批处理与并行处理
【10月更文挑战第27天】在数据驱动的时代,高效的数据导入和处理能力是企业竞争力的重要组成部分。作为一位数据工程师,我在实际工作中经常遇到需要将大量数据导入ClickHouse的需求。ClickHouse是一款高性能的列式数据库系统,非常适合进行大规模数据的分析和查询。然而,如何优化ClickHouse的数据导入过程,提高导入的效率和速度,是我们面临的一个重要挑战。本文将从我个人的角度出发,详细介绍如何通过批处理、并行处理和数据预处理等技术优化ClickHouse的数据导入过程。
152 0
|
5月前
|
并行计算 算法 大数据
Dask 与图形处理:大规模图数据的并行分析
【8月更文第29天】在大数据时代,图数据结构因其能够高效表达实体之间的复杂关系而变得越来越重要。然而,处理大规模图数据集往往需要高效的并行计算框架。Dask 是一个灵活的并行计算库,它能够与 Python 的现有科学计算生态系统无缝集成。本文将介绍如何利用 Dask 来处理和分析大规模的图数据结构。
223 4
|
7月前
|
机器学习/深度学习 分布式计算 并行计算
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
104 0
|
分布式计算 Hadoop 大数据
MapReduce 案例之数据去重
MapReduce 案例之数据去重
323 0
|
8月前
|
分布式计算
MapReduce中的Shuffle过程是什么?为什么它在性能上很关键?
MapReduce中的Shuffle过程是什么?为什么它在性能上很关键?
270 0
|
数据库
聊聊StarRocks向量化执行引擎-过滤操作
聊聊StarRocks向量化执行引擎-过滤操作
291 0
|
数据库
聊聊Doris向量化执行引擎-过滤操作
聊聊Doris向量化执行引擎-过滤操作
338 0
|
分布式计算 并行计算 大数据
并行计算框架Polars、Dask的数据处理性能对比
在Pandas 2.0发布以后,我们发布过一些评测的文章,这次我们看看,除了Pandas以外,常用的两个都是为了大数据处理的并行数据框架的对比测试。
491 0
|
SQL 存储 自然语言处理
数据导入与预处理-第6章-01数据集成
数据导入与预处理-第6章-01数据集成 1 数据集成概述 1.1 数据集成需要关注的问题 2 基于Pandas实现数据集成
数据导入与预处理-第6章-01数据集成
|
分布式计算 Hadoop
Tez计算引擎,写入数据特别慢的原因?
hadoop集群使用tez作为计算引擎,但是计算结果最后写入hdfs时(orc文件),特别慢,谁知道原因吗?怎么解决这个问题。万分感谢!
1673 0