并发设计模式实战系列(19):监视器(Monitor)

简介: 🌟 大家好,我是摘星!🌟今天为大家带来的是并发设计模式实战系列,第十九章,废话不多说直接开始~

 

image.gif 编辑

🌟 大家好,我是摘星! 🌟

今天为大家带来的是并发设计模式实战系列,第十九章监视器(Monitor),废话不多说直接开始~

目录

一、核心原理深度拆解

1. 监视器三要素模型

2. 线程调度机制

二、生活化类比:银行柜台服务系统

三、Java代码实现(生产级Demo)

1. 完整可运行代码

2. 关键配置说明

四、横向对比表格

1. 线程同步机制对比

2. 条件变量实现对比

五、高级优化技巧

1. 锁分段优化

2. 条件变量优化

3. 监控指标扩展

4. 自适应锁优化

5. 无锁化改造方案

六、异常处理与健壮性设计

1. 死锁检测与恢复

2. 线程泄漏防护

七、分布式环境扩展

1. 跨JVM的Monitor实现

2. 多节点协同方案

八、现代Java特性整合

1. 虚拟线程适配

2. Project Loom纤程支持

九、监控指标体系建设

1. Prometheus监控集成

2. 关键监控看板

十、经典场景最佳实践

1. 数据库连接池实现

2. 生产者-消费者增强版


一、核心原理深度拆解

1. 监视器三要素模型

┌──────────────────────┐
│      Monitor Object   │
│  ┌─────────────────┐  │
│  │  Shared Data     │  │
│  └────────┬─────────┘  │
│           │            │
│  ┌────────▼─────────┐  │
│  │  Sync Methods    │  │
│  │ (Entry Queue)    │  │
│  └────────┬─────────┘  │
│           │            │
│  ┌────────▼─────────┐  │
│  │  Wait Conditions │  │
│  │ (Condition Queue)│  │
│  └─────────────────┘  │
└──────────────────────┘

