震撼!通过双重异步,Excel 10万行数据导入从191秒优化到2秒!

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 通过合理设计线程池和利用异步编程模型,本文展示了如何将 Excel 10万行数据的导入时间从191秒优化到2秒。文章详细介绍了使用 Spring Boot 的 `@Async` 注解、自定义线程池和 EasyExcel 进行大数据量的 Excel 解析和异步写入数据库的方法。通过分而治之的策略,减少了系统的响应时间,提高了并发处理能力。同时,还分析了如何根据 CPU 和 IO 密集型任务的特性,合理设置线程池的参数,以充分发挥硬件资源的性能。

震撼!通过双重异步,Excel 10万行数据导入从191秒优化到2秒!

在现代的企业级应用开发中,海量数据的处理效率和并发性能优化是一个非常重要的课题。无论是大规模数据导入、文件解析,还是在分布式系统中处理高并发任务,如何提升系统的处理速度、合理利用计算资源、减少线程上下文切换的开销,这些都是开发者必须面对的问题。在这一背景下,线程池技术以及异步编程逐渐成为提升系统性能的利器。

本文将深入探讨如何通过合理设计线程池和利用异步编程模型,有效优化大规模数据的处理性能。我们将结合 Spring Boot 框架中的 @Async 注解、自定义线程池、以及通过使用 EasyExcel 进行大数据量的 Excel 解析和异步写入数据库的场景,详细说明如何通过分而治之的策略,减少系统的响应时间、提高并发处理能力。同时,还将分析如何基于 CPU 和 IO 密集型任务的特性,来合理设置线程池的核心线程数、最大线程数等参数,以便在实际项目中能够充分发挥硬件资源的性能。

通常我是这样做的:

  1. 使用POI读取需要导入的Excel文件;
  2. 将文件名作为表名,列标题作为列名,并将数据拼接成SQL语句;
  3. 通过JDBC或Mybatis插入到数据库。

90c2a31361cc7a7429bccdb963c77cf.png

在操作中,如果文件数量多且数据量大,处理过程可能会非常缓慢。

访问后,感觉程序没有响应,但实际上,它正在读取并插入数据,只是速度很慢。

读取包含10万行的Excel文件竟然耗时191秒!

我以为程序卡住了!

private void readXls(String filePath, String filename) throws Exception {
    @SuppressWarnings("resource")
    XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));
    // 读取第一个工作表
    XSSFSheet sheet = xssfWorkbook.getSheetAt(0);
    // 获取总行数
    int maxRow = sheet.getLastRowNum();
    StringBuilder insertBuilder = new StringBuilder();
    insertBuilder.append("insert into ").append(filename).append(" ( UUID,");
    XSSFRow row = sheet.getRow(0);
    for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) {
        insertBuilder.append(row.getCell(i)).append(",");
    }
    insertBuilder.deleteCharAt(insertBuilder.length() - 1);
    insertBuilder.append(" ) values ( ");
    StringBuilder stringBuilder = new StringBuilder();
    for (int i = 1; i <= maxRow; i++) {
        XSSFRow xssfRow = sheet.getRow(i);
        String id = "";
        String name = "";
        for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
            if (j == 0) {
                id = xssfRow.getCell(j) + "";
            } else if (j == 1) {
                name = xssfRow.getCell(j) + "";
            }
        }
        boolean flag = isExisted(id, name);
        if (!flag) {
            stringBuilder.append(insertBuilder);
            stringBuilder.append('\'').append(uuid()).append('\'').append(",");
            for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
                stringBuilder.append('\'').append(value).append('\'').append(",");
            }
            stringBuilder.deleteCharAt(stringBuilder.length() - 1);
            stringBuilder.append(" )").append("\n");
        }
    }
    List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());
    int sum = JdbcUtil.executeDML(collect);
}
private static boolean isExisted(String id, String name) {
    String sql = "select count(1) as num from " + static_TABLE + " where ID = '" + id + "' and NAME = '" + name + "'";
    String num = JdbcUtil.executeSelect(sql, "num");
    return Integer.valueOf(num) > 0;
}
private static String uuid() {
    return UUID.randomUUID().toString().replace("-", "");
}

