深入浅出理解Java并发AQS的共享锁模式

简介: 深入浅出理解Java并发AQS的共享锁模式

概述


这篇文章深入浅出理解Java并发AQS的独占锁模式讲解了AQS的独占锁实现原理,那么本篇文章在阐述AQS另外一个重要模式,共享锁模式,那什么是共享锁呢?

共享锁可以由多个线程同时获取, 比较典型的就是读锁,读操作并不会产生副作用,所以可以允许多个线程同时对数据进行读操作而不会有线程安全问题,jdk中的很多并发工具比如ReadWriteLock和CountdownLatch就是依赖AQS的共享锁实现的。

本文重点讲解下AQS是如何实现共享锁的。


自定义共享锁例子


首先我们通过AQS实现一个非常最最最轻量简单的共享锁例子,帮助大家对共享锁有一个整体的感知。

@Slf4j
public class ShareLock {
    /**
     * 共享锁帮助类
     */
    private static class ShareSync extends AbstractQueuedSynchronizer {
        private int lockCount;
        /**
         * 创建共享锁帮助类,最多有count把共享锁,超过了则阻塞
         *
         * @param count 共享锁数量
         */
        public ShareSync(int count) {
           this.lockCount = count;
        }
        /**
         * 尝试获取共享锁
         *
         * @param arg 每次获取锁的数量
         * @return 返回正数,表示后续其他线程获取共享锁可能成功; 返回0,表示后续其他线程无法获取共享锁;返回负数,表示当前线程获取共享锁失败
         */
        @Override
        protected int tryAcquireShared(int arg) {
            // 自旋
            for (;;) {
                int c = getState();
                // 如果持有锁的数量大于指定数量,返回-1,线程进入阻塞
                if(c >= lockCount) {
                    return -1;
                }
                int nextc = c + 1;
                // cas设置成功,返回1,获取到共享锁
                if (compareAndSetState(c, nextc)) {
                    return 1;
                }
            }
        }
        /**
         * 尝试释放共享锁
         *
         * @param arg 释放锁的数量
         * @return 如果释放后允许唤醒后续等待结点返回true,否则返回false
         */
        @Override
        protected boolean tryReleaseShared(int arg) {
            // 自旋操作
            for (; ; ) {
                int c = getState();
                // 如果没有锁了
                if (c == 0) {
                    return false;
                }
                // 否则锁量-1
                int nextc = c - 1;
                // cas修改状态
                if (compareAndSetState(c, nextc)) {
                    return true;
                }
            }
        }
    }
    private final ShareSync sync;
    public ShareLock(int count) {
        this.sync = new ShareSync(count);
    }
    /**
     * 加共享锁
     */
    public void lockShare() {
        sync.acquireShared(1);
    }
    /**
     * 释放共享锁
     */
    public void releaseShare() {
        sync.releaseShared(1);
    }
}
  • 创建内部类共享帮助锁ShareSync类,继承自AbstractQueuedSynchronizer类,实现了共享锁相关的方法tryAcquireShared()tryReleaseShared()
  • 创建ShareLock,提供了lockShare()加锁和releaseShare()两个API。

验证:

public static void main(String[] args) throws InterruptedException {
        ShareLock shareLock = new ShareLock(3);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                shareLock.lockShare();
                try {
                    log.info("lock success");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    shareLock.releaseShare();
                    log.info("release success");
                }
            }, "thread-" + i).start();
        }
        Thread.sleep(10000);
    }
  • 一共创建最多共同有3个线程共享的共享锁。
  • 创建5个线程去竞争共享锁。

运行结果:

1671194328586.jpg

  • 运行结果显示每次最多只有3个lock success,说明同时只有3个线程共享。
  • 只有在释放共享锁以后,其他线程才能获取锁。

下面对它的实现原理一探究竟。


核心原理机制


共享模式也是由AQS提供的,首先我们关注下AQS的数据结构。

1671194338192.jpg

AQS内部维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。

AQS作为一个抽象方法,提供了加锁、和释放锁的框架,这里采用的模板方模式,在上面中提到的tryAcquireSharedtryReleaseShared就是和共享模式相关的模板方法。

方法名 描述
protected int tryAcquireShared(int arg) 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected boolean tryReleaseShared(int arg) 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。

共享模式的入口方法如下:

方法名 描述
void acquireShared(int arg) 共享模式获取锁,不响应中断。
void acquireSharedInterruptibly(int arg) 共享模式获取锁,响应中断。
tryAcquireSharedNanos(int arg, long nanosTimeout) 尝试在共享模式下获取锁,如果中断则中止,如果超过给定超时则失败。
boolean releaseShared(int arg) 共享模式下释放锁。


源码解析


1671194350898.jpg

上图是AQS的类结构图,其中标红部分是组成AQS的重要成员变量。


成员变量


  1. state共享变量

AQS中里一个很重要的字段state,表示同步状态,是由volatile修饰的,用于展示当前临界资源的获锁情况。通过getState(),setState(),compareAndSetState()三个方法进行维护。

关于state的几个要点:

  • 使用volatile修饰,保证多线程间的可见性。
  • getState()、setState()、compareAndSetState()使用final修饰,限制子类不能对其重写。
  • compareAndSetState()采用乐观锁思想的CAS算法,保证原子性操作。
  1. CLH队列(FIFO队列)

AQS里另一个重要的概念就是CLH队列,它是一个双向链表队列,其内部由head和tail分别记录头结点和尾结点,队列的元素类型是Node。

private transient volatile Node head;
private transient volatile Node tail;

Node的结构如下:

static final class Node {
    //共享模式下的等待标记
    static final Node SHARED = new Node();
    //独占模式下的等待标记
    static final Node EXCLUSIVE = null;
    //表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
    static final int CANCELLED =  1;
    //表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
    static final int SIGNAL    = -1;
    //表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
    static final int CONDITION = -2;
    //共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
    static final int PROPAGATE = -3;
    //状态,包括上面的四种状态值,初始值为0,一般是节点的初始状态
    volatile int waitStatus;
    //上一个节点的引用
    volatile Node prev;
    //下一个节点的引用
    volatile Node next;
    //保存在当前节点的线程引用
    volatile Thread thread;
    //condition队列的后续节点
    Node nextWaiter;
}

注意,waitSstatus负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。

  1. exclusiveOwnerThread

AQS通过继承AbstractOwnableSynchronizer类,拥有的属性。表示独占模式下同步器持有的线程。


共享锁获取acquireShared(int)


acquireShared(int)是共享锁模式下线程获取共享资源的入口方法,它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程无法响应中断。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

方法的整体流程如下:

  1. tryAcquireShared()尝试获取资源,需要自定义同步器去实现,返回负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。
  2. 如果失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。

doAcquireShared(int)

此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。

private void doAcquireShared(int arg) {
    //封装线程为共享Node 加入队列尾部
    final Node node = addWaiter(Node.SHARED);
    //是否成功标志
    boolean failed = true;
    try {
        //等待过程中是否被中断过的标志
        boolean interrupted = false;
        // 自旋操作
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            //如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
            if (p == head) {
                //尝试获取资源
                int r = tryAcquireShared(arg);
                //成功
                if (r >= 0) {
                    //将head指向自己,还有剩余资源可以再唤醒之后的线程
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    //如果等待过程中被打断过,此时将中断补上。
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireShared方法的实现和获取独占锁中的acquireQueued方法很类似,但是主要有一点不同,那就是线程在被唤醒后,若成功获取到了共享锁,还需要判断共享锁是否还能被其他线程获取,若可以,则继续向后唤醒它的下一个节点对应的线程。

setHeadAndPropagate(Node, int)

该方法主要将当前节点设置为头节点,同时判断条件是否符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    //head指向自己
    setHead(node);
     //如果还有剩余量,继续唤醒下一个邻居线程
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 唤醒操作
            doReleaseShared();
    }
}


共享释放releaseShared(int)


releaseShared(int)是共享模式下线程释放共享资源的入口,它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。

public final boolean releaseShared(int arg) {
    //尝试释放资源
    if (tryReleaseShared(arg)) {
        //唤醒后继结点
        doReleaseShared();
        return true;
    }
    return false;
}

方法的整体流程如下:

  • tryReleaseShared尝试释放锁,这由自定义同步器去实现, 返回true表示释放成功。
  • doReleaseShared唤醒后续队列中等待的节点,

doReleaseShared()

此方法主要用于唤醒队列中等待的共享节点。

private void doReleaseShared() {
    // 自旋操作
    for (;;) {
        // 获取头节点
        Node h = head;
        if (h != null && h != tail) {
            // 获取节点的等待状态
            int ws = h.waitStatus;
            // 如果节点等待状态是-1, -1表示有责任唤醒后续节点的状态
            if (ws == Node.SIGNAL) {
                // cas修改当前节点的等待状态为0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                //唤醒后续节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)// head发生变化
            break;
    }
}
  • 逻辑是一个死循环,每次循环中重新读取一次head,然后保存在局部变量h中,再配合if(h == head) break;,这样,循环检测到head没有变化时就会退出循环。注意,head变化一定是因为:acquire thread被唤醒,之后它成功获取锁,然后setHead设置了新head。而且注意,只有通过if(h == head) break;即head不变才能退出循环,不然会执行多次循环。
  • if (h != null && h != tail)判断队列是否至少有两个node,如果队列从来没有初始化过(head为null),或者head就是tail,那么中间逻辑直接不走,直接判断head是否变化了。
  • 如果队列中有两个或以上个node,那么检查局部变量h的状态:
  • 如果状态为SIGNAL,说明h的后继是需要被通知的。通过对CAS操作结果取反,将compareAndSetWaitStatus(h, Node.SIGNAL, 0)unparkSuccessor(h)绑定在了一起。说明了只要head成功得从SIGNAL修改为0,那么head的后继的代表线程肯定会被唤醒了。
  • 如果状态为0,说明h的后继所代表的线程已经被唤醒或即将被唤醒,并且这个中间状态即将消失,要么由于acquire thread获取锁失败再次设置head为SIGNAL并再次阻塞,要么由于acquire thread获取锁成功而将自己(head后继)设置为新head并且只要head后继不是队尾,那么新head肯定为SIGNAL。所以设置这种中间状态的head的status为PROPAGATE,让其status又变成负数,这样可能被被唤醒线程检测到。
  • 如果状态为PROPAGATE,直接判断head是否变化。
  • 两个continue保证了进入那两个分支后,只有当CAS操作成功后,才可能去执行if(h == head) break;,才可能退出循环。
  • if(h == head) break;保证了,只要在某个循环的过程中有线程刚获取了锁且设置了新head,就会再次循环。目的当然是为了再次执行unparkSuccessor(h),即唤醒队列中第一个等待的线程。


总结


本文主要讲解了AQS的共享模式,通过一个自定义简单的demo帮助大家深入浅出的理解,同时深入分析了源码实现,希望对大家有帮助。

目录
打赏
0
0
0
0
14
分享
相关文章
java中的公平锁、非公平锁、可重入锁、递归锁、自旋锁、独占锁和共享锁
本文介绍了几种常见的锁机制,包括公平锁与非公平锁、可重入锁与不可重入锁、自旋锁以及读写锁和互斥锁。公平锁按申请顺序分配锁,而非公平锁允许插队。可重入锁允许线程多次获取同一锁,避免死锁。自旋锁通过循环尝试获取锁,减少上下文切换开销。读写锁区分读锁和写锁,提高并发性能。文章还提供了相关代码示例,帮助理解这些锁的实现和使用场景。
java中的公平锁、非公平锁、可重入锁、递归锁、自旋锁、独占锁和共享锁
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面
本文介绍了如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面。通过合理配置初始连接数、最大连接数和空闲连接超时时间,确保系统性能和稳定性。文章还探讨了同步阻塞、异步回调和信号量等并发控制策略,并提供了异常处理的最佳实践。最后,给出了一个简单的连接池示例代码,并推荐使用成熟的连接池框架(如HikariCP、C3P0)以简化开发。
77 2
|
3月前
|
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
51 1
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
JAVA并发编程AQS原理剖析
很多小朋友面试时候,面试官考察并发编程部分,都会被问:说一下AQS原理。面对并发编程基础和面试经验,专栏采用通俗简洁无废话无八股文方式,已陆续梳理分享了《一文看懂全部锁机制》、《JUC包之CAS原理》、《volatile核心原理》、《synchronized全能王的原理》,希望可以帮到大家巩固相关核心技术原理。今天我们聊聊AQS....
【Java新纪元启航】JDK 22:解锁未命名变量与模式,让代码更简洁,思维更自由!
【9月更文挑战第7天】JDK 22带来的未命名变量与模式匹配的结合,是Java编程语言发展历程中的一个重要里程碑。它不仅简化了代码,提高了开发效率,更重要的是,它激发了我们对Java编程的新思考,让我们有机会以更加自由、更加创造性的方式解决问题。随着Java生态系统的不断演进,我们有理由相信,未来的Java将更加灵活、更加强大,为开发者们提供更加广阔的舞台。让我们携手并进,共同迎接Java新纪元的到来!
93 11
【实战揭秘】如何运用Java发布-订阅模式,打造高效响应式天气预报App?
【8月更文挑战第30天】发布-订阅模式是一种消息通信模型,发送者将消息发布到公共队列,接收者自行订阅并处理。此模式降低了对象间的耦合度,使系统更灵活、可扩展。例如,在天气预报应用中,`WeatherEventPublisher` 类作为发布者收集天气数据并通知订阅者(如 `TemperatureDisplay` 和 `HumidityDisplay`),实现组件间的解耦和动态更新。这种方式适用于事件驱动的应用,提高了系统的扩展性和可维护性。
90 2
Java设计模式-工厂方法模式(4)
Java设计模式-工厂方法模式(4)
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等