image.gif

  • 共享数据:被保护的临界资源(如计数器、连接池)
  • 同步方法:互斥访问入口(Java的synchronized方法/块)
  • 条件变量:线程协作机制(Object.wait()/notify()

2. 线程调度机制

  • Entry Set:竞争锁的线程队列(JVM管理)
  • Wait Set:调用wait()的线程等待区
  • 优先级控制:非公平锁(默认)vs 公平锁(按入队顺序)

二、生活化类比:银行柜台服务系统

监视器组件

银行类比

运行机制

共享数据

柜台现金

所有柜员共享同一保险箱

同步方法

柜台窗口

每次仅允许一个柜员操作现金

条件变量

客户等待区

现金不足时柜员进入等待状态

Entry Set

排队叫号机

客户按顺序获取服务资格

Wait Set

VIP休息室

特殊需求客户暂时离开主队列

  • 异常处理:柜员突发离职(线程中断)→ 系统自动唤醒下个柜员

三、Java代码实现(生产级Demo)

1. 完整可运行代码

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MonitorPatternDemo {
    // 共享资源:有限容量队列
    private final LinkedList<String> messageQueue = new LinkedList<>();
    private final int MAX_CAPACITY = 10;
    // 显式锁(比synchronized更灵活)
    private final ReentrantLock lock = new ReentrantLock(true); // 公平锁
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    // 监视器方法:生产消息
    public void produce(String message) throws InterruptedException {
        lock.lock();
        try {
            while (messageQueue.size() == MAX_CAPACITY) {
                System.out.println("[Producer] 队列已满,等待消费...");
                notFull.await(); // 释放锁并进入等待
            }
            messageQueue.addLast(message);
            System.out.println("[Producer] 添加消息: " + message + " | 队列大小: " + messageQueue.size());
            notEmpty.signal(); // 唤醒等待的消费者
        } finally {
            lock.unlock();
        }
    }
    // 监视器方法:消费消息
    public String consume() throws InterruptedException {
        lock.lock();
        try {
            while (messageQueue.isEmpty()) {
                System.out.println("[Consumer] 队列为空,等待生产...");
                notEmpty.await(); // 释放锁并进入等待
            }
            String message = messageQueue.removeFirst();
            System.out.println("[Consumer] 处理消息: " + message + " | 剩余: " + messageQueue.size());
            notFull.signal(); // 唤醒等待的生产者
            return message;
        } finally {
            lock.unlock();
        }
    }
    // 监控线程
    public void startMonitorThread() {
        new Thread(() -> {
            while (true) {
                try {
                    lock.lock();
                    try {
                        System.out.println("[Monitor] === 当前状态 ===");
                        System.out.println("队列大小: " + messageQueue.size());
                        System.out.println("等待生产者: " + lock.getWaitQueueLength(notFull));
                        System.out.println("等待消费者: " + lock.getWaitQueueLength(notEmpty));
                        System.out.println("=== === === === ===");
                    } finally {
                        lock.unlock();
                    }
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }, "Monitor-Thread").start();
    }
    public static void main(String[] args) {
        MonitorPatternDemo monitor = new MonitorPatternDemo();
        monitor.startMonitorThread();
        // 模拟生产者
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        monitor.produce("Msg-" + System.currentTimeMillis());
                        TimeUnit.MILLISECONDS.sleep(500);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Producer-" + i).start();
        }
        // 模拟消费者
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        monitor.consume();
                        TimeUnit.SECONDS.sleep(1);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Consumer-" + i).start();
        }
    }
}

image.gif

2. 关键配置说明

// 锁类型选择
new ReentrantLock(true); // 公平锁(按入队顺序获取锁)
new ReentrantLock();     // 非公平锁(默认,吞吐量更高)
// 条件变量分离
private final Condition notFull = lock.newCondition();  // 队列未满条件
private final Condition notEmpty = lock.newCondition(); // 队列非空条件
// 监控接口
lock.getWaitQueueLength(condition); // 获取等待特定条件的线程数

image.gif


四、横向对比表格

1. 线程同步机制对比

机制

互斥能力

条件等待

可中断

公平性

适用场景

synchronized

单一条件

不可

非公平

简单同步场景

ReentrantLock

多条件

可中断

可配置

复杂同步逻辑

Semaphore

可中断

可配置

资源池控制

ReadWriteLock

多条件

可中断

可配置

读多写少场景

2. 条件变量实现对比

实现方式

通知精度

批量唤醒

超时支持

使用复杂度

Object.wait()

全部

Condition.await()

指定条件

BlockingQueue

内置

自动


五、高级优化技巧

1. 锁分段优化

// 降低锁粒度(如ConcurrentHashMap的分段锁思想)
private final ReentrantLock[] segmentLocks = new ReentrantLock[16];
{
    for (int i = 0; i < segmentLocks.length; i++) {
        segmentLocks[i] = new ReentrantLock();
    }
}
public void put(String key, String value) {
    int segment = Math.abs(key.hashCode() % segmentLocks.length);
    segmentLocks[segment].lock();
    try {
        // 操作对应分段的共享数据
    } finally {
        segmentLocks[segment].unlock();
    }
}

image.gif

2. 条件变量优化

// 使用带超时的等待(避免死锁)
if (!notEmpty.await(5, TimeUnit.SECONDS)) {
    throw new TimeoutException("等待消息超时");
}
// 使用signalAll()谨慎(可能引起"惊群效应")
notEmpty.signal(); // 优先使用精准通知

image.gif

3. 监控指标扩展

// 添加JMX监控(示例)
public class MonitorMetrics implements MonitorMetricsMBean {
    private final ReentrantLock lock;
    public int getWaitThreadCount() {
        return lock.getQueueLength(); // 获取等待锁的线程数
    }
    public int getActiveThreadCount() {
        return lock.getHoldCount(); // 获取锁重入次数
    }
}
// 注册MBean
ManagementFactory.getPlatformMBeanServer().registerMBean(
    new MonitorMetrics(lock), 
    new ObjectName("com.example:type=MonitorMetrics")
);

image.gif

4. 自适应锁优化

// 根据竞争情况动态切换锁类型
public class AdaptiveLock {
    private volatile boolean highContention = false;
    private final ReentrantLock fairLock = new ReentrantLock(true);
    private final ReentrantLock unfairLock = new ReentrantLock();
    public void lock() {
        if (highContention) {
            fairLock.lock(); // 高竞争时用公平锁
        } else {
            unfairLock.lock(); // 默认非公平锁
        }
    }
    // 监控线程竞争情况
    public void monitor() {
        new Thread(() -> {
            while (true) {
                int waiters = fairLock.getQueueLength() + unfairLock.getQueueLength();
                highContention = waiters > 5; // 阈值可配置
                try { Thread.sleep(1000); } catch (InterruptedException e) { break; }
            }
        }).start();
    }
}

image.gif

5. 无锁化改造方案

// 对读多写少场景使用原子变量
private final AtomicReference<Map<String, String>> cache = 
    new AtomicReference<>(new ConcurrentHashMap<>());
public void updateCache(String key, String value) {
    while (true) {
        Map<String, String> oldMap = cache.get();
        Map<String, String> newMap = new ConcurrentHashMap<>(oldMap);
        newMap.put(key, value);
        if (cache.compareAndSet(oldMap, newMap)) break;
    }
}

image.gif


六、异常处理与健壮性设计

1. 死锁检测与恢复

// 使用ThreadMXBean检测死锁
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
long[] threadIds = bean.findDeadlockedThreads();
if (threadIds != null) {
    ThreadInfo[] infos = bean.getThreadInfo(threadIds);
    for (ThreadInfo info : infos) {
        System.err.println("死锁线程: " + info.getThreadName());
        // 强制中断受害线程(生产环境需谨慎)
        Thread thread = findThreadById(info.getThreadId());
        if (thread != null) thread.interrupt();
    }
}

image.gif

2. 线程泄漏防护

// 封装安全的线程池
public class SafeExecutor extends ThreadPoolExecutor {
    private final ConcurrentMap<Worker, Boolean> workers = new ConcurrentHashMap<>();
    protected void beforeExecute(Thread t, Runnable r) {
        workers.put((Worker) t, true);
    }
    protected void afterExecute(Runnable r, Throwable t) {
        workers.remove(Thread.currentThread());
    }
    public List<Thread> getStuckThreads(long timeoutMs) {
        return workers.keySet().stream()
            .filter(w -> w.getActiveTime() > timeoutMs)
            .collect(Collectors.toList());
    }
}

image.gif


七、分布式环境扩展

1. 跨JVM的Monitor实现

// 基于Redis的分布式锁
public class DistributedMonitor {
    private final Jedis jedis;
    private final String lockKey;
    public boolean tryLock(long timeoutMs) {
        String result = jedis.set(lockKey, "locked", 
            "NX", "PX", timeoutMs);
        return "OK".equals(result);
    }
    public void unlock() {
        jedis.del(lockKey);
    }
    // 使用Redisson的看门狗机制实现续期
    public void startWatchdog() {
        new Thread(() -> {
            while (locked) {
                jedis.expire(lockKey, 30);
                try { Thread.sleep(10000); } 
                catch (InterruptedException e) { break; }
            }
        }).start();
    }
}

image.gif

2. 多节点协同方案

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Node 1    │    │   Node 2    │    │   Node 3    │
│ ┌─────────┐ │    │ ┌─────────┐ │    │ ┌─────────┐ │
│ │ Monitor │───ZK─▶│ Monitor │───ZK─▶│ Monitor │ │
│ └─────────┘ │    │ └─────────┘ │    │ └─────────┘ │
└─────────────┘    └─────────────┘    └─────────────┘

image.gif

  • ZooKeeper协调:通过临时节点实现Leader选举
  • 状态同步:使用Watcher机制通知条件变更

八、现代Java特性整合

1. 虚拟线程适配

// JDK21+ 虚拟线程优化
ExecutorService vThreadPool = Executors.newVirtualThreadPerTaskExecutor();
public void virtualThreadMonitor() {
    try (var executor = vThreadPool) {
        executor.submit(() -> {
            synchronized(this) { // 兼容传统synchronized
                while (conditionNotMet()) {
                    wait(); // 虚拟线程挂起时不占用OS线程
                }
                // 处理共享数据
            }
        });
    }
}

image.gif

2. Project Loom纤程支持

// 使用Fiber替代线程(实验性)
new Fiber<Void>(() -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    synchronized(lock) { // 百万级纤程共享Monitor
        // 业务逻辑
    }
}).start();

