【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock

简介: 前言ReentrantLock是非常常用的锁,在前面【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue在我们了解到,LinkedBlockingQueue入队、出队都是依赖ReentrantLock进行锁同步和线程唤醒、等待的。

前言

ReentrantLock是非常常用的锁,在前面【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue在我们了解到,LinkedBlockingQueue入队、出队都是依赖ReentrantLock进行锁同步和线程唤醒、等待的。
本文来学习下ReentrantLock。

ReentrantLock

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

通过构造函数,我们可以看到可以根据参数fair,生成公平的同步和不公平的同步模式。
接下来需要看下FairSync和NonfairSync到底是何方神圣

Sync


Sync是一个抽象类,FairSync和NonfairSync都继承自Sync并实现了tryAcquire方法,tryAcquire是在AbstractQueuedSynchronizer(AQS)中声明的。
AbstractQueuedSynchronizer中的方法非常多,我们通过ReentrantLock中各方法的调用来逐步熟悉它。

ReentrantLock::lock

public void lock() {
    sync.acquire(1);
}

请求锁,如果加锁失败则一直等待。
ReentrantLock中加锁的方法非常简洁,直接调用sync的acquire方法
下面我们看下acquire的具体实现。

AbstractQueuedSynchronizer::acquire

/**
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            //会一直等待,直到获取到锁为止
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}
  • 首先尝试获取锁(具体实现下面分析),获取成功函数结束
  • 获取失败,则加入等待队列一直自旋尝试获取锁直到获取成功或超时。
  • 如果获取失败,则抛出中断异常

NonfairSync::tryAcquire

/**
 * Sync object for non-fair locks
 */
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果state = 0 即当前没加锁,则尝试通过CAS的方式加锁,加锁后将持有锁的线程设置为当前线程
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果当前有锁,则判断是否是当前线程的锁,是的话state加一,即重入锁,不是的话 返回加锁失败
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
  • 如果没锁,则尝试获取锁
  • 如果有锁,判断是否是当前线程持有的
  • 是当前线程持有,则state值加1 返回加锁成功。即重入锁
  • 不是当前线程持有,则加锁失败

FairSync::tryAcquire

/**
 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
 */
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果没锁
    if (c == 0) {
        //如果队列中没有比当前线程等待更久的线程,则尝试通过CAS的方式获取锁
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果当前有锁,则判断是否是当前线程的锁,是的话state加一,即重入锁,不是的话 返回加锁失败
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

/**
 * Queries whether any threads have been waiting to acquire longer
 * than the current thread.
 *
 * <p>An invocation of this method is equivalent to (but may be
 * more efficient than):
 * <pre> {@code
 * getFirstQueuedThread() != Thread.currentThread()
 *   && hasQueuedThreads()}</pre>
 *
 * <p>Note that because cancellations due to interrupts and
 * timeouts may occur at any time, a {@code true} return does not
 * guarantee that some other thread will acquire before the current
 * thread.  Likewise, it is possible for another thread to win a
 * race to enqueue after this method has returned {@code false},
 * due to the queue being empty.
 *
 * <p>This method is designed to be used by a fair synchronizer to
 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.
 * Such a synchronizer's {@link #tryAcquire} method should return
 * {@code false}, and its {@link #tryAcquireShared} method should
 * return a negative value, if this method returns {@code true}
 * (unless this is a reentrant acquire).  For example, the {@code
 * tryAcquire} method for a fair, reentrant, exclusive mode
 * synchronizer might look like this:
 *
 * <pre> {@code
 * protected boolean tryAcquire(int arg) {
 *   if (isHeldExclusively()) {
 *     // A reentrant acquire; increment hold count
 *     return true;
 *   } else if (hasQueuedPredecessors()) {
 *     return false;
 *   } else {
 *     // try to acquire normally
 *   }
 * }}</pre>
 *
 * @return {@code true} if there is a queued thread preceding the
 *         current thread, and {@code false} if the current thread
 *         is at the head of the queue or the queue is empty
 * @since 1.7
 */
public final boolean hasQueuedPredecessors() {
    Node h, s;
    //如果队列不为空
    if ((h = head) != null) {
        //看当前线程是不是在队列的队首,即排队时间最长的队列
        if ((s = h.next) == null || s.waitStatus > 0) {
            s = null; // traverse in case of concurrent cancellation
            //这里为啥从队列尾部开始向前遍历???可能是因为队列头部可能会有大量超时的节点,从后往前遍历更快?
            for (Node p = tail; p != h && p != null; p = p.prev) {
                if (p.waitStatus <= 0)
                    s = p;
            }
        }
        if (s != null && s.thread != Thread.currentThread())
            return true;
    }
    return false;
}

ReentrantLock::lockInterruptibly

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

/**
 * Acquires in exclusive mode, aborting if interrupted.
 * Implemented by first checking interrupt status, then invoking
 * at least once {@link #tryAcquire}, returning on
 * success.  Otherwise the thread is queued, possibly repeatedly
 * blocking and unblocking, invoking {@link #tryAcquire}
 * until success or the thread is interrupted.  This method can be
 * used to implement method {@link Lock#lockInterruptibly}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @throws InterruptedException if the current thread is interrupted
 */
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    //如果线程被重点,抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果获取锁失败 
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

/**
 * Acquires in exclusive interruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    //new一个新节点
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        //轮询直到获取到锁或者线程被中断
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

请求锁,如果失败则一直阻塞等待 直到获取锁或线程中断

ReentrantLock::tryLock

public boolean tryLock() {
    //尝试获取锁,获取失败的话 直接返回false,不会再等待
    return sync.nonfairTryAcquire(1);
}

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    //尝试获取锁,如果失败的话,等待timeout时间后返回false,如果被中断则抛出异常
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

请求锁,如果请求失败,则返回false

ReentrantLock::unlock

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    //释放锁 state数减releases
    int c = getState() - releases;
    //如果当前线程没有持有锁,则抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //当c=0时,锁完全释放,ownerThread设为null。
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

释放锁,直到state=0完全释放时,线程owner设置为null

ReentrantLock::newCondition

public Condition newCondition() {
    return sync.newCondition();
}

final ConditionObject newCondition() {
    return new ConditionObject();
}

ConditionObject::await

线程释放锁,阻塞挂起,直到被signal唤醒,则继续尝试获取锁

public final void await() throws InterruptedException {
    //如果当前线程被中断、则抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //新创建个节点,将当前线程加入等待队列
    Node node = addConditionWaiter();
    //完全释放锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //阻塞直到node被唤醒
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        //如果被中断,则直接break
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //尝试获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

ConditionObject::awaitNanos

线程释放锁,阻塞挂起一段时间,直到被signal唤醒或超时,则继续尝试获取锁

public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // We don't check for nanosTimeout <= 0L here, to allow
    // awaitNanos(0) as a way to "yield the lock".
    final long deadline = System.nanoTime() + nanosTimeout;
    long initialNanos = nanosTimeout;
    //加入等待队列
    Node node = addConditionWaiter();
    //释放所有锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        //等待超时
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    //尝试获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    long remaining = deadline - System.nanoTime(); // avoid overflow
    return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
}

ConditionObject::awaitUntil

线程释放锁,阻塞挂起一段时间,直到被signal唤醒或到指定时间,则继续尝试获取锁

public final boolean awaitUntil(Date deadline)
        throws InterruptedException {
    long abstime = deadline.getTime();
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (System.currentTimeMillis() >= abstime) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        LockSupport.parkUntil(this, abstime);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

ConditionObject::signal

把首节点的status设置为Node.SIGNAL 则阻塞的线程循环判断发现statue状态变了,则唤醒继续执行。如果设置status失败,则在此线程中调用LockSupport.unpark唤醒阻塞的线程

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    //唤醒一个节点,把statue设置为Node.SIGNAL。如果设置失败了,则自己调用LockSupport.unpark唤醒线程
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

ConditionObject::signalAll

唤醒所有的节点。

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

总结

【从入门到放弃-Java】并发编程-锁-synchronized中,我们学习内置锁synchronized,与ReentrantLock对比

  • 两者都是可重入的互斥锁。
  • synchronized是隐式的加解锁,不需要手动解锁。而ReentrantLock需要显式的lock和unlock。lock加锁多少次,对应的就需要unlock多少次。因此一般都会在finally中unlock。避免因异常等情况导致锁无法释放
  • ReentrantLock通过AQS(volatile state + CAS + CLH队列实现)加解锁。synchronized是通过monitor实现(存在偏向锁、轻量级锁、重量级锁等锁升级)。
  • ReentrantLock可以使用lockInterruptibly响应中断,synchronized只能傻等、等到死
  • ReentrantLock可以使用非公平锁和公平锁模式,可以通过非公平性减少CAS的竞争,提升性能。也可以通过公平锁减少线程饥饿情况发生
  • ReentrantLock可以创造多个Condition,来实现线程等待通知机制(阻塞、唤醒)

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

目录
相关文章
|
1月前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
34 0
|
3月前
|
存储 安全 Java
从入门到精通:Java Map全攻略,一篇文章就够了!
【10月更文挑战第17天】本文详细介绍了Java编程中Map的使用,涵盖Map的基本概念、创建、访问与修改、遍历方法、常用实现类(如HashMap、TreeMap、LinkedHashMap)及其特点,以及Map在多线程环境下的并发处理和性能优化技巧,适合初学者和进阶者学习。
89 3
|
9天前
|
自然语言处理 Java
Java中的字符集编码入门-增补字符(转载)
本文探讨Java对Unicode的支持及其发展历程。文章详细解析了Unicode字符集的结构,包括基本多语言面(BMP)和增补字符的表示方法,以及UTF-16编码中surrogate pair的使用。同时介绍了代码点和代码单元的概念,并解释了UTF-8的编码规则及其兼容性。
81 60
|
1月前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
67 6
Spring Boot 入门:简化 Java Web 开发的强大工具
|
1月前
|
监控 架构师 Java
Java虚拟机调优的艺术:从入门到精通####
本文作为一篇深入浅出的技术指南,旨在为Java开发者揭示JVM调优的神秘面纱,通过剖析其背后的原理、分享实战经验与最佳实践,引领读者踏上从调优新手到高手的进阶之路。不同于传统的摘要概述,本文将以一场虚拟的对话形式,模拟一位经验丰富的架构师向初学者传授JVM调优的心法,激发学习兴趣,同时概括性地介绍文章将探讨的核心议题——性能监控、垃圾回收优化、内存管理及常见问题解决策略。 ####
|
2月前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
192 6
|
2月前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
2月前
|
Java 大数据 API
14天Java基础学习——第1天:Java入门和环境搭建
本文介绍了Java的基础知识,包括Java的简介、历史和应用领域。详细讲解了如何安装JDK并配置环境变量,以及如何使用IntelliJ IDEA创建和运行Java项目。通过示例代码“HelloWorld.java”,展示了从编写到运行的全过程。适合初学者快速入门Java编程。
|
2月前
|
存储 缓存 安全
Java内存模型(JMM):深入理解并发编程的基石####
【10月更文挑战第29天】 本文作为一篇技术性文章,旨在深入探讨Java内存模型(JMM)的核心概念、工作原理及其在并发编程中的应用。我们将从JMM的基本定义出发,逐步剖析其如何通过happens-before原则、volatile关键字、synchronized关键字等机制,解决多线程环境下的数据可见性、原子性和有序性问题。不同于常规摘要的简述方式,本摘要将直接概述文章的核心内容,为读者提供一个清晰的学习路径。 ####
51 2
|
2月前
|
存储 安全 Java
🌟Java零基础-反序列化:从入门到精通
【10月更文挑战第21天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
93 5