场景说明
现有一个 10G 文件的数据,里面包含了 18-70 之间的整数,分别表示 18-70 岁的人群数量统计,假设年龄范围分布均匀,分别表示系统中所有用户的年龄数,找出重复次数最多的那个数,现有一台内存为 4G、2 核 CPU 的电脑,请写一个算法实现。
23,31,42,19,60,30,36,........
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
模拟数据
Java 中一个整数占 4 个字节,模拟 10G 为 30 亿左右个数据, 采用追加模式写入 10G 数据到硬盘里。每 100 万个记录写一行,大概 4M 一行,10G 大概 2500 行数据。
package bigdata; import java.io.*; import java.util.Random; /** * @Desc: * @Author: bingbing * @Date: 2022/5/4 0004 19:05 */ public class GenerateData { private static Random random = new Random(); public static int generateRandomData(int start, int end) { return random.nextInt(end - start + 1) + start; } /** * 产生10G的 1-1000的数据在D盘 */ public void generateData() throws IOException { File file = new File("D:\ User.dat"); if (!file.exists()) { try { file.createNewFile(); } catch (IOException e) { e.printStackTrace(); } } int start = 18; int end = 70; long startTime = System.currentTimeMillis(); BufferedWriter bos = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, true))); for (long i = 1; i < Integer.MAX_VALUE * 1.7; i++) { String data = generateRandomData(start, end) + ","; bos.write(data); // 每100万条记录成一行,100万条数据大概4M if (i % 1000000 == 0) { bos.write("\n"); } } System.out.println("写入完成! 共花费时间:" + (System.currentTimeMillis() - startTime) / 1000 + " s"); bos.close(); } public static void main(String[] args) { GenerateData generateData = new GenerateData(); try { generateData.generateData(); } catch (IOException e) { e.printStackTrace(); } } }
上述代码调整参数执行 2 次,凑 10 个 G 的数据在 D 盘的 User.dat 文件里。
准备好 10G 数据后,接着写如何处理这些数据。
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
场景分析
10G 的数据比当前拥有的运行内存大的多,不能全量加载到内存中读取,如果采用全量加载,那么内存会直接爆掉,只能按行读取,Java 中的 bufferedReader 的 readLine() 按行读取文件里的内容。
读取数据
首先我们写一个方法单线程读完这 30E 数据需要多少时间,每读 100 行打印一次:
private static void readData() throws IOException { BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(FILE_NAME), "utf-8")); String line; long start = System.currentTimeMillis(); int count = 1; while ((line = br.readLine()) != null) { // 按行读取 // SplitData.splitLine(line); if (count % 100 == 0) { System.out.println("读取100行,总耗时间: " + (System.currentTimeMillis() - start) / 1000 + " s"); System.gc(); } count++; } running = false; br.close(); }
按行读完 10G 的数据大概 20 秒,基本每 100 行,1E 多数据花 1S,速度还挺快:
处理数据
思路一:通过单线程处理
通过单线程处理,初始化一个 countMap,key 为年龄,value 为出现的次数,将每行读取到的数据按照 "," 进行分割,然后获取到的每一项进行保存到 countMap 里,如果存在,那么值 key 的 value+1。
for (int i = start; i <= end; i++) { try { File subFile = new File(dir + "\" + i + ".dat"); if (!file.exists()) { subFile.createNewFile(); } countMap.computeIfAbsent(i + "", integer -> new AtomicInteger(0)); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }
单线程读取并统计 countMap:
public static void splitLine(String lineData) { String[] arr = lineData.split(","); for (String str : arr) { if (StringUtils.isEmpty(str)) { continue; } countMap.computeIfAbsent(str, s -> new AtomicInteger(0)).getAndIncrement(); } }
通过比较找出年龄数最多的年龄并打印出来:
private static void findMostAge() { Integer targetValue = 0; String targetKey = null; Iterator<Map.Entry<String, AtomicInteger>> entrySetIterator = countMap.entrySet().iterator(); while (entrySetIterator.hasNext()) { Map.Entry<String, AtomicInteger> entry = entrySetIterator.next(); Integer value = entry.getValue().get(); String key = entry.getKey(); if (value > targetValue) { targetValue = value; targetKey = key; } } System.out.println("数量最多的年龄为:" + targetKey + "数量为:" + targetValue); }
完整代码:
package bigdata; import org.apache.commons.lang3.StringUtils; import java.io.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; /** * @Desc: * @Author: bingbing * @Date: 2022/5/4 0004 19:19 * 单线程处理 */ public class HandleMaxRepeatProblem_v0 { public static final int start = 18; public static final int end = 70; public static final String dir = "D:\dataDir"; public static final String FILE_NAME = "D:\ User.dat"; /** * 统计数量 */ private static Map<String, AtomicInteger> countMap = new ConcurrentHashMap<>(); /** * 开启消费的标志 */ private static volatile boolean startConsumer = false; /** * 消费者运行保证 */ private static volatile boolean consumerRunning = true; /** * 按照 "," 分割数据,并写入到countMap里 */ static class SplitData { public static void splitLine(String lineData) { String[] arr = lineData.split(","); for (String str : arr) { if (StringUtils.isEmpty(str)) { continue; } countMap.computeIfAbsent(str, s -> new AtomicInteger(0)).getAndIncrement(); } } } /** * init map */ static { File file = new File(dir); if (!file.exists()) { file.mkdir(); } for (int i = start; i <= end; i++) { try { File subFile = new File(dir + "\" + i + ".dat"); if (!file.exists()) { subFile.createNewFile(); } countMap.computeIfAbsent(i + "", integer -> new AtomicInteger(0)); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) { new Thread(() -> { try { readData(); } catch (IOException e) { e.printStackTrace(); } }).start(); } private static void readData() throws IOException { BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(FILE_NAME), "utf-8")); String line; long start = System.currentTimeMillis(); int count = 1; while ((line = br.readLine()) != null) { // 按行读取,并向map里写入数据 SplitData.splitLine(line); if (count % 100 == 0) { System.out.println("读取100行,总耗时间: " + (System.currentTimeMillis() - start) / 1000 + " s"); try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } count++; } findMostAge(); br.close(); } private static void findMostAge() { Integer targetValue = 0; String targetKey = null; Iterator<Map.Entry<String, AtomicInteger>> entrySetIterator = countMap.entrySet().iterator(); while (entrySetIterator.hasNext()) { Map.Entry<String, AtomicInteger> entry = entrySetIterator.next(); Integer value = entry.getValue().get(); String key = entry.getKey(); if (value > targetValue) { targetValue = value; targetKey = key; } } System.out.println("数量最多的年龄为:" + targetKey + "数量为:" + targetValue); } private static void clearTask() { // 清理,同时找出出现字符最大的数 findMostAge(); System.exit(-1); } }
测试结果: 总共花了 3 分钟读取完并统计完所有数据。
内存消耗为 2G-2.5G,CPU 利用率太低,只向上浮动了 20%-25% 之间:
要想提高 CPU 的利用率,那么可以使用多线程去处理。下面我们使用多线程去解决这个 CPU 利用率低的问题。