编辑
🌟 大家好,我是摘星! 🌟
今天为大家带来的是并发设计模式实战系列,第六章读写锁模式,废话不多说直接开始~
目录
一、核心原理深度拆解
1. 读写锁三维模型
编辑
2. 关键实现原理
- 线程饥饿预防:公平模式下,等待时间最长的线程优先获取锁
- 锁状态追踪:
int readCount; // 当前持有读锁的线程数 int writeCount; // 写锁持有标记(0/1) Thread writerThread; // 写锁持有者
二、生活化类比:图书馆管理系统
系统组件 |
现实类比 |
核心规则 |
读锁 |
读者借阅 |
多人可同时阅读,但禁止修改书籍 |
写锁 |
图书管理员维护 |
维护时禁止所有借阅和修改 |
锁降级 |
管理员先停止维护 |
转为普通读者身份继续阅读 |
- 异常场景:如果读者正在阅读时管理员直接修改书籍(无锁保护),会导致数据不一致
三、Java代码实现(生产级Demo)
1. 完整可运行代码
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.Lock; public class ReadWriteLockDemo { private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true); // 公平模式 private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); private String sharedData = "原始数据"; // 读操作 public String readData() { readLock.lock(); try { System.out.println(Thread.currentThread().getName() + " 读取数据"); Thread.sleep(500); // 模拟读取耗时 return sharedData; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } finally { readLock.unlock(); } } // 写操作 public void writeData(String newData) { writeLock.lock(); try { System.out.println(Thread.currentThread().getName() + " 开始写入"); Thread.sleep(1000); // 模拟写入耗时 sharedData = newData; System.out.println("更新后数据: " + sharedData); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { writeLock.unlock(); } } // 锁降级演示 public void lockDowngrade() { writeLock.lock(); try { System.out.println("== 执行锁降级 =="); sharedData = "临时数据"; // 1. 先修改数据 readLock.lock(); // 2. 获取读锁(降级开始) System.out.println("降级中当前数据: " + sharedData); } finally { writeLock.unlock(); // 3. 释放写锁(降级完成) } try { // 仍持有读锁,可继续读取 System.out.println("降级后读取: " + sharedData); } finally { readLock.unlock(); } } public static void main(String[] args) throws InterruptedException { ReadWriteLockDemo demo = new ReadWriteLockDemo(); // 模拟并发读写 new Thread(() -> demo.writeData("新数据1"), "写线程1").start(); new Thread(() -> System.out.println(demo.readData()), "读线程1").start(); new Thread(() -> demo.writeData("新数据2"), "写线程2").start(); new Thread(() -> System.out.println(demo.readData()), "读线程2").start(); // 锁降级演示 Thread.sleep(2000); new Thread(demo::lockDowngrade, "降级线程").start(); } }
2. 关键配置说明
// 创建公平锁(防止线程饥饿) new ReentrantReadWriteLock(true); // 非公平锁(更高吞吐量) new ReentrantReadWriteLock(false); // 锁降级必须按此顺序: // 1. 获取写锁 → 2. 获取读锁 → 3. 释放写锁 → 4. 释放读锁
四、横向对比表格
1. 不同锁机制对比
锁类型 |
并发度 |
适用场景 |
死锁风险 |
synchronized |
低 |
简单同步场景 |
有 |
ReentrantLock |
中 |
需要条件变量的场景 |
有 |
ReadWriteLock |
高 |
读多写少场景 |
有 |
StampedLock |
极高 |
乐观读控制 |
无 |
2. 读写锁实现对比
实现类 |
特性 |
适用场景 |
ReentrantReadWriteLock |
支持公平/非公平模式、可重入 |
通用场景 |
StampedLock |
支持乐观读、锁降级优化 |
超高并发读取 |
CopyOnWriteArrayList |
写时复制 |
读远多于写的集合场景 |
五、高级优化技巧
1. 锁升级陷阱规避
// 错误示例(会导致死锁): readLock.lock(); try { writeLock.lock(); // 阻塞等待所有读锁释放 } finally { readLock.unlock(); } // 正确做法:直接获取写锁 writeLock.lock();
2. 统计监控实现
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); // 获取等待线程数 int readerQueueLength = rwLock.getQueueLength(); // 判断是否有写锁等待 boolean hasWriterWaiting = rwLock.hasQueuedThreads();
3. StampedLock优化方案
StampedLock stampLock = new StampedLock(); // 乐观读(不阻塞写操作) long stamp = stampLock.tryOptimisticRead(); if (!stampLock.validate(stamp)) { stamp = stampLock.readLock(); // 升级为悲观读 try { /* 读取数据 */ } finally { stampLock.unlockRead(stamp); } }
六、高级优化技巧扩展
1. 异步层性能提升(增强版)
// 使用Epoll替代Selector(Linux系统优化) SelectorProvider provider = SelectorProvider.provider(); Selector epollSelector = provider.openSelector(); // 配合内存映射提升IO效率 FileChannel channel = FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); MappedByteBuffer buf = channel.map( FileChannel.MapMode.READ_WRITE, 0, 1024);
2. 同步层动态扩缩容(智能版)
// 基于CPU使用率自动调整线程池 ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1); monitor.scheduleAtFixedRate(() -> { double load = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(); int newSize = load > 2.0 ? pool.getCorePoolSize() * 2 : pool.getMaximumPoolSize() / 2; pool.setCorePoolSize(Math.max(4, Math.min(32, newSize))); }, 5, 5, TimeUnit.SECONDS);
3. 监控关键指标(企业级)
// 集成Micrometer监控 Metrics.addRegistry(new SimpleMeterRegistry()); Gauge.builder("task.queue.size", taskQueue::size) .tag("module", "async") .register(Metrics.globalRegistry); Counter.builder("task.rejected") .tag("reason", "queue_full") .register(Metrics.globalRegistry);
七、生产环境最佳实践
1. 熔断保护机制
// 使用Resilience4j实现熔断 CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("async-circuit"); Supplier<String> decoratedSupplier = CircuitBreaker .decorateSupplier(circuitBreaker, () -> { if (taskQueue.size() > 800) { throw new IllegalStateException("Queue overload"); } return processTask(); });
2. 分布式扩展方案
编辑
- 技术选型:
- Redis Stream(轻量级)
- Kafka(高吞吐)
- Pulsar(多协议支持)
3. 混沌工程测试用例
// 使用ChaosBlade注入故障 @ChaosTest public void testQueueOverflow() { // 模拟队列积压 for (int i = 0; i < 2000; i++) { taskQueue.offer(() -> {}); } assertThat(taskQueue.size()).isGreaterThan(1000); assertThat(pool.getActiveCount()).isEqualTo(pool.getMaximumPoolSize()); }
八、性能压测数据(新增)
1. 不同队列实现对比
队列类型 |
吞吐量(ops/s) |
99%延迟(ms) |
CPU占用 |
LinkedBlockingQueue |
125,000 |
12 |
85% |
ArrayBlockingQueue |
138,000 |
9 |
78% |
ConcurrentLinkedQueue |
152,000 |
5 |
92% |
Disruptor |
210,000 |
2 |
65% |
2. 线程池配置优化
// 最优配置公式(适用于IO密集型) int optimalThreads = Runtime.getRuntime().availableProcessors() * (1 + (平均等待时间 / 平均处理时间)); // 示例:4核CPU,等待时间50ms,处理时间20ms 4 * (1 + (50/20)) ≈ 14 threads
九、现代替代方案(新增)
1. 协程方案(Kotlin)
// 使用协程替代线程池 val scope = CoroutineScope(Dispatchers.IO.limitedParallelism(16)) scope.launch { val result = withContext(Dispatchers.Default) { processTask() // 挂起函数 } sendResult(result) }
2. 虚拟线程(Java19+)
// 使用虚拟线程处理阻塞任务 ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); Future<String> future = executor.submit(() -> { Thread.sleep(1000); // 不占用OS线程 return "Result"; });
3. Reactive模式
// 使用Project Reactor Flux.fromIterable(taskQueue) .parallel() .runOn(Schedulers.boundedElastic()) .flatMap(this::processTask) .subscribe(result -> { // 处理结果 });