高效读取大文件,再也不用担心 OOM 了!(中)

简介: 最近阿粉接到一个需求,需要从文件读取数据,然后经过业务处理之后存储到数据库中。这个需求,说实话不是很难,阿粉很快完成了第一个版本。

逐行读取

逐行读取的方式比较多,这里阿粉主要介绍两种方式:

  • BufferReader
  • Apache Commons IO
  • Java8 stream

BufferReader

我们可以使用 BufferReader#readLine 逐行读取数据。

try (BufferedReader fileBufferReader = new BufferedReader(new FileReader("temp/test.txt"))) {
    String fileLineContent;
    while ((fileLineContent = fileBufferReader.readLine()) != null) {
        // process the line.
    }
} catch (FileNotFoundException e) {
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();
}

Apache Commons IO

Common-IO 中有一个方法 FileUtils#lineIterator可以实现逐行读取方式,使用代码如下:

Stopwatch stopwatch = Stopwatch.createStarted();
LineIterator fileContents = FileUtils.lineIterator(new File("temp/test.txt"), StandardCharsets.UTF_8.name());
while (fileContents.hasNext()) {
    fileContents.nextLine();
    //  pass
}
logMemory();
fileContents.close();
stopwatch.stop();
System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s");

这个方法返回一个迭代器,每次我们都可以获取的一行数据。

其实我们查看代码,其实可以发现 FileUtils#lineIterator,其实用的就是 BufferReader,感兴趣的同学可以自己查看一下源码。

由于公号内无法插入外链,关注『Java极客技术』,回复『20200610』 获取源码

Java8 stream

Java8 Files 类新增了一个 lines,可以返回 Stream我们可以逐行处理数据。

Stopwatch stopwatch = Stopwatch.createStarted();
// lines(Path path, Charset cs)
try (Stream<String> inputStream = Files.lines(Paths.get("temp/test.txt"), StandardCharsets.UTF_8)) {
    inputStream
            .filter(str -> str.length() > 5)// 过滤数据
            .forEach(o -> {
                // pass do sample logic
            });
}
logMemory();
stopwatch.stop();
System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s");

使用这个方法有个好处在于,我们可以方便使用 Stream 链式操作,做一些过滤操作。

注意:这里我们使用 try-with-resources 方式,可以安全的确保读取结束,流可以被安全的关闭。

并发读取

逐行的读取的方式,解决我们 OOM 的问题。不过如果数据很多,我们这样一行行处理,需要花费很多时间。

上述的方式,只有一个线程在处理数据,那其实我们可以多来几个线程,增加并行度。

下面在上面的基础上,阿粉就抛砖引玉,介绍下阿粉自己比较常用两种并行处理方式。

逐行批次打包

第一种方式,先逐行读取数据,加载到内存中,等到积累一定数据之后,然后再交给线程池异步处理。

