Java并发知识之ReentrantLock

简介: 本文深入剖析了Java中并发编程的核心概念,特别聚焦于锁的设计思想,通过分析AbstractQueuedSynchronizer(AQS)、ReentrantLock和ReentrantReadWriteLock的实现,揭示了锁的工作原理和高效并发控制策略。

并发编程是Java中非常关键的基本功,并发编程包括的知识点如下图,非常之多,锁是保证并发的一种手段之一,本文将围绕Aqs,ReentrantLock,ReentrantReadWriteLock的实现来分析锁设计的思想。

并发知识.png

Jdk为什么需要设计Lock?

使用了Sychronized之后,发现Sychronized提供的锁功能存在一些短板,比如不能取消,不可以等待,不可以中断,而Lock就弥补了Sychronized的这些缺陷,下面是区别,所以Lock有他存在的必要性。

Lock与Sychronized区别、特性

  • 功能方面 1、Synchronized Jvm关键字实现 互斥锁实现。monitorenter和monitorexit 2、Synchronized 使用范围包括操作方法,对象,同步代码块 3、Synchornized 可重入 不可中断 不可等待取消 非公平,而Lock是可重入,可中断,可等待取消,支持公平/非公平(默认) 4、Synchronized在线程发生异常时会自动释放锁,因此不会发生异常死锁。Lock异常时不会自动释放锁,所以需要在finally中实现释放锁。 5、Lock是可以中断锁,Synchronized是非中断锁,必须等待线程执行完成释放锁。 6、Lock可以使用读锁提高多线程读效率,如ReentrantLock。 7、在Lock中可以使用多组Condition实现多组线程多条件通信,而Sychronized使用wait/notify只能针对一组线程通信。

  • 在实现方面 1、都使用了cas+volatile+自旋。 2、排队抢锁的线程,sychronized使用了两个队列,而reentrantlock只使用了一个队列。 3、sychronized有自适应自旋,而Lock没有,Lock只有自旋和park.

在多线程竞争重入锁时,竞争失败的线程是如何实现阻塞以及被唤醒的呢?

排队+wait状态 通过前一个线程唤醒后一个线程

非公平锁的获取锁设计思想

1、一请求获取锁,就尝试获取锁 2、获取锁失败,继续尝试一定次数来获取锁 3、获取锁成功,通过cas实现state原子字段自增,exclusiveOwnerThread设置成当前变量 4、如果已经获取到了锁,state自增1,实现可重入特性,当前线程对应的节点设置成头节点。 5、如果尝试获取锁失败,将线程加入队列,并阻塞当前线程

非公平锁的释放锁设计思想:

1、unlock方法调用一次将state减少1, 2、如果state变成了0,则exclusiveOwnerThread设置成null 3、释放锁成功后,需要将等待获取锁的线程唤醒。

Condition设计思想:

类似于Object的wait方法,notify方法,实现线程直接的通信。 在生产者消费者模型的例子里: 生产者线程发现消息队列满了,则调用await方法阻塞, 生产者线程如果生产消息成功,则调用signal方法通知消费者线程进行消费 消费者线程发现消息队列空了,则调用await方法阻塞, 消费者线程消费消息成功,则调用signal方法通知生产者进行生产

Condition的await方法设计思想