image.gif


九、监控指标体系建设

1. Prometheus监控集成

// 暴露锁竞争指标
Gauge contentionGauge = Gauge.build()
    .name("monitor_lock_contention")
    .help("Current lock waiters count")
    .register();
public void recordMetrics() {
    new ScheduledThreadPoolExecutor(1)
        .scheduleAtFixedRate(() -> {
            contentionGauge.set(lock.getQueueLength());
        }, 0, 5, TimeUnit.SECONDS);
}

image.gif

2. 关键监控看板

指标名称

计算方式

健康阈值

锁等待时间

历史平均等待时间

< 50ms

条件变量等待数

notEmpty.getWaitQueueLength

< CPU核心数×2

死锁检测次数

ThreadMXBean统计

= 0

线程活跃度

活跃线程数/最大线程数

60%~80%


十、经典场景最佳实践

1. 数据库连接池实现

public class ConnectionPool {
    private final LinkedList<Connection> pool = new LinkedList<>();
    private final int maxSize;
    private final Object monitor = new Object();
    public Connection borrow() throws InterruptedException {
        synchronized (monitor) {
            while (pool.isEmpty()) {
                monitor.wait();
            }
            return pool.removeFirst();
        }
    }
    public void release(Connection conn) {
        synchronized (monitor) {
            pool.addLast(conn);
            monitor.notify();
        }
    }
}

