并发设计模式实战系列(16):屏障(Barrier)

简介: 🌟 大家好,我是摘星!🌟今天为大家带来的是并发设计模式实战系列,第十六章,废话不多说直接开始~

 

image.gif 编辑

🌟 大家好,我是摘星! 🌟

今天为大家带来的是并发设计模式实战系列,第十六章屏障(Barrier),废话不多说直接开始~

目录

一、核心原理深度拆解

1. 屏障的同步机制

2. 关键参数

二、生活化类比:团队登山

三、Java代码实现(生产级Demo)

1. 完整可运行代码

2. 关键方法说明

四、横向对比表格

1. 同步工具对比

2. 屏障参数配置对比

五、高级应用技巧

1. 多阶段任务控制

2. 动态线程管理

3. 性能监控

六、工程实践中的陷阱与解决方案

1. 死锁风险场景

2. 屏障断裂处理

七、性能优化技巧

1. 分层屏障设计

2. 与ForkJoinPool结合

八、分布式屏障扩展(ZooKeeper实现)

1. 核心原理

2. Java代码片段

九、监控与调试方案

1. 关键监控指标

2. Arthas诊断命令

十、与其他模式的组合应用

1. 屏障 + 生产者消费者模式

2. 代码示例


一、核心原理深度拆解

1. 屏障的同步机制

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  线程1       │    │  线程2       │    │  线程N       │
│ 执行阶段1    │    │ 执行阶段1    │    │ 执行阶段1    │
└──────┬──────┘    └──────┬──────┘    └──────┬──────┘
       │                  │                  │
       └─────────┬────────┴────────┬─────────┘
                 │   屏障点         │
                 ▼                  ▼
        ┌───────────────────┐
        │   所有线程到达后    │
        │   才继续执行阶段2   │
        └───────────────────┘

image.gif

  • 协调机制:强制多个线程在某个点等待,直到所有参与线程都到达该点
  • 重置特性:CyclicBarrier 可重复使用(自动重置),CountDownLatch 不可重置

2. 关键参数

  • parties:需要等待的线程数量
  • barrierAction:所有线程到达后触发的回调(可选)

二、生活化类比:团队登山

系统组件

现实类比

核心行为

线程

登山队员

各自以不同速度攀登

屏障点

集合点

必须所有队员到齐才能继续前进

barrierAction

领队

检查装备、宣布下一阶段路线

  • 异常处理:若有队员受伤(线程中断),其他队员需要决定是否继续等待

三、Java代码实现(生产级Demo)

1. 完整可运行代码

import java.util.concurrent.*;
import java.util.Random;
public class BarrierPatternDemo {
    // 登山模拟
    static class MountainClimbing {
        private final CyclicBarrier barrier;
        private final Random rand = new Random();
        
        public MountainClimbing(int teamSize) {
            // 屏障点设置:队伍到齐后执行领队指令
            this.barrier = new CyclicBarrier(teamSize, () -> {
                System.out.println("\n=== 所有队员已到达集合点 ===");
                System.out.println("领队:检查装备完毕,向下一营地前进!");
            });
        }
        
        public void climb(String name) {
            try {
                // 第一阶段攀登
                int time = rand.nextInt(3000);
                System.out.printf("%s 正在攀登第一段(预计%dms)...\n", name, time);
                Thread.sleep(time);
                
                System.out.printf("[%s] 到达第一集合点,等待队友...\n", name);
                barrier.await();  // 等待所有队员
                
                // 第二阶段(屏障解除后)
                time = rand.nextInt(4000);
                System.out.printf("%s 向顶峰冲刺(预计%dms)...\n", name, time);
                Thread.sleep(time);
                System.out.printf("[%s] 成功登顶!\n", name);
                
            } catch (InterruptedException | BrokenBarrierException e) {
                System.out.printf("[%s] 登山中断: %s\n", name, e.getMessage());
            }
        }
    }
    public static void main(String[] args) {
        final int TEAM_SIZE = 3;
        MountainClimbing expedition = new MountainClimbing(TEAM_SIZE);
        
        // 创建登山线程
        ExecutorService pool = Executors.newFixedThreadPool(TEAM_SIZE);
        for (int i = 1; i <= TEAM_SIZE; i++) {
            String name = "队员-" + i;
            pool.execute(() -> expedition.climb(name));
        }
        
        pool.shutdown();
    }
}

image.gif

2. 关键方法说明