如何优化?

优化1:首先,查询所有数据,将其缓存到map中,然后在插入前做决策。这样可以大大提高速度。

优化2:如果单个Excel文件太大,可以考虑使用异步和多线程,分批读取多行并插入数据库。

3a9e3c46f1756d8d2d6352b735b1f59.png

优化3:如果文件太多,可以为每个Excel文件使用一个异步进程,实现双重异步读取和插入。

b1b2ade1e7f9eb31bf2e8bf1036068b.png

使用双重异步处理后,从191秒优化到了2秒,你能相信吗?

以下是异步读取Excel文件和批量读取大Excel文件的关键代码。

异步读取缓存的Excel Controller类

@RequestMapping(value = "/readExcelCacheAsync", method = RequestMethod.POST)
@ResponseBody
public String readExcelCacheAsync() {
    String path = "G:\\Test\\data\\";
    try {
        // 读取Excel之前,缓存所有数据
        USER_INFO_SET = getUserInfo();
        File file = new File(path);
        String[] xlsxArr = file.list();
        for (int i = 0; i < xlsxArr.length; i++) {
            File fileTemp = new File(path + "\\" + xlsxArr[i]);
            String filename = fileTemp.getName().replace(".xlsx", "");
            readExcelCacheAsyncService.readXls(path + filename + ".xlsx", filename);
        }
    } catch (Exception e) {
        logger.error("|#ReadDBCsv|#Exception: ", e);
        return "error";
    }
    return "success";
}

批量读取超大Excel文件

@Async("async-executor")
public void readXls(String filePath, String filename) throws Exception {
    @SuppressWarnings("resource")
    XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));
    // 读取第一个工作表
    XSSFSheet sheet = xssfWorkbook.getSheetAt(0);
    // 总行数
    int maxRow = sheet.getLastRowNum();
    logger.info(filename + ".xlsx,共 " + maxRow + " 行数据!");
    StringBuilder insertBuilder = new StringBuilder();
    insertBuilder.append("insert into ").append(filename).append(" ( UUID,");
    XSSFRow row = sheet.getRow(0);
    for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) {
        insertBuilder.append(row.getCell(i)).append(",");
    }
    insertBuilder.deleteCharAt(insertBuilder.length() - 1);
    insertBuilder.append(" ) values ( ");
    int times = maxRow / STEP + 1;
    for (int time = 0; time < times; time++) {
        int start = STEP * time + 1;
        int end = STEP * time + STEP;
        if (time == times - 1) {
            end = maxRow;
        }
        if (end + 1 - start > 0) {
            readExcelDataAsyncService.readXlsCacheAsyncMybatis(sheet, row, start, end, insertBuilder);
        }
    }
}

异步批量插入数据库

@Async("async-executor")
public void readXlsCacheAsync(XSSFSheet sheet, XSSFRow row, int start, int end, StringBuilder insertBuilder) {
    StringBuilder stringBuilder = new StringBuilder();
    for (int i = start; i <= end; i++) {
        XSSFRow xssfRow = sheet.getRow(i);
        String id = "";
        String name = "";
        for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
            if (j == 0) {
                id = xssfRow.getCell(j) + "";
            } else if (j == 1) {
                name = xssfRow.getCell(j) + "";
            }
        }
        // 在读取Excel之前,先缓存所有数据,然后做决策
        boolean flag = isExisted(id, name);
        if (!flag) {
            stringBuilder.append(insertBuilder);
            stringBuilder.append('\'').append(uuid()).append('\'').append(",");
            for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
                stringBuilder.append('\'').append(value).append('\'').append(",");
            }
            stringBuilder.deleteCharAt(stringBuilder.length() - 1);
            stringBuilder.append(" )").append("\n");
        }
    }
    List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());
    if (collect != null && collect.size() > 0) {
        int sum = JdbcUtil.executeDML(collect);
    }
}
private boolean isExisted(String id, String name) {
    return ReadExcelCacheAsyncController.USER_INFO_SET.contains(id + "," + name);
}

异步线程池工具类

