从Excel批量导入数据说到ForkJoin的原理

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 前面我们介绍了EasyPOI,用其进行Excel导入导出,非常的方便,非常的简单。但是4.0.0 版本以及其之前的版本都是通过单线程的方式对Excel中的数据进行解析的。效率比较差。今天我将做一个测试,5000条数据,分别使用EasyPOI的方式和自己手写ForkJoin的方式(多任务)的方式来导入,来比较这两者的性能差异。

前言

前面我们介绍了EasyPOI,用其进行Excel导入导出,非常的方便,非常的简单。但是4.0.0 版本以及其之前的版本都是通过单线程的方式对Excel中的数据进行解析的。效率比较差。

今天我将做一个测试,5000条数据,分别使用EasyPOI的方式和自己手写ForkJoin的方式(多任务)的方式来导入,来比较这两者的性能差异。

测试前准备

1. 首先创建一个测试项目

首先我们需要创建一个测试项目,我这里新建了一个SpringBoot项目。

然后引入easypoi的依赖,本次引入的easyPOI的版本是4.0.0版本。

<!--easypoi-->
  <dependency>
    <groupId>cn.afterturn</groupId>
    <artifactId>easypoi-spring-boot-starter</artifactId>
    <version>4.0.0</version>
  </dependency>
  <!--easypoi-->

2. 分别用两种方式实现导入

2.1:使用EasyPOI的方式

@Override
    public String batchUploadStudent_easyPOI(MultipartFile file) throws Exception {
        long startTime = System.currentTimeMillis();
        List<Student> studentList = ExcelImportUtil.importExcel(file.getInputStream(), Student.class, new ImportParams());
        log.info("********通过EasyPOI读取文件总耗时是={},读取到的数据总条数是={}", (System.currentTimeMillis() - startTime) + "毫秒", studentList.size());
        return null;
    }

使用EasyPOI实现导入非常的简单,只需要调用importExcel方法即可。再此不在赘述。

2.2:自己手写Fork-Join的方式

接下来,我们自己手写Fork-Join的方式来实现文件的解析。

1.解析单元格的方法,本demo是直接挨个读取每个单元格的,当然也可以通过注解的方式来实现。代码如下:

private List<Student> getData(Sheet sheet, int start, int end) {
        List<Student> mapList = new ArrayList<>();
        for (int i = start; i <= end; i++) {
            Student student = null;
            try {
                Row row = sheet.getRow(i);
                student = new Student();
                student.setClassName(ExcelUtil.getKeyValue(row.getCell(0)));
                student.setStudentName(ExcelUtil.getKeyValue(row.getCell(1)));
                student.setStudentMobile(ExcelUtil.getKeyValue(row.getCell(2)));               student.setIdCard(ExcelUtil.getKeyValue(row.getCell(3)));
                student.setStudentNo(ExcelUtil.getKeyValue(row.getCell(4)));
                student.setIdCard(ExcelUtil.getKeyValue(row.getCell(5)));
            } catch (Exception e) {
                log.info("***************税号={},文件名={},数据解析出现异常={}", e);
                continue;
            }
            mapList.add(student);
        }
        return mapList;
    }
}

这个方法也是很简单,就是读取开始行到结束行之间的所有数据。每个单元格的读取,严格按照Excel的字段顺序来读。

2. 定义RecursiveTask类。

class JoinTask extends RecursiveTask<List<Student>> {
  //开始解读的行
        private int start;
  //结束解读的行
        private int end;
  //分页
        private Sheet sheet;
  //总的行数
        private int total;
        public JoinTask(int start, int end, Sheet sheet) {
            this.start = start;
            this.end = end;
            this.sheet = sheet;
            this.total = sheet.getLastRowNum();
        }
        @Override
        protected List<Student> compute() {
    //数据异常
            if (start > end || total < end) {
                return new ArrayList<>(1);
            }
    //每200行一个解析
            if (end - start <= 200) {
                return getData(sheet, start, end).stream().filter(DistinctUtil.distinctByKey(Student::getStudentNo)).collect(Collectors.toList());
            } else {
    //二分法,将数据平均分成两块
                int mid = (start + end) / 2;
    //递归调用,左边是序号小的那一块
                JoinTask rightTask = new JoinTask(start, mid, sheet);
    //递归调用,右边是数据大的那一块
                JoinTask leftTask = new JoinTask(mid + 1, end, sheet);
                //写法一
                rightTask.fork();
                List<Student> leftList =  leftTask.compute();
                List<Student> rightList = rightTask.join();
                //写法二
                //invokeAll(rightTask, leftTask);
                //List<Student> leftList = leftTask.join();
                //List<Student> rightList = rightTask.join();
    //将左边和右边的数据合并
                leftList.addAll(rightList);
                return leftList;
            }
        }
    }

