多线程模式
在我本地一共就几十篇博客的条件下执行一次还是很快的,但如果我们的文件是几万、几十万甚至上百万呢。
虽然功能可以实现,但可以想象这样的耗时绝对是成倍的增加。
这时多线程就发挥优势了,由多个线程分别去读取文件最后汇总结果即可。
这样实现的过程就变为:
- 读取某个目录下的所有文件。
- 将文件路径交由不同的线程自行处理。
- 最终汇总结果。
多线程带来的问题
也不是使用多线程就万事大吉了,先来看看第一个问题:共享资源。
简单来说就是怎么保证多线程和单线程统计的总字数是一致的。
基于我本地的环境先看看单线程运行的结果:
总计为:414142 字。
接下来换为多线程的方式:
List<String> allFile = scannerFile.getAllFile(strings[0]); logger.info("allFile size=[{}]",allFile.size()); for (String msg : allFile) { executorService.execute(new ScanNumTask(msg,filterProcessManager)); } public class ScanNumTask implements Runnable { private static Logger logger = LoggerFactory.getLogger(ScanNumTask.class); private String path; private FilterProcessManager filterProcessManager; public ScanNumTask(String path, FilterProcessManager filterProcessManager) { this.path = path; this.filterProcessManager = filterProcessManager; } @Override public void run() { Stream<String> stringStream = null; try { stringStream = Files.lines(Paths.get(path), StandardCharsets.UTF_8); } catch (Exception e) { logger.error("IOException", e); } List<String> collect = stringStream.collect(Collectors.toList()); for (String msg : collect) { filterProcessManager.process(msg); } } }
使用线程池管理线程,更多线程池相关的内容请看这里:《如何优雅的使用和理解线程池》
执行结果:
我们会发现无论执行多少次,这个值都会小于我们的预期值。
来看看统计那里是怎么实现的。
@Component public class TotalWords { private long sum = 0 ; public void sum(int count){ sum += count; } public long total(){ return sum; } }
可以看到就是对一个基本类型进行累加而已。那导致这个值比预期小的原因是什么呢?
我想大部分人都会说:多线程运行时会导致有些线程把其他线程运算的值覆盖。
但其实这只是导致这个问题的表象,根本原因还是没有讲清楚。
内存可见性
核心原因其实是由 Java 内存模型(JMM
)的规定导致的。
这里引用一段之前写的《你应该知道的 volatile 关键字》一段解释:
由于
Java
内存模型(JMM
)规定,所有的变量都存放在主内存中,而每个线程都有着自己的工作内存(高速缓存)。
线程在工作时,需要将主内存中的数据拷贝到工作内存中。这样对数据的任何操作都是基于工作内存(效率提高),并且不能直接操作主内存以及其他线程工作内存中的数据,之后再将更新之后的数据刷新到主内存中。
这里所提到的主内存可以简单认为是堆内存,而工作内存则可以认为是栈内存。
如下图所示:
所以在并发运行时可能会出现线程 B 所读取到的数据是线程 A 更新之前的数据。
更多相关内容就不再展开了,感兴趣的朋友可以翻翻以前的博文。
直接来说如何解决这个问题吧,JDK 其实已经帮我们想到了这些问题。
在 java.util.concurrent
并发包下有许多你可能会使用到的并发工具。
这里就非常适合 AtomicLong
,它可以原子性的对数据进行修改。
来看看修改后的实现:
@Component public class TotalWords { private AtomicLong sum = new AtomicLong() ; public void sum(int count){ sum.addAndGet(count) ; } public long total(){ return sum.get() ; } }
只是使用了它的两个 API
而已。再来运行下程序会发现结果居然还是不对。
甚至为 0 了。
线程间通信
这时又出现了一个新的问题,来看看获取总计数据是怎么实现的。
List<String> allFile = scannerFile.getAllFile(strings[0]); logger.info("allFile size=[{}]",allFile.size()); for (String msg : allFile) { executorService.execute(new ScanNumTask(msg,filterProcessManager)); } executorService.shutdown(); long total = totalWords.total(); long end = System.currentTimeMillis(); logger.info("total sum=[{}],[{}] ms",total,end-start);
不知道大家看出问题没有,其实是在最后打印总数时并不知道其他线程是否已经执行完毕了。
因为 executorService.execute()
会直接返回,所以当打印获取数据时还没有一个线程执行完毕,也就导致了这样的结果。
关于线程间通信之前我也写过相关的内容:《深入理解线程通信》
大概的方式有以下几种:
这里我们使用线程池的方式:
在停用线程池后加上一个判断条件即可:
executorService.shutdown(); while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) { logger.info("worker running"); } long total = totalWords.total(); long end = System.currentTimeMillis(); logger.info("total sum=[{}],[{}] ms",total,end-start);
这样我们再次尝试,发现无论多少次结果都是正确的了:
效率提升
可能还会有朋友问,这样的方式也没见提升多少效率啊。
这其实是由于我本地文件少,加上一个文件处理的耗时也比较短导致的。
甚至线程数开的够多导致频繁的上下文切换还是让执行效率降低。
为了模拟效率的提升,每处理一个文件我都让当前线程休眠 100 毫秒来模拟执行耗时。
先看单线程运行需要耗时多久。
总共耗时:[8404] ms
接着在线程池大小为 4 的情况下耗时:
总共耗时:[2350] ms
可见效率提升还是非常明显的。
更多思考
这只是多线程其中的一个用法,相信看到这里的朋友应该多它的理解更进一步了。
再给大家留个阅后练习,场景也是类似的:
在 Redis 或者其他存储介质中存放有上千万的手机号码数据,每个号码都是唯一的,需要在最快的时间内把这些号码全部都遍历一遍。
有想法感兴趣的朋友欢迎在文末留言参与讨论🤔🤨。
总结
希望看完的朋友心中能对文初的几个问题能有自己的答案:
- 为什么需要多线程?
- 怎么实现一个多线程程序?
- 多线程带来的问题及解决方案?
文中的代码都在此处。