编辑
🌟 大家好,我是摘星! 🌟
今天为大家带来的是并发设计模式实战系列,第十九章监视器(Monitor),废话不多说直接开始~
目录
一、核心原理深度拆解
1. 监视器三要素模型
┌──────────────────────┐ │ Monitor Object │ │ ┌─────────────────┐ │ │ │ Shared Data │ │ │ └────────┬─────────┘ │ │ │ │ │ ┌────────▼─────────┐ │ │ │ Sync Methods │ │ │ │ (Entry Queue) │ │ │ └────────┬─────────┘ │ │ │ │ │ ┌────────▼─────────┐ │ │ │ Wait Conditions │ │ │ │ (Condition Queue)│ │ │ └─────────────────┘ │ └──────────────────────┘
- 共享数据:被保护的临界资源(如计数器、连接池)
- 同步方法:互斥访问入口(Java的
synchronized
方法/块) - 条件变量:线程协作机制(
Object.wait()
/notify()
)
2. 线程调度机制
- Entry Set:竞争锁的线程队列(JVM管理)
- Wait Set:调用
wait()
的线程等待区 - 优先级控制:非公平锁(默认)vs 公平锁(按入队顺序)
二、生活化类比:银行柜台服务系统
监视器组件 |
银行类比 |
运行机制 |
共享数据 |
柜台现金 |
所有柜员共享同一保险箱 |
同步方法 |
柜台窗口 |
每次仅允许一个柜员操作现金 |
条件变量 |
客户等待区 |
现金不足时柜员进入等待状态 |
Entry Set |
排队叫号机 |
客户按顺序获取服务资格 |
Wait Set |
VIP休息室 |
特殊需求客户暂时离开主队列 |
- 异常处理:柜员突发离职(线程中断)→ 系统自动唤醒下个柜员
三、Java代码实现(生产级Demo)
1. 完整可运行代码
import java.util.LinkedList; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class MonitorPatternDemo { // 共享资源:有限容量队列 private final LinkedList<String> messageQueue = new LinkedList<>(); private final int MAX_CAPACITY = 10; // 显式锁(比synchronized更灵活) private final ReentrantLock lock = new ReentrantLock(true); // 公平锁 private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); // 监视器方法:生产消息 public void produce(String message) throws InterruptedException { lock.lock(); try { while (messageQueue.size() == MAX_CAPACITY) { System.out.println("[Producer] 队列已满,等待消费..."); notFull.await(); // 释放锁并进入等待 } messageQueue.addLast(message); System.out.println("[Producer] 添加消息: " + message + " | 队列大小: " + messageQueue.size()); notEmpty.signal(); // 唤醒等待的消费者 } finally { lock.unlock(); } } // 监视器方法:消费消息 public String consume() throws InterruptedException { lock.lock(); try { while (messageQueue.isEmpty()) { System.out.println("[Consumer] 队列为空,等待生产..."); notEmpty.await(); // 释放锁并进入等待 } String message = messageQueue.removeFirst(); System.out.println("[Consumer] 处理消息: " + message + " | 剩余: " + messageQueue.size()); notFull.signal(); // 唤醒等待的生产者 return message; } finally { lock.unlock(); } } // 监控线程 public void startMonitorThread() { new Thread(() -> { while (true) { try { lock.lock(); try { System.out.println("[Monitor] === 当前状态 ==="); System.out.println("队列大小: " + messageQueue.size()); System.out.println("等待生产者: " + lock.getWaitQueueLength(notFull)); System.out.println("等待消费者: " + lock.getWaitQueueLength(notEmpty)); System.out.println("=== === === === ==="); } finally { lock.unlock(); } TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } }, "Monitor-Thread").start(); } public static void main(String[] args) { MonitorPatternDemo monitor = new MonitorPatternDemo(); monitor.startMonitorThread(); // 模拟生产者 for (int i = 0; i < 3; i++) { new Thread(() -> { try { while (!Thread.currentThread().isInterrupted()) { monitor.produce("Msg-" + System.currentTimeMillis()); TimeUnit.MILLISECONDS.sleep(500); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, "Producer-" + i).start(); } // 模拟消费者 for (int i = 0; i < 2; i++) { new Thread(() -> { try { while (!Thread.currentThread().isInterrupted()) { monitor.consume(); TimeUnit.SECONDS.sleep(1); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, "Consumer-" + i).start(); } } }
2. 关键配置说明
// 锁类型选择 new ReentrantLock(true); // 公平锁(按入队顺序获取锁) new ReentrantLock(); // 非公平锁(默认,吞吐量更高) // 条件变量分离 private final Condition notFull = lock.newCondition(); // 队列未满条件 private final Condition notEmpty = lock.newCondition(); // 队列非空条件 // 监控接口 lock.getWaitQueueLength(condition); // 获取等待特定条件的线程数
四、横向对比表格
1. 线程同步机制对比
机制 |
互斥能力 |
条件等待 |
可中断 |
公平性 |
适用场景 |
synchronized |
有 |
单一条件 |
不可 |
非公平 |
简单同步场景 |
ReentrantLock |
有 |
多条件 |
可中断 |
可配置 |
复杂同步逻辑 |
Semaphore |
无 |
无 |
可中断 |
可配置 |
资源池控制 |
ReadWriteLock |
有 |
多条件 |
可中断 |
可配置 |
读多写少场景 |
2. 条件变量实现对比
实现方式 |
通知精度 |
批量唤醒 |
超时支持 |
使用复杂度 |
Object.wait() |
全部 |
是 |
有 |
低 |
Condition.await() |
指定条件 |
否 |
有 |
中 |
BlockingQueue |
内置 |
自动 |
有 |
低 |
五、高级优化技巧
1. 锁分段优化
// 降低锁粒度(如ConcurrentHashMap的分段锁思想) private final ReentrantLock[] segmentLocks = new ReentrantLock[16]; { for (int i = 0; i < segmentLocks.length; i++) { segmentLocks[i] = new ReentrantLock(); } } public void put(String key, String value) { int segment = Math.abs(key.hashCode() % segmentLocks.length); segmentLocks[segment].lock(); try { // 操作对应分段的共享数据 } finally { segmentLocks[segment].unlock(); } }
2. 条件变量优化
// 使用带超时的等待(避免死锁) if (!notEmpty.await(5, TimeUnit.SECONDS)) { throw new TimeoutException("等待消息超时"); } // 使用signalAll()谨慎(可能引起"惊群效应") notEmpty.signal(); // 优先使用精准通知
3. 监控指标扩展
// 添加JMX监控(示例) public class MonitorMetrics implements MonitorMetricsMBean { private final ReentrantLock lock; public int getWaitThreadCount() { return lock.getQueueLength(); // 获取等待锁的线程数 } public int getActiveThreadCount() { return lock.getHoldCount(); // 获取锁重入次数 } } // 注册MBean ManagementFactory.getPlatformMBeanServer().registerMBean( new MonitorMetrics(lock), new ObjectName("com.example:type=MonitorMetrics") );
4. 自适应锁优化
// 根据竞争情况动态切换锁类型 public class AdaptiveLock { private volatile boolean highContention = false; private final ReentrantLock fairLock = new ReentrantLock(true); private final ReentrantLock unfairLock = new ReentrantLock(); public void lock() { if (highContention) { fairLock.lock(); // 高竞争时用公平锁 } else { unfairLock.lock(); // 默认非公平锁 } } // 监控线程竞争情况 public void monitor() { new Thread(() -> { while (true) { int waiters = fairLock.getQueueLength() + unfairLock.getQueueLength(); highContention = waiters > 5; // 阈值可配置 try { Thread.sleep(1000); } catch (InterruptedException e) { break; } } }).start(); } }
5. 无锁化改造方案
// 对读多写少场景使用原子变量 private final AtomicReference<Map<String, String>> cache = new AtomicReference<>(new ConcurrentHashMap<>()); public void updateCache(String key, String value) { while (true) { Map<String, String> oldMap = cache.get(); Map<String, String> newMap = new ConcurrentHashMap<>(oldMap); newMap.put(key, value); if (cache.compareAndSet(oldMap, newMap)) break; } }
六、异常处理与健壮性设计
1. 死锁检测与恢复
// 使用ThreadMXBean检测死锁 ThreadMXBean bean = ManagementFactory.getThreadMXBean(); long[] threadIds = bean.findDeadlockedThreads(); if (threadIds != null) { ThreadInfo[] infos = bean.getThreadInfo(threadIds); for (ThreadInfo info : infos) { System.err.println("死锁线程: " + info.getThreadName()); // 强制中断受害线程(生产环境需谨慎) Thread thread = findThreadById(info.getThreadId()); if (thread != null) thread.interrupt(); } }
2. 线程泄漏防护
// 封装安全的线程池 public class SafeExecutor extends ThreadPoolExecutor { private final ConcurrentMap<Worker, Boolean> workers = new ConcurrentHashMap<>(); protected void beforeExecute(Thread t, Runnable r) { workers.put((Worker) t, true); } protected void afterExecute(Runnable r, Throwable t) { workers.remove(Thread.currentThread()); } public List<Thread> getStuckThreads(long timeoutMs) { return workers.keySet().stream() .filter(w -> w.getActiveTime() > timeoutMs) .collect(Collectors.toList()); } }
七、分布式环境扩展
1. 跨JVM的Monitor实现
// 基于Redis的分布式锁 public class DistributedMonitor { private final Jedis jedis; private final String lockKey; public boolean tryLock(long timeoutMs) { String result = jedis.set(lockKey, "locked", "NX", "PX", timeoutMs); return "OK".equals(result); } public void unlock() { jedis.del(lockKey); } // 使用Redisson的看门狗机制实现续期 public void startWatchdog() { new Thread(() -> { while (locked) { jedis.expire(lockKey, 30); try { Thread.sleep(10000); } catch (InterruptedException e) { break; } } }).start(); } }
2. 多节点协同方案
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Node 1 │ │ Node 2 │ │ Node 3 │ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │ │ Monitor │───ZK─▶│ Monitor │───ZK─▶│ Monitor │ │ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ └─────────────┘ └─────────────┘ └─────────────┘
- ZooKeeper协调:通过临时节点实现Leader选举
- 状态同步:使用Watcher机制通知条件变更
八、现代Java特性整合
1. 虚拟线程适配
// JDK21+ 虚拟线程优化 ExecutorService vThreadPool = Executors.newVirtualThreadPerTaskExecutor(); public void virtualThreadMonitor() { try (var executor = vThreadPool) { executor.submit(() -> { synchronized(this) { // 兼容传统synchronized while (conditionNotMet()) { wait(); // 虚拟线程挂起时不占用OS线程 } // 处理共享数据 } }); } }
2. Project Loom纤程支持
// 使用Fiber替代线程(实验性) new Fiber<Void>(() -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); synchronized(lock) { // 百万级纤程共享Monitor // 业务逻辑 } }).start();
九、监控指标体系建设
1. Prometheus监控集成
// 暴露锁竞争指标 Gauge contentionGauge = Gauge.build() .name("monitor_lock_contention") .help("Current lock waiters count") .register(); public void recordMetrics() { new ScheduledThreadPoolExecutor(1) .scheduleAtFixedRate(() -> { contentionGauge.set(lock.getQueueLength()); }, 0, 5, TimeUnit.SECONDS); }
2. 关键监控看板
指标名称 |
计算方式 |
健康阈值 |
锁等待时间 |
历史平均等待时间 |
< 50ms |
条件变量等待数 |
notEmpty.getWaitQueueLength |
< CPU核心数×2 |
死锁检测次数 |
ThreadMXBean统计 |
= 0 |
线程活跃度 |
活跃线程数/最大线程数 |
60%~80% |
十、经典场景最佳实践
1. 数据库连接池实现
public class ConnectionPool { private final LinkedList<Connection> pool = new LinkedList<>(); private final int maxSize; private final Object monitor = new Object(); public Connection borrow() throws InterruptedException { synchronized (monitor) { while (pool.isEmpty()) { monitor.wait(); } return pool.removeFirst(); } } public void release(Connection conn) { synchronized (monitor) { pool.addLast(conn); monitor.notify(); } } }
2. 生产者-消费者增强版
// 支持优先级和批量处理 public class EnhancedBlockingQueue { private final PriorityBlockingQueue<Item> queue; private final Semaphore available; public void putBatch(List<Item> items) { queue.addAll(items); available.release(items.size()); } public Item take() throws InterruptedException { available.acquire(); return queue.poll(); } }