Java并发编程之AbstractQueuedSychronizer(抽象队列同步器,简称AQS)

简介: Java并发编程之AbstractQueuedSychronizer(抽象队列同步器,简称AQS)

AbstractQueuedSychronizer(抽象队列同步器,简称AQS)



1.JDK的并发包(包名:java.util.concurrent,以下简称JUC)下面提供了很多并发操作的工具类,如:ReentrantLock,CountDownLatch等。这些并发操作工具类的基础是AbstractQueuedSychronizer


2.*AQS内部维护了一个共享资源和两个队列:*一个是同步队列;一个是条件队列。


public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    //同步队列的头结点
    private transient volatile Node head;
    //同步队列的尾结点
    private transient volatile Node tail;
    //同步队列的共享资源,队列的同步状态
    private volatile int state;
}


3.Node类的主要信息


static final class Node {
    //静态变量,标识节点以共享模式等待资源
    static final Node SHARED = new Node();
    //标识节点以独占模式等待资源
    static final Node EXCLUSIVE = null;
    //等待状态,节点取消等待资源
    static final int CANCELLED =  1;
    //等待状态,标识后继节点需要唤醒
    static final int SIGNAL    = -1;
    //等待状态,标识线程处于条件等待状态
    static final int CONDITION = -2;
    //等待状态,标识线程以共享模式获取资源,释放锁的行为将传播到后续节点,该状态作用于头节点
    static final int PROPAGATE = -3;
    //节点等待状态,是上述4中状态之一,或者为0
    volatile int waitStatus;
    //节点的前驱节点
    volatile Node prev;
    //节点的后继节点
    volatile Node next;
    //节点对应的线程
    volatile Thread thread;
    //条件等待时标识下一个等待条件的节点,指向条件队列中的下一个节点
    //或者,标识共享模式
    Node nextWaiter;
}


4.同步队列是用双向链表实现的,主要用于记录等待获取共享资源的线程


5.条件队列是一个单向链表的结构,链表中的元素也是Node,只不过条件队列中的元素使用Node的nextWaiter指向下一个元素。


6.AQS对外提供的protected类型的方法入手分析一下AQS的工作原理:


/**
    *尝试以独占模式获取共享资源
    *@param arg 表示需要获取资源的个数
    *@return true表示获取成功,false获取失败
    **/
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
/**
    *尝试以独占模式释放共享资源
    *@param arg 表示释放资源的个数
    *@return true表示释放成功,false释放失败
    **/
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
/**
    *尝试以共享模式获取共享资源
    *@param arg 表示获取资源的个数
    *@return true表示获取成功,false获取失败
    **/
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
/**
    *尝试以共享模式释放共享资源
    *@param arg 表示释放资源的个数
    *@return true表示释放成功,false释放失败
    **/
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
/**
    * 当前线程是否以独占模式获取了共享资源,该方法是在条件对象ConditionObject内部使用的,如果不需要条件等待,则无需实现该方法
    *@return true表示是,false表示否
    **/
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}


AQS以模板方法的模式,提供了多个线程对共享资源(state)操作的算法框架,上面的五个protected类型的方法主要用于尝试获取和释放共享资源,并不会阻塞当前线程,是AQS留给具体的业务操作类(如:ReentrantLock)来实现的。


7.AQS获取资源,释放资源等方法的具体代码:


/**
    *以独占模式获取共享资源,获取成功则返回,否则阻塞当前线程,并将当前线程放入同步队列等待获取资源
    *@param arg 表示需要获取资源的个数
    **/
public final void acquire(int arg) {
    //首先调用子类实现的tryAcquire方法,如果该方法返回true则表示获取成功,不进行后续判断
    //否则,调用acquireQueued方法将当前线程放入同步队列排队等待获取资源
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
/**
    *将当前线程包装成Node,并放入同步队列
    *@param mode 模式,共享模式或者独占模式
    *@return 当前线程所在的节点
    **/
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 先尝试将节点放入队列的尾部,如果成功则返回,否则将节点入队
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //将当前节点放入同步队列,cas操作设置头和尾节点
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
/**
    *在队列中不断尝试获取资源
    *@param node 当前线程所在节点
    *@param arg 获取资源的个数
    *@return 等待资源过程中线程是否被中断
    **/
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //获取当前节点的前驱节点
            final Node p = node.predecessor();
            //只有前驱是头节点的情况下才尝试获取锁,因为头结点是当前持有资源的线程所在的节点,如果前驱不是头节点那么没有必要尝试获取
            if (p == head && tryAcquire(arg)) {
                //获取成功后将当前节点设置为头结点
                setHead(node);
                //释放原来的头节点
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果节点前驱不是头结点或者获取资源失败则阻塞当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
/**
    *以独占模式释放共享资源
    *@param arg 表示释放资源的个数
    *@return true表示释放成功,false释放失败
    **/
public final boolean release(int arg) {
    //尝试释放资源,如果失败则直接返回
    if (tryRelease(arg)) {
        //释放成功后,唤醒后继节点
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}


AQS中独占模式获取和释放资源的方法,这两个方法可以用于实现锁的功能,事实上ReentrantLock就是基于以上方法实现的。以共享模式获取和释放资源的方法,与独占模式类似


8.条件对象的等待和唤醒方法:


public class ConditionObject implements Condition, java.io.Serializable {    
  //第一个条件等待的节点
    private transient Node firstWaiter;
    //最后一个条件等待的节点
    private transient Node lastWaiter;
    /**
      * 唤醒条件队列中的第一个等待的线程,此时该线程将进入同步队列重新等待获取资源
      */
    public final void signal() {
        //判断当前线程是否以独占模式占有资源
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            //将条件队列中的第一个线程,重新放入同步队列
            doSignal(first);
    }
    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    final boolean transferForSignal(Node node) {
        //如果CAS操作失败,说明线程取消获取共享资源,此时返回false,doSignal会尝试将下一个节点放入同步队列
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        //将节点放入同步队列
        Node p = enq(node);
        int ws = p.waitStatus;
        //设置节点的前驱节点的状态为Node.SIGNAL
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    //使获取共享资源的线程等待并进入条件队列,如果当前线程被中断则退出
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //在条件队列中添加一个节点
        Node node = addConditionWaiter();
        //释放当前线程获取的共享资源
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        //判断当前节点是否在同步队列中,如果不在则暂停当前线程
        while (!isOnSyncQueue(node)) {
            //暂停当前线程,该方法响应中断;当调用signal()方法的线程释放共享资源时,会从该处继续执行
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //重新等待获取锁
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
  }


9.总结:当线程获取共享资源成功时返回,否则进入同步队列等待前驱节点唤醒,此时当前线程处于阻塞状态(LockSupport.park方法使线程阻塞);前驱节点释放共享资源后会唤醒(LockSupport.unpark方法唤醒线程)后继节点,需要说明的是获取共享资源成功的线程必定是头节点所在的线程。当获取共享资源的线程,调用Condition.await()方法时,当前线程会进入条件队列等待;当其他线程调用Condition.signal()方法,并释放共享资源时当前线程会重新进入同步队列等待获取共享资源。


目录
相关文章
|
1月前
|
存储 安全 Java
【用Java学习数据结构系列】探索栈和队列的无尽秘密
【用Java学习数据结构系列】探索栈和队列的无尽秘密
31 2
|
2月前
|
Java API 容器
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
|
1月前
|
存储 算法 Java
【用Java学习数据结构系列】用堆实现优先级队列
【用Java学习数据结构系列】用堆实现优先级队列
31 0
|
3月前
|
Java
java中的队列
这篇文章通过Java代码示例介绍了使用数组实现队列操作,包括队列的初始化、入队、出队、判断队列满和空以及遍历队列的方法。
java中的队列
|
Java 调度
Java基础-抽象队列同步器:AbstractQueuedSynchronizer(2)-AQS的源码
一般来说,自定义同步器要么是独占方法,要么是共享方式; 他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。 AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
137 0
|
9天前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
16天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
8天前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
|
8天前
|
Java 开发者
Java多线程编程的艺术与实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的技术文档,本文以实战为导向,通过生动的实例和详尽的代码解析,引领读者领略多线程编程的魅力,掌握其在提升应用性能、优化资源利用方面的关键作用。无论你是Java初学者还是有一定经验的开发者,本文都将为你打开多线程编程的新视角。 ####
|
7天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
下一篇
无影云桌面