编辑
🌟 大家好,我是摘星! 🌟
今天为大家带来的是并发设计模式实战系列,第二十章扇出/扇入模式(Fan-Out/Fan-In),废话不多说直接开始~
目录
一、核心原理深度拆解
1. 数据流拓扑结构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Producer │ ──> │ Workers │ ──> │ Collector │ │ (Main Task) │ │ (并行处理) │ │ (结果聚合) │ └─────────────┘ └─────────────┘ └─────────────┘ │ ▲ ▲ └───────────────────┴───────────────────┘ Fan-Out Fan-In
- Fan-Out阶段:主任务将工作拆分为多个子任务,分发给Worker线程并行处理
- Fan-In阶段:Worker处理结果通过通道返回,由Collector统一聚合
2. 并发控制关键点
- 工作窃取(Work Stealing):使用
ForkJoinPool
实现负载均衡 - 背压控制(Backpressure):通过有界队列防止内存溢出
- 结果排序:若需保持顺序,需使用
CompletionService
按完成顺序获取
二、生活化类比:快餐店厨房
系统组件 |
现实类比 |
核心行为 |
Producer |
订单接收台 |
将套餐拆解为汉堡、薯条等单品 |
Workers |
多个厨师工作站 |
并行制作不同食物组件 |
Collector |
装配台 |
将完成组件组合成完整套餐 |
- 效率提升:3个厨师同时做汉堡/薯条/饮料(Fan-Out) → 装配员组合(Fan-In)
- 异常处理:某个厨师生病时,其他厨师可分担其任务
三、Java代码实现(生产级Demo)
1. 完整可运行代码
import java.util.*; import java.util.concurrent.*; import java.util.stream.*; public class FanOutFanInDemo { // 模拟CPU密集型任务 static int processItem(int item) { try { Thread.sleep(ThreadLocalRandom.current().nextInt(100)); } catch (InterruptedException e) {} return item * item; // 平方计算 } public static void main(String[] args) throws Exception { List<Integer> inputs = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList()); // 方法1:使用CompletableFuture(自动Fan-In) System.out.println("=== CompletableFuture方案 ==="); long start = System.currentTimeMillis(); CompletableFuture<Void> future = CompletableFuture.allOf( inputs.stream() .map(item -> CompletableFuture.supplyAsync(() -> processItem(item))) .toArray(CompletableFuture[]::new) ).thenApply(ignored -> { System.out.println("所有任务完成"); return null; }); future.get(); System.out.printf("耗时: %dms\n", System.currentTimeMillis() - start); // 方法2:使用ForkJoinPool(工作窃取) System.out.println("\n=== ForkJoinPool方案 ==="); ForkJoinPool pool = new ForkJoinPool(4); start = System.currentTimeMillis(); List<Integer> results = pool.submit(() -> inputs.parallelStream() .map(FanOutFanInDemo::processItem) .collect(Collectors.toList()) ).get(); System.out.println("结果数量: " + results.size()); System.out.printf("耗时: %dms\n", System.currentTimeMillis() - start); // 方法3:手动控制(精确管理) System.out.println("\n=== 手动控制方案 ==="); ExecutorService workers = Executors.newFixedThreadPool(4); CompletionService<Integer> completionService = new ExecutorCompletionService<>(workers); start = System.currentTimeMillis(); // Fan-Out阶段 inputs.forEach(item -> completionService.submit(() -> processItem(item))); // Fan-In阶段 List<Integer> orderedResults = new ArrayList<>(inputs.size()); for (int i = 0; i < inputs.size(); i++) { try { orderedResults.add(completionService.take().get()); } catch (ExecutionException e) { e.printStackTrace(); } } System.out.println("结果数量: " + orderedResults.size()); System.out.printf("耗时: %dms\n", System.currentTimeMillis() - start); workers.shutdown(); } }
2. 关键配置说明
// 1. CompletableFuture默认使用ForkJoinPool.commonPool() // 可指定自定义线程池: CompletableFuture.supplyAsync(() -> task, customPool); // 2. ForkJoinPool参数选择: // - 并行度通常设为CPU核心数 new ForkJoinPool(Runtime.getRuntime().availableProcessors()); // 3. 手动方案背压控制: // 使用有界队列+CallerRunsPolicy new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
四、横向对比表格
1. 不同实现方案对比
方案 |
优点 |
缺点 |
适用场景 |
CompletableFuture |
声明式编程,自动聚合 |
难以精确控制线程 |
简单并行任务 |
ForkJoinPool |
工作窃取算法效率高 |
适合CPU密集型 |
递归分治任务 |
手动控制 |
完全控制流程和资源 |
代码复杂度高 |
需要严格顺序/背压 |
2. 性能影响因素
因素 |
影响程度 |
优化建议 |
任务粒度 |
★★★★ |
每个任务10ms-100ms为宜 |
线程池大小 |
★★★ |
IO密集型:大线程池 CPU密集型:小线程池 |
结果传输开销 |
★★ |
避免在Worker和Collector间传大对象 |
任务倾斜 |
★★ |
使用工作窃取线程池 |
五、高级优化技巧
1. 动态批次处理
// 根据系统负载调整批次大小 int batchSize = Runtime.getRuntime().availableProcessors() * 2; List<List<Integer>> batches = Lists.partition(inputs, batchSize);
2. 错误处理增强
// 为CompletableFuture添加异常处理 future.exceptionally(ex -> { System.err.println("任务失败: " + ex.getMessage()); return null; });
3. 混合模式
// IO密集型 + CPU密集型混合 ExecutorService ioPool = Executors.newCachedThreadPool(); ExecutorService cpuPool = Executors.newWorkStealingPool(); CompletableFuture.supplyAsync(() -> queryDB(), ioPool) .thenApplyAsync(data -> processData(data), cpuPool);
4. 监控指标
// ForkJoinPool监控 ForkJoinPool pool = (ForkJoinPool) ForkJoinPool.commonPool(); System.out.println("活跃线程数: " + pool.getActiveThreadCount()); System.out.println("窃取任务数: " + pool.getStealCount());
六、扇出/扇入模式变体与扩展模式
1. 分阶段扇出(Pipeline Fan-Out)
┌─────────┐ ┌─────────┐ ┌─────────┐ │ Stage 1 │───> │ Stage 2 │───> │ Stage 3 │ │ (Fan-Out)│<─── │ (Fan-Out)│<─── │ (Fan-In) │ └─────────┘ └─────────┘ └─────────┘
特点:
- 每个阶段既是上一阶段的消费者,又是下一阶段的生产者
- 适用于多级处理流水线(如ETL场景)
Java实现:
// 使用Phaser协调多阶段 Phaser phaser = new Phaser(3); ExecutorService[] stagePools = {Executors.newFixedThreadPool(4), ...}; // Stage 1 stagePools[0].submit(() -> { List<Data> stage1Results = dataList.parallelStream() .map(d -> processStage1(d)).collect(Collectors.toList()); phaser.arriveAndAwaitAdvance(); return stage1Results; }); // Stage 2同理...
2. 动态扇出(Dynamic Forking)
场景:根据运行时条件决定分支数量
CompletableFuture<?>[] futures = inputList.stream() .map(input -> { if (needFork(input)) { return CompletableFuture.allOf( taskA(input), taskB(input) ); } else { return taskC(input); } }).toArray(CompletableFuture[]::new);
七、生产环境问题解决方案
1. 死锁预防矩阵
风险点 |
检测方法 |
解决方案 |
Fan-Out过度 |
监控线程池队列堆积 |
限制最大分片数量(如Guava的RateLimiter) |
Fan-In阻塞 |
线程dump显示Collector阻塞 |
使用 |
资源竞争 |
JFR显示高锁竞争 |
采用无锁队列(ConcurrentLinkedQueue) |
结果顺序错乱 |
业务校验失败 |
使用 |
2. 性能调优检查表
- Fan-Out阶段
- 分片大小是否大于CPU核心数×2
- 避免在分片逻辑中执行阻塞IO
- Worker阶段
- 每个任务耗时是否在50-500ms黄金区间
- 是否避免修改共享状态
- Fan-In阶段
- 聚合操作时间复杂度是否优于O(n²)
- 是否使用并发集合(如ConcurrentHashMap)
八、与其他模式的组合应用
1. 反应式编程整合
// Reactor + Fan-Out Flux.fromIterable(inputList) .parallel() .runOn(Schedulers.parallel()) .map(item -> process(item)) // Fan-Out .sequential() .collectList() // Fan-In .subscribe(System.out::println);
2. 与生产者-消费者模式结合
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Global Queue │───> │ Local Queues │───> │ Workers │ │ (优先级队列) │<─── │ (工作窃取) │<─── │ (弹性线程池) │ └──────────────┘ └──────────────┘ └──────────────┘
优势:
- 全局队列解决跨节点负载均衡
- 本地队列减少锁竞争
九、可视化监控方案
1. Prometheus监控指标
// Fan-Out速率 Counter.Builder("fanout_requests_total") .labelNames("source") .register(registry); // Fan-In延迟直方图 Histogram.Builder("fanin_duration_seconds") .buckets(0.1, 0.5, 1) .register(registry);
2. 线程池监控看板
指标 |
健康阈值 |
Grafana表达式 |
活跃线程数 |
≤核心线程数×1.5 |
|
任务积压量 |
≤队列容量50% |
|
工作窃取次数 |
≥100/分钟 |
|
十、模式选择决策树
编辑
graph TD A[需要严格顺序?] -->|是| B[选择手动控制方案] A -->|否| C{任务类型?} C -->|CPU密集型| D[ForkJoinPool] C -->|IO密集型| E[CompletableFuture+自定义线程池] C -->|混合型| F[分阶段差异化处理]