如何实现百万级数据从 Excel 导入到数据库?

简介: 本文介绍百万级Excel数据导入数据库的优化方案,涵盖内存溢出、性能瓶颈与错误处理三大问题。通过EasyExcel流式读取避免内存溢出,结合多线程并发读取多个sheet,并利用生产者-消费者模式提升处理效率。采用批量插入与事务管理提高数据库写入性能,同时设计数据校验、重试机制与日志记录保障容错性,确保大规模数据导入稳定高效。

这个场景考察的方面其实挺多的,还是那句话,我们要学会拆解。我们先来看一下,百万级数据从 Excel 读取,并插入到数据库,可能会遇到哪些问题:

  1. 内存溢出问题
  • 百万级数据量的 Excel 文件会非常大,都加载到内存中可能会导致内存溢出。
  1. 性能问题
  • 百万级数据从 Excel 读取并插入到数据库,可能会很慢,需要考虑性能问题。
  1. 错误处理
  • 在文件的读取及导入过程中,可能会遇到各种各样的问题,我们需要妥善地处理好这些问题。




内存溢出问题

百万级数据量,一次性都读取到内存中,肯定是不现实的,那么好的办法就是基于流式读取的方式进行分批处理

在技术选型上,我们选择使用 EasyExcel,它特别针对大数据量和复杂 Excel 文件的处理进行了优化。在解析 Excel 时,EasyExcel 不会将 Excel 一次性全部加载到内存中,而是从磁盘上一行行读取数据,逐个解析。


性能问题

百万级数据的处理,如果用单线程的话肯定是很慢的,想要提升性能,那么就需要使用多线程

多线程的使用上涉及到两个场景:一个是用多线程进行文件的读取,另一个是用多线程实现数据的插入。这里就涉及到一个生产者-消费者的模式了,多个线程读取,然后多个线程插入,这样可以最大限度地提升整体的性能。

而数据的插入,我们除了借助多线程之外,还可以同时使用数据库的批量插入功能,这样就能更加地提升插入速度。


错误处理

在文件的读取和数据库写入过程中,会需要解决各种各样的问题,比如数据格式错误、数据不一致、有重复数据等。

所以我们需要分两步来,第一步就是先进行数据的检查,在开始插入之前就把数据的格式等问题提前检查好,然后在插入过程中,对异常进行处理。

处理方式有很多种,可以进行事务回滚、也可以进行日志记录。这个根据实际情况,一般来说不建议做回滚,直接做自动重试,重试几次之后还是不行的话,再记录日志然后后续重新插入即可。

并且在这个过程中,需要考虑一下数据重复的问题,需要在 Excel 中某几个字段设置成数据库唯一性约束,然后在遇到数据冲突的时候,进行处理,处理方式可以是覆盖、跳过以及报错。这个根据实际业务情况来,一般来说跳过 + 打印日志是相对合理的。

所以,整体方案就是

借助 EasyExcel 来实现 Excel 的读取,因为他并不会一次性把整个 Excel 都加载到内存中,而是逐行读取的。为了提升并发性能,我们再进一步将百万级数据分散到不同的 sheet 中,然后借助线程池,多线程同时读取不同的 sheet,在读取过程中,借助 EasyExcel 的 ReadListener 做数据处理。

在处理过程中,我们并不会每一条数据都操作数据库,这样对数据库来说压力太大了,我们会设定一个批次,比如 1000 条,我们会把从 Excel 中读取到的数据暂存在内存中,这里可以使用 List 实现,当读取了 1000 条之后,就执行一次数据的批量插入,批量插入可以借助 mybatis 就能简单的实现了。


具体实现

为了提升并发处理的能力,我们把百万级数据放到同一个 Excel 的不同的 sheet 中,然后通过使用 EasyExcel 并发地读取这些 sheet。

EasyExcel 提供了 ReadListener 接口,允许在读取每一批数据后进行自定义处理。我们可以基于它的这个功能来实现文件的分批读取。

先增加依赖:

<dependencies>
    <!-- EasyExcel -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>easypexcel</artifactId>
        <version>最新的版本号</version>
    </dependency>

    <!-- 数据库连接和线程池 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
</dependencies>

然后实现并发读取多个sheet的代码:

@Service
public class ExcelImporterService {

    @Autowired
    private MyDataService myDataService;

    public void doImport() {
        // Excel文件的路径
        String filePath = "users/clay/workspace/excel/test.xlsx";

        // 需要读取的sheet数量
        int numberOfSheets = 20;

        // 创建一个固定大小的线程池,大小与sheet数量相同
        ExecutorService executor = Executors.newFixedThreadPool(numberOfSheets);

        // 遍历所有sheets
        for (int sheetNo = 0; sheetNo < numberOfSheets; sheetNo++) {
            // 在Java lambda表达式中使用的变量需要是final
            int finalSheetNo = sheetNo;

            // 向线程池提交一个任务
            executor.submit(() -> {
                // 使用EasyExcel读取指定的sheet
                EasyExcel.read(filePath, MyDataModel.class, new MyDataModelListener(myDataService))
                         .sheet(finalSheetNo) // 指定sheet号
                         .doRead(); // 开始读取操作
            });
        }

        // 启动线程池的关闭序列
        executor.shutdown();

        // 等待所有任务完成,或者在等待超时前被中断
        try {
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            // 如果等待过程中线程被中断,打印异常信息
            e.printStackTrace();
        }
    }
}

这段代码通过创建一个固定大小的线程池来并发读取一个包含多个 sheets 的 Excel 文件。每个 sheet 的读取作为一个单独的任务提交给线程池。


我们在代码中用了一个 MyDataModelListener,这个类是 ReadListener 的一个实现类。当 EasyExcel 读取每一行数据时,它会自动调用我们传入的


这个 ReadListener 实例的 invoke 方法。在这个方法中,我们就可以定义如何处理这些数据。

MyDataModelListener 还包含 doAfterAllAnalysed 方法,这个方法在所有数据都读取完毕后被调用。这里可以执行一些清理工作,或处理剩余的数据。


接下来,我们来实现这个我们的 ReadListener

import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.read.listener.ReadListener;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.List;

// 自定义的ReadListener,用于处理从Excel读取的数据
public class MyDataModelListener implements ReadListener<MyDataModel> {

    // 设置批量处理的数据大小
    private static final int BATCH_SIZE = 1000;

    // 用于暂存读取的数据,直到达到批量大小
    private List<MyDataModel> batch = new ArrayList<>();

    private MyDataService myDataService;

    // 构造函数,注入MyBatis的Mapper
    public MyDataModelListener(MyDataService myDataService) {
        this.myDataService = myDataService;
    }

    // 每读取一行数据都会调用此方法
    @Override
    public void invoke(MyDataModel data, AnalysisContext context) {
        // 检查数据的合法性及有效性
        if (validateData(data)) {
            // 有效数据添加到list中
            batch.add(data);
        } else {
            // 处理无效数据,例如记录日志或跳过
        }

        // 当达到批量大小时,处理这批数据
        if (batch.size() >= BATCH_SIZE) {
            processBatch();
        }
    }

    // 所有数据读取完成后调用此方法
    @Override
    public void doAfterAllAnalysed(AnalysisContext context) {
        // 如果还有未处理的数据,进行处理
        if (!batch.isEmpty()) {
            processBatch();
        }
    }

    // 处理一批数据的方法
    private void processBatch() {
        int retryCount = 0;
        // 重试逻辑
        while (retryCount < 3) {
            try {
                // 尝试批量插入
                myDataService.batchInsert(batch);
                // 清空批量数据,以便下一次批量处理
                batch.clear();
                break;
            } catch (Exception e) {
                // 重试计数增加
                retryCount++;
                // 如果重试3次都失败,记录错误日志
                if (retryCount >= 3) {
                    logError(e, batch);
                }
            }
        }
    }

    // 记录错误日志的方法
    private void logError(Exception e, List<MyDataModel> failedBatch) {
        // 在这里实现错误日志记录逻辑
        // 可以记录异常信息和导致失败的数据
    }

    // 验证数据是否有效(例如:数据库中不存在重复数据)
    private boolean validateData(MyDataModel data) {
        // 调用 mapper 方法来检查数据库中是否已存在该数据
        int count = myDataService.countByColumn1(data.getColumn1());
        // 如果 count 为 0,表示数据不存在,返回 true;否则返回 false
        if (count == 0) {
            return true;
        }

        // 在这里实现数据验证逻辑
        return false;
    }
}
@Service
public class MyDataService {

    // MyBatis的Mapper,用于数据库操作
    @Autowired
    private MyDataMapper myDataMapper;

    // 使用Spring的事务管理进行批量插入
    @Transactional(rollbackFor = Exception.class)
    public void batchInsert(List<MyDataModel> batch) {
        // 使用MyBatis Mapper进行批量插入
        myDataMapper.batchInsert(batch);
    }

    public int countByColumn1(String column1) {
        return myDataMapper.countByColumn1(column1);
    }
}

通过自定义这个 MyDataModelListener,我们就可以在读取 Excel 文件的过程中处理数据。


每读取到一条数据之后会把它们放入一个 List,当 List 中积累到 1000 条之后,进行一次数据库的批量插入,插入时如果失败则重试,最后还是失败就打印日志。


这里批量插入,用到了 MyBatis 的批量插入,代码实现如下:

import org.apache.ibatis.annotations.Mapper;
import java.util.List;

@Mapper
public interface MyDataMapper {
    void batchInsert(List<MyDataModel> dataList);

    int countByColumn1(String column1);
}
<insert id="batchInsert" parameterType="list">
    INSERT INTO test_table_name (column1, column2, ...)
    VALUES
    <foreach collection="list" item="item" index="index" separator=",">
        (#{item.column1}, #{item.column2}, ...)
    </foreach>
</insert>

<select id="countByColumn1" resultType="int">
    SELECT COUNT(*) FROM your_table WHERE column1 = #{column1}
</select>


目录
相关文章
|
3月前
|
存储 算法 关系型数据库
【Java架构师体系课 | MySQL篇】② 深入理解MySQL索引底层数据结构与算法
InnoDB索引为何采用B+树?本文由浅入深解析二叉树、红黑树、B树的缺陷,详解B+树的结构优势:非叶子节点不存数据、叶子节点有序且双向链接,支持高效范围查询与磁盘预读,三层即可存储两千多万数据,极大提升查询性能。
247 7
|
SQL XML 关系型数据库
Mybatis-Plus通过SQL注入器实现真正的批量插入
Mybatis-Plus通过SQL注入器实现真正的批量插入
7617 0
Mybatis-Plus通过SQL注入器实现真正的批量插入
|
3月前
|
消息中间件 架构师 Kafka
【架构师】如何做技术选型?
技术选型无绝对优劣,关键在于“更合适”。需综合评估功能满足度、可扩展性、安全性、性能等非功能性需求,同时考量使用人数、社区活跃度、迭代速度、学习与维护成本,以及与现有技术体系的匹配度,权衡利弊后做出最优选择。
174 4
|
3月前
|
架构师 微服务
【架构师】微服务的拆分有哪些原则?
微服务拆分需遵循七大原则:职责单一、围绕业务、中台化公共模块、按系统保障级别分离、技术栈解耦、避免循环依赖,并遵循康威定律使架构与组织匹配,提升可维护性与协作效率。
327 4
|
5月前
|
缓存 监控 Java
SpringBoot @Scheduled 注解详解
使用`@Scheduled`注解实现方法周期性执行,支持固定间隔、延迟或Cron表达式触发,基于Spring Task,适用于日志清理、数据同步等定时任务场景。需启用`@EnableScheduling`,注意线程阻塞与分布式重复问题,推荐结合`@Async`异步处理,提升任务调度效率。
858 128
|
存储 Java easyexcel
招行面试:100万级别数据的Excel,如何秒级导入到数据库?
本文由40岁老架构师尼恩撰写,分享了应对招商银行Java后端面试绝命12题的经验。文章详细介绍了如何通过系统化准备,在面试中展示强大的技术实力。针对百万级数据的Excel导入难题,尼恩推荐使用阿里巴巴开源的EasyExcel框架,并结合高性能分片读取、Disruptor队列缓冲和高并发批量写入的架构方案,实现高效的数据处理。此外,文章还提供了完整的代码示例和配置说明,帮助读者快速掌握相关技能。建议读者参考《尼恩Java面试宝典PDF》进行系统化刷题,提升面试竞争力。关注公众号【技术自由圈】可获取更多技术资源和指导。
|
3月前
|
SQL 存储 关系型数据库
MySQL中到底什么是覆盖索引、索引下推?
覆盖索引指查询只需通过索引即可获取数据,无需回表,提升查询效率。索引下推则在索引遍历时提前过滤条件,减少回表次数,尤其适用于联合索引中部分字段无法使用的情况,二者均能显著降低I/O开销,提高查询性能。(238字)
430 1
|
3月前
|
人工智能 安全 Java
Spring AI 核心架构解析:构建企业级 AI 应用的 Java 新范式
Spring AI 为 Java 开发者提供企业级 AI 应用新范式,通过分层架构、统一抽象(如 ChatClient、PromptTemplate)与 Spring 生态深度集成,支持 RAG、函数调用、流式响应等核心功能,实现安全、可观测、可维护的智能系统构建。
1007 8
|
3月前
|
关系型数据库 MySQL Java
【Java架构师体系课 | MySQL篇】⑦ 深入理解MySQL事务隔离级别与锁机制
本文深入讲解数据库事务隔离级别与锁机制,涵盖ACID特性、并发问题(脏读、不可重复读、幻读)、四种隔离级别对比及MVCC原理,分析表锁、行锁、间隙锁、临键锁等机制,并结合实例演示死锁处理与优化策略,帮助理解数据库并发控制核心原理。
400 4