逐行读取
逐行读取的方式比较多,这里阿粉主要介绍两种方式:
- BufferReader
- Apache Commons IO
- Java8 stream
BufferReader
我们可以使用 BufferReader#readLine
逐行读取数据。
try (BufferedReader fileBufferReader = new BufferedReader(new FileReader("temp/test.txt"))) { String fileLineContent; while ((fileLineContent = fileBufferReader.readLine()) != null) { // process the line. } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }
Apache Commons IO
Common-IO 中有一个方法 FileUtils#lineIterator
可以实现逐行读取方式,使用代码如下:
Stopwatch stopwatch = Stopwatch.createStarted(); LineIterator fileContents = FileUtils.lineIterator(new File("temp/test.txt"), StandardCharsets.UTF_8.name()); while (fileContents.hasNext()) { fileContents.nextLine(); // pass } logMemory(); fileContents.close(); stopwatch.stop(); System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s");
这个方法返回一个迭代器,每次我们都可以获取的一行数据。
其实我们查看代码,其实可以发现 FileUtils#lineIterator
,其实用的就是 BufferReader
,感兴趣的同学可以自己查看一下源码。
由于公号内无法插入外链,关注『Java极客技术』,回复『20200610』 获取源码
Java8 stream
Java8 Files
类新增了一个 lines
,可以返回 Stream
我们可以逐行处理数据。
Stopwatch stopwatch = Stopwatch.createStarted(); // lines(Path path, Charset cs) try (Stream<String> inputStream = Files.lines(Paths.get("temp/test.txt"), StandardCharsets.UTF_8)) { inputStream .filter(str -> str.length() > 5)// 过滤数据 .forEach(o -> { // pass do sample logic }); } logMemory(); stopwatch.stop(); System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s");
使用这个方法有个好处在于,我们可以方便使用 Stream
链式操作,做一些过滤操作。
注意:这里我们使用
try-with-resources
方式,可以安全的确保读取结束,流可以被安全的关闭。
并发读取
逐行的读取的方式,解决我们 OOM 的问题。不过如果数据很多,我们这样一行行处理,需要花费很多时间。
上述的方式,只有一个线程在处理数据,那其实我们可以多来几个线程,增加并行度。
下面在上面的基础上,阿粉就抛砖引玉,介绍下阿粉自己比较常用两种并行处理方式。
逐行批次打包
第一种方式,先逐行读取数据,加载到内存中,等到积累一定数据之后,然后再交给线程池异步处理。
@SneakyThrows public static void readInApacheIOWithThreadPool() { // 创建一个 最大线程数为 10,队列最大数为 100 的线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60l, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100)); // 使用 Apache 的方式逐行读取数据 LineIterator fileContents = FileUtils.lineIterator(new File("temp/test.txt"), StandardCharsets.UTF_8.name()); List<String> lines = Lists.newArrayList(); while (fileContents.hasNext()) { String nextLine = fileContents.nextLine(); lines.add(nextLine); // 读取到十万的时候 if (lines.size() == 100000) { // 拆分成两个 50000 ,交给异步线程处理 List<List<String>> partition = Lists.partition(lines, 50000); List<Future> futureList = Lists.newArrayList(); for (List<String> strings : partition) { Future<?> future = threadPoolExecutor.submit(() -> { processTask(strings); }); futureList.add(future); } // 等待两个线程将任务执行结束之后,再次读取数据。这样的目的防止,任务过多,加载的数据过多,导致 OOM for (Future future : futureList) { // 等待执行结束 future.get(); } // 清除内容 lines.clear(); } } // lines 若还有剩余,继续执行结束 if (!lines.isEmpty()) { // 继续执行 processTask(lines); } threadPoolExecutor.shutdown(); } private static void processTask(List<String> strings) { for (String line : strings) { // 模拟业务执行 try { TimeUnit.MILLISECONDS.sleep(10L); } catch (InterruptedException e) { e.printStackTrace(); } } }
上述方法,等到内存的数据到达 10000 的时候,拆封两个任务交给异步线程执行,每个任务分别处理 50000 行数据。
后续使用 future#get()
,等待异步线程执行完成之后,主线程才能继续读取数据。
之所以这么做,主要原因是因为,线程池的任务过多,再次导致 OOM 的问题。
大文件拆分成小文件
第二种方式,首先我们将一个大文件拆分成几个小文件,然后使用多个异步线程分别逐行处理数据。
public static void splitFileAndRead() throws Exception { // 先将大文件拆分成小文件 List<File> fileList = splitLargeFile("temp/test.txt"); // 创建一个 最大线程数为 10,队列最大数为 100 的线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60l, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100)); List<Future> futureList = Lists.newArrayList(); for (File file : fileList) { Future<?> future = threadPoolExecutor.submit(() -> { try (Stream inputStream = Files.lines(file.toPath(), StandardCharsets.UTF_8)) { inputStream.forEach(o -> { // 模拟执行业务 try { TimeUnit.MILLISECONDS.sleep(10L); } catch (InterruptedException e) { e.printStackTrace(); } }); } catch (IOException e) { e.printStackTrace(); } }); futureList.add(future); } for (Future future : futureList) { // 等待所有任务执行结束 future.get(); } threadPoolExecutor.shutdown(); } private static List<File> splitLargeFile(String largeFileName) throws IOException { LineIterator fileContents = FileUtils.lineIterator(new File(largeFileName), StandardCharsets.UTF_8.name()); List<String> lines = Lists.newArrayList(); // 文件序号 int num = 1; List<File> files = Lists.newArrayList(); while (fileContents.hasNext()) { String nextLine = fileContents.nextLine(); lines.add(nextLine); // 每个文件 10w 行数据 if (lines.size() == 100000) { createSmallFile(lines, num, files); num++; } } // lines 若还有剩余,继续执行结束 if (!lines.isEmpty()) { // 继续执行 createSmallFile(lines, num, files); } return files; }