image.gif

2. 生产者-消费者增强版

// 支持优先级和批量处理
public class EnhancedBlockingQueue {
    private final PriorityBlockingQueue<Item> queue;
    private final Semaphore available;
    public void putBatch(List<Item> items) {
        queue.addAll(items);
        available.release(items.size());
    }
    public Item take() throws InterruptedException {
        available.acquire();
        return queue.poll();
    }
}

image.gif

目录
相关文章
|
30天前
|
设计模式 消息中间件 监控
并发设计模式实战系列(5):生产者/消费者
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第五章,废话不多说直接开始~
77 1
|
30天前
|
设计模式 负载均衡 监控
并发设计模式实战系列(2):领导者/追随者模式
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第二章领导者/追随者(Leader/Followers)模式,废话不多说直接开始~
50 0
|
30天前
|
设计模式 监控 Java
并发设计模式实战系列(1):半同步/半异步模式
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第一章半同步/半异步(Half-Sync/Half-Async)模式,废话不多说直接开始~
37 0
|
30天前
|
设计模式 运维 监控
并发设计模式实战系列(4):线程池
需要建立持续的性能剖析(Profiling)和调优机制。通过以上十二个维度的系统化扩展,构建了一个从。设置合理队列容量/拒绝策略。动态扩容/优化任务处理速度。检查线程栈定位热点代码。调整最大用户进程数限制。CPU占用率100%
145 0
|
30天前
|
设计模式 存储 安全
并发设计模式实战系列(7):Thread Local Storage (TLS)
🌟 大家好,我是摘星! 🌟今天为大家带来的是并发设计模式实战系列,第七章Thread Local Storage (TLS),废话不多说直接开始~
66 0
|
30天前
|
设计模式 消息中间件 监控
并发设计模式实战系列(3):工作队列
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第三章,废话不多说直接开始~
33 0
|
30天前
|
设计模式 监控 Java
并发设计模式实战系列(6):读写锁
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第六章,废话不多说直接开始~
32 0
|
1月前
|
设计模式 Java 数据库连接
【设计模式】【创建型模式】工厂方法模式(Factory Methods)
一、入门 什么是工厂方法模式? 工厂方法模式(Factory Method Pattern)是一种创建型设计模式,它定义了一个用于创建对象的接口,但由子类决定实例化哪个类。工厂方法模式使类的实例化延迟
79 16
|
30天前
|
设计模式 安全 Java
并发设计模式实战系列(12):不变模式(Immutable Object)
🌟 大家好,我是摘星!🌟今天为大家带来的是并发设计模式实战系列,第十二章,废话不多说直接开始~
32 0
|
20天前
|
设计模式 算法 Java
设计模式觉醒系列(04)策略模式|简单工厂模式的升级版
本文介绍了简单工厂模式与策略模式的概念及其融合实践。简单工厂模式用于对象创建,通过隐藏实现细节简化代码;策略模式关注行为封装与切换,支持动态替换算法,增强灵活性。两者结合形成“策略工厂”,既简化对象创建又保持低耦合。文章通过支付案例演示了模式的应用,并强调实际开发中应根据需求选择合适的设计模式,避免生搬硬套。最后推荐了JVM调优、并发编程等技术专题,助力开发者提升技能。

热门文章

最新文章