RecursiveTask类是ForkJoin方式的核心类,后面会介绍这个类的作用。

3. 调用的入口

public List<Student> importExcel(Workbook workbook) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);;
        Sheet sheet = workbook.getSheetAt(0);
  //开启任务
        JoinTask joinTask = new JoinTask(1, sheet.getLastRowNum(), sheet);
        List<Student> importVOList = forkJoinPool.invoke(joinTask);
        //excel内部去重
        List<Student> noRepeatImportVOList = importVOList.stream().filter(DistinctUtil.distinctByKey(Student::getStudentNo)).collect(Collectors.toList());
        return noRepeatImportVOList;
    }

测试类:

@Override
    public String batchUploadStudent_forkjoin(Workbook workbook) {
        long startTime = System.currentTimeMillis();
        List<Student> studentList = studentExcelImportWrapper.importExcel(workbook);
        log.info("********通过Fork-Join的方式读取文件总耗时是={},读取到的数据条数是={}", (System.currentTimeMillis() - startTime) + "毫秒", studentList.size());
        return null;
    }

3. 测试结果

上传同样的一个5000条数据的Excel,上传后的测试结果如下:

cfce68d1fca0a1c39f296849b2b9267f_20200418175249461.jpg

从上测试结果,我们可以明显看出,性能差别还是挺大的,这主要是由于EasyPOI使用的是单线程的方式来读取Excel。数据量越大性能差别越明显。既然这个ForkJoin这么好用,那么就让我们来认识一下它吧。

ForkJoin初识

什么是ForkJoin框架

ForkJoin框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。其中Fork就是将大任务拆分成若干个可以并发执行的小任务。Join就是合并所有小任务的执行结果。其执行流程如下图所示:

5c0b69a887b305abb7851815457f51f5_watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTQ1MzQ4MDg=,size_16,color_FFFFFF,t_70.png

任务分割和结果合并说明

ForkJoinTask 就是最基本的任务,使用ForkJoin 框架必须创建的对象,它提供fork,join操作。一般而言,我们不需要直接继承它,只需要继承它的子类。它有两个子类。

RecursiveAction: 用于无结果返回的任务。

RecursiveTask: 用于有返回结果的任务。

它的fork方法就是让task异步执行,join,就是让task同步执行,并获取返回值。

异常处理

ForkJoinTask在执行的时候可能会抛出异常,但是我们没有办法在主线程中直接捕获异常,所以ForkJoinTask提供了isCompleteAbnormally()方法来检查任务是否已经跑出异常或者已经被取消了。

我们可以通过getException()方法获取异常信息,这个返回返回Throwable,如果任务被取消则返回CancellationException,如果任务正常执行完或者没有抛出异常,就返回null。

if (rightTask.isCompletedAbnormally()) {
                    System.out.println(rightTask.getException());
                }

ForkJoin框架的实现原理

ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交到ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。

fork方法

public final ForkJoinTask<V> fork() {
        Thread t;
  //如果当前线程是ForkJoinWorkerThread线程
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

如上代码:调用fork方法,这个方法的逻辑是,如果当前线程是ForkJoinWorkerThread线程则调用ForkJoinWorkerThread的push方法异步执行这个任务,然后立即返回结果。如果不是的话,则调用ForkJoinPool.common.externalPush(this);异步执行这个任务。

push方法

push方法是把当前任务存放在ForkJoinTask数组的queue里,然后再调用ForkJoinPool的signalWork()方法来唤醒或者创建一个工作线程来执行任务。

final void push(ForkJoinTask<?> task) {
            ForkJoinTask<?>[] a; ForkJoinPool p;
            int b = base, s = top, n;
            if ((a = array) != null) {    // ignore if queue removed
                int m = a.length - 1;     // fenced write for task visibility
    //将当前任务存放在ForkJoinTask数组中
                U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
                U.putOrderedInt(this, QTOP, s + 1);
                if ((n = s - b) <= 1) {
                    if ((p = pool) != null)
      //调用signalWork方法来唤醒或者创建一个工作线程来执行任务
                        p.signalWork(p.workQueues, this);
                }
                else if (n >= m)
                    growArray();
            }
        }

说完了fork()方法,接下来,让我们来看看join方法。

Join方法

ForkJoinTask的join方法的主要作用就是阻塞当前线程并等待执行结果

public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
     private void reportException(int s) {
        if (s == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL)
            rethrow(getThrowableException());
    }

如上,join()方法首先会调用doJoin()方法获取当前任务的执行状态来判断返回什么结果,任务的状态有四种:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)。