@Async 的目的是异步处理任务。

  1. 在方法上添加 @Async 表明该方法是异步的。
  2. 在类上添加 @Async 表示该类中的所有方法都是异步的。
  3. 使用此注解的类必须由 Spring 管理。
  4. 必须在启动类或配置类中添加 @EnableAsync 注解,@Async 才能生效。

在使用 @Async 时,如果不指定线程池的名称,即不自定义线程池,默认会使用一个线程池。这个默认线程池是 Spring 的 SimpleAsyncTaskExecutor。

默认线程池的默认配置如下:

  1. 默认核心线程数:8。
  2. 最大线程数:Integer.MAX_VALUE。
  3. 队列类型:LinkedBlockingQueue。
  4. 容量:Integer.MAX_VALUE。
  5. 空闲线程保留时间:60秒。
  6. 线程池拒绝策略:AbortPolicy。

从最大线程数可以看出,在并发情况下,线程会无限制地创建。

你也可以通过 yml 文件重新配置:

spring:
  task:
    execution:
      pool:
        max-size: 10
        core-size: 5
        keep-alive: 3s
        queue-capacity: 1000
        thread-name-prefix: my-executor

你也可以自定义线程池。以下是使用 @Async 自定义线程池的简单代码实现:

@EnableAsync // 支持异步操作
@Configuration
public class AsyncTaskConfig {
    /**
     * 来自 com.google.guava 的线程池
     * @return
     */
    @Bean("my-executor")
    public Executor firstExecutor() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-executor").build();
        // 获取 CPU 处理器数量
        int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(curSystemThreads, 100,
                200, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(), threadFactory);
        threadPool.allowsCoreThreadTimeOut();
        return threadPool;
    }
    /**
     * Spring 的线程池
     * @return
     */
    @Bean("async-executor")
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // 核心线程数
        taskExecutor.setCorePoolSize(24);
        // 线程池维护的最大线程数,超出核心线程数的线程仅当缓冲队列满时才会创建
        taskExecutor.setMaxPoolSize(200);
        // 缓冲队列
        taskExecutor.setQueueCapacity(50);
        // 超出核心线程数的线程空闲时间,超时后将被销毁
        taskExecutor.setKeepAliveSeconds(200);
        // 异步方法内部线程名
        taskExecutor.setThreadNamePrefix("async-executor-");
        /**
         * 当线程池的任务缓存队列已满,且线程池中的线程数量已达到最大值时,如果还有任务到来,将采用任务拒绝策略。
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:抛弃任务并抛出 RejectedExecutionException 异常。
         * ThreadPoolExecutor.DiscardPolicy:抛弃任务,但不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列最前面的任务,然后尝试执行当前任务(重复此过程)。
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前任务,自动调用执行方法,直到成功。
         */
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }
}

异步失效的原因

  1. 被 @Async 注解的方法不是 public 的;
  2. 被 @Async 注解的方法的返回值类型只能是 void 或 Future;
  3. 被 @Async 注解的方法如果是静态的也会失效;
  4. 未添加 @EnableAsync 注解;
  5. 调用者和被 @Async 注解的方法不能在同一个类中;
  6. 对异步方法使用 @Transactional 是无效的,但对异步方法内调用的方法加上 @Transactional 是有效的。

线程池中设置核心线程数的问题

我尚未有时间详细探讨:在线程池中设置 CorePoolSize 和 MaxPoolSize 的最适宜和最高效的数量是多少。

借此机会进行了一些测试。

我记得有个关于 CPU 处理器数量的说法

将 CorePoolSize 设置为 CPU 处理器的数量时,效率最高吗?

// 获取 CPU 处理器数量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;

Runtime.getRuntime().availableProcessors() 会获取 CPU 核心线程数,代表计算资源。

  • 对于 CPU 密集型任务,线程池的大小设置为 N,与 CPU 线程数一致,这可以最大限度地减少线程间的上下文切换。但在实际开发中,一般设置为 N+1,以防止线程由于不可预见的情况而阻塞。如果发生阻塞,多出来的线程可以继续执行任务,保证 CPU 的高效利用。
  • 对于 IO 密集型任务,线程池的大小设置为 2N。这个数值是根据业务压力测试得出的,或者在不涉及业务时使用推荐值。

