剑指JUC原理-16.读写锁(下)

简介: 剑指JUC原理-16.读写锁

剑指JUC原理-16.读写锁(上):https://developer.aliyun.com/article/1413658


t3 r.lock,t4 w.lock


这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子

t2 t3都是读锁,所以状态都是 SHARED,而t4是写锁,所以状态是EXCLUSIVE


t1 w.unlock


这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子

public void unlock() {
            sync.release(1);
        }
public final boolean release(int arg) {
    // 如果return true呢,就会执行后续的逻辑
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
    // 首先在原来的基础上减1
            int nextc = getState() - releases;
    // 然后去查看写锁部分是不是减成0了
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
    // 如果没有减成0 代表着这事一次锁的重入
            setState(nextc);
            return free;
        }

接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内

parkAndCheckInterrupt() 处恢复运行


这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 此时进入到这个条件中
                        // 替换头结点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 此时这里唤醒了,就可以继续运行了,然后再循环
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
// 返回-1表示失败,返回0或者1表示成功
protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
    // 返回-1的之前分析过了,就不再做分析了
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
    // 获取读锁,也就是高16位
            int r = sharedCount(c);
    // 不应该被阻塞住
            if (!readerShouldBlock() &&
                // 没有超过最大基数
                r < MAX_COUNT &&
                // 对于高位来讲,不是加1,是加了65536
                compareAndSetState(c, c + SHARED_UNIT)) {
                // 如果都符合,代表后续读锁加锁成功了
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

// 替换头结点
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 拿到当前节点的下一个 
            Node s = node.next;
            // 如果节点的状态是 shared的话,
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用

doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    // 在这里会将节点的状态从 -1 改成 0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 对头结点的后继结点又要去唤醒,这就接上了 此时就唤醒t3的park
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 又再次来到了 tryAcquireShared方法,和前面的流程是一样的
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 此时t3 被唤醒了,和之前的流程是一样的
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
    // 返回-1的之前分析过了,就不再做分析了
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
    // 获取读锁,也就是高16位
            int r = sharedCount(c);
    // 不应该被阻塞住
            if (!readerShouldBlock() &&
                // 没有超过最大基数
                r < MAX_COUNT &&
                // 对于高位来讲,不是加1,是加了65536
                compareAndSetState(c, c + SHARED_UNIT)) {
                // 如果都符合,代表后续读锁加锁成功了
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一

这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

// 替换头结点
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 拿到当前节点的下一个 
            Node s = node.next;
            // 如果节点的状态是 shared的话,
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点


t2 r.unlock,t3 r.unlock


t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零

public void unlock() {
            sync.releaseShared(1);
        }
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                // 获取状态,减去高位的1
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入

doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 本质上唤醒t4
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;😉 这次自己是老二,并且没有其他竞争,tryAcquire(1) 成功,修改头结点,流程结束


StampedLock


该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用


这里对优化性能部分做了专门的解读:


之前学读写锁,其实已经看到了读读可以并发,已经很快了,但是还不够快,读读并发的时候,底层还是用cas的方式去修改它的状态,读锁的高16位去修改它的状态,它还是性能上比不上不加锁,如果希望读取的性能达到这种极致,那么就可以使用StampedLock。


加解读锁

long stamp = lock.readLock();
lock.unlockRead(stamp);

加解写锁

long stamp = lock.writeLock();
lock.unlockWrite(stamp);

乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

// 这里面没有加任何的锁
long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
  // 锁升级
}

提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

class DataContainerStamped {
    private int data;
    private final StampedLock lock = new StampedLock();
    public DataContainerStamped(int data) {
        this.data = data;
    }
    public int read(int readTime) {
        long stamp = lock.tryOptimisticRead();
        log.debug("optimistic read locking...{}", stamp);
        sleep(readTime);
        if (lock.validate(stamp)) {
            log.debug("read finish...{}, data:{}", stamp, data);
            return data;
        }
        // 锁升级 - 读锁
        log.debug("updating to read lock... {}", stamp);
        try {
            stamp = lock.readLock();
            log.debug("read lock {}", stamp);
            sleep(readTime);
            log.debug("read finish...{}, data:{}", stamp, data);
            return data;
        } finally {
            log.debug("read unlock {}", stamp);
            lock.unlockRead(stamp);
        }
    }
    public void write(int newData) {
        long stamp = lock.writeLock();
        log.debug("write lock {}", stamp);
        try {
            sleep(2);
            this.data = newData;
        } finally {
            log.debug("write unlock {}", stamp);
            lock.unlockWrite(stamp);
        }
    }
}

测试 读-读 可以优化

public static void main(String[] args) {
        DataContainerStamped dataContainer = new DataContainerStamped(1);
        new Thread(() -> {
            dataContainer.read(1);
        }, "t1").start();
        sleep(0.5);
        new Thread(() -> {
            dataContainer.read(0);
        }, "t2").start();
    }

输出结果,可以看到实际没有加读锁

15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256 
15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256 
15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1 
15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1

测试 读-写 时优化读补加读锁

public static void main(String[] args) {
        DataContainerStamped dataContainer = new DataContainerStamped(1);
        new Thread(() -> {
            dataContainer.read(1);
        }, "t1").start();
        sleep(0.5);
        new Thread(() -> {
            dataContainer.write(100);
        }, "t2").start();
    }

输出结果

15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256 
15:57:00.717 c.DataContainerStamped [t2] - write lock 384 
15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256 
15:57:02.719 c.DataContainerStamped [t2] - write unlock 384 
15:57:02.719 c.DataContainerStamped [t1] - read lock 513 
15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000 
15:57:03.719 c.DataContainerStamped [t1] - read unlock 513 

注意


  • StampedLock 不支持条件变量
  • StampedLock 不支持可重入
目录
相关文章
|
6月前
|
Java
剑指JUC原理-14.ReentrantLock原理(下)
剑指JUC原理-14.ReentrantLock原理
38 1
|
6月前
|
存储 安全 Java
剑指JUC原理-4.共享资源和线程安全性(上)
剑指JUC原理-4.共享资源和线程安全性
82 1
|
6月前
|
SQL Java 数据库连接
剑指JUC原理-15.ThreadLocal(上)
剑指JUC原理-15.ThreadLocal
78 1
|
6月前
|
安全 Java 程序员
剑指JUC原理-14.ReentrantLock原理(上)
剑指JUC原理-14.ReentrantLock原理
45 0
|
6月前
|
Java Linux API
剑指JUC原理-2.线程
剑指JUC原理-2.线程
62 0
|
6月前
|
存储 算法 安全
剑指JUC原理-5.synchronized底层原理(上)
剑指JUC原理-5.synchronized底层原理
57 0
|
6月前
|
Java Linux 调度
剑指JUC原理-7.线程状态与ReentrantLock(中)
剑指JUC原理-7.线程状态与ReentrantLock
58 0
|
6月前
|
存储 Java 数据安全/隐私保护
剑指JUC原理-15.ThreadLocal(中)
剑指JUC原理-15.ThreadLocal
50 0
|
6月前
|
存储 Java 编译器
剑指JUC原理-5.synchronized底层原理(下)
剑指JUC原理-5.synchronized底层原理
52 0
|
6月前
|
Java 编译器 测试技术
剑指JUC原理-8.Java内存模型(中)
剑指JUC原理-8.Java内存模型
57 0