ReentrantReadWriteLock
其定义就是支持冲入的读写锁,本质上也就是基于 ReentrantLock 实现的
当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。
类似于数据库中的 select … from … lock in share mode
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
class DataContainer { private Object data; private ReentrantReadWriteLock rw = new ReentrantReadWriteLock(); private ReentrantReadWriteLock.ReadLock r = rw.readLock(); private ReentrantReadWriteLock.WriteLock w = rw.writeLock(); public Object read() { log.debug("获取读锁..."); r.lock(); try { log.debug("读取"); sleep(1); return data; } finally { log.debug("释放读锁..."); r.unlock(); } } public void write() { log.debug("获取写锁..."); w.lock(); try { log.debug("写入"); sleep(1); } finally { log.debug("释放写锁..."); w.unlock(); } } }
测试 读锁-读锁 可以并发
DataContainer dataContainer = new DataContainer(); new Thread(() -> { dataContainer.read(); }, "t1").start(); new Thread(() -> { dataContainer.read(); }, "t2").start();
输出结果,从这里可以看到 Thread-0 锁定期间,Thread-1 的读操作不受影响
14:05:14.341 c.DataContainer [t2] - 获取读锁... 14:05:14.341 c.DataContainer [t1] - 获取读锁... 14:05:14.345 c.DataContainer [t1] - 读取 14:05:14.345 c.DataContainer [t2] - 读取 14:05:15.365 c.DataContainer [t2] - 释放读锁... 14:05:15.386 c.DataContainer [t1] - 释放读锁...
测试 读锁-写锁 相互阻塞
DataContainer dataContainer = new DataContainer(); new Thread(() -> { dataContainer.read(); }, "t1").start(); Thread.sleep(100); new Thread(() -> { dataContainer.write(); }, "t2").start();
输出结果
14:04:21.838 c.DataContainer [t1] - 获取读锁... 14:04:21.838 c.DataContainer [t2] - 获取写锁... 14:04:21.841 c.DataContainer [t2] - 写入 14:04:22.843 c.DataContainer [t2] - 释放写锁... 14:04:22.843 c.DataContainer [t1] - 读取 14:04:23.843 c.DataContainer [t1] - 释放读锁...
写锁-写锁 也是相互阻塞的,这里就不测试了
注意事项
- 读锁不支持条件变量
ReentrantReadWriteLock 中的读锁不支持条件变量,主要是因为读锁在 ReentrantReadWriteLock 中是共享的,多个线程可以同时持有读锁来访问共享资源。条件变量通常用于在多线程环境下实现线程间的协调和通信,而读锁的共享特性可能导致条件变量的信号在多个线程之间产生歧义或不确定性。
- 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
r.lock(); try { // ... w.lock(); try { // ... } finally{ w.unlock(); } } finally{ r.unlock(); }
- 重入时降级支持:即持有写锁的情况下去获取读锁
// 下面以ReentrantReadWriteLock 的 CachedData 类来说明,这段代码主要是使用读写锁来实现对缓存数据的并发访问,以提高并发读取操作的性能。 class CachedData { Object data; // 是否有效,如果失效,需要重新计算 data volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { // 先加读锁,判断缓存是否失效,如果没有失效,那么可以直接返回即可。使用完了将读锁解开即可 rwl.readLock().lock(); if (!cacheValid) { // 如果失效了,释放读锁,然后获得写锁,重新对其进行计算 // 获取写锁前必须释放读锁 rwl.readLock().unlock(); rwl.writeLock().lock(); try { // 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新 /* 在上述代码中,两次检查 cacheValid 的作用是为了在获取写锁之前和获取写锁后再次确认缓存的有效性。让我来详细解释一下: 第一次检查 cacheValid 发生在首次获取写锁之前。这是为了避免出现竞态条件(race condition)的情况,即在当前线程释放读锁后,有可能其他线程已经获取了写锁并更新了缓存。如果没有进行第一次检查,当前线程获取写锁后可能会重复更新缓存,造成不必要的计算和数据更新。 第二次检查 cacheValid 发生在获取写锁之后。虽然在第一次检查时缓存无效,但在当前线程获取写锁之前,可能有其他线程已经更新了缓存并将 cacheValid 设置为有效。因此,在获取写锁后再次检查 cacheValid 可以避免重复更新缓存,确保只有一个线程更新缓存数据。 通过这样的双重检查机制,可以有效地避免多个线程同时更新缓存数据,确保在并发环境下对缓存的更新操作是正确且高效的。此外,结合读写锁的降级操作,可以使得在缓存有效时多个线程能够同时读取数据,从而提高系统的并发性能。 */ if (!cacheValid) { data = ... cacheValid = true; } /* 锁的降级,写锁释放开,但是我还想同时持有它的读锁,这是为了释放开的那个瞬间,其他线程的读取权限就ok了 加读锁的目的也是为了你在读的的时候不受其他的写的线程的干扰 */ // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存 rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); } } // 自己用完数据, 释放读锁 try { use(data); } finally { rwl.readLock().unlock(); } } }
应用之缓存
缓存更新策略
更新时,是先清缓存还是先更新数据库
先清缓存
读操作的速度是大于写操作的
先更新数据库
补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询
这种情况的出现几率非常小
读写锁实现一致性缓存
使用读写锁实现一个简单的按需加载缓存(核心:写操作 加 写锁;读操作 加 读锁)
class GenericCachedDao<T> { // HashMap 作为缓存非线程安全, 需要保护 HashMap<SqlPair, T> map = new HashMap<>(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); GenericDao genericDao = new GenericDao(); public int update(String sql, Object... params) { SqlPair key = new SqlPair(sql, params); // 加写锁, 防止其它线程对缓存读取和更改 lock.writeLock().lock(); try { int rows = genericDao.update(sql, params); map.clear(); return rows; } finally { lock.writeLock().unlock(); } } public T queryOne(Class<T> beanClass, String sql, Object... params) { SqlPair key = new SqlPair(sql, params); // 加读锁, 防止其它线程对缓存更改 lock.readLock().lock(); try { T value = map.get(key); if (value != null) { return value; } } finally { lock.readLock().unlock(); } // 加写锁, 防止其它线程对缓存读取和更改 lock.writeLock().lock(); try { // get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据 // 为防止重复查询数据库, 再次验证 T value = map.get(key); if (value == null) { // 如果没有, 查询数据库 value = genericDao.queryOne(beanClass, sql, params); map.put(key, value); } return value; } finally { lock.writeLock().unlock(); } } // 作为 key 保证其是不可变的 class SqlPair { private String sql; private Object[] params; public SqlPair(String sql, Object[] params) { this.sql = sql; this.params = params; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } SqlPair sqlPair = (SqlPair) o; return sql.equals(sqlPair.sql) && Arrays.equals(params, sqlPair.params); } @Override public int hashCode() { int result = Objects.hash(sql); result = 31 * result + Arrays.hashCode(params); return result; } } }
注意
以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑:
- 适合读多写少,如果写操作比较频繁,以上实现性能低
- 没有考虑缓存容量
- 没有考虑缓存过期
- 只适合单机
- 并发性还是低,目前只会用一把锁
- 更新方法太过简单粗暴,清空了所有 key(考虑按类型分区或重新设计 key)
读写锁原理
读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个
t1 w.lock,t2 r.lock
其实该流程和 ReentrantLock 几乎是一样的,但是还是有一些区别的,比如state不太一样,因为state既要给读锁用,也要给写锁用,所以要将state分成两部分。
t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位。
其实t1肯定是能加上锁,接下来分析一下源码:
ctrl + f12 找到 writelock 里面的lock方法:
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /* 首先会调用tryAcquire 尝试加锁,如果成功了那么后续的代码就不执行了,如果加锁失败了,才会进入这个队列 */
protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); // 首先拿到整个state状态 int c = getState(); int w = exclusiveCount(c); // 如果不等于0,意味着既有可能其他线程加了读锁,也有可能是其他线程加了写锁 // 因为高16位不等于0或者低16位不等于0都有可能导致 不等于0 if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) // w == 0 代表 加的读锁部分 而 往后执行代表着 可能加的写锁,但是这个写锁是不是自己加的呢?比如先加的写锁,发生了重入,又加了一次写锁 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 如果写锁部分 再加 1超过写锁的最大范围了 65535 2的16次方 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 可以理解为发生了可重入 setState(c + acquires); return true; } // 如果能往下走,说明c是等于0的,代表别的线程都没有加锁,首先判断写锁是否需要阻塞,其实就意味着公平非公平,如果是非公平锁 就 总会返回false,公平锁会检查这个队列。然后就接着往后面走 是否能compareAndSetState if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // 将对应线程设置成 owner setExclusiveOwnerThread(current); return true; }
接下来看 加 读锁的lock方法
public void lock() { sync.acquireShared(1); } public final void acquireShared(int arg) { // 尝试去获取这个读锁 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败
tryAcquireShared 返回值表示
- -1 表示失败
- 0 表示成功,但后继节点不会继续唤醒(0或者1会在后面的信号量章节介绍)
- 正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 检查写锁部分是否不为0 此时t1已经将其变为1了,去检查加写锁的是不是当前线程呢? // 这种情况其实就是 t2 已经加了写锁,然后又加了读锁,这里是应该成功的,因为这是锁降级的过程 // 最终我们当前的情况来看,t2 其实就是返回-1了。 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
返回-1,这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态
private void doAcquireShared(int arg) { // 唤醒的时候,判断的逻辑稍有不同 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 死循环,去找t2 有没有前驱节点 final Node p = node.predecessor(); // 如果前驱节点是 head,那么说明其是 第二个节点,是有资格争抢锁的 if (p == head) { // 调用tryAcquireShared 返回-1表示失败,返回0或者1表示成功 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 如果返回-1,说明并没有释放锁,那么就会走到这个逻辑,和ReentrantLock逻辑一致,走park shouldParkAfterFailedAcquire将其前驱节点设置成-1,然后 重新for循环,设置为park if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
剑指JUC原理-16.读写锁(下):https://developer.aliyun.com/article/1413659