自定义 ForkJoinPool 提升并行流 ParallelStream 执行速度

简介: 简介在 java8 中 添加了流Stream,可以让你以一种声明的方式处理数据。使用起来非常简单优雅。ParallelStream 则是一个并行执行的流,采用 ForkJoinPool 并行执行任务,提高执行速度。

简介

在 java8 中 添加了流Stream,可以让你以一种声明的方式处理数据。使用起来非常简单优雅。ParallelStream 则是一个并行执行的流,采用 ForkJoinPool 并行执行任务,提高执行速度。
     
    下面我们看看2个简单的示例:

示例1 (list)

Arrays.asList(1,2,3,4,5,6)
    .parallelStream()
    .forEach((value) -> {
        String name = Thread.currentThread().getName();
        System.out.println("示例1 Thread:" + name + " value:" + value);
    });

示例2 (array)

Stream.of(1,2,3,4,5,6)
    .parallel()
    .forEach((value) -> {
        String name = Thread.currentThread().getName();
        System.out.println("示例2 Thread:" + name + " value:" + value);
    });

问题引出

笔者最近在做一些爬虫相关的业务,其核心工具已开源 mica-http:https://gitee.com/596392912/mica/tree/master/mica-http ,经过2个版本的迭代已经发展成了一个强大非账号爬虫利器,赶紧来试试吧。

image.png

image.png

    我们采集了大量的代理 ip 用来供爬虫使用,其中有个定时任务每 5 分钟去检测代理是否失效,代理 ip 检测比较费时,我们给每个检测的请求
设定了 2s 的超时,这样单线程的话 1000 个 ip 就得消耗半个多小时,当然笔者在校验的时候采用的 parallel Stream 简化开发。

    然后发现效果并不明显,代理 ip 数量上来之后 5 分钟完全检测不完,导致任务堆积。明明用了并发流为什么没有明显的提高执行速度呢?

001.png

    下面我们来看看刚刚的“示例”打印出的信息:

示例1 Thread:main value:4
示例1 Thread:ForkJoinPool.commonPool-worker-2 value:1
示例1 Thread:main value:6
示例1 Thread:ForkJoinPool.commonPool-worker-2 value:5
示例1 Thread:main value:3
示例1 Thread:ForkJoinPool.commonPool-worker-1 value:2
示例2 Thread:main value:4
示例2 Thread:ForkJoinPool.commonPool-worker-3 value:3
示例2 Thread:ForkJoinPool.commonPool-worker-2 value:5
示例2 Thread:ForkJoinPool.commonPool-worker-4 value:1
示例2 Thread:ForkJoinPool.commonPool-worker-5 value:2
示例2 Thread:ForkJoinPool.commonPool-worker-1 value:6

我们可以看到 Parallel Stream,默认采用的是一个 ForkJoinPool.commonPool 的线程池,这样我们就算使用了 Parallel Stream,
整个 jvm 共用一个 common pool 线程池,一不小心就任务堆积了,在校验代理 ip 的时候我们还有采集代理等其他的任务中也大量使用了并发流,
这样也就印证了为什么会任务堆积了。

解决问题

使用自定义 ForkJoinPool 执行速度。示例代码如下:

// 示例:自定义线程池
ForkJoinPool forkJoinPool = new ForkJoinPool(8);

// 这里是从数据库里查出来的一批代理 ip
List<ProxyList> records = new ArrayList<>();

// 找出失效的代理 ip
List<String> needDeleteList = forkJoinPool.submit(() -> records.parallelStream()
    .map(ProxyList::getIpPort)
    .filter(IProxyListTask::isFailed)
    .collect(Collectors.toList())
).fork().join();

// 删除失效的代理

    整个代码依然比较优雅,在使用自定义的 ForkJoin 线程池之后,执行速度有了明显的提升。以前 5 分钟执行不完的任务现在 2 分钟之内就能全部执行完毕。

结论

java8 的并发流在大批量数据处理时可简化多线程的使用,在遇到耗时业务或者重度使用并发流不妨根据业务情况采用自定义线程池来提示处理速度。

开源推荐

目录
相关文章
|
Java
java8中修改parallelStream默认并发数
java8中修改parallelStream默认并发数
2297 0
java8中修改parallelStream默认并发数
|
缓存 Java API
Java工具篇之Guava-retry重试组件
Guava 是一组来自 Google 的核心 Java 库,其中包括新的集合类型(例如 multimap 和 multiset)、不可变集合、图形库以及用于并发、I/O、散列、缓存、原语、字符串等的实用程序!它广泛用于 Google 内部的大多数 Java 项目,也被许多其他公司广泛使用。 API 非常的简单,我们可以非常轻松的使用,来封装成我们业务中自己的组件。
1143 0
|
11月前
|
Arthas 监控 数据可视化
类似arthas的工具还有其他的吗?
类似arthas的工具还有其他的吗?
464 6
|
存储 算法 Java
深入解析Java中的ForkJoinPool:分而治之,并行处理的利器
深入解析Java中的ForkJoinPool:分而治之,并行处理的利器
|
Java UED
基于SpringBoot自定义线程池实现多线程执行方法,以及多线程之间的协调和同步
这篇文章介绍了在SpringBoot项目中如何自定义线程池来实现多线程执行方法,并探讨了多线程之间的协调和同步问题,提供了相关的示例代码。
3496 0
|
消息中间件 存储 缓存
一文快速掌握高性能内存队列Disruptor
`Disruptor`是LMAX公司开源的高性能内存消息队列,单线程处理能力可达600w订单/秒。本文从使用和设计角度探讨这款Java消息队列。作者sharkChili是Java开发者,CSDN博客专家,Java Guide项目维护者。文章介绍了Disruptor的基础使用,包括前置步骤、消息模型、消息处理器配置、生产者实现,并展示了效果。同时,文章详细解析了Disruptor的工作流程和高效原因,如无锁操作、分支预测和缓存填充。最后,作者提供相关资源链接并邀请读者加入交流群。
2668 0
|
分布式计算 并行计算 算法
【高并发】什么是ForkJoin?看这一篇就够了!
在JDK中,提供了这样一种功能:它能够将复杂的逻辑拆分成一个个简单的逻辑来并行执行,待每个并行执行的逻辑执行完成后,再将各个结果进行汇总,得出最终的结果数据。有点像Hadoop中的MapReduce。 ForkJoin是由JDK1.7之后提供的多线程并发处理框架。ForkJoin框架的基本思想是分而治之。什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值分解成多个计算,然后将各个计算结果进行汇总。相应的,ForkJoin将复杂的计算当做一个任务,而分解的多个计算则是当做一个个子任务来并行执行。
6852 0
【高并发】什么是ForkJoin?看这一篇就够了!
|
监控 中间件 关系型数据库
MyCAT、ShardingSphere和Mocc这三个中间件的优缺点对比
MyCAT、ShardingSphere和Mocc这三个中间件的优缺点对比
1324 0
|
SQL 监控 druid
Spring Boot 整合 Druid 指南
Spring Boot 整合 Druid 指南
45448 3
|
分布式计算 并行计算 数据可视化
战斗到底:Java vs. Python - 用哪个更适合处理海量数据?
战斗到底:Java vs. Python - 用哪个更适合处理海量数据?
1759 0