1.问题说明
大数据量的List问题处理,多线程分批处理,需要解决的问题:
- 下标越界。
- 线程安全。
- 数据丢失。
private List<Map<String, Object>> dealDataByThreads(List<String> dataList) { int dataSize = dataList.size(); // 结果数据【1】 List<Map<String, Object>> dataMap = Collections.synchronizedList(new ArrayList<>(dataSize)); // 每批的记录数据并计算批次 int numberBatch = 2000; double number = dataSize * 1.0 / numberBatch; int n = ((Double) Math.ceil(number)).intValue(); // 根据参数开启线程 CountDownLatch countDownLatch = new CountDownLatch(n); ThreadPoolExecutor executor = ThreadManager.executor; // 分批处理数据 for (int i = 0; i < n; i++) { // 计算sub的toIndex int end = numberBatch * (i + 1); if (end > dataSize) { end = dataSize; } // 获取分批数据 List<String> dataListSub = dataList.subList(numberBatch * i, end); int finalI = i; int finalEnd = end; executor.submit(() -> { long startMillis = System.currentTimeMillis(); //【2】 dataMap.addAll(dealData(dataListSub)); countDownLatch.countDown(); long endMillis = System.currentTimeMillis(); log.info("当前线程[{}]处理{}-{}/{}耗时[{}]毫秒", Thread.currentThread().getName(), numberBatch * finalI, finalEnd, dataSize, endMillis - startMillis); }); } // 主线程等待 try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } return dataMap; }
2.代码说明
- 使用
dataSize
初始化 Map 对象,可以避免 Map 对象的重构导致的下标越界。
List<Map<String, Object>> dataMap = Collections.synchronizedList(new ArrayList<>(dataSize));
- 不要将
dataMap
对象给线程,dataMap.add(dataListSubAfterDeal) 方法可能会覆盖 List 内的对象。
// 正确方式 dataMap.addAll(dealData(dataListSub)); // 错误方式 dealData(dataListSub,dataMap); dataMap.add(dataListSubAfterDeal);
- 为什么不使用线程安全的
Vector
或CopyOnWriteArrayList
。
// 没有达到多线程期待的效果。
3.方案效率
3 > 2 >1
// 1.线程安全,性能较低,没有达到多线程期待的效果。 Vector<Map<String, Object>> vector = new Vector<>(dataSize); // 2.CopyOnWriteArrayList比Vector效率好一些前者synchronized后者lock List<Map<String, Object>> resultList = new CopyOnWriteArrayList<>(); // 3.Collections.synchronizedList将线程不安全的集合转成线程安全的集合 List<Map<String, Object>> dataMap = Collections.synchronizedList(new ArrayList<>(dataSize));