1.如果任务状态不是已完成,则调用reportException方法,这个方法的逻辑是

如果任务是已取消,则抛出CancellationException异常

如果任务是出现异常,则抛出封装重抛异常。

2.如果任务是已完成,则返回结果。

接下来,让我们来看看doJoin()方法。

doJoin方法

private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }

在doJoin()方法里,首先通过查看任务的状态,看任务是否执行完了,如果执行完了,则直接返回任务状态,如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成了,则设置任务的状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为EXCEPTIONAL。

参考资料

JDK1.8 源码

http://ifeve.com/fork-join-5/

源码地址

https://github.com/XWxiaowei/JavaCode/tree/master/spring-poi-demo

相关文章
|
20天前
|
数据采集 数据可视化 数据挖掘
利用Python自动化处理Excel数据:从基础到进阶####
本文旨在为读者提供一个全面的指南,通过Python编程语言实现Excel数据的自动化处理。无论你是初学者还是有经验的开发者,本文都将帮助你掌握Pandas和openpyxl这两个强大的库,从而提升数据处理的效率和准确性。我们将从环境设置开始,逐步深入到数据读取、清洗、分析和可视化等各个环节,最终实现一个实际的自动化项目案例。 ####
|
2月前
|
数据采集 存储 JavaScript
自动化数据处理:使用Selenium与Excel打造的数据爬取管道
本文介绍了一种使用Selenium和Excel结合代理IP技术从WIPO品牌数据库(branddb.wipo.int)自动化爬取专利信息的方法。通过Selenium模拟用户操作,处理JavaScript动态加载页面,利用代理IP避免IP封禁,确保数据爬取稳定性和隐私性。爬取的数据将存储在Excel中,便于后续分析。此外,文章还详细介绍了Selenium的基本设置、代理IP配置及使用技巧,并探讨了未来可能采用的更多防反爬策略,以提升爬虫效率和稳定性。
146 4
|
4月前
|
关系型数据库 MySQL Shell
不通过navicat工具怎么把查询数据导出到excel表中
不通过navicat工具怎么把查询数据导出到excel表中
51 0
|
2月前
|
数据处理 Python
Python实用记录(十):获取excel数据并通过列表的形式保存为txt文档、xlsx文档、csv文档
这篇文章介绍了如何使用Python读取Excel文件中的数据,处理后将其保存为txt、xlsx和csv格式的文件。
86 3
Python实用记录(十):获取excel数据并通过列表的形式保存为txt文档、xlsx文档、csv文档
|
2月前
|
easyexcel Java UED
SpringBoot中大量数据导出方案:使用EasyExcel并行导出多个excel文件并压缩zip后下载
在SpringBoot环境中,为了优化大量数据的Excel导出体验,可采用异步方式处理。具体做法是将数据拆分后利用`CompletableFuture`与`ThreadPoolTaskExecutor`并行导出,并使用EasyExcel生成多个Excel文件,最终将其压缩成ZIP文件供下载。此方案提升了导出效率,改善了用户体验。代码示例展示了如何实现这一过程,包括多线程处理、模板导出及资源清理等关键步骤。
|
3月前
|
数据采集 存储 数据挖掘
使用Python读取Excel数据
本文介绍了如何使用Python的`pandas`库读取和操作Excel文件。首先,需要安装`pandas`和`openpyxl`库。接着,通过`read_excel`函数读取Excel数据,并展示了读取特定工作表、查看数据以及计算平均值等操作。此外,还介绍了选择特定列、筛选数据和数据清洗等常用操作。`pandas`是一个强大且易用的工具,适用于日常数据处理工作。
|
4月前
|
SQL JSON 关系型数据库
n种方式教你用python读写excel等数据文件
n种方式教你用python读写excel等数据文件
|
4月前
|
存储 Java Apache
|
4月前
|
数据可视化 Python
我是如何把python获取到的数据写入Excel的?
我是如何把python获取到的数据写入Excel的?
59 2
|
4月前
|
索引 Python
Python基于Excel多列长度不定的数据怎么绘制折线图?
本文档详述了如何运用Python从CSV格式的Excel文件中读取特定范围的数据,并基于这些数据绘制多条折线图。文件的第一列代表循环增长的时间序列,后续各列包含不同属性的数据。通过指定起始与结束行数,可选取一个完整的时间循环周期内的数据进行绘图。每列数据以不同颜色和线型表示,并且图片长度会根据时间序列的长度动态调整,确保图表清晰易读。最终生成的图表将保存至指定文件夹。