当前线程需要通过LockSupport.park方法阻塞,释放cpu资源,并且通过调用release方法释放锁,让其他线程可以获取到锁,并且把当前线程存起来,让其他线程唤醒,在阻塞的方法后面,如果被唤醒,需要重新获取锁资源

        /**
         * Implements interruptible condition wait.
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //当前线程加入等待队列
            Node node = addConditionWaiter();
            //释放锁资源,让其他线程有机会获得锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //阻塞,如果被唤醒,会继续执行
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //被唤醒后再次获取锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

Conditon的signal方法设计思想

需要将第一个等待需要唤醒的线程进行唤醒,并且加入同步队列

       /**
         * 获取一个等待条件最久的线程,
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //获得第一个等待被唤醒的线程节点
            Node first = firstWaiter;
            if (first != null)
            //执行唤醒操作
                doSignal(first);
        }
        /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
   
   
            do {
   
   
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
    /**
     * 移动节点从条件队列到同步队列
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        //将需要被唤醒的节点加入同步队列
        Node p = enq(node);
        int ws = p.waitStatus;
        //实现唤醒操作
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

aqs(AbstractQueuedSynchronizer)基础组件提供的能力

1、内部维护了Node类型的双向链表,用来存储等待线程的,以及等待线程的状态,提供线程排队基础。 Node 有 5 中状态,分别是:

  • CANCELLED(1): 在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该 Node 的结点,其结点的 waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化
  • SIGNAL(-1): 只要前置节点释放锁,就会通知标识为SIGNAL状态的后续节点的线程抢占锁
  • CONDITION(-2): 和 Condition 有关系
  • PROPAGATE(-3):共享模式下,PROPAGATE状态的线程处于可运行状态
  • 5、0:初始状态

2、一个volatile int state字段,用来表示锁的状态和锁的重入次数 0无锁,>0加锁,提供了get/set/compareAndSwapInt方法来操作state状态。 3、等待线程入队。 4、唤醒后继节点。 5、加锁算法定义。 6、释放独占锁,共享锁算法定义,提供子类重写方法,实现不同的同步组件 7、提供了一些监控锁的方法,等待线程队列长度,排队中的线程, 8、condition辅助类

aqs是对锁的通用功能进行抽象,子类通过继承aqs实现同步的功能。 其中一个实现是ReentrantLock。

ReentrantLock

他是基于aqs实现的一种独占锁,非共享锁。 类结构图如下:

image.png 支持公平锁和非公平锁。默认是非公平锁

image.png

非公平锁设计思想

1、一请求就尝试获取锁 2、获取锁失败,继续尝试一定次数来获取锁 3、获取锁成功,通过cas实现state字段值原子自增,exclusiveOwnerThread设置成当前线程。 4、如果已经获取到了锁,state自增1,实现可重入特性,当前线程对应的节点设置成头节点。 5、如果尝试获取锁失败,将线程加入队列,并阻塞当前线程 如下是加锁成功的代码。

image.png

非公平锁获取锁分四个阶段,详细流程如下

第一阶段,快速占用锁阶段

  • 这个阶段就是一调用lock方法,就直接通过cas设置一次state字段加锁,不管有前面没有线程获取锁,这就是体现了非公平的地方。

image.png

抢锁入口:

static final class NonfairSync extends Sync {
   
   
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         加锁
         */
        final void lock() {
   
   
        //通过cas快速加锁
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
            //进入正常加锁流程
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
   
   
            return nonfairTryAcquire(acquires);
        }
    }

第二阶段,正常的加锁阶段

  • 这个阶段和第一阶段类似,也是使用cas尝试加一次锁,多了重入锁的判断,如果当前锁已经被当前线程占有了,state自增,体现了重入锁特性。

image.png

