【精通函数式编程】(六) Stream高并发实战

简介: 本讲为Stream高并发(并发、并行、多线程)、ForkJoin线程池框架的实战

前言:

📫 作者简介:小明 java 问道之路,专注于研究计算机底层,就职于金融公司后端高级工程师,擅长交易领域的高安全/可用/并发/性能的设计和架构📫

🏆 Java 领域优质创作者、阿里云专家博主、华为云享专家🏆

🔥 如果此文还不错的话,还请👍关注、点赞、收藏三连支持👍一下博主哦

本文导读

我们上讲看Stream接口提供大量API可以方便的处理元素,这讲Stream高并发(并发、并行、多线程)、ForkJoin线程池框架的实战

一、并行流(parallelStream、parallel、sequential)

并行流就是把一个内容拆分成多个数据块来执行,用不同的线程分别处理每个数据块的流

parallelStream、.parallel()都可以将流转换成并行流,.parallel()的粒度更小。要注意的是顺序流( .stream) 调用parallel方法不意味本身有什么实际变化,它内部设置了一个boolean,表示调用parallel之后的操作都是并行的

对并行流调用后可以使用 .sequential() 变成顺序流。

List<String> collect10 = orderInfos.stream().parallel()
                .map(OrderInfo::getOrderId).collect(toList());
        BigDecimal totalSubOrderAmt1 = orderInfos.stream()
                .map(OrderInfo::getSubOrderInfoList)
                .flatMap(subOrderInfos -> subOrderInfos.stream())
                .filter(subOrderInfo -> null != subOrderInfo.getSubOrderAmt())
                .parallel()
                .map(SubOrderInfo::getSubOrderAmt)
                .sequential()
                .reduce(BigDecimal.ZERO, BigDecimal::add);
        List<String> collect11 = orderInfos.parallelStream()
                .map(OrderInfo::getOrderId).collect(toList());

并行流 不是一定并行,多线程中保证原子操作会有对象的可变状态,当多个线程共享对象时,共享的可变状态会不断被线程锁住,会影响并行流和并行计算

所以我们是否可以使用并行流需要避免共享变量,而且当较小的数据量的时候并行处理的开销不一定会小于计算开销,同时集合的数据结构也会对并行流有影响,ArrayList拆分的开销,要小于LinkedList(前者底层是数组可以直接在内存拆分,后者底层是链表,内存不连续需要遍历拆分,ArrayList、HashSet、TreeSet可拆分性好,LinkedList可拆分性极差)

二、Fork/Jion框架

并行流的背后的原理是java7里面的分支/合并框架,分支/合并框架的目的就是以递归的方式,将数据块(任务)并行的拆分成更小的数据块,然后将每个子任务合并成整体,ExecutorService 接口的实现,ExecutorService把子任务分配给线程池 ForkJionPool 中的工作线程。

要把任务提交到这个ForkJionPool 线程池,必须创建RecursiveTask的实现类,要定义RecursiveTask只需要实现他的抽象方法compute

网络异常,图片无法展示
|

创建一个 ForkJoinTask,并把对象传给我们自定义的ForkJoinCalculator,创建一个 ForkJoinPool 并把任务传递给他的调用方法invoke,返回值就是ForkJoinCalculator定义的结果

扩展RecursiveTask,穿件ForkJoin框架,创建起始位置和终止位置,实现compute方法,实现fork、join金额累加

public static void main(String[] args) {
        List<OrderInfo> orderInfos = Arrays.asList(new OrderInfo("123", BigDecimal.ONE),
                new OrderInfo("456", BigDecimal.TEN), new OrderInfo("789", BigDecimal.TEN));
        // 创建一个 ForkJoinTask,并把对象传给我们自定义的ForkJoinCalculator
        ForkJoinCalculator orderInfoForkJoinTask = new ForkJoinCalculator(orderInfos, 0, orderInfos.size());
        // 创建一个 ForkJoinPool 并把任务传递给他的调用方法,返回值就是ForkJoinCalculator定义的结果
        System.out.println(new ForkJoinPool().invoke(orderInfoForkJoinTask));
    }
    /**
     * 扩展RecursiveTask,穿件ForkJoin框架,创建起始位置和终止位置
     */
    static class ForkJoinCalculator extends RecursiveTask<BigDecimal> {
        List<OrderInfo> orderInfos;
        int start;
        int end;
        BigDecimal amt;
        public ForkJoinCalculator(List<OrderInfo> orderInfos, int start, int end) {
            this.orderInfos = orderInfos;
            this.amt = orderInfos.get(start).getOrderAmt();
            this.start = start;
            this.end = end;
        }
        @Override
        protected BigDecimal compute() {
            int length = end - start;
            // 如果大小等于阈值,返回结果
            if (length < 2)
                return amt; // 订单金额
            ForkJoinCalculator task = new ForkJoinCalculator(orderInfos, start, start + length / 2);
            task.fork();
            ForkJoinCalculator task1 = new ForkJoinCalculator(orderInfos, start + length / 2, end);
            BigDecimal compute = task1.compute();
            BigDecimal join = task.join();
            System.out.println("compute:" + compute + "  join:" + join);
            return compute.add(join); // 累加
        }
    }

Fork/Jion框架还有几个框架需要注意的,对于一个任务调用join方法,会阻塞调用方,直到出任务结果;不应该在RecursiveTask里面实现ForkJoinPool 应该使用compute或fork方法;compute实现需要中断方法并且这里面实现比较困难需要多加练习。

网络异常,图片无法展示
|

工作窃取,在实际工作中任务差不多被平均分配到ForkJoinPool的所有线程,每个线程都为分配给线程的任务保存一个 双向LinkedQueue(双向链式队列),每个人物完成就会从队列的头取一个在进行执行,但是某些线程执行可能过快,此时这个线程会从其他线程的 队列尾,取走一个任务执行,这也是Fork/Jion框架能有高性能的原因

总结

本文讲解Stream高并发(并发、并行、多线程)、ForkJoin线程池框架的实战

相关文章
|
存储 缓存 安全
高并发内存池实战:用C++构建高性能服务器(下)
高并发内存池实战:用C++构建高性能服务器
高并发内存池实战:用C++构建高性能服务器(下)
|
人工智能 JSON 前端开发
【Spring boot实战】Springboot+对话ai模型整体框架+高并发线程机制处理优化+提示词工程效果展示(按照框架自己修改可对接市面上百分之99的模型)
【Spring boot实战】Springboot+对话ai模型整体框架+高并发线程机制处理优化+提示词工程效果展示(按照框架自己修改可对接市面上百分之99的模型)
|
5月前
|
缓存 监控 Cloud Native
Java Solon v3.2.0 高并发与低内存实战指南之解决方案优化
本文深入解析了Java Solon v3.2.0框架的实战应用,聚焦高并发与低内存消耗场景。通过响应式编程、云原生支持、内存优化等特性,结合API网关、数据库操作及分布式缓存实例,展示其在秒杀系统中的性能优势。文章还提供了Docker部署、监控方案及实际效果数据,助力开发者构建高效稳定的应用系统。代码示例详尽,适合希望提升系统性能的Java开发者参考。
268 4
Java Solon v3.2.0 高并发与低内存实战指南之解决方案优化
|
5月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
1553 7
|
4月前
|
数据采集 监控 网络协议
基于aiohttp的高并发爬虫实战:从原理到代码的完整指南
在数据驱动时代,传统同步爬虫效率低下,而基于Python的aiohttp库可构建高并发异步爬虫。本文通过实战案例解析aiohttp的核心组件与优化策略,包括信号量控制、连接池复用、异常处理等,并探讨代理集成、分布式架构及反爬应对方案,助你打造高性能、稳定可靠的网络爬虫系统。
306 0
|
7月前
|
SQL 安全 测试技术
2025接口测试全攻略:高并发、安全防护与六大工具实战指南
本文探讨高并发稳定性验证、安全防护实战及六大工具(Postman、RunnerGo、Apipost、JMeter、SoapUI、Fiddler)选型指南,助力构建未来接口测试体系。接口测试旨在验证数据传输、参数合法性、错误处理能力及性能安全性,其重要性体现在早期发现问题、保障系统稳定和支撑持续集成。常用方法包括功能、性能、安全性及兼容性测试,典型场景涵盖前后端分离开发、第三方服务集成与数据一致性检查。选择合适的工具需综合考虑需求与团队协作等因素。
1052 24
|
缓存 NoSQL Java
Java高并发实战:利用线程池和Redis实现高效数据入库
Java高并发实战:利用线程池和Redis实现高效数据入库
966 0
|
11月前
|
缓存 NoSQL Java
高并发场景秒杀抢购超卖Bug实战重现
在电商平台的秒杀活动中,高并发场景下的抢购超卖Bug是一个常见且棘手的问题。一旦处理不当,不仅会引发用户投诉,还会对商家的信誉和利益造成严重损害。本文将详细介绍秒杀抢购超卖Bug的背景历史、业务场景、底层原理以及Java代码实现,旨在帮助开发者更好地理解和解决这一问题。
366 12
|
存储 监控 Java
近亿级用户体量高并发实战:大促前压测干崩近百个服务引起的深度反思!
几年前,数百个服务,将堆内存从28GB升配到36GB,引发系统全面OOM的事件。
406 12
|
存储 缓存 运维
优化高并发环境下的数据库查询性能:实战经验与技巧
在高并发环境下,数据库性能往往成为系统瓶颈。本文将深入探讨在高并发场景下优化数据库查询性能的策略与实践,包括索引优化、查询优化、数据库架构设计以及缓存机制的应用。通过对具体案例的分析,读者将能够掌握提升数据库性能的关键技术,从而在面对大规模用户请求时提高系统的响应速度和稳定性。

热门文章

最新文章

下一篇
oss云网关配置