实际中,线程池的具体大小需要根据压力测试以及机器的当前状态进行调整。

如果线程池过大,会导致 CPU 持续切换,系统整体性能并不会有显著提高,反而可能会变慢。

我电脑的 CPU 处理器数量为 24。

那么一次读取多少行效率最高呢?

测试中,Excel 文件包含 10 万行数据。10 万 / 24 = 4166,因此我设置为 4200。这是最有效的设置吗?

测试过程中似乎的确如此。

我记得大家习惯性地将核心线程数(CorePoolSize)和最大线程数(MaxPoolSize)设置为相同的数值,通常是 200。

这只是随机选择,还是基于经验的?

测试发现,当 CorePoolSize 和 MaxPoolSize 都设置为 200 时,最初同时开启了 150 个线程工作。

为什么会这样呢?

经过数十次测试后

  1. 发现核心线程数并没有太大区别;
  2. 关键是每次读取和存储的行数,不能太多,存储速度会逐渐减慢;
  3. 也不能太少,如果少于 150 个线程,会导致线程阻塞,反而减慢进程。

IV.使用 EasyExcel 读取并插入数据库

我不会写 EasyExcel 的双异步优化。大家要记住避免掉进低级勤奋的陷阱。

ReadEasyExcelController

@RequestMapping(value = "/readEasyExcel", method = RequestMethod.POST)
@ResponseBody
public String readEasyExcel() {
    try {
        String path = "G:\\Test\\data\\";
        String[] xlsxArr = new File(path).list();
        for (int i = 0; i < xlsxArr.length; i++) {
            String filePath = path + xlsxArr[i];
            File fileTemp = new File(path + xlsxArr[i]);
            String fileName = fileTemp.getName().replace(".xlsx", "");
            List<UserInfo> list = new ArrayList<>();
            EasyExcel.read(filePath, UserInfo.class, new ReadEasyExeclAsyncListener(readEasyExeclService, fileName, batchCount, list)).sheet().doRead();
        }
    }catch (Exception e){
        logger.error("readEasyExcel Exception:",e);
        return "error";
    }
    return "success";
}

ReadEasyExeclAsyncListener

public ReadEasyExeclService readEasyExeclService;
// 表名
public String TABLE_NAME;
// 批量插入阈值
private int BATCH_COUNT;
// 数据收集
private List<UserInfo> LIST;
public ReadEasyExeclAsyncListener(ReadEasyExeclService readEasyExeclService, String tableName, int batchCount, List<UserInfo> list) {
    this.readEasyExeclService = readEasyExeclService;
    this.TABLE_NAME = tableName;
    this.BATCH_COUNT = batchCount;
    this.LIST = list;
}
@Override
public void invoke(UserInfo data, AnalysisContext analysisContext) {
    data.setUuid(uuid());
    data.setTableName(TABLE_NAME);
    LIST.add(data);
    if (LIST.size() >= BATCH_COUNT) {
        // 批量入库
        readEasyExeclService.saveDataBatch(LIST);
    }
}
@Override
public void doAfterAllAnalysed(AnalysisContext analysisContext) {
    if (LIST.size() > 0) {
        // 最后一批入库
        readEasyExeclService.saveDataBatch(LIST);
    }
}
public static String uuid() {
    return UUID.randomUUID().toString().replace("-", "");
}

ReadEasyExeclServiceImpl

@Service
public class ReadEasyExeclServiceImpl implements ReadEasyExeclService {
    @Resource
    private ReadEasyExeclMapper readEasyExeclMapper;
    @Override
    public void saveDataBatch(List<UserInfo> list) {
        // Insert into the database via mybatis
        readEasyExeclMapper.saveDataBatch(list);
      // Insert into the database via JDBC
        // insertByJdbc(list);
        list.clear();
    }
    