// 1. 屏障等待(可设置超时)
barrier.await(5, TimeUnit.SECONDS);
// 2. 检查屏障状态
barrier.isBroken();  // 是否有线程被中断
barrier.getNumberWaiting();  // 当前等待的线程数
// 3. 重置屏障(CyclicBarrier特有)
barrier.reset();

image.gif


四、横向对比表格

1. 同步工具对比

工具

可重用性

可中断

额外功能

适用场景

CyclicBarrier

支持回调(barrierAction)

多阶段任务同步

CountDownLatch

一次性

主线程等待多个子线程完成

Phaser

动态注册/注销

复杂分阶段任务

Exchanger

数据交换

线程间数据传递

2. 屏障参数配置对比

配置项

CyclicBarrier

CountDownLatch

初始化参数

等待线程数

需要countDown的次数

重用方式

自动重置

需重新创建实例

异常处理

BrokenBarrierException

无特殊异常


五、高级应用技巧

1. 多阶段任务控制

// 使用多个屏障实现多阶段同步
CyclicBarrier phase1 = new CyclicBarrier(3);
CyclicBarrier phase2 = new CyclicBarrier(3, ()->System.out.println("阶段2完成"));
// 线程中按顺序等待
phase1.await();
// 执行阶段1任务...
phase2.await();

image.gif

2. 动态线程管理

// 使用Phaser替代(JDK7+)
Phaser phaser = new Phaser(1); // 注册主线程
for (int i = 0; i < 3; i++) {
    phaser.register(); // 动态注册任务线程
    new Thread(() -> {
        doWork();
        phaser.arriveAndDeregister(); // 完成任务后注销
    }).start();
}
phaser.arriveAndAwaitAdvance(); // 主线程等待

image.gif

3. 性能监控

// 监控屏障等待情况
System.out.println("当前等待线程数: " + barrier.getNumberWaiting());
if (barrier.isBroken()) {
    System.out.println("警告:屏障已被破坏!");
}

image.gif

六、工程实践中的陷阱与解决方案

1. 死锁风险场景

// 错误示例:线程池大小 < 屏障要求的parties数
ExecutorService pool = Executors.newFixedThreadPool(2);
CyclicBarrier barrier = new CyclicBarrier(3); // 要求3个线程
pool.submit(() -> barrier.await()); // 永远阻塞
pool.submit(() -> barrier.await());

image.gif

解决方案

  • 确保线程池大小 ≥ parties数
  • 添加超时机制:
barrier.await(10, TimeUnit.SECONDS);

image.gif

2. 屏障断裂处理

当某个等待线程被中断或超时,会触发BrokenBarrierException,此时需要:

try {
    barrier.await();
} catch (BrokenBarrierException e) {
    // 1. 记录断裂原因
    // 2. 重置屏障或终止任务
    barrier.reset(); // 仅CyclicBarrier有效
}

image.gif


七、性能优化技巧

1. 分层屏障设计

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  子任务组1   │    │  子任务组2   │    │  子任务组N   │
│  (屏障A)     │    │  (屏障A)     │    │  (屏障A)     │
└──────┬──────┘    └──────┬──────┘    └──────┬──────┘
       │                  │                  │
       └─────────┬────────┴────────┬─────────┘
                 │   全局屏障B      │
                 ▼                  ▼
        ┌───────────────────┐
        │   最终聚合处理     │
        └───────────────────┘

image.gif

适用场景:大数据分片处理(如MapReduce)

2. 与ForkJoinPool结合

ForkJoinPool pool = new ForkJoinPool(4);
CyclicBarrier barrier = new CyclicBarrier(4);
pool.execute(() -> {
    // 分治任务1
    barrier.await();
    // 合并结果...
});

image.gif


八、分布式屏障扩展(ZooKeeper实现)

1. 核心原理

┌─────────────┐    ┌─────────────┐
│  节点1       │    │  节点2       │
│ 创建临时节点 │───>│ 监听节点变化 │
└─────────────┘    └─────────────┘
        │               ▲
        └───────┬───────┘
        ┌─────────────┐
        │  ZooKeeper   │
        │  /barrier    │
        └─────────────┘

image.gif

2. Java代码片段

public class DistributedBarrier {
    private final ZooKeeper zk;
    private final String barrierPath;
    
    public void await() throws Exception {
        zk.create(barrierPath + "/node", 
                 null, 
                 ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                 CreateMode.EPHEMERAL_SEQUENTIAL);
        
        while (true) {
            List<String> nodes = zk.getChildren(barrierPath, false);
            if (nodes.size() >= REQUIRED_NODES) {
                break; // 所有节点就绪
            }
            Thread.sleep(100);
        }
    }
}