@SneakyThrows
public static void readInApacheIOWithThreadPool() {
    // 创建一个 最大线程数为 10,队列最大数为 100 的线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60l, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100));
    // 使用 Apache 的方式逐行读取数据
    LineIterator fileContents = FileUtils.lineIterator(new File("temp/test.txt"), StandardCharsets.UTF_8.name());
    List<String> lines = Lists.newArrayList();
    while (fileContents.hasNext()) {
        String nextLine = fileContents.nextLine();
        lines.add(nextLine);
        // 读取到十万的时候
        if (lines.size() == 100000) {
            // 拆分成两个 50000 ,交给异步线程处理
            List<List<String>> partition = Lists.partition(lines, 50000);
            List<Future> futureList = Lists.newArrayList();
            for (List<String> strings : partition) {
                Future<?> future = threadPoolExecutor.submit(() -> {
                    processTask(strings);
                });
                futureList.add(future);
            }
            // 等待两个线程将任务执行结束之后,再次读取数据。这样的目的防止,任务过多,加载的数据过多,导致 OOM
            for (Future future : futureList) {
                // 等待执行结束
                future.get();
            }
            // 清除内容
            lines.clear();
        }
    }
    // lines 若还有剩余,继续执行结束
    if (!lines.isEmpty()) {
        // 继续执行
        processTask(lines);
    }
  threadPoolExecutor.shutdown();
}
    private static void processTask(List<String> strings) {
        for (String line : strings) {
            // 模拟业务执行
            try {
                TimeUnit.MILLISECONDS.sleep(10L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

上述方法,等到内存的数据到达 10000 的时候,拆封两个任务交给异步线程执行,每个任务分别处理 50000 行数据。

后续使用  future#get(),等待异步线程执行完成之后,主线程才能继续读取数据。

之所以这么做,主要原因是因为,线程池的任务过多,再次导致 OOM 的问题。

大文件拆分成小文件

第二种方式,首先我们将一个大文件拆分成几个小文件,然后使用多个异步线程分别逐行处理数据。

public static void splitFileAndRead() throws Exception {
    // 先将大文件拆分成小文件
    List<File> fileList = splitLargeFile("temp/test.txt");
    // 创建一个 最大线程数为 10,队列最大数为 100 的线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60l, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100));
    List<Future> futureList = Lists.newArrayList();
    for (File file : fileList) {
        Future<?> future = threadPoolExecutor.submit(() -> {
            try (Stream inputStream = Files.lines(file.toPath(), StandardCharsets.UTF_8)) {
                inputStream.forEach(o -> {
                    // 模拟执行业务
                    try {
                        TimeUnit.MILLISECONDS.sleep(10L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        futureList.add(future);
    }
    for (Future future : futureList) {
        // 等待所有任务执行结束
        future.get();
    }
    threadPoolExecutor.shutdown();
}
private static List<File> splitLargeFile(String largeFileName) throws IOException {
    LineIterator fileContents = FileUtils.lineIterator(new File(largeFileName), StandardCharsets.UTF_8.name());
    List<String> lines = Lists.newArrayList();
    // 文件序号
    int num = 1;
    List<File> files = Lists.newArrayList();
    while (fileContents.hasNext()) {
        String nextLine = fileContents.nextLine();
        lines.add(nextLine);
        // 每个文件 10w 行数据
        if (lines.size() == 100000) {
            createSmallFile(lines, num, files);
            num++;
        }
    }
    // lines 若还有剩余,继续执行结束
    if (!lines.isEmpty()) {
        // 继续执行
        createSmallFile(lines, num, files);
    }
    return files;
}
相关文章
|
11月前
|
C++
《C++避坑神器·七》二进制读写自定义类型导致崩溃或数据读写不全问题
《C++避坑神器·七》二进制读写自定义类型导致崩溃或数据读写不全问题
84 0
|
5月前
|
监控 Linux
【专栏】在 Linux 中,掌握检查内存使用情况至关重要,因为内存问题可能导致系统性能下降甚至崩溃。这 5 个命令堪称绝了!
【4月更文挑战第28天】在 Linux 中,掌握检查内存使用情况至关重要,因为内存问题可能导致系统性能下降甚至崩溃。本文介绍了 5 个常用的检查内存命令:1) `free` 提供内存和交换区的详细信息;2) `top` 显示进程信息及内存使用;3) `vmstat` 输出系统综合信息,包括内存动态变化;4) `pidstat` 监控特定进程的内存使用;5) `/proc/meminfo` 文件提供系统内存详细数据。了解和使用这些命令能帮助用户及时发现并解决内存相关问题,确保系统稳定运行。
73 0
|
SQL 缓存 监控
掌握了这些优化技巧,再也不用担心接口性能上不去了!
优化接口性能对每个后端开发同学来说见惯不惯了,也是一项必备的技能,因为我们平时开发中都会对外提供接口,性能差的话,功能多少会有影响。
|
Java
项目实战典型案例20——内存长期占用导致系统慢
项目实战典型案例20——内存长期占用导致系统慢
81 0
【项目实战典型案例】20.内存长期占用导致系统慢
【项目实战典型案例】20.内存长期占用导致系统慢
|
Python
谈一谈|如何随意的对文件进行读写?
谈一谈|如何随意的对文件进行读写?
102 0
|
存储 缓存 文件存储
「系统」占用太多硬盘空间?试试用这些方法将它找回来
「系统」占用太多硬盘空间?试试用这些方法将它找回来
321 0
|
存储 消息中间件 Linux
看完这篇文章,我再也不用担心线上出现 CPU 性能问题了(上)
生产环境上出现 CPU 性能问题是非常典型的一类问题,往往这个时候就比较考验相关人员排查问题的能力
|
IDE Linux 调度
看完这篇文章,我再也不用担心线上出现 CPU 性能问题了(下)
在上一篇文章中咸鱼给大家介绍了 CPU 常见的性能指标,当生产环境出现 CPU 性能瓶颈的时候,优先观察这些指标有没有什么异常的地方,能解决大部分情况
|
存储 Apache 数据库
高效读取大文件,再也不用担心 OOM 了!(上)
最近阿粉接到一个需求,需要从文件读取数据,然后经过业务处理之后存储到数据库中。这个需求,说实话不是很难,阿粉很快完成了第一个版本。
高效读取大文件,再也不用担心 OOM 了!(上)