    private void insertByJdbc(List<UserInfo> list){
        List<String> sqlList = new ArrayList<>();
        for (UserInfo u : list){
            StringBuilder sqlBuilder = new StringBuilder();
            sqlBuilder.append("insert into ").append(u.getTableName()).append(" ( UUID,ID,NAME,AGE,ADDRESS,PHONE,OP_TIME ) values ( ");
            sqlBuilder.append("'").append(ReadEasyExeclAsyncListener.uuid()).append("',")
                            .append("'").append(u.getId()).append("',")
                            .append("'").append(u.getName()).append("',")
                            .append("'").append(u.getAge()).append("',")
                            .append("'").append(u.getAddress()).append("',")
                            .append("'").append(u.getPhone()).append("',")
                            .append("sysdate )");
            sqlList.add(sqlBuilder.toString());
        }
        JdbcUtil.executeDML(sqlList);
    }
}

UserInfo

@Data
public class UserInfo {
    private String tableName;
    private String uuid;
    @ExcelProperty(value = "ID")
    private String id;
    @ExcelProperty(value = "NAME")
    private String name;
    @ExcelProperty(value = "AGE")
    private String age;
    @ExcelProperty(value = "ADDRESS")
    private String address;
    @ExcelProperty(value = "PHONE")
    private String phone;
}

结语

在处理高并发、大数据导入等场景时,异步编程和线程池技术提供了一种极具效率的解决方案。通过合理配置线程池的核心线程数、最大线程数、队列长度等参数,能够在确保系统稳定性的前提下,大幅提升并发处理能力。而通过异步编程,我们可以有效避免线程阻塞、减少资源浪费,并让系统在面对大量请求时依然能够保持较高的响应速度。

本文的示例通过 Spring Boot 的 @Async 注解和自定义线程池,在实际的 EasyExcel 大数据导入场景下,验证了这种技术组合的高效性和实用性。此外,通过对 CPU 密集型任务和 IO 密集型任务的深入分析,开发者能够根据自身项目的特点,选择合适的线程池配置策略,最大化资源利用率和性能表现。

在实际应用中,线程池和异步编程不仅适用于大数据导入,还可以推广到包括文件处理、网络请求、日志处理等各类需要并发处理的场景中。因此,掌握并灵活运用这些技术,将为我们的系统性能优化提供坚实的基础,使我们能够应对更复杂、更苛刻的业务需求。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
7月前
|
SQL 数据库连接 数据库
【SQL Server】2. 将数据导入导出到Excel表格当中
【SQL Server】2. 将数据导入导出到Excel表格当中
169 0
|
7月前
|
关系型数据库 MySQL 区块链
将excel表格数据导入Mysql新建表中
将excel表格数据导入Mysql新建表中
|
4月前
|
关系型数据库 MySQL Windows
MySQL数据导入:MySQL 导入 Excel 文件.md
MySQL数据导入:MySQL 导入 Excel 文件.md
|
7月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之在 DataWorks 中将本地数据导入至 Excel 电子表格中如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
132 0
|
7月前
|
存储 SQL C#
C#实现Excel合并单元格数据导入数据集
C#实现Excel合并单元格数据导入数据集
|
Java easyexcel 数据库连接
多个sheet Excel 数据 导入数据库 如何实现?
多个sheet Excel 数据 导入数据库 如何实现?
267 0
|
测试技术
python+requests+excel+unittest+ddt接口自动化数据驱动并生成html报告(优化版)
python+requests+excel+unittest+ddt接口自动化数据驱动并生成html报告(优化版)
318 0
python+requests+excel+unittest+ddt接口自动化数据驱动并生成html报告(优化版)
|
数据可视化
excel点数据导入arcgis
在最近的工作中,需要将坐标点(Excel格式)导入到ArcGIS中,用来做地理可视化和数据统计,例如下面的坐标点数据。
281 0
|
SQL 搜索推荐 关系型数据库
kettle操作--excel数据导入mysql数据库
kettle操作--excel数据导入mysql数据库
kettle操作--excel数据导入mysql数据库
|
SQL 算法 Java
百万级别数据Excel导出优化
这篇文章不是标题党,下文会通过一个仿真例子分析如何优化百万级别数据Excel导出。
961 0
百万级别数据Excel导出优化