image.gif


九、监控与调试方案

1. 关键监控指标

指标

采集方式

健康阈值

平均等待时间

Barrier日志打点 + Prometheus

< 任务超时时间的20%

屏障断裂次数

异常捕获统计

每小时 < 3次

线程阻塞比例

ThreadMXBean监控

< 线程数的30%

2. Arthas诊断命令

# 查看屏障状态
watch java.util.concurrent.CyclicBarrier getParties returnObj
# 监控等待线程
thread -b | grep 'await'

image.gif


十、与其他模式的组合应用

1. 屏障 + 生产者消费者模式

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  生产者线程  │    │  生产者线程  │    │  屏障        │
│  生产数据    │───>│ 提交到队列   │───>│ 等待所有生产 │
└─────────────┘    └─────────────┘    └──────┬──────┘
┌─────────────┐    ┌─────────────┐    ┌──────▼──────┐
│  消费者线程  │    │  消费者线程  │    │  屏障释放   │
│  开始消费    │<───│ 从队列获取   │<───│ 触发消费信号│
└─────────────┘    └─────────────┘    └─────────────┘

image.gif

2. 代码示例

BlockingQueue<Data> queue = new LinkedBlockingQueue<>();
CyclicBarrier producerBarrier = new CyclicBarrier(3, () -> {
    System.out.println("所有生产者完成,启动消费者");
    startConsumers();
});
// 生产者线程
void produce() {
    queue.put(generateData());
    producerBarrier.await();
}

image.gif

目录
相关文章
|
4月前
|
设计模式 消息中间件 监控
并发设计模式实战系列(5):生产者/消费者
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第五章,废话不多说直接开始~
124 1
|
4月前
|
设计模式 负载均衡 监控
并发设计模式实战系列(2):领导者/追随者模式
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第二章领导者/追随者(Leader/Followers)模式,废话不多说直接开始~
112 0
|
4月前
|
设计模式 监控 Java
并发设计模式实战系列(1):半同步/半异步模式
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第一章半同步/半异步(Half-Sync/Half-Async)模式,废话不多说直接开始~
90 0
|
4月前
|
设计模式 运维 监控
并发设计模式实战系列(4):线程池
需要建立持续的性能剖析(Profiling)和调优机制。通过以上十二个维度的系统化扩展,构建了一个从。设置合理队列容量/拒绝策略。动态扩容/优化任务处理速度。检查线程栈定位热点代码。调整最大用户进程数限制。CPU占用率100%
264 0
|
4月前
|
设计模式 消息中间件 监控
并发设计模式实战系列(3):工作队列
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第三章,废话不多说直接开始~
71 0
|
2月前
|
设计模式 C++
【实战指南】设计模式 - 工厂模式
工厂模式是一种面向对象设计模式,通过定义“工厂”来创建具体产品实例。它包含简单工厂、工厂方法和抽象工厂三种形式,分别适用于不同复杂度的场景。简单工厂便于理解但扩展性差;工厂方法符合开闭原则,适合单一类型产品创建;抽象工厂支持多类型产品创建,但不便于新增产品种类。三者各有优缺点,适用于不同设计需求。
93 21
|
4月前
|
设计模式 监控 Java
并发设计模式实战系列(6):读写锁
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第六章,废话不多说直接开始~
62 0
|
4月前
|
设计模式 Java 数据库连接
【设计模式】【创建型模式】工厂方法模式(Factory Methods)
一、入门 什么是工厂方法模式? 工厂方法模式(Factory Method Pattern)是一种创建型设计模式,它定义了一个用于创建对象的接口,但由子类决定实例化哪个类。工厂方法模式使类的实例化延迟
110 16
|
4月前
|
设计模式 安全 Java
并发设计模式实战系列(12):不变模式(Immutable Object)
🌟 大家好,我是摘星!🌟今天为大家带来的是并发设计模式实战系列,第十二章,废话不多说直接开始~
80 0
|
4月前
|
设计模式 算法 Java
设计模式觉醒系列(04)策略模式|简单工厂模式的升级版
本文介绍了简单工厂模式与策略模式的概念及其融合实践。简单工厂模式用于对象创建,通过隐藏实现细节简化代码;策略模式关注行为封装与切换,支持动态替换算法,增强灵活性。两者结合形成“策略工厂”,既简化对象创建又保持低耦合。文章通过支付案例演示了模式的应用,并强调实际开发中应根据需求选择合适的设计模式,避免生搬硬套。最后推荐了JVM调优、并发编程等技术专题,助力开发者提升技能。