final boolean nonfairTryAcquire(int acquires) {
   
   
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
   
   
            //尝试快速加一次锁
                if (compareAndSetState(0, acquires)) {
   
   
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //重入锁判断和获取
            else if (current == getExclusiveOwnerThread()) {
   
   
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

第三阶段,当前线程加入队列

这个阶段,是线程入队列,之前尝试两次都没抢到锁,需要排队抢锁

image.png

//线程加入等待队列
private Node addWaiter(Node mode) {
   
   
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        //尝试快速加入队列尾部
        if (pred != null) {
   
   
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
   
   
                pred.next = node;
                return node;
            }
        }
        //正常入队列
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
   
   
        for (;;) {
   
   
            Node t = tail;
            if (t == null) {
   
    
            // Must initialize 初始化队头(空节点)
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
   
   
            //加入队尾 首先新节点 前置节点执行老尾节点
                node.prev = t;
                //新的尾节点 设置成 新节点
                if (compareAndSetTail(t, node)) {
   
   
                //老尾节点 指向 新节点
                    t.next = node;
                    return t;
                }
            }
        }
    }

第四阶段,自旋抢站锁阶段

自旋,根据上一个节点状态来判断是否可以抢占锁,还是park阻塞

image.png

    final boolean acquireQueued(final Node node, int arg) {
   
   
        boolean failed = true;
        try {
   
   
            boolean interrupted = false;
            //自旋
            for (;;) {
   
   
                final Node p = node.predecessor();
                //如果前驱节点是头节点,则具备抢锁资格
                if (p == head && tryAcquire(arg)) {
   
   
                    //抢锁成功了,自己作为头节点
                    setHead(node);
                    //帮助老的头节点回收
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //是否需要park阻塞,前一个节点状态是SIGNAL就要park, 
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) //park
                    interrupted = true;
            }
        } finally {
   
   
            if (failed)
                cancelAcquire(node);
        }
    }

如果达到了park条件,则可能进入park,

private final boolean parkAndCheckInterrupt() {
   
   
//调用unsafe类的park方法 阻塞在这里,等待获取锁的线程释放锁并唤醒 我 然后继续执行
        LockSupport.park(this);
        return Thread.interrupted();
    }

公平/非公平锁释放锁流程

image.png

非公平锁的释放锁设计思想:

1、unlock方法调用一次将state减少1, 2、如果state变成了0,则exclusiveOwnerThread设置成null 3、释放锁成功后,需要将等待获取锁的线程唤醒。

protected final boolean tryRelease(int releases) {
   
   
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
   
   
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

如果头节点状态不是0,表示需要唤醒头节点的下一个节点

image.png

public final boolean release(int arg) {
   
   
        if (tryRelease(arg)) {
   
   
            Node h = head;
            //唤醒下一个节点
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }


private void unparkSuccessor(Node node) {
   
   

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);


        Node s = node.next;
        //下一个节点是空的,`从尾节点往前找`,最接近head的状态小于0的线程节点
        if (s == null || s.waitStatus > 0) {
   
   
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //指向unpark
        if (s != null)
            LockSupport.unpark(s.thread);
    }

上面为啥head.next是空的,还有从尾巴节点遍历找需要唤醒的节点 由于入队操作,先构建尾节点,再将老尾节点指向新节点,next链比prev链延迟构建

private Node enq(final Node node) {
   
   
 for (;;) {
   
   
 Node t = tail;
 if (t == null) {
   
    // Must initialize
 if (compareAndSetHead(new Node()))
 tail = head;
 } else {
   
   
 //和尾巴节点
 node.prev = t;
 if (compareAndSetTail(t, node)) {
   
   
 //最后一步才和尾节点关联
 t.next = node;
 return t;
 }
 }
 }
}

以上分析的是非公平锁,他们释放锁流程一致,

公平锁和非公平锁加锁流程有什么不同?

final void lock() {
   
   
            acquire(1);
        }

少了非公平锁的快速抢占锁阶段,在正常获取锁之前,加入hasQueuedPredecessors判断,是否有前驱节点,其它步骤一样, 经过排队抢锁。

protected final boolean tryAcquire(int acquires) {
   
   
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
   
   
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
   
   
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
   
   
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

//查询是否有任何线程在等待获取比当前线程更长的时间

public final boolean hasQueuedPredecessors() {
   
   

        Node t = tail; 
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

Condition的设计思想

Condition设计思想:

类似于Object的wait方法,notify方法,实现线程之间的通信。 在生产者消费者模型的例子里: 生产者线程发现消息队列满了,则调用await方法阻塞, 生产者线程如果生产消息成功,则调用signal方法通知消费者线程进行消费 消费者线程发现消息队列空了,则调用await方法阻塞, 消费者线程消费消息成功,则调用signal方法通知生产者进行生产

//ConditionObject 设计了两个节点 头节点 尾节点 维护了一个单向链表,维护等待条件的线程链路。

    public class ConditionObject implements Condition {
   
   
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

Condition的await方法设计思想

当前线程需要通过LockSupport.park方法阻塞,释放cpu资源,并且通过调用release方法释放锁,让其他线程可以获取到锁,并且把当前线程存起来,让其他线程唤醒,在阻塞的方法后面,如果被唤醒,需要重新获取锁资源

/**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
   
   
            if (Thread.interrupted())
                throw new InterruptedException();
//当前线程加入等待队列
            Node node = addConditionWaiter();
//释放锁资源,让其他线程有机会获得锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
//阻塞
            while (!isOnSyncQueue(node)) {
   
   
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
//被唤醒后再次获取锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

Conditon的signal方法设计思想 需要将第一个等待需要唤醒的线程进行唤醒,并且加入同步队列

       /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
   
   
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
//获得第一个等待被唤醒的线程节点
            Node first = firstWaiter;
            if (first != null)
//执行唤醒操作
                doSignal(first);
        }
 /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
   
   
            do {
   
   
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
/**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
   
   
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
//将需要被唤醒的节点加入同步队列
        Node p = enq(node);
        int ws = p.waitStatus;
//实现唤醒操作
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

针对中断获取锁 直接抛出中断一次

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
   
   
            //中断状态抛异常
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
   
   
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
   
   
            for (;;) {
   
   
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
   
   
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //中断状态抛异常
                    throw new InterruptedException();
            }
        } finally {
   
   
            if (failed)
                cancelAcquire(node);
        }
    }

取消获取锁的线程

private void cancelAcquire(Node node) {
   
   
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;

        // 设置为取消状态
        node.waitStatus = Node.CANCELLED;

        // 节点在尾处 直接去除 置null
        if (node == tail && compareAndSetTail(node, pred)) {
   
   
            compareAndSetNext(pred, predNext, null);
        } else {
   
   
            // 节点去除
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
   
   
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
   
   
            //唤醒下一个节点
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

Lock和Sychronized在非公平锁的加锁解锁的实现上差异?

  • 快速加锁阶段,lock通过cas,设置state变量,再设置独占线程 而sychronized是通过cas设置对象头上线程id.(偏向锁阶段)
  • lock通过 cas一次设置state变量 而sychronized设置锁记录指针到对象头(轻量级别锁)
  • lock 先排队,通过自旋 + cas + park 设置state字段,再设置线程对象。而sychronized通过 自旋 + cas + 排队 + park 设置线程id(重量级锁),lock中只有一个队列用户抢锁排队,而sychronized设计了两个队列,2q算法,减少对一个队列的并发处理。
  • wait/notify和signal/await的设计思想类型,都使用了一个队列存储等待的线程。结合加解锁api来实现线程通信。

ReentrantReadWriteLock可重入读写锁的实现解析

基于ReentrantLock主要改造点

  • 读写锁的标志,通过32位整形变量,高16位存读锁标志,低16位存写锁标志。
  • ReentrantReadWriteLock有读和写两把锁,写锁和ReentrantLock实现方式类似,因为他们都是独占锁,而读锁,ReentrantReadWriteLock使用了ThreadLocal对不同线程获取读锁的次数进行隔离,加锁操作,是否锁操作互不影响。

写锁获取源码,和ReentrantLock流程类似

protected final boolean tryAcquire(int acquires) {
   
   
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            //锁状态不为0,已经加了读锁或写锁
            if (c != 0) {
   
   
                //写锁为0或者线程不是当前线程,其它线程获取了读锁,则本次获取写锁失败
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                //锁次数是否会超过上限    
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                    //已经获取了写锁,重入次数自增
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            //写锁是否要阻塞,如果是非公平不需要阻塞,公平要阻塞
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

//与运算


static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//     0000 0000 0000 0001 左移16位,再减1
//0001 0000 0000 0000 0000
//0000 1111 1111 1111 1111 ==写锁的标记
//

//计算写锁的数量
static int exclusiveCount(int c) {
   
   
    return c & EXCLUSIVE_MASK; 
}

写锁释放

和Reentrantlock的释放逻辑一致 修改状态,唤醒下一个线程

读锁获取


//最后一个线程 获取读锁 的计数器
private transient HoldCounter cachedHoldCounter;

protected final int tryAcquireShared(int unused) {
   
   
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            //写锁被其它线程占有了,失败
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            //是否要阻塞
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
   
   
                //第一个读锁线程,专有字段进行记录
                if (r == 0) {
   
   
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
   
   
                //第一个获取读锁线程,重入
                    firstReaderHoldCount++;
                } else {
   
   
                //不是第一个获取读锁线程,不是当前线程
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

读锁释放


protected final boolean tryReleaseShared(int unused) {
   
   
            Thread current = Thread.currentThread();
            //对获取第1个的线程进行判断,释放锁,缓存第一个获取锁的线程和计数,加快释放锁的速度
            if (firstReader == current) {
   
   
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
   
   
            //最后一个获取锁的计数器,缓存,也是为了最后一个获取锁的线程进行快速释放锁
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
   
   
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            //释放锁,最后需要修改state状态
            for (;;) {
   
   
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }
相关文章
|
7月前
|
Java
Java中ReentrantLock中部分加锁取消节点源码分析
Java中ReentrantLock中部分加锁取消节点源码分析
63 13
|
7月前
|
Java
Java中ReentrantLock释放锁代码解析
Java中ReentrantLock释放锁代码解析
51 8
|
7月前
|
Java
Java中ReentrantLock中tryLock()方法加锁分析
Java中ReentrantLock中tryLock()方法加锁分析
39 0
|
6月前
|
安全 Java 开发者
Java并发编程:深入理解synchronized和ReentrantLock
在Java并发编程中,正确使用同步机制是确保线程安全的关键。本文将深入探讨Java内置的两种同步机制——synchronized关键字和ReentrantLock类。我们将通过权威数据、经典理论和实际案例,对比分析它们的性能、用法和适用场景,帮助开发者做出明智的选择。
44 0
|
4月前
|
小程序 Java 开发工具
【Java】@Transactional事务套着ReentrantLock锁,锁竟然失效超卖了
本文通过一个生动的例子,探讨了Java中加锁仍可能出现超卖问题的原因及解决方案。作者“JavaDog程序狗”通过模拟空调租赁场景,详细解析了超卖现象及其背后的多线程并发问题。文章介绍了四种解决超卖的方法:乐观锁、悲观锁、分布式锁以及代码级锁,并重点讨论了ReentrantLock的使用。此外,还分析了事务套锁失效的原因及解决办法,强调了事务边界的重要性。
127 2
【Java】@Transactional事务套着ReentrantLock锁,锁竟然失效超卖了
|
4月前
|
Java 开发者
Java多线程教程:使用ReentrantLock实现高级锁功能
Java多线程教程:使用ReentrantLock实现高级锁功能
48 1
|
4月前
|
Java
Java 并发编程:理解并应用 ReentrantLock
【7月更文挑战第56天】 在多线程环境下,为了保证数据一致性和程序正确性,我们需要对共享资源进行同步访问。Java提供了多种并发工具来帮助我们实现这一目标,其中ReentrantLock是一个功能强大且灵活的同步机制。本文将深入探讨ReentrantLock的基本原理、使用方法以及与synchronized关键字的区别,帮助读者更好地理解和应用这一重要的并发编程工具。
|
3月前
|
Java
JAVA并发编程ReentrantLock核心原理剖析
本文介绍了Java并发编程中ReentrantLock的重要性和优势,详细解析了其原理及源码实现。ReentrantLock作为一种可重入锁,弥补了synchronized的不足,如支持公平锁与非公平锁、响应中断等。文章通过源码分析,展示了ReentrantLock如何基于AQS实现公平锁和非公平锁,并解释了两者的具体实现过程。
|
4月前
|
传感器 C# 监控
硬件交互新体验:WPF与传感器的完美结合——从初始化串行端口到读取温度数据,一步步教你打造实时监控的智能应用
【8月更文挑战第31天】本文通过详细教程,指导Windows Presentation Foundation (WPF) 开发者如何读取并处理温度传感器数据,增强应用程序的功能性和用户体验。首先,通过`.NET Framework`的`Serial Port`类实现与传感器的串行通信;接着,创建WPF界面显示实时数据;最后,提供示例代码说明如何初始化串行端口及读取数据。无论哪种传感器,只要支持串行通信,均可采用类似方法集成到WPF应用中。适合希望掌握硬件交互技术的WPF开发者参考。
85 0
|
4月前
|
安全 Java
Java并发编程实战:使用synchronized和ReentrantLock实现线程安全
【8月更文挑战第31天】在Java并发编程中,保证线程安全是至关重要的。本文将通过对比synchronized和ReentrantLock两种锁机制,深入探讨它们在实现线程安全方面的优缺点,并通过代码示例展示如何使用这两种锁来保护共享资源。