@[TOC]
为什么要出现读写锁
因为ReentrantLock是互斥锁,如果有一个操作是读多写少,同时还需要保证线程安全,那么使用ReentrantLock会导致效率比较低。因为多个线程在对同一个数据进行读操作时,也不会造成线程安全问题。所以出现了ReentrantReadWriteLock锁:
- 读读操作是共享的。
- 写写操作是互斥的。
- 读写操作是互斥的。
- 写读操作是互斥的。
单个线程获取写锁后,再次获取读锁,可以拿到。(写读可重入)
单个线程获取读锁后,再次获取写锁,拿不到。(读写不可重入)
使用方式:
public class XxxTest {
// 读写锁
static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// 写锁
static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
// 读锁
static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
public static void main(String[] args) throws InterruptedException {
readLock.lock();
try {
System.out.println("拿到读锁!");
} finally {
readLock.unlock();
}
writeLock.lock();
try {
System.out.println("拿到写锁!");
} finally {
writeLock.unlock();
}
}
}
读写锁的核心思想
ReentrantReadWriteLock基于AQS实现的,很多功能的实现和ReentrantLock类似。还是基于AQS的state来确定当前线程是否拿到锁资源。将state的高16位作为读锁的标识,将state的低16位作为写锁的标识
锁重入问题:
- 写锁重入:因为写操作和其他操作是互斥的,代表同一时间,只有一个线程持有着写锁,只要锁重入,就对低位+1即可。
- 读锁重入:读锁的重入不能仿照写锁的方式,因为写锁属于互斥锁,同一时间只会有一个线程持有写锁,但是读锁是共享锁,同一时间会有多个线程持有读锁。所以每个获取到读锁的线程,记录锁重入的方式都是基于自己的ThreadLocal存储锁重入次数。
读锁重入修改state,只是记录当前线程锁重入的次数,需要基于ThreadLocal记录。
state二进制表示:00000000 00000000 00000000 00000000
将state的高16位作为读锁的标识,将state的低16位作为写锁的标识。
写锁:00000000 00000000 00000000 00000001
写锁重入,低16位+1:00000000 00000000 00000000 00000010
读锁:
00000000 00000001 00000000 00000000
读锁重入,高16位+1:
00000000 00000010 00000000 00000000
每个读操作的线程,在获取读锁时,都需要开辟一个ThreadLocal。读写锁为了优化这个事情,做了两手操作:
- 第一个拿到读锁的线程,不用ThreadLocal记录重入次数,在读写锁内有有一个firstRead记录重入次数
- 记录了最后一个拿到读锁的线程的重入次数,交给cachedHoldCounter属性标识,可以避免频繁的在锁重入时,从TL中获取。
写锁的操作
写锁加锁-acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire:尝试获取锁资源,能否以CAS的方式将state 从0 改为 1,改成功,拿锁成功。
addWaiter:将当前没按到锁资源的,封装成Node,排到AQS里。
acquireQueued:当前排队的能否竞争锁资源,不能挂起线程阻塞。
因为都是AQS的实现,主要看tryAcquire
// state,高16:读,低16:写
00000000 00000000 00000000 00000000
00000000 00000001 00000000 00000000 - SHARED_UNIT
00000000 00000000 11111111 11111111 - MAX_COUNT
00000000 00000000 11111111 11111111 - EXCLUSIVE_MASK
&
00000000 00000000 00000000 00000001
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 只拿到表示读锁的高16位。
static int sharedCount(int c) {
return c >>> SHARED_SHIFT; }
// 只拿到表示写锁的低16位。
static int exclusiveCount(int c) {
return c & EXCLUSIVE_MASK; }
// 读写锁的写锁,获取流程
protected final boolean tryAcquire(int acquires) {
// 拿到当前线程
Thread current = Thread.currentThread();
// 拿到state
int c = getState();
// 拿到了写锁的低16位标识w
int w = exclusiveCount(c);
// c != 0:要么有读操作拿着锁,要么有写操作拿着锁
if (c != 0) {
// 如果w == 0,代表没有写锁,拿不到
// 如果w != 0,代表有写锁,看一下拿占用写锁是不是当前线程,如果不是,拿不到
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 到这,说明肯定是写锁,并且是当前线程持有
// 判断对低位 + 1,是否会超过MAX_COUNT,超过抛Error
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 如果没超过锁重入次数, + 1,返回true,拿到锁资源。
setState(c + acquires);
return true;
}
// 到这,说明c == 0
// 读写锁也分为公平锁和非公平锁
// 公平:看下排队不,排队就不抢了
// 走hasQueuedPredecessors方法,有排队的返回true,没排队的返回false
// 非公平:直接抢!
// 方法实现直接返回false
if (writerShouldBlock() ||
// 以CAS的方式,将state从0修改为 1
!compareAndSetState(c, c + acquires))
// 要么不让抢,要么CAS操作失败,返回false
return false;
// 将当前持有互斥锁的线程,设置为自己
setExclusiveOwnerThread(current);
return true;
}
addWaiter和acquireQueued和ReentrantLock看的一样,都是AQS自身提供的方法。
写锁-释放锁操作
读写锁的释放操作,跟ReentrantLock一致,只是需要单独获取低16位,判断是否为0,为0就释放成功
// 写锁的释放锁
public final boolean release(int arg) {
// 只有tryRealse是读写锁重新实现的方法,其他的和ReentrantLock一致
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 读写锁的真正释放
protected final boolean tryRelease(int releases) {
// 判断释放锁的线程是不是持有锁的线程
if (!isHeldExclusively())
// 不是抛异常
throw new IllegalMonitorStateException();
// 对state - 1
int nextc = getState() - releases;
// 拿着next从获取低16位的值,判断是否为0
boolean free = exclusiveCount(nextc) == 0;
// 返回true
if (free)
// 将持有互斥锁的线程信息置位null
setExclusiveOwnerThread(null);
// 将-1之后的nextc复制给state
setState(nextc);
return free;
}
读锁的操作
读锁的加锁操作
// 读锁加锁操作
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared,尝试获取锁资源,获取到返回1,没获取到返回-1
doAcquireShared,前面没拿到锁,这边需要排队
// tryAcquireShared方法
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 拿到state
int c = getState();
// 那写锁标识,如果 !=0,代表有写锁
if (exclusiveCount(c) != 0 &&
// 如果持有写锁的不是当前线程,排队去!
getExclusiveOwnerThread() != current)
// 排队!
return -1;
// 没有写锁!
// 获取读锁信息
int r = sharedCount(c);
// 公平锁: 有人排队,返回true,直接拜拜,没人排队,返回false
// 非公平锁:正常的逻辑是非公平直接抢,因为是读锁,每次抢占只要CAS成功,必然成功
// 这就会出现问题,写操作无法在读锁的情况抢占资源,导致写线程饥饿,一直阻塞
// 非公平锁会查看next是否是写锁的,如果是,返回true,如果不是返回false
if (!readerShouldBlock() &&
// 查看读锁是否已经达到了最大限制
r < MAX_COUNT &&
// 以CAS的方式,对state的高16位+1
compareAndSetState(c, c + SHARED_UNIT)) {
// 拿到锁资源成功!!!
if (r == 0) {
// 第一个拿到锁资源的线程,用first存储
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 锁重入,第一个拿到读锁的线程,直接对firstReaderHoldCount++记录重入的次数
firstReaderHoldCount++;
} else {
// 不是第一个拿到锁资源的
// 先拿到cachedHoldCounter,最后一个线程的重入次数
HoldCounter rh = cachedHoldCounter;
// rh == null: 第二个拿到读锁的!
// 或者发现之前有最后一个来的,但是不我,将我设置为最后一个。
if (rh == null || rh.tid != getThreadId(current))
// 获取自己的重入次数,并赋值给cachedHoldCounter
cachedHoldCounter = rh = readHolds.get();
// 之前拿过,现在如果为0,赋值给TL
else if (rh.count == 0)
readHolds.set(rh);
// 重入次数+1,
// 第一个:可能是第一次拿
// 第二个:可能是重入操作
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
// 通过tryAcquireShared没拿到锁资源,也没返回-1,就走这
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
// 拿state
int c = getState();
// 现在有互斥锁,不是自己,拜拜!
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 公平:有排队的,进入逻辑。 没排队的,过!
// 非公平:head的next是写不,是,进入逻辑。 如果不是,过!
} else if (readerShouldBlock()) {
// 这里代码特别乱,因为这里的代码为了处理JDK1.5的内存泄漏问题,修改过~
// 这个逻辑里不会让你拿到锁,做被阻塞前的准备
if (firstReader == current) {
// 什么都不做
} else {
if (rh == null) {
// 获取最后一个拿到读锁资源的
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
// 拿到我自己的记录重入次数的。
rh = readHolds.get();
// 如果我的次数是0,绝对不是重入操作!
if (rh.count == 0)
// 将我的TL中的值移除掉,不移除会造成内存泄漏
readHolds.remove();
}
}
// 如果我的次数是0,绝对不是重入操作!
if (rh.count == 0)
// 返回-1,等待阻塞吧!
return -1;
}
}
// 超过读锁的最大值了没?
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 到这,就CAS竞争锁资源
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 跟tryAcquireShared一模一样
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh;
}
return 1;
}
}
}
加锁-扔到队列准备阻塞操作
// 没拿到锁,准备挂起
private void doAcquireShared(int arg) {
// 将当前线程封装为Node,当前Node为共享锁,并添加到队列的模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取上一个节点
final Node p = node.predecessor();
if (p == head) {
// 如果我的上一个是head,尝试再次获取锁资源
int r = tryAcquireShared(arg);
if (r >= 0) {
// 如果r大于等于0,代表获取锁资源成功
// 唤醒AQS中我后面的要获取读锁的线程(SHARED模式的Node)
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 能否挂起当前线程,需要保证我前面Node的状态为-1,才能执行后面操作
if (shouldParkAfterFailedAcquire(p, node) &&
//LockSupport.park挂起~~
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
总结
ReentrantReadWriteLock的使用可以提高并发性,特别适用于读操作远多于写操作的情况。它相对于排他锁,因为排他锁在同一时间只允许一个线程访问。ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程同时访问,也不允许写线程和写线程同时访问。