并发设计模式实战系列(20):扇出/扇入模式(Fan-Out/Fan-In)(完结篇)

简介: 🌟 大家好,我是摘星!🌟今天为大家带来的是并发设计模式实战系列,第二十章,废话不多说直接开始~

 

image.gif 编辑

🌟 大家好,我是摘星! 🌟

今天为大家带来的是并发设计模式实战系列,第二十章扇出/扇入模式(Fan-Out/Fan-In),废话不多说直接开始~

目录

一、核心原理深度拆解

1. 数据流拓扑结构

2. 并发控制关键点

二、生活化类比:快餐店厨房

三、Java代码实现(生产级Demo)

1. 完整可运行代码

2. 关键配置说明

四、横向对比表格

1. 不同实现方案对比

2. 性能影响因素

五、高级优化技巧

1. 动态批次处理

2. 错误处理增强

3. 混合模式

4. 监控指标

六、扇出/扇入模式变体与扩展模式

1. 分阶段扇出(Pipeline Fan-Out)

2. 动态扇出(Dynamic Forking)

七、生产环境问题解决方案

1. 死锁预防矩阵

2. 性能调优检查表

八、与其他模式的组合应用

1. 反应式编程整合

2. 与生产者-消费者模式结合

九、可视化监控方案

1. Prometheus监控指标

2. 线程池监控看板

十、模式选择决策树


一、核心原理深度拆解

1. 数据流拓扑结构

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Producer   │ ──> │  Workers    │ ──> │  Collector  │
│ (Main Task)  │     │ (并行处理)   │     │ (结果聚合)   │
└─────────────┘     └─────────────┘     └─────────────┘
      │                   ▲                   ▲
      └───────────────────┴───────────────────┘
            Fan-Out                  Fan-In

image.gif

  • Fan-Out阶段:主任务将工作拆分为多个子任务,分发给Worker线程并行处理
  • Fan-In阶段:Worker处理结果通过通道返回,由Collector统一聚合

2. 并发控制关键点

  • 工作窃取(Work Stealing):使用ForkJoinPool实现负载均衡
  • 背压控制(Backpressure):通过有界队列防止内存溢出
  • 结果排序:若需保持顺序,需使用CompletionService按完成顺序获取

二、生活化类比:快餐店厨房

系统组件

现实类比

核心行为

Producer

订单接收台

将套餐拆解为汉堡、薯条等单品

Workers

多个厨师工作站

并行制作不同食物组件

Collector

装配台

将完成组件组合成完整套餐

  • 效率提升:3个厨师同时做汉堡/薯条/饮料(Fan-Out) → 装配员组合(Fan-In)
  • 异常处理:某个厨师生病时,其他厨师可分担其任务

三、Java代码实现(生产级Demo)

1. 完整可运行代码

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class FanOutFanInDemo {
    // 模拟CPU密集型任务
    static int processItem(int item) {
        try {
            Thread.sleep(ThreadLocalRandom.current().nextInt(100));
        } catch (InterruptedException e) {}
        return item * item; // 平方计算
    }
    public static void main(String[] args) throws Exception {
        List<Integer> inputs = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
        
        // 方法1:使用CompletableFuture(自动Fan-In)
        System.out.println("=== CompletableFuture方案 ===");
        long start = System.currentTimeMillis();
        CompletableFuture<Void> future = CompletableFuture.allOf(
            inputs.stream()
                .map(item -> CompletableFuture.supplyAsync(() -> processItem(item)))
                .toArray(CompletableFuture[]::new)
        ).thenApply(ignored -> {
            System.out.println("所有任务完成");
            return null;
        });
        future.get();
        System.out.printf("耗时: %dms\n", System.currentTimeMillis() - start);
        // 方法2:使用ForkJoinPool(工作窃取)
        System.out.println("\n=== ForkJoinPool方案 ===");
        ForkJoinPool pool = new ForkJoinPool(4);
        start = System.currentTimeMillis();
        List<Integer> results = pool.submit(() -> 
            inputs.parallelStream()
                .map(FanOutFanInDemo::processItem)
                .collect(Collectors.toList())
        ).get();
        System.out.println("结果数量: " + results.size());
        System.out.printf("耗时: %dms\n", System.currentTimeMillis() - start);
        // 方法3:手动控制(精确管理)
        System.out.println("\n=== 手动控制方案 ===");
        ExecutorService workers = Executors.newFixedThreadPool(4);
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(workers);
        start = System.currentTimeMillis();
        
        // Fan-Out阶段
        inputs.forEach(item -> completionService.submit(() -> processItem(item)));
        
        // Fan-In阶段
        List<Integer> orderedResults = new ArrayList<>(inputs.size());
        for (int i = 0; i < inputs.size(); i++) {
            try {
                orderedResults.add(completionService.take().get());
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        System.out.println("结果数量: " + orderedResults.size());
        System.out.printf("耗时: %dms\n", System.currentTimeMillis() - start);
        workers.shutdown();
    }
}

image.gif

2. 关键配置说明

// 1. CompletableFuture默认使用ForkJoinPool.commonPool()
// 可指定自定义线程池:
CompletableFuture.supplyAsync(() -> task, customPool);
// 2. ForkJoinPool参数选择:
// - 并行度通常设为CPU核心数
new ForkJoinPool(Runtime.getRuntime().availableProcessors());
// 3. 手动方案背压控制:
// 使用有界队列+CallerRunsPolicy
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<>(100), 
    new ThreadPoolExecutor.CallerRunsPolicy());

image.gif


四、横向对比表格

1. 不同实现方案对比

方案

优点

缺点

适用场景

CompletableFuture

声明式编程,自动聚合

难以精确控制线程

简单并行任务

ForkJoinPool

工作窃取算法效率高

适合CPU密集型

递归分治任务

手动控制

完全控制流程和资源

代码复杂度高

需要严格顺序/背压

2. 性能影响因素

因素

影响程度

优化建议

任务粒度

★★★★

每个任务10ms-100ms为宜

线程池大小

★★★

IO密集型:大线程池 CPU密集型:小线程池

结果传输开销

★★

避免在Worker和Collector间传大对象

任务倾斜

★★

使用工作窃取线程池


五、高级优化技巧

1. 动态批次处理

// 根据系统负载调整批次大小
int batchSize = Runtime.getRuntime().availableProcessors() * 2;
List<List<Integer>> batches = Lists.partition(inputs, batchSize);

image.gif

2. 错误处理增强

// 为CompletableFuture添加异常处理
future.exceptionally(ex -> {
    System.err.println("任务失败: " + ex.getMessage());
    return null;
});

image.gif

3. 混合模式

// IO密集型 + CPU密集型混合
ExecutorService ioPool = Executors.newCachedThreadPool();
ExecutorService cpuPool = Executors.newWorkStealingPool();
CompletableFuture.supplyAsync(() -> queryDB(), ioPool)
    .thenApplyAsync(data -> processData(data), cpuPool);

image.gif

4. 监控指标

// ForkJoinPool监控
ForkJoinPool pool = (ForkJoinPool) ForkJoinPool.commonPool();
System.out.println("活跃线程数: " + pool.getActiveThreadCount());
System.out.println("窃取任务数: " + pool.getStealCount());

image.gif



六、扇出/扇入模式变体与扩展模式

1. 分阶段扇出(Pipeline Fan-Out)

┌─────────┐     ┌─────────┐     ┌─────────┐  
│ Stage 1  │───> │ Stage 2  │───> │ Stage 3  │  
│ (Fan-Out)│<─── │ (Fan-Out)│<─── │ (Fan-In) │  
└─────────┘     └─────────┘     └─────────┘

image.gif

特点

  • 每个阶段既是上一阶段的消费者,又是下一阶段的生产者
  • 适用于多级处理流水线(如ETL场景)

Java实现

// 使用Phaser协调多阶段  
Phaser phaser = new Phaser(3);  
ExecutorService[] stagePools = {Executors.newFixedThreadPool(4), ...};  
// Stage 1  
stagePools[0].submit(() -> {  
    List<Data> stage1Results = dataList.parallelStream()  
        .map(d -> processStage1(d)).collect(Collectors.toList());  
    phaser.arriveAndAwaitAdvance();  
    return stage1Results;  
});  
// Stage 2同理...

image.gif

2. 动态扇出(Dynamic Forking)

场景:根据运行时条件决定分支数量

CompletableFuture<?>[] futures = inputList.stream()  
    .map(input -> {  
        if (needFork(input)) {  
            return CompletableFuture.allOf(  
                taskA(input),  
                taskB(input)  
            );  
        } else {  
            return taskC(input);  
        }  
    }).toArray(CompletableFuture[]::new);

image.gif


七、生产环境问题解决方案

1. 死锁预防矩阵

风险点

检测方法

解决方案

Fan-Out过度

监控线程池队列堆积

限制最大分片数量(如Guava的RateLimiter)

Fan-In阻塞

线程dump显示Collector阻塞

使用CompletionService.poll(timeout)

资源竞争

JFR显示高锁竞争

采用无锁队列(ConcurrentLinkedQueue)

结果顺序错乱

业务校验失败

使用AtomicInteger序号标记原始顺序

2. 性能调优检查表

  1. Fan-Out阶段
  • 分片大小是否大于CPU核心数×2
  • 避免在分片逻辑中执行阻塞IO
  1. Worker阶段
  • 每个任务耗时是否在50-500ms黄金区间
  • 是否避免修改共享状态
  1. Fan-In阶段
  • 聚合操作时间复杂度是否优于O(n²)
  • 是否使用并发集合(如ConcurrentHashMap)

八、与其他模式的组合应用

1. 反应式编程整合

// Reactor + Fan-Out  
Flux.fromIterable(inputList)  
    .parallel()  
    .runOn(Schedulers.parallel())  
    .map(item -> process(item))  // Fan-Out  
    .sequential()  
    .collectList()               // Fan-In  
    .subscribe(System.out::println);

image.gif

2. 与生产者-消费者模式结合

┌──────────────┐     ┌──────────────┐     ┌──────────────┐  
│ Global Queue  │───> │ Local Queues │───> │ Workers      │  
│ (优先级队列)    │<─── │ (工作窃取)    │<─── │ (弹性线程池)  │  
└──────────────┘     └──────────────┘     └──────────────┘

image.gif

优势

  • 全局队列解决跨节点负载均衡
  • 本地队列减少锁竞争

九、可视化监控方案

1. Prometheus监控指标

// Fan-Out速率  
Counter.Builder("fanout_requests_total")  
    .labelNames("source")  
    .register(registry);  
// Fan-In延迟直方图  
Histogram.Builder("fanin_duration_seconds")  
    .buckets(0.1, 0.5, 1)  
    .register(registry);

image.gif

2. 线程池监控看板

指标

健康阈值

Grafana表达式

活跃线程数

≤核心线程数×1.5

thread_pool_active_threads{}

任务积压量

≤队列容量50%

thread_pool_queue_size{}

工作窃取次数

≥100/分钟

forkjoin_steals_total


十、模式选择决策树

image.gif 编辑

graph TD  
    A[需要严格顺序?] -->|是| B[选择手动控制方案]  
    A -->|否| C{任务类型?}  
    C -->|CPU密集型| D[ForkJoinPool]  
    C -->|IO密集型| E[CompletableFuture+自定义线程池]  
    C -->|混合型| F[分阶段差异化处理]

image.gif

相关文章
|
2月前
|
设计模式 缓存 安全
【JUC】(6)带你了解共享模型之 享元和不可变 模型并初步带你了解并发工具 线程池Pool,文章内还有饥饿问题、设计模式之工作线程的解决于实现
JUC专栏第六篇,本文带你了解两个共享模型:享元和不可变 模型,并初步带你了解并发工具 线程池Pool,文章中还有解决饥饿问题、设计模式之工作线程的实现
155 2
|
3月前
|
设计模式 人工智能 算法
基于多设计模式的状态扭转设计:策略模式与责任链模式的实战应用
接下来,我会结合实战案例,聊聊如何用「策略模式 + 责任链模式」构建灵活可扩展的状态引擎,让抽奖系统的状态管理从「混乱战场」变成「有序流水线」。
|
7月前
|
设计模式 负载均衡 监控
并发设计模式实战系列(2):领导者/追随者模式
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第二章领导者/追随者(Leader/Followers)模式,废话不多说直接开始~
214 0
|
7月前
|
设计模式 监控 Java
并发设计模式实战系列(1):半同步/半异步模式
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第一章半同步/半异步(Half-Sync/Half-Async)模式,废话不多说直接开始~
196 0
|
5月前
|
设计模式 C++
【实战指南】设计模式 - 工厂模式
工厂模式是一种面向对象设计模式,通过定义“工厂”来创建具体产品实例。它包含简单工厂、工厂方法和抽象工厂三种形式,分别适用于不同复杂度的场景。简单工厂便于理解但扩展性差;工厂方法符合开闭原则,适合单一类型产品创建;抽象工厂支持多类型产品创建,但不便于新增产品种类。三者各有优缺点,适用于不同设计需求。
180 52
|
7月前
|
设计模式 算法 Java
设计模式觉醒系列(04)策略模式|简单工厂模式的升级版
本文介绍了简单工厂模式与策略模式的概念及其融合实践。简单工厂模式用于对象创建,通过隐藏实现细节简化代码;策略模式关注行为封装与切换,支持动态替换算法,增强灵活性。两者结合形成“策略工厂”,既简化对象创建又保持低耦合。文章通过支付案例演示了模式的应用,并强调实际开发中应根据需求选择合适的设计模式,避免生搬硬套。最后推荐了JVM调优、并发编程等技术专题,助力开发者提升技能。
|
7月前
|
设计模式 Java 数据库连接
【设计模式】【创建型模式】工厂方法模式(Factory Methods)
一、入门 什么是工厂方法模式? 工厂方法模式(Factory Method Pattern)是一种创建型设计模式,它定义了一个用于创建对象的接口,但由子类决定实例化哪个类。工厂方法模式使类的实例化延迟
205 16
|
7月前
|
设计模式 安全 Java
并发设计模式实战系列(12):不变模式(Immutable Object)
🌟 大家好,我是摘星!🌟今天为大家带来的是并发设计模式实战系列,第十二章,废话不多说直接开始~
168 0
|
12月前
|
设计模式 前端开发 搜索推荐
前端必须掌握的设计模式——模板模式
模板模式(Template Pattern)是一种行为型设计模式,父类定义固定流程和步骤顺序,子类通过继承并重写特定方法实现具体步骤。适用于具有固定结构或流程的场景,如组装汽车、包装礼物等。举例来说,公司年会节目征集时,蜘蛛侠定义了歌曲的四个步骤:前奏、主歌、副歌、结尾。金刚狼和绿巨人根据此模板设计各自的表演内容。通过抽象类定义通用逻辑,子类实现个性化行为,从而减少重复代码。模板模式还支持钩子方法,允许跳过某些步骤,增加灵活性。
655 11
|
设计模式 安全 Java
Kotlin教程笔记(51) - 改良设计模式 - 构建者模式
Kotlin教程笔记(51) - 改良设计模式 - 构建者模式

热门文章

最新文章