【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock

简介: 前言ReentrantLock是非常常用的锁,在前面【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue在我们了解到,LinkedBlockingQueue入队、出队都是依赖ReentrantLock进行锁同步和线程唤醒、等待的。

前言

ReentrantLock是非常常用的锁,在前面【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue在我们了解到,LinkedBlockingQueue入队、出队都是依赖ReentrantLock进行锁同步和线程唤醒、等待的。
本文来学习下ReentrantLock。

ReentrantLock

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

通过构造函数,我们可以看到可以根据参数fair,生成公平的同步和不公平的同步模式。
接下来需要看下FairSync和NonfairSync到底是何方神圣

Sync


Sync是一个抽象类,FairSync和NonfairSync都继承自Sync并实现了tryAcquire方法,tryAcquire是在AbstractQueuedSynchronizer(AQS)中声明的。
AbstractQueuedSynchronizer中的方法非常多,我们通过ReentrantLock中各方法的调用来逐步熟悉它。

ReentrantLock::lock

public void lock() {
    sync.acquire(1);
}

请求锁,如果加锁失败则一直等待。
ReentrantLock中加锁的方法非常简洁,直接调用sync的acquire方法
下面我们看下acquire的具体实现。

AbstractQueuedSynchronizer::acquire

/**
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            //会一直等待,直到获取到锁为止
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}
  • 首先尝试获取锁(具体实现下面分析),获取成功函数结束
  • 获取失败,则加入等待队列一直自旋尝试获取锁直到获取成功或超时。
  • 如果获取失败,则抛出中断异常

NonfairSync::tryAcquire

/**
 * Sync object for non-fair locks
 */
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果state = 0 即当前没加锁,则尝试通过CAS的方式加锁,加锁后将持有锁的线程设置为当前线程
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果当前有锁,则判断是否是当前线程的锁,是的话state加一,即重入锁,不是的话 返回加锁失败
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
  • 如果没锁,则尝试获取锁
  • 如果有锁,判断是否是当前线程持有的
  • 是当前线程持有,则state值加1 返回加锁成功。即重入锁
  • 不是当前线程持有,则加锁失败

FairSync::tryAcquire

/**
 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
 */
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果没锁
    if (c == 0) {
        //如果队列中没有比当前线程等待更久的线程,则尝试通过CAS的方式获取锁
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果当前有锁,则判断是否是当前线程的锁,是的话state加一,即重入锁,不是的话 返回加锁失败
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

/**
 * Queries whether any threads have been waiting to acquire longer
 * than the current thread.
 *
 * <p>An invocation of this method is equivalent to (but may be
 * more efficient than):
 * <pre> {@code
 * getFirstQueuedThread() != Thread.currentThread()
 *   && hasQueuedThreads()}</pre>
 *
 * <p>Note that because cancellations due to interrupts and
 * timeouts may occur at any time, a {@code true} return does not
 * guarantee that some other thread will acquire before the current
 * thread.  Likewise, it is possible for another thread to win a
 * race to enqueue after this method has returned {@code false},
 * due to the queue being empty.
 *
 * <p>This method is designed to be used by a fair synchronizer to
 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.
 * Such a synchronizer's {@link #tryAcquire} method should return
 * {@code false}, and its {@link #tryAcquireShared} method should
 * return a negative value, if this method returns {@code true}
 * (unless this is a reentrant acquire).  For example, the {@code
 * tryAcquire} method for a fair, reentrant, exclusive mode
 * synchronizer might look like this:
 *
 * <pre> {@code
 * protected boolean tryAcquire(int arg) {
 *   if (isHeldExclusively()) {
 *     // A reentrant acquire; increment hold count
 *     return true;
 *   } else if (hasQueuedPredecessors()) {
 *     return false;
 *   } else {
 *     // try to acquire normally
 *   }
 * }}</pre>
 *
 * @return {@code true} if there is a queued thread preceding the
 *         current thread, and {@code false} if the current thread
 *         is at the head of the queue or the queue is empty
 * @since 1.7
 */
public final boolean hasQueuedPredecessors() {
    Node h, s;
    //如果队列不为空
    if ((h = head) != null) {
        //看当前线程是不是在队列的队首,即排队时间最长的队列
        if ((s = h.next) == null || s.waitStatus > 0) {
            s = null; // traverse in case of concurrent cancellation
            //这里为啥从队列尾部开始向前遍历???可能是因为队列头部可能会有大量超时的节点,从后往前遍历更快?
            for (Node p = tail; p != h && p != null; p = p.prev) {
                if (p.waitStatus <= 0)
                    s = p;
            }
        }
        if (s != null && s.thread != Thread.currentThread())
            return true;
    }
    return false;
}

ReentrantLock::lockInterruptibly

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

/**
 * Acquires in exclusive mode, aborting if interrupted.
 * Implemented by first checking interrupt status, then invoking
 * at least once {@link #tryAcquire}, returning on
 * success.  Otherwise the thread is queued, possibly repeatedly
 * blocking and unblocking, invoking {@link #tryAcquire}
 * until success or the thread is interrupted.  This method can be
 * used to implement method {@link Lock#lockInterruptibly}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @throws InterruptedException if the current thread is interrupted
 */
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    //如果线程被重点,抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果获取锁失败 
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

/**
 * Acquires in exclusive interruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    //new一个新节点
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        //轮询直到获取到锁或者线程被中断
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

请求锁,如果失败则一直阻塞等待 直到获取锁或线程中断

ReentrantLock::tryLock

public boolean tryLock() {
    //尝试获取锁,获取失败的话 直接返回false,不会再等待
    return sync.nonfairTryAcquire(1);
}

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    //尝试获取锁,如果失败的话,等待timeout时间后返回false,如果被中断则抛出异常
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

请求锁,如果请求失败,则返回false

ReentrantLock::unlock

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    //释放锁 state数减releases
    int c = getState() - releases;
    //如果当前线程没有持有锁,则抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //当c=0时,锁完全释放,ownerThread设为null。
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

释放锁,直到state=0完全释放时,线程owner设置为null

ReentrantLock::newCondition

public Condition newCondition() {
    return sync.newCondition();
}

final ConditionObject newCondition() {
    return new ConditionObject();
}

ConditionObject::await

线程释放锁,阻塞挂起,直到被signal唤醒,则继续尝试获取锁

public final void await() throws InterruptedException {
    //如果当前线程被中断、则抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //新创建个节点,将当前线程加入等待队列
    Node node = addConditionWaiter();
    //完全释放锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //阻塞直到node被唤醒
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        //如果被中断,则直接break
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //尝试获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

ConditionObject::awaitNanos

线程释放锁,阻塞挂起一段时间,直到被signal唤醒或超时,则继续尝试获取锁

public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // We don't check for nanosTimeout <= 0L here, to allow
    // awaitNanos(0) as a way to "yield the lock".
    final long deadline = System.nanoTime() + nanosTimeout;
    long initialNanos = nanosTimeout;
    //加入等待队列
    Node node = addConditionWaiter();
    //释放所有锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        //等待超时
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    //尝试获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    long remaining = deadline - System.nanoTime(); // avoid overflow
    return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
}

ConditionObject::awaitUntil

线程释放锁,阻塞挂起一段时间,直到被signal唤醒或到指定时间,则继续尝试获取锁

public final boolean awaitUntil(Date deadline)
        throws InterruptedException {
    long abstime = deadline.getTime();
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (System.currentTimeMillis() >= abstime) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        LockSupport.parkUntil(this, abstime);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

ConditionObject::signal

把首节点的status设置为Node.SIGNAL 则阻塞的线程循环判断发现statue状态变了,则唤醒继续执行。如果设置status失败,则在此线程中调用LockSupport.unpark唤醒阻塞的线程

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    //唤醒一个节点,把statue设置为Node.SIGNAL。如果设置失败了,则自己调用LockSupport.unpark唤醒线程
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

ConditionObject::signalAll

唤醒所有的节点。

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

总结

【从入门到放弃-Java】并发编程-锁-synchronized中,我们学习内置锁synchronized,与ReentrantLock对比

  • 两者都是可重入的互斥锁。
  • synchronized是隐式的加解锁,不需要手动解锁。而ReentrantLock需要显式的lock和unlock。lock加锁多少次,对应的就需要unlock多少次。因此一般都会在finally中unlock。避免因异常等情况导致锁无法释放
  • ReentrantLock通过AQS(volatile state + CAS + CLH队列实现)加解锁。synchronized是通过monitor实现(存在偏向锁、轻量级锁、重量级锁等锁升级)。
  • ReentrantLock可以使用lockInterruptibly响应中断,synchronized只能傻等、等到死
  • ReentrantLock可以使用非公平锁和公平锁模式,可以通过非公平性减少CAS的竞争,提升性能。也可以通过公平锁减少线程饥饿情况发生
  • ReentrantLock可以创造多个Condition,来实现线程等待通知机制(阻塞、唤醒)

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

目录
相关文章
|
16天前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
142 60
【Java并发】【线程池】带你从0-1入门线程池
|
3天前
|
缓存 安全 Java
【Java并发】【synchronized】适合初学者体质入门的synchronized
欢迎来到我的Java线程同步入门指南!我不是外包员工,梦想是写高端CRUD。2025年我正在沉淀中,博客更新速度加快,欢迎点赞、收藏、关注。 本文介绍Java中的`synchronized`关键字,适合初学者。`synchronized`用于确保多个线程访问共享资源时不会发生冲突,避免竞态条件、保证内存可见性、防止原子性破坏及协调多线程有序访问。
45 8
【Java并发】【synchronized】适合初学者体质入门的synchronized
|
3天前
|
存储 监控 Java
《从头开始学java,一天一个知识点》之:数组入门:一维数组的定义与遍历
**你是否也经历过这些崩溃瞬间?** - 看了三天教程,连`i++`和`++i`的区别都说不清 - 面试时被追问&quot;`a==b`和`equals()`的区别&quot;,大脑突然空白 - 写出的代码总是莫名报NPE,却不知道问题出在哪个运算符 这个系列就是为你打造的Java「速效救心丸」!我们承诺:每天1分钟,地铁通勤、午休间隙即可完成学习;直击痛点,只讲高频考点和实际开发中的「坑位」;拒绝臃肿,没有冗长概念堆砌,每篇都有可运行的代码标本。明日预告:《多维数组与常见操作》。 通过实例讲解数组的核心认知、趣味场景应用、企业级开发规范及优化技巧,帮助你快速掌握Java数组的精髓。
52 23
|
3月前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
60 0
|
2月前
|
自然语言处理 Java
Java中的字符集编码入门-增补字符(转载)
本文探讨Java对Unicode的支持及其发展历程。文章详细解析了Unicode字符集的结构,包括基本多语言面(BMP)和增补字符的表示方法,以及UTF-16编码中surrogate pair的使用。同时介绍了代码点和代码单元的概念,并解释了UTF-8的编码规则及其兼容性。
118 60
|
3月前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
121 7
Spring Boot 入门:简化 Java Web 开发的强大工具
|
3月前
|
监控 架构师 Java
Java虚拟机调优的艺术:从入门到精通####
本文作为一篇深入浅出的技术指南,旨在为Java开发者揭示JVM调优的神秘面纱,通过剖析其背后的原理、分享实战经验与最佳实践,引领读者踏上从调优新手到高手的进阶之路。不同于传统的摘要概述,本文将以一场虚拟的对话形式,模拟一位经验丰富的架构师向初学者传授JVM调优的心法,激发学习兴趣,同时概括性地介绍文章将探讨的核心议题——性能监控、垃圾回收优化、内存管理及常见问题解决策略。 ####
|
4月前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
486 6
|
4月前
|
设计模式 安全 Java
Java 多线程并发编程
Java多线程并发编程是指在Java程序中使用多个线程同时执行,以提高程序的运行效率和响应速度。通过合理管理和调度线程,可以充分利用多核处理器资源,实现高效的任务处理。本内容将介绍Java多线程的基础概念、实现方式及常见问题解决方法。
225 1
|
4月前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin

热门文章

最新文章