并发设计模式实战系列(20):扇出/扇入模式(Fan-Out/Fan-In)(完结篇)

简介: 🌟 大家好,我是摘星!🌟今天为大家带来的是并发设计模式实战系列,第二十章,废话不多说直接开始~

 

image.gif 编辑

🌟 大家好,我是摘星! 🌟

今天为大家带来的是并发设计模式实战系列,第二十章扇出/扇入模式(Fan-Out/Fan-In),废话不多说直接开始~

目录

一、核心原理深度拆解

1. 数据流拓扑结构

2. 并发控制关键点

二、生活化类比:快餐店厨房

三、Java代码实现(生产级Demo)

1. 完整可运行代码

2. 关键配置说明

四、横向对比表格

1. 不同实现方案对比

2. 性能影响因素

五、高级优化技巧

1. 动态批次处理

2. 错误处理增强

3. 混合模式

4. 监控指标

六、扇出/扇入模式变体与扩展模式

1. 分阶段扇出(Pipeline Fan-Out)

2. 动态扇出(Dynamic Forking)

七、生产环境问题解决方案

1. 死锁预防矩阵

2. 性能调优检查表

八、与其他模式的组合应用

1. 反应式编程整合

2. 与生产者-消费者模式结合

九、可视化监控方案

1. Prometheus监控指标

2. 线程池监控看板

十、模式选择决策树


一、核心原理深度拆解

1. 数据流拓扑结构

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Producer   │ ──> │  Workers    │ ──> │  Collector  │
│ (Main Task)  │     │ (并行处理)   │     │ (结果聚合)   │
└─────────────┘     └─────────────┘     └─────────────┘
      │                   ▲                   ▲
      └───────────────────┴───────────────────┘
            Fan-Out                  Fan-In

image.gif

  • Fan-Out阶段:主任务将工作拆分为多个子任务,分发给Worker线程并行处理
  • Fan-In阶段:Worker处理结果通过通道返回,由Collector统一聚合

2. 并发控制关键点

  • 工作窃取(Work Stealing):使用ForkJoinPool实现负载均衡
  • 背压控制(Backpressure):通过有界队列防止内存溢出
  • 结果排序:若需保持顺序,需使用CompletionService按完成顺序获取

二、生活化类比:快餐店厨房

系统组件

现实类比

核心行为

Producer

订单接收台

将套餐拆解为汉堡、薯条等单品

Workers

多个厨师工作站

并行制作不同食物组件

Collector

装配台

将完成组件组合成完整套餐

  • 效率提升:3个厨师同时做汉堡/薯条/饮料(Fan-Out) → 装配员组合(Fan-In)
  • 异常处理:某个厨师生病时,其他厨师可分担其任务

三、Java代码实现(生产级Demo)

1. 完整可运行代码

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class FanOutFanInDemo {
    // 模拟CPU密集型任务
    static int processItem(int item) {
        try {
            Thread.sleep(ThreadLocalRandom.current().nextInt(100));
        } catch (InterruptedException e) {}
        return item * item; // 平方计算
    }
    public static void main(String[] args) throws Exception {
        List<Integer> inputs = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
        
        // 方法1:使用CompletableFuture(自动Fan-In)
        System.out.println("=== CompletableFuture方案 ===");
        long start = System.currentTimeMillis();
        CompletableFuture<Void> future = CompletableFuture.allOf(
            inputs.stream()
                .map(item -> CompletableFuture.supplyAsync(() -> processItem(item)))
                .toArray(CompletableFuture[]::new)
        ).thenApply(ignored -> {
            System.out.println("所有任务完成");
            return null;
        });
        future.get();
        System.out.printf("耗时: %dms\n", System.currentTimeMillis() - start);
        // 方法2:使用ForkJoinPool(工作窃取)
        System.out.println("\n=== ForkJoinPool方案 ===");
        ForkJoinPool pool = new ForkJoinPool(4);
        start = System.currentTimeMillis();
        List<Integer> results = pool.submit(() -> 
            inputs.parallelStream()
                .map(FanOutFanInDemo::processItem)
                .collect(Collectors.toList())
        ).get();
        System.out.println("结果数量: " + results.size());
        System.out.printf("耗时: %dms\n", System.currentTimeMillis() - start);
        // 方法3:手动控制(精确管理)
        System.out.println("\n=== 手动控制方案 ===");
        ExecutorService workers = Executors.newFixedThreadPool(4);
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(workers);
        start = System.currentTimeMillis();
        
        // Fan-Out阶段
        inputs.forEach(item -> completionService.submit(() -> processItem(item)));
        
        // Fan-In阶段
        List<Integer> orderedResults = new ArrayList<>(inputs.size());
        for (int i = 0; i < inputs.size(); i++) {
            try {
                orderedResults.add(completionService.take().get());
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        System.out.println("结果数量: " + orderedResults.size());
        System.out.printf("耗时: %dms\n", System.currentTimeMillis() - start);
        workers.shutdown();
    }
}

image.gif

2. 关键配置说明

// 1. CompletableFuture默认使用ForkJoinPool.commonPool()
// 可指定自定义线程池:
CompletableFuture.supplyAsync(() -> task, customPool);
// 2. ForkJoinPool参数选择:
// - 并行度通常设为CPU核心数
new ForkJoinPool(Runtime.getRuntime().availableProcessors());
// 3. 手动方案背压控制:
// 使用有界队列+CallerRunsPolicy
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<>(100), 
    new ThreadPoolExecutor.CallerRunsPolicy());

image.gif


四、横向对比表格

1. 不同实现方案对比

方案

优点

缺点

适用场景

CompletableFuture

声明式编程,自动聚合

难以精确控制线程

简单并行任务

ForkJoinPool

工作窃取算法效率高

适合CPU密集型

递归分治任务

手动控制

完全控制流程和资源

代码复杂度高

需要严格顺序/背压

2. 性能影响因素

因素

影响程度

优化建议

任务粒度

★★★★

每个任务10ms-100ms为宜

线程池大小

★★★

IO密集型:大线程池 CPU密集型:小线程池

结果传输开销

★★

避免在Worker和Collector间传大对象

任务倾斜

★★

使用工作窃取线程池


五、高级优化技巧

1. 动态批次处理

// 根据系统负载调整批次大小
int batchSize = Runtime.getRuntime().availableProcessors() * 2;
List<List<Integer>> batches = Lists.partition(inputs, batchSize);

image.gif

2. 错误处理增强

// 为CompletableFuture添加异常处理
future.exceptionally(ex -> {
    System.err.println("任务失败: " + ex.getMessage());
    return null;
});

image.gif

3. 混合模式

// IO密集型 + CPU密集型混合
ExecutorService ioPool = Executors.newCachedThreadPool();
ExecutorService cpuPool = Executors.newWorkStealingPool();
CompletableFuture.supplyAsync(() -> queryDB(), ioPool)
    .thenApplyAsync(data -> processData(data), cpuPool);

image.gif

4. 监控指标

// ForkJoinPool监控
ForkJoinPool pool = (ForkJoinPool) ForkJoinPool.commonPool();
System.out.println("活跃线程数: " + pool.getActiveThreadCount());
System.out.println("窃取任务数: " + pool.getStealCount());

image.gif



六、扇出/扇入模式变体与扩展模式

1. 分阶段扇出(Pipeline Fan-Out)

┌─────────┐     ┌─────────┐     ┌─────────┐  
│ Stage 1  │───> │ Stage 2  │───> │ Stage 3  │  
│ (Fan-Out)│<─── │ (Fan-Out)│<─── │ (Fan-In) │  
└─────────┘     └─────────┘     └─────────┘

image.gif

特点

  • 每个阶段既是上一阶段的消费者,又是下一阶段的生产者
  • 适用于多级处理流水线(如ETL场景)

Java实现

// 使用Phaser协调多阶段  
Phaser phaser = new Phaser(3);  
ExecutorService[] stagePools = {Executors.newFixedThreadPool(4), ...};  
// Stage 1  
stagePools[0].submit(() -> {  
    List<Data> stage1Results = dataList.parallelStream()  
        .map(d -> processStage1(d)).collect(Collectors.toList());  
    phaser.arriveAndAwaitAdvance();  
    return stage1Results;  
});  
// Stage 2同理...

image.gif

2. 动态扇出(Dynamic Forking)

场景:根据运行时条件决定分支数量

CompletableFuture<?>[] futures = inputList.stream()  
    .map(input -> {  
        if (needFork(input)) {  
            return CompletableFuture.allOf(  
                taskA(input),  
                taskB(input)  
            );  
        } else {  
            return taskC(input);  
        }  
    }).toArray(CompletableFuture[]::new);

image.gif


七、生产环境问题解决方案

1. 死锁预防矩阵

风险点

检测方法

解决方案

Fan-Out过度

监控线程池队列堆积

限制最大分片数量(如Guava的RateLimiter)

Fan-In阻塞

线程dump显示Collector阻塞

使用CompletionService.poll(timeout)

资源竞争

JFR显示高锁竞争

采用无锁队列(ConcurrentLinkedQueue)

结果顺序错乱

业务校验失败

使用AtomicInteger序号标记原始顺序

2. 性能调优检查表

  1. Fan-Out阶段
  • 分片大小是否大于CPU核心数×2
  • 避免在分片逻辑中执行阻塞IO
  1. Worker阶段
  • 每个任务耗时是否在50-500ms黄金区间
  • 是否避免修改共享状态
  1. Fan-In阶段
  • 聚合操作时间复杂度是否优于O(n²)
  • 是否使用并发集合(如ConcurrentHashMap)

八、与其他模式的组合应用

1. 反应式编程整合

// Reactor + Fan-Out  
Flux.fromIterable(inputList)  
    .parallel()  
    .runOn(Schedulers.parallel())  
    .map(item -> process(item))  // Fan-Out  
    .sequential()  
    .collectList()               // Fan-In  
    .subscribe(System.out::println);

image.gif

2. 与生产者-消费者模式结合

┌──────────────┐     ┌──────────────┐     ┌──────────────┐  
│ Global Queue  │───> │ Local Queues │───> │ Workers      │  
│ (优先级队列)    │<─── │ (工作窃取)    │<─── │ (弹性线程池)  │  
└──────────────┘     └──────────────┘     └──────────────┘

image.gif

优势

  • 全局队列解决跨节点负载均衡
  • 本地队列减少锁竞争

九、可视化监控方案

1. Prometheus监控指标

// Fan-Out速率  
Counter.Builder("fanout_requests_total")  
    .labelNames("source")  
    .register(registry);  
// Fan-In延迟直方图  
Histogram.Builder("fanin_duration_seconds")  
    .buckets(0.1, 0.5, 1)  
    .register(registry);

image.gif

2. 线程池监控看板

指标

健康阈值

Grafana表达式

活跃线程数

≤核心线程数×1.5

thread_pool_active_threads{}

任务积压量

≤队列容量50%

thread_pool_queue_size{}

工作窃取次数

≥100/分钟

forkjoin_steals_total


十、模式选择决策树

image.gif 编辑

graph TD  
    A[需要严格顺序?] -->|是| B[选择手动控制方案]  
    A -->|否| C{任务类型?}  
    C -->|CPU密集型| D[ForkJoinPool]  
    C -->|IO密集型| E[CompletableFuture+自定义线程池]  
    C -->|混合型| F[分阶段差异化处理]

image.gif

目录
打赏
0
0
0
0
18
分享
相关文章
并发设计模式实战系列(2):领导者/追随者模式
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第二章领导者/追随者(Leader/Followers)模式,废话不多说直接开始~
73 0
并发设计模式实战系列(1):半同步/半异步模式
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第一章半同步/半异步(Half-Sync/Half-Async)模式,废话不多说直接开始~
63 0
并发设计模式实战系列(4):线程池
需要建立持续的性能剖析(Profiling)和调优机制。通过以上十二个维度的系统化扩展,构建了一个从。设置合理队列容量/拒绝策略。动态扩容/优化任务处理速度。检查线程栈定位热点代码。调整最大用户进程数限制。CPU占用率100%
182 0
【实战指南】设计模式 - 工厂模式
工厂模式是一种面向对象设计模式,通过定义“工厂”来创建具体产品实例。它包含简单工厂、工厂方法和抽象工厂三种形式,分别适用于不同复杂度的场景。简单工厂便于理解但扩展性差;工厂方法符合开闭原则,适合单一类型产品创建;抽象工厂支持多类型产品创建,但不便于新增产品种类。三者各有优缺点,适用于不同设计需求。
设计模式觉醒系列(04)策略模式|简单工厂模式的升级版
本文介绍了简单工厂模式与策略模式的概念及其融合实践。简单工厂模式用于对象创建,通过隐藏实现细节简化代码;策略模式关注行为封装与切换,支持动态替换算法,增强灵活性。两者结合形成“策略工厂”,既简化对象创建又保持低耦合。文章通过支付案例演示了模式的应用,并强调实际开发中应根据需求选择合适的设计模式,避免生搬硬套。最后推荐了JVM调优、并发编程等技术专题,助力开发者提升技能。
【设计模式】【创建型模式】工厂方法模式(Factory Methods)
一、入门 什么是工厂方法模式? 工厂方法模式(Factory Method Pattern)是一种创建型设计模式,它定义了一个用于创建对象的接口,但由子类决定实例化哪个类。工厂方法模式使类的实例化延迟
91 16
并发设计模式实战系列(12):不变模式(Immutable Object)
🌟 大家好,我是摘星!🌟今天为大家带来的是并发设计模式实战系列,第十二章,废话不多说直接开始~
57 0
前端必须掌握的设计模式——模板模式
模板模式(Template Pattern)是一种行为型设计模式,父类定义固定流程和步骤顺序,子类通过继承并重写特定方法实现具体步骤。适用于具有固定结构或流程的场景,如组装汽车、包装礼物等。举例来说,公司年会节目征集时,蜘蛛侠定义了歌曲的四个步骤:前奏、主歌、副歌、结尾。金刚狼和绿巨人根据此模板设计各自的表演内容。通过抽象类定义通用逻辑,子类实现个性化行为,从而减少重复代码。模板模式还支持钩子方法,允许跳过某些步骤,增加灵活性。
351 11
Kotlin教程笔记(51) - 改良设计模式 - 构建者模式
Kotlin教程笔记(51) - 改良设计模式 - 构建者模式
PHP中的设计模式:提升代码的可维护性与扩展性在软件开发过程中,设计模式是开发者们经常用到的工具之一。它们提供了经过验证的解决方案,可以帮助我们解决常见的软件设计问题。本文将介绍PHP中常用的设计模式,以及如何利用这些模式来提高代码的可维护性和扩展性。我们将从基础的设计模式入手,逐步深入到更复杂的应用场景。通过实际案例分析,读者可以更好地理解如何在PHP开发中应用这些设计模式,从而写出更加高效、灵活和易于维护的代码。
本文探讨了PHP中常用的设计模式及其在实际项目中的应用。内容涵盖设计模式的基本概念、分类和具体使用场景,重点介绍了单例模式、工厂模式和观察者模式等常见模式。通过具体的代码示例,展示了如何在PHP项目中有效利用设计模式来提升代码的可维护性和扩展性。文章还讨论了设计模式的选择原则和注意事项,帮助开发者在不同情境下做出最佳决策。
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问