AQS(abstractQueuedSynchronizer)锁实现原理详解

简介: AQS(abstractQueuedSynchronizer)抽象队列同步器。其本身是一个抽象类,提供lock锁的实现。聚合大量的锁机制实现的共用方法。

AQS(abstractQueuedSynchronizer)锁实现原理详解

AQS是什么

AQS(abstractQueuedSynchronizer)抽象队列同步器。其本身是一个抽象类,提供lock锁的实现。聚合大量的锁机制实现的共用方法。

AQS(abstractQueuedSynchronizer)结构图

在这里插入图片描述
在这里插入图片描述

为什么要使用AQS

1、我们使用的可重入锁(公平锁/非公平锁)底层实现是AQS。
通过实现AQS(队列同步器)子类Sync的FairSync和NonfairSync

2、为什么线程争抢锁时,没有抢到资源,线程会进入队列。

3、没有抢到资源的线程为什么不是等待而是阻塞

在这里插入图片描述

AQS实现原理

概况图:

在这里插入图片描述

在这里插入图片描述

简略介绍

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        // 线程取消
        static final int CANCELLED =  1;
        //后继线程需要唤醒
        static final int SIGNAL    = -1;
       

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         */
        volatile int waitStatus;

          // 指向前一个结点的指针
        volatile Node prev;

       // 指向下一个节点的指针
        volatile Node next;

        // 节点中的线程
        volatile Thread thread;

        
        Node nextWaiter;

        //获取当前节点的前驱节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {  
        }
}

    // head指针,指向双向链表队列虚节点/或者空节点
    private transient volatile Node head;

    // tail指针,指向双向队列尾节点
    private transient volatile Node tail;

    // 锁状态 0 表示无线程使用,大于等于表示存在线程占用
    private volatile int state;

**注意: 锁状态 0 表示无线程使用,大于等于表示存在线程占用

private volatile int state;**



示例
现在我们有三个线程,使用lock锁实现互斥访问。

public static void main(String[] args) throws InterruptedException {

        Lock lock = new ReentrantLock();

        new Thread(()->{
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName()+" come in");
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+" out");
            }finally {
                lock.unlock();
            }
        },"A").start();

        new Thread(()->{
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName()+" come in");
            }finally {
                lock.unlock();
            }
            System.out.println(Thread.currentThread().getName()+" out");
        },"B").start();

        new Thread(()->{
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName()+" come in");
            }finally {
                lock.unlock();
            }
            System.out.println(Thread.currentThread().getName()+" out");
        },"C").start();
    }

执行结果
在这里插入图片描述

ABC三个线程同时通过lock.lock()获取锁,只有拿到锁才能访问资源。

注意:我们创建的是不公平锁

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

      
        abstract void lock();

        ......

      }  
    }
    
    public void lock() {
        sync.lock(); //sync为AQS子抽象类,由于其为抽象类
        //通过子类NonfairSync实现 
        // 此时A线程访问这里
    }
    
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

      
        final void lock() {
            // A线程通过unsafe的CAS原语,判断state(锁状态)是否等于0线程。 等于0将state设置为1.表示锁已经线程占用。
            // A结果为true
            if (compareAndSetState(0, 1))
             //将A线程设置为独占线程(独占锁的线程)
              setExclusiveOwnerThread(Thread.currentThread());
            else
                //state锁状态为1,已经被占用。 线程 B,线程C进入这里
                // 尝试获取
                acquire(1);
        }
    }


}

此时图为这样,线程A已经占有锁状态state ,其它线程入队

在这里插入图片描述

 protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

 protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
// AQS方法
// B,C线程会进入这里,假设B线程先进
public final void acquire(int arg) {
        // B线程 尝试获取锁,返回false,取反为true,
        // addWaiter 将B线程加入双向链表队列 
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

// 由于是不公平锁,调用的是NonFairSync
protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

final boolean nonfairTryAcquire(int acquires) {
            // 获取进入的线程  B
            final Thread current = Thread.currentThread();
            // 获取锁状态
            int c = getState();
            // 锁是否被占用  c != 0 ,B线程无法进入
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 当前线程是否为栈有锁的线程  B != A
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
private Node addWaiter(Node mode) {
        // 创建包含B线程的Node
        Node node = new Node(Thread.currentThread(), mode);
       
           // tail默认为null,pred指向null
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 将B线程节点加入队列,返回B节点
        enq(node);
        return node;
    }
 private Node enq(final Node node) {
        for (;;) {
            // 第一次 t指向 taill 为null
            // 第二次 t指向 taill 为空节点
            Node t = tail;
            if (t == null) { // Must initialize
                // 通过CAS原语  将head指向空节点
                if (compareAndSetHead(new Node()))
                    // 将tail也指向 空节点
                    tail = head;
            } else {
                // 将B线程 所在节点,设置前驱节点为空节点
                node.prev = t;
                // 将B线程 所在节点,设置尾节点。tail指向  B线程所在节点
                if (compareAndSetTail(t, node)) {
                    // 空节点后继节点指向 B节点
                    t.next = node;
                 
                    return t;
                }
            }
        }
    }

 private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }  

private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
      

到此处,线程B所在节点Node已经入队,并返回了B节点Node

在这里插入图片描述

// AQS方法
// B,C线程会进入这里,假设B线程先进
public final void acquire(int arg) {
        // B线程 尝试获取锁,返回false,取反为true,
        // addWaiter 将B线程加入双向链表队列 ,并返回B节点
        // acquireQueued(Node,arg)堵塞B线程,自旋获取锁
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 第一次. 获取B节点的前驱节点,空节点 == head ,尝试获取锁,false,因为state为1
                // 第二次,获取B节点的前驱节点,空节点 == head ,尝试获取锁,false,因为state为1
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 第一次 shouldParkAfterFailedAcquire,结果false,p为空节点,node为b节点        
                // 第二次进入 shouldParkAfterFailedAcquire。如果B线程获取锁失败应该堵塞,parkAndCheckInterrupt堵塞B线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


// 线程获取锁失败之后,应该堵塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 节点等待状态默认为0
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 第一次进入,ws为0 设置为-1  返回false
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }


private final boolean parkAndCheckInterrupt() {    
        // 使用lockSupport堵塞线程,B线程停止运行
        LockSupport.park(this);
        return Thread.interrupted();
    }

到目前为止,进入双向链表队列的B线程正式被堵塞

在这里插入图片描述

如果此时A线程unlock,双向列表中队列节点会不在堵塞,获取state执行

new Thread(()->{
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName()+" come in");
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+" out");
            }finally {
                lock.unlock();
            }
        },"A").start();


public void unlock() {
        sync.release(1);
    }

public final boolean release(int arg) {
        //A线程尝试释放锁
        if (tryRelease(arg)) {
            // 获取head指向的 空节点Node
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 解锁后继节点线程 B
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

// 线程A尝试释放锁,
protected final boolean tryRelease(int releases) {    
            // state 此时由 1变为0,返回true
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

// 尝试解锁后继 节点线程
private void unparkSuccessor(Node node) {
      
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 获取后继节点 B节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // B线程不堵塞
        if (s != null)
            LockSupport.unpark(s.thread);
    }

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 第一次. 获取B节点的前驱节点,空节点 == head ,尝试获取锁,false,因为state为1
                // 第二次,获取B节点的前驱节点,空节点 == head ,尝试获取锁,false,因为state为1
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 第一次 shouldParkAfterFailedAcquire,结果false,p为空节点,node为b节点        
                // 第二次进入 shouldParkAfterFailedAcquire。如果B线程获取锁失败应该堵塞,parkAndCheckInterrupt堵塞B线程
                // 一旦B线程不堵塞之后,又开始循环获取state状态锁,
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在这里插入图片描述

公平锁和不公平锁细微差比

上述中可以看到,当执行的线程释放锁后,直接让队列中空节点的后继节点线程不在堵塞获取锁资源。这是公平锁的体现啊???

但是我创建的是不公平锁啊?????

那么不公平体现在哪呢?

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    }

不公平锁的不公平性主要体现在 lock.lock()的时候,线程通过CAS原语来抢夺锁,谁先抢到state归谁。

公平锁则是直接从双向列表队列中获取

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
}        

CAS原理总结

1、默认不公平锁, lock.lock()的时候,就开始争夺锁state(锁状态)

**2、没有争抢到锁的线程,会被封装为node节点,进入队列,然后堵塞park。
注意:head指针指向虚节点(空节点),tail指针指向尾节点**

3、当执行的线程unlock时,会通知队列中后继节点线程不堵塞unpark,继续执行任务即可

相关文章
|
7月前
|
存储 Java
AQS(AbstractQueuedSynchronizer,队列同步器)源码解读
AQS(AbstractQueuedSynchronizer,队列同步器)源码解读
|
3月前
|
安全 Java
JUC锁: ReentrantReadWriteLock详解
`ReentrantReadWriteLock` 主要用于实现高性能的并发读取,而在写操作相对较少的场景中表现尤为突出。它保证了数据的一致性和线程安全,在合适的场合合理使用 `ReentrantReadWriteLock`,可以实现更加细粒度的控制,并显著提升应用性能。然而,需要注意它的复杂度较一般的互斥锁高,因此在选择使用时要仔细考虑其适用场景。
41 1
|
5月前
|
安全 Java
Java多线程中的锁机制:深入解析synchronized与ReentrantLock
Java多线程中的锁机制:深入解析synchronized与ReentrantLock
96 0
|
7月前
|
Java
ReentrantLock(可重入锁)源码解读与使用
ReentrantLock(可重入锁)源码解读与使用
|
7月前
|
安全 Java
利用AQS(AbstractQueuedSynchronizer)实现一个线程同步器
利用AQS(AbstractQueuedSynchronizer)实现一个线程同步器
|
7月前
|
存储 设计模式 算法
队列同步器AQS-AbstractQueuedSynchronizer 原理分析
队列同步器AQS-AbstractQueuedSynchronizer 原理分析
103 0
|
安全 Java
并发编程-19AQS同步组件之重入锁ReentrantLock、 读写锁ReentrantReadWriteLock、Condition
并发编程-19AQS同步组件之重入锁ReentrantLock、 读写锁ReentrantReadWriteLock、Condition
97 0
并发编程-19AQS同步组件之重入锁ReentrantLock、 读写锁ReentrantReadWriteLock、Condition
|
Java
JUC基础(三)—— Lock锁 及 AQS(2)
JUC基础(三)—— Lock锁 及 AQS
96 0
|
算法 调度
JUC基础(三)—— Lock锁 及 AQS(1)
JUC基础(三)—— Lock锁 及 AQS
134 0
Java并发之AbstractQueuedSynchronizer(AQS)详解
Java并发之AbstractQueuedSynchronizer(AQS)详解
Java并发之AbstractQueuedSynchronizer(AQS)详解