震撼!通过双重异步,Excel 10万行数据导入从191秒优化到2秒!
在现代的企业级应用开发中,海量数据的处理效率和并发性能优化是一个非常重要的课题。无论是大规模数据导入、文件解析,还是在分布式系统中处理高并发任务,如何提升系统的处理速度、合理利用计算资源、减少线程上下文切换的开销,这些都是开发者必须面对的问题。在这一背景下,线程池技术以及异步编程逐渐成为提升系统性能的利器。
本文将深入探讨如何通过合理设计线程池和利用异步编程模型,有效优化大规模数据的处理性能。我们将结合 Spring Boot 框架中的 @Async 注解、自定义线程池、以及通过使用 EasyExcel 进行大数据量的 Excel 解析和异步写入数据库的场景,详细说明如何通过分而治之的策略,减少系统的响应时间、提高并发处理能力。同时,还将分析如何基于 CPU 和 IO 密集型任务的特性,来合理设置线程池的核心线程数、最大线程数等参数,以便在实际项目中能够充分发挥硬件资源的性能。
通常我是这样做的:
- 使用POI读取需要导入的Excel文件;
- 将文件名作为表名,列标题作为列名,并将数据拼接成SQL语句;
- 通过JDBC或Mybatis插入到数据库。
在操作中,如果文件数量多且数据量大,处理过程可能会非常缓慢。
访问后,感觉程序没有响应,但实际上,它正在读取并插入数据,只是速度很慢。
读取包含10万行的Excel文件竟然耗时191秒!
我以为程序卡住了!
private void readXls(String filePath, String filename) throws Exception { "resource") ( XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath)); // 读取第一个工作表 XSSFSheet sheet = xssfWorkbook.getSheetAt(0); // 获取总行数 int maxRow = sheet.getLastRowNum(); StringBuilder insertBuilder = new StringBuilder(); insertBuilder.append("insert into ").append(filename).append(" ( UUID,"); XSSFRow row = sheet.getRow(0); for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) { insertBuilder.append(row.getCell(i)).append(","); } insertBuilder.deleteCharAt(insertBuilder.length() - 1); insertBuilder.append(" ) values ( "); StringBuilder stringBuilder = new StringBuilder(); for (int i = 1; i <= maxRow; i++) { XSSFRow xssfRow = sheet.getRow(i); String id = ""; String name = ""; for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) { if (j == 0) { id = xssfRow.getCell(j) + ""; } else if (j == 1) { name = xssfRow.getCell(j) + ""; } } boolean flag = isExisted(id, name); if (!flag) { stringBuilder.append(insertBuilder); stringBuilder.append('\'').append(uuid()).append('\'').append(","); for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) { stringBuilder.append('\'').append(value).append('\'').append(","); } stringBuilder.deleteCharAt(stringBuilder.length() - 1); stringBuilder.append(" )").append("\n"); } } List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList()); int sum = JdbcUtil.executeDML(collect); } private static boolean isExisted(String id, String name) { String sql = "select count(1) as num from " + static_TABLE + " where ID = '" + id + "' and NAME = '" + name + "'"; String num = JdbcUtil.executeSelect(sql, "num"); return Integer.valueOf(num) > 0; } private static String uuid() { return UUID.randomUUID().toString().replace("-", ""); }
如何优化?
优化1:首先,查询所有数据,将其缓存到map中,然后在插入前做决策。这样可以大大提高速度。
优化2:如果单个Excel文件太大,可以考虑使用异步和多线程,分批读取多行并插入数据库。
优化3:如果文件太多,可以为每个Excel文件使用一个异步进程,实现双重异步读取和插入。
使用双重异步处理后,从191秒优化到了2秒,你能相信吗?
以下是异步读取Excel文件和批量读取大Excel文件的关键代码。
异步读取缓存的Excel Controller类
value = "/readExcelCacheAsync", method = RequestMethod.POST) ( public String readExcelCacheAsync() { String path = "G:\\Test\\data\\"; try { // 读取Excel之前,缓存所有数据 USER_INFO_SET = getUserInfo(); File file = new File(path); String[] xlsxArr = file.list(); for (int i = 0; i < xlsxArr.length; i++) { File fileTemp = new File(path + "\\" + xlsxArr[i]); String filename = fileTemp.getName().replace(".xlsx", ""); readExcelCacheAsyncService.readXls(path + filename + ".xlsx", filename); } } catch (Exception e) { logger.error("|#ReadDBCsv|#Exception: ", e); return "error"; } return "success"; }
批量读取超大Excel文件
"async-executor") (public void readXls(String filePath, String filename) throws Exception { "resource") ( XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath)); // 读取第一个工作表 XSSFSheet sheet = xssfWorkbook.getSheetAt(0); // 总行数 int maxRow = sheet.getLastRowNum(); logger.info(filename + ".xlsx,共 " + maxRow + " 行数据!"); StringBuilder insertBuilder = new StringBuilder(); insertBuilder.append("insert into ").append(filename).append(" ( UUID,"); XSSFRow row = sheet.getRow(0); for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) { insertBuilder.append(row.getCell(i)).append(","); } insertBuilder.deleteCharAt(insertBuilder.length() - 1); insertBuilder.append(" ) values ( "); int times = maxRow / STEP + 1; for (int time = 0; time < times; time++) { int start = STEP * time + 1; int end = STEP * time + STEP; if (time == times - 1) { end = maxRow; } if (end + 1 - start > 0) { readExcelDataAsyncService.readXlsCacheAsyncMybatis(sheet, row, start, end, insertBuilder); } } }
异步批量插入数据库
"async-executor") (public void readXlsCacheAsync(XSSFSheet sheet, XSSFRow row, int start, int end, StringBuilder insertBuilder) { StringBuilder stringBuilder = new StringBuilder(); for (int i = start; i <= end; i++) { XSSFRow xssfRow = sheet.getRow(i); String id = ""; String name = ""; for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) { if (j == 0) { id = xssfRow.getCell(j) + ""; } else if (j == 1) { name = xssfRow.getCell(j) + ""; } } // 在读取Excel之前,先缓存所有数据,然后做决策 boolean flag = isExisted(id, name); if (!flag) { stringBuilder.append(insertBuilder); stringBuilder.append('\'').append(uuid()).append('\'').append(","); for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) { stringBuilder.append('\'').append(value).append('\'').append(","); } stringBuilder.deleteCharAt(stringBuilder.length() - 1); stringBuilder.append(" )").append("\n"); } } List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList()); if (collect != null && collect.size() > 0) { int sum = JdbcUtil.executeDML(collect); } } private boolean isExisted(String id, String name) { return ReadExcelCacheAsyncController.USER_INFO_SET.contains(id + "," + name); }
异步线程池工具类
@Async 的目的是异步处理任务。
- 在方法上添加 @Async 表明该方法是异步的。
- 在类上添加 @Async 表示该类中的所有方法都是异步的。
- 使用此注解的类必须由 Spring 管理。
- 必须在启动类或配置类中添加 @EnableAsync 注解,@Async 才能生效。
在使用 @Async 时,如果不指定线程池的名称,即不自定义线程池,默认会使用一个线程池。这个默认线程池是 Spring 的 SimpleAsyncTaskExecutor。
默认线程池的默认配置如下:
- 默认核心线程数:8。
- 最大线程数:Integer.MAX_VALUE。
- 队列类型:LinkedBlockingQueue。
- 容量:Integer.MAX_VALUE。
- 空闲线程保留时间:60秒。
- 线程池拒绝策略:AbortPolicy。
从最大线程数可以看出,在并发情况下,线程会无限制地创建。
你也可以通过 yml 文件重新配置:
spring: task: execution: pool: max-size: 10 core-size: 5 keep-alive: 3s queue-capacity: 1000 thread-name-prefix: my-executor
你也可以自定义线程池。以下是使用 @Async 自定义线程池的简单代码实现:
// 支持异步操作 public class AsyncTaskConfig { /** * 来自 com.google.guava 的线程池 * @return */ "my-executor") ( public Executor firstExecutor() { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-executor").build(); // 获取 CPU 处理器数量 int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(curSystemThreads, 100, 200, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory); threadPool.allowsCoreThreadTimeOut(); return threadPool; } /** * Spring 的线程池 * @return */ "async-executor") ( public Executor asyncExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); // 核心线程数 taskExecutor.setCorePoolSize(24); // 线程池维护的最大线程数,超出核心线程数的线程仅当缓冲队列满时才会创建 taskExecutor.setMaxPoolSize(200); // 缓冲队列 taskExecutor.setQueueCapacity(50); // 超出核心线程数的线程空闲时间,超时后将被销毁 taskExecutor.setKeepAliveSeconds(200); // 异步方法内部线程名 taskExecutor.setThreadNamePrefix("async-executor-"); /** * 当线程池的任务缓存队列已满,且线程池中的线程数量已达到最大值时,如果还有任务到来,将采用任务拒绝策略。 * 通常有以下四种策略: * ThreadPoolExecutor.AbortPolicy:抛弃任务并抛出 RejectedExecutionException 异常。 * ThreadPoolExecutor.DiscardPolicy:抛弃任务,但不抛出异常。 * ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列最前面的任务,然后尝试执行当前任务(重复此过程)。 * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前任务,自动调用执行方法,直到成功。 */ taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.initialize(); return taskExecutor; } }
异步失效的原因
- 被 @Async 注解的方法不是 public 的;
- 被 @Async 注解的方法的返回值类型只能是 void 或 Future;
- 被 @Async 注解的方法如果是静态的也会失效;
- 未添加 @EnableAsync 注解;
- 调用者和被 @Async 注解的方法不能在同一个类中;
- 对异步方法使用 @Transactional 是无效的,但对异步方法内调用的方法加上 @Transactional 是有效的。
线程池中设置核心线程数的问题
我尚未有时间详细探讨:在线程池中设置 CorePoolSize 和 MaxPoolSize 的最适宜和最高效的数量是多少。
借此机会进行了一些测试。
我记得有个关于 CPU 处理器数量的说法
将 CorePoolSize 设置为 CPU 处理器的数量时,效率最高吗?
// 获取 CPU 处理器数量 int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;
Runtime.getRuntime().availableProcessors()
会获取 CPU 核心线程数,代表计算资源。
- 对于 CPU 密集型任务,线程池的大小设置为 N,与 CPU 线程数一致,这可以最大限度地减少线程间的上下文切换。但在实际开发中,一般设置为 N+1,以防止线程由于不可预见的情况而阻塞。如果发生阻塞,多出来的线程可以继续执行任务,保证 CPU 的高效利用。
- 对于 IO 密集型任务,线程池的大小设置为 2N。这个数值是根据业务压力测试得出的,或者在不涉及业务时使用推荐值。
实际中,线程池的具体大小需要根据压力测试以及机器的当前状态进行调整。
如果线程池过大,会导致 CPU 持续切换,系统整体性能并不会有显著提高,反而可能会变慢。
我电脑的 CPU 处理器数量为 24。
那么一次读取多少行效率最高呢?
测试中,Excel 文件包含 10 万行数据。10 万 / 24 = 4166,因此我设置为 4200。这是最有效的设置吗?
测试过程中似乎的确如此。
我记得大家习惯性地将核心线程数(CorePoolSize)和最大线程数(MaxPoolSize)设置为相同的数值,通常是 200。
这只是随机选择,还是基于经验的?
测试发现,当 CorePoolSize 和 MaxPoolSize 都设置为 200 时,最初同时开启了 150 个线程工作。
为什么会这样呢?
经过数十次测试后
- 发现核心线程数并没有太大区别;
- 关键是每次读取和存储的行数,不能太多,存储速度会逐渐减慢;
- 也不能太少,如果少于 150 个线程,会导致线程阻塞,反而减慢进程。
IV.使用 EasyExcel 读取并插入数据库
我不会写 EasyExcel 的双异步优化。大家要记住避免掉进低级勤奋的陷阱。
ReadEasyExcelController
value = "/readEasyExcel", method = RequestMethod.POST) ( public String readEasyExcel() { try { String path = "G:\\Test\\data\\"; String[] xlsxArr = new File(path).list(); for (int i = 0; i < xlsxArr.length; i++) { String filePath = path + xlsxArr[i]; File fileTemp = new File(path + xlsxArr[i]); String fileName = fileTemp.getName().replace(".xlsx", ""); List<UserInfo> list = new ArrayList<>(); EasyExcel.read(filePath, UserInfo.class, new ReadEasyExeclAsyncListener(readEasyExeclService, fileName, batchCount, list)).sheet().doRead(); } }catch (Exception e){ logger.error("readEasyExcel Exception:",e); return "error"; } return "success"; }
ReadEasyExeclAsyncListener
public ReadEasyExeclService readEasyExeclService; // 表名 public String TABLE_NAME; // 批量插入阈值 private int BATCH_COUNT; // 数据收集 private List<UserInfo> LIST; public ReadEasyExeclAsyncListener(ReadEasyExeclService readEasyExeclService, String tableName, int batchCount, List<UserInfo> list) { this.readEasyExeclService = readEasyExeclService; this.TABLE_NAME = tableName; this.BATCH_COUNT = batchCount; this.LIST = list; } public void invoke(UserInfo data, AnalysisContext analysisContext) { data.setUuid(uuid()); data.setTableName(TABLE_NAME); LIST.add(data); if (LIST.size() >= BATCH_COUNT) { // 批量入库 readEasyExeclService.saveDataBatch(LIST); } } public void doAfterAllAnalysed(AnalysisContext analysisContext) { if (LIST.size() > 0) { // 最后一批入库 readEasyExeclService.saveDataBatch(LIST); } } public static String uuid() { return UUID.randomUUID().toString().replace("-", ""); }
ReadEasyExeclServiceImpl
public class ReadEasyExeclServiceImpl implements ReadEasyExeclService { private ReadEasyExeclMapper readEasyExeclMapper; public void saveDataBatch(List<UserInfo> list) { // Insert into the database via mybatis readEasyExeclMapper.saveDataBatch(list); // Insert into the database via JDBC // insertByJdbc(list); list.clear(); } private void insertByJdbc(List<UserInfo> list){ List<String> sqlList = new ArrayList<>(); for (UserInfo u : list){ StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder.append("insert into ").append(u.getTableName()).append(" ( UUID,ID,NAME,AGE,ADDRESS,PHONE,OP_TIME ) values ( "); sqlBuilder.append("'").append(ReadEasyExeclAsyncListener.uuid()).append("',") .append("'").append(u.getId()).append("',") .append("'").append(u.getName()).append("',") .append("'").append(u.getAge()).append("',") .append("'").append(u.getAddress()).append("',") .append("'").append(u.getPhone()).append("',") .append("sysdate )"); sqlList.add(sqlBuilder.toString()); } JdbcUtil.executeDML(sqlList); } }
UserInfo
public class UserInfo { private String tableName; private String uuid; value = "ID") ( private String id; value = "NAME") ( private String name; value = "AGE") ( private String age; value = "ADDRESS") ( private String address; value = "PHONE") ( private String phone; }
结语
在处理高并发、大数据导入等场景时,异步编程和线程池技术提供了一种极具效率的解决方案。通过合理配置线程池的核心线程数、最大线程数、队列长度等参数,能够在确保系统稳定性的前提下,大幅提升并发处理能力。而通过异步编程,我们可以有效避免线程阻塞、减少资源浪费,并让系统在面对大量请求时依然能够保持较高的响应速度。
本文的示例通过 Spring Boot 的 @Async 注解和自定义线程池,在实际的 EasyExcel 大数据导入场景下,验证了这种技术组合的高效性和实用性。此外,通过对 CPU 密集型任务和 IO 密集型任务的深入分析,开发者能够根据自身项目的特点,选择合适的线程池配置策略,最大化资源利用率和性能表现。
在实际应用中,线程池和异步编程不仅适用于大数据导入,还可以推广到包括文件处理、网络请求、日志处理等各类需要并发处理的场景中。因此,掌握并灵活运用这些技术,将为我们的系统性能优化提供坚实的基础,使我们能够应对更复杂、更苛刻的业务需求。