自定义 ForkJoinPool 提升并行流 ParallelStream 执行速度

简介: 简介在 java8 中 添加了流Stream,可以让你以一种声明的方式处理数据。使用起来非常简单优雅。ParallelStream 则是一个并行执行的流,采用 ForkJoinPool 并行执行任务,提高执行速度。

简介

在 java8 中 添加了流Stream,可以让你以一种声明的方式处理数据。使用起来非常简单优雅。ParallelStream 则是一个并行执行的流,采用 ForkJoinPool 并行执行任务,提高执行速度。
     
    下面我们看看2个简单的示例:

示例1 (list)

Arrays.asList(1,2,3,4,5,6)
    .parallelStream()
    .forEach((value) -> {
        String name = Thread.currentThread().getName();
        System.out.println("示例1 Thread:" + name + " value:" + value);
    });

示例2 (array)

Stream.of(1,2,3,4,5,6)
    .parallel()
    .forEach((value) -> {
        String name = Thread.currentThread().getName();
        System.out.println("示例2 Thread:" + name + " value:" + value);
    });

问题引出

笔者最近在做一些爬虫相关的业务,其核心工具已开源 mica-http:https://gitee.com/596392912/mica/tree/master/mica-http ,经过2个版本的迭代已经发展成了一个强大非账号爬虫利器,赶紧来试试吧。

image.png

image.png

    我们采集了大量的代理 ip 用来供爬虫使用,其中有个定时任务每 5 分钟去检测代理是否失效,代理 ip 检测比较费时,我们给每个检测的请求
设定了 2s 的超时,这样单线程的话 1000 个 ip 就得消耗半个多小时,当然笔者在校验的时候采用的 parallel Stream 简化开发。

    然后发现效果并不明显,代理 ip 数量上来之后 5 分钟完全检测不完,导致任务堆积。明明用了并发流为什么没有明显的提高执行速度呢?

001.png

    下面我们来看看刚刚的“示例”打印出的信息:

示例1 Thread:main value:4
示例1 Thread:ForkJoinPool.commonPool-worker-2 value:1
示例1 Thread:main value:6
示例1 Thread:ForkJoinPool.commonPool-worker-2 value:5
示例1 Thread:main value:3
示例1 Thread:ForkJoinPool.commonPool-worker-1 value:2
示例2 Thread:main value:4
示例2 Thread:ForkJoinPool.commonPool-worker-3 value:3
示例2 Thread:ForkJoinPool.commonPool-worker-2 value:5
示例2 Thread:ForkJoinPool.commonPool-worker-4 value:1
示例2 Thread:ForkJoinPool.commonPool-worker-5 value:2
示例2 Thread:ForkJoinPool.commonPool-worker-1 value:6

我们可以看到 Parallel Stream,默认采用的是一个 ForkJoinPool.commonPool 的线程池,这样我们就算使用了 Parallel Stream,
整个 jvm 共用一个 common pool 线程池,一不小心就任务堆积了,在校验代理 ip 的时候我们还有采集代理等其他的任务中也大量使用了并发流,
这样也就印证了为什么会任务堆积了。

解决问题

使用自定义 ForkJoinPool 执行速度。示例代码如下:

// 示例:自定义线程池
ForkJoinPool forkJoinPool = new ForkJoinPool(8);

// 这里是从数据库里查出来的一批代理 ip
List<ProxyList> records = new ArrayList<>();

// 找出失效的代理 ip
List<String> needDeleteList = forkJoinPool.submit(() -> records.parallelStream()
    .map(ProxyList::getIpPort)
    .filter(IProxyListTask::isFailed)
    .collect(Collectors.toList())
).fork().join();

// 删除失效的代理

    整个代码依然比较优雅,在使用自定义的 ForkJoin 线程池之后,执行速度有了明显的提升。以前 5 分钟执行不完的任务现在 2 分钟之内就能全部执行完毕。

结论

java8 的并发流在大批量数据处理时可简化多线程的使用,在遇到耗时业务或者重度使用并发流不妨根据业务情况采用自定义线程池来提示处理速度。

开源推荐

目录
相关文章
|
Java
java8中修改parallelStream默认并发数
java8中修改parallelStream默认并发数
2948 0
java8中修改parallelStream默认并发数
|
canal 消息中间件 存储
手把手告诉你如何监听 MySQL binlog 实现数据变化后的实时通知!
Hello 大家好,我是阿粉。不知道大家在日常的工作中有没有遇到这样的场景,很多时候业务数据有变更需要及时加载到缓存、ES 或者发送到消息队列中通知下游服务。
7778 0
手把手告诉你如何监听 MySQL binlog 实现数据变化后的实时通知!
|
算法 Linux 调度
Docker的资源限制实战篇
本文详细介绍了如何利用Docker对容器的资源进行限制,包括内存和CPU的使用。文章首先概述了资源限制的重要性及其在Linux系统中的实现原理,并强调了不当设置可能导致的风险。接着,通过一系列实战案例展示了如何具体设置容器的内存限制,包括硬性限制、动态调整以及软限制等。最后,文章还提供了限制容器CPU访问的具体方法和示例,如指定容器使用的CPU核心数和基于`--cpu-shares`参数对CPU资源进行分配。通过这些实践,读者可以更好地理解和掌握Docker资源管理技巧。
1505 14
Docker的资源限制实战篇
|
XML Java 程序员
保姆级教程,手把手教你实现SpringBoot自定义starter
保姆级教程,手把手教你实现SpringBoot自定义starter
14820 2
保姆级教程,手把手教你实现SpringBoot自定义starter
|
算法 前端开发 Java
支撑每秒数百万订单无压力,SpringBoot + Disruptor 太猛了!
本文详细介绍如何通过 Spring Boot 集成 Disruptor 实现每秒处理数百万订单的高性能系统。Disruptor 是一种无锁并发框架,采用环形缓冲区和无锁算法,提供极低延迟和高吞吐量。文章涵盖 Maven 配置、事件工厂、处理器及生产者实现,并通过 REST API 和 Thymeleaf 展示订单创建流程。Disruptor 在高并发场景下表现出色,是解决高性能并发处理的理想方案。
|
JSON Java fastjson
JMH - Java 代码性能测试的终极利器、必须掌握
JMH - Java 代码性能测试的终极利器、必须掌握
4856 1
|
Java Spring
在Spring Boot中,可以通过控制`@PostConstruct`注解方法的执行顺序来实现初始化时的顺序控制
在Spring Boot中,可以通过控制`@PostConstruct`注解方法的执行顺序来实现初始化时的顺序控制
1583 1
|
存储 算法 Java
深入解析Java中的ForkJoinPool:分而治之,并行处理的利器
深入解析Java中的ForkJoinPool:分而治之,并行处理的利器
|
数据库 测试技术 Java
阿里技术专家详解DDD系列 第二弹 - 应用架构
应用架构,指软件系统中固定不变的代码结构、设计模式、规范和组件间的通信方式。在应用开发中架构之所以是最重要的第一步,因为一个好的架构能让系统安全、稳定、快速迭代。但是今天我们在做业务研发时,更多会关注一些宏观的架构,而忽略了应用内部的架构设计,希望能通过案例分析和重构,推演出一套高质量的DDD架构。
60359 25
阿里技术专家详解DDD系列 第二弹 - 应用架构

热门文章

最新文章