维护AQS提供的队列 - 入队操作
最后,我们来看看如何维护AQS提供的队列,主要看入队操作。
入队操作: 当一个线程获取锁失败后该线程会被转换为Node节点,然后就会使用enq(final Node node)方法将该节点插入到AQS的阻塞队列.
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; // 1 if (t == null) { // Must initialize if (compareAndSetHead(new Node()))// 2 tail = head; } else { node.prev = t; // 3 if (compareAndSetTail(t, node)) { // 4 t.next = node; return t; } } } }
下面结合代码和节点图来讲解入队的过程。
【第一次循环】
- 如上代码在第一次循环中,当要在AQS队列尾部插入元素时,AQS队列状态如下所示
也就是队列头、尾节点都指向null;
- 当执行代码(1)后节点t指向了尾部节点,这时候队列状态如下图所示。
- 这时候t为null,故执行代码(2),使用CAS算法设置一个哨兵节点为头节点,如果CAS设置成功,则让尾部节点也指向哨兵节点,这时候队列状态如下图所示
【第二次循环】
- 到现在为止只插入了一个哨兵节点,还需要插入node节点,所以在第二次循环后执行到代码(1),这时候队列状态如下图所示
- 然后执行代码(3)设置node(入参)的前驱节点为尾部节点,这时候队列状态如下图所示
- 然后通过CAS算法设置node节点为尾部节点,CAS成功后队列状态如下图所示
CAS成功后再设置原来的尾部节点的后驱节点为node,这时候就完成了双向链表的插入,此时队列状态如下图所示。
AQS——条件变量的支持
我们知道notify和wait,是配合synchronized内置锁实现线程间同步的基础设施一样,条件变量的signal和await方法也是用来配合锁(使用AQS实现的锁)实现线程间同步的基础设施。
它们的不同在于,synchronized同时只能与一个共享变量的notify或wait方法实现同步,而AQS的一个锁可以对应多个条件变量。
在调用共享变量的notify和wait方法前必须先获取该共享变量的内置锁,同理,在调用条件变量的signal和await方法前也必须先获取条件变量对应的锁。
那么,到底什么是条件变量呢?如何使用呢?不急,下面看一个例子。
// 1 ReentrantLock lock = new ReentrantLock(); // 2 Condition condition = lock.newCondition(); // 3 lock.lock(); try { System.out.println("begin wait"); // 4 condition.await(); System.out.println("end wait"); } catch (Exception e) { e.printStackTrace(); } finally { // 5 lock.unlock(); } // 6 lock.lock(); try { System.out.println("begin single"); // 7 condition.signal(); System.out.println("end single"); } catch (Exception e) { e.printStackTrace(); } finally { // 8 lock.unlock(); }
代码(1)创建了一个独占锁ReentrantLock对象,ReentrantLock是基于AQS实现的锁。
代码(2)使用创建的Lock对象的newCondition()方法创建了一个ConditionObject变量,这个变量就是Lock锁对应的一个条件变量。需要注意的是,一个Lock对象可以创建多个条件变量。
代码(3)首先获取了独占锁
代码(4)则调用了条件变量的await()方法阻塞挂起了当前线程。 当其他线程调用条件变量的signal方法时,被阻塞的线程才会从await处返回。需要注意的是,和调用Object的wait方法一样,如果在没有获取到锁前调用了条件变量的await方法则会抛出 java.lang.IllegalMonitorStateException异常。
代码(5)则释放了获取的锁。
其实这里的Lock对象等价于synchronized加上共享变量,调用lock.lock()方法就相当于进入了synchronized块(获取了共享变量的内置锁),调用lock.unLock()方法就相当于退出synchronized块。 调用条件变量的await()方法就相当于调用共享变量的wait()方法,调用条件变量的signal方法就相当于调用共享变量的notify()方法。调用条件变量的signalAll()方法就相当于调用共享变量的notifyAll()方法。
经过上面解释,知道条件变量是什么,它是用来做什么的了。
在上面代码中,lock.newCondition()的作用其实是new了一个在AQS内部声明的ConditionObject对象,ConditionObject是AQS的内部类,可以访问AQS内部的变量(例如状态变量state)和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程。注意这个条件队列和AQS队列不是一回事。
在如下代码中,当线程调用条件变量的await()方法时(必须先调用锁的lock()方法获取锁),在内部会构造一个类型为Node.CONDITION的node节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的state变量的值),并被阻塞挂起。
这时候如果有其他线程调用lock.lock()尝试获取锁,就会有一个线程获取到锁,如果获取到锁的线程调用了条件变量的await()方法,则该线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,在await()方法处阻塞。
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 9 创建新的node节点,并插入到条件队列的对尾 Node node = addConditionWaiter(); // 10 释放当前线程获取的锁 int savedState = fullyRelease(node); int interruptMode = 0; // 11 调用park方法阻塞挂起当前线程 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
在如下代码中,当另外一个线程调用条件变量的signal方法时(必须先调用锁的lock()方法获取锁),在内部会把条件队列里面队头的一个线程节点从条件队列里面移除并放入AQS的阻塞队列里面,然后激活这个线程。
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) // 将条件队列的队首移动到AQS队列 doSignal(first); }
需要注意的是,AQS只提供了ConditionObject的实现,并没有提供newCondition函数,该函数用来new一个ConditionObject对象。需要由AQS的子类来提供newCondition函数。
下面来看当一个线程调用条件变量的await()方法而被阻塞后,如何将其放入条件队列。
/** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 1 Node node = new Node(Thread.currentThread(), Node.CONDITION); // 2 if (t == null) firstWaiter = node; else // 3 t.nextWaiter = node; // 4 lastWaiter = node; return node; }
代码(1)首先根据当前线程创建一个类型为Node.CONDITION的节点,然后通过代码(2)(3)(4)在单向条件队列尾部插入一个元素。
注意:当多个线程同时调用lock.lock()方法获取锁时,只有一个线程获取到了锁,其他线程会被转换为Node节点插入到lock锁对应的AQS阻塞队列里面,并做自旋CAS尝试获取锁。
如果获取到锁的线程又调用了对应的条件变量的await()方法,则该线程会释放获取到的锁,并被转换为Node节点插入到条件变量对应的条件队列里面。
这时候因为调用lock.lock()方法被阻塞到AQS队列里面的一个线程会获取到被释放的锁,如果该线程也调用了条件变量的await()方法则该线程也会被放入条件变量的条件队列里面。
当另外一个线程调用条件变量的signal()或者signalAll()方法时,会把条件队列里面的一个或者全部Node节点移动到AQS的阻塞队列里面,等待时机获取锁。
最后使用一个图总结如下:一个锁对应一个AQS阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列。
基于AQS实现自定义同步器
我们基于AQS实现一个不可重入的独占锁,。
自定义AQS需要重写一系列函数,还需要定义原子变量state的含义。这里我们定义,state为0表示目前锁没有被线程持有,state为1表示锁已经被某一个线程持有,由于是不可重入锁,所以不需要记录持有锁的线程获取锁的次数。另外,我们自定义的锁支持条件变量。
【基于AQS实现的不可重入的独占锁】
import java.io.Serializable; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/12/5 22:35 * @mark: show me the code , change the world */ public class NonReentrantLock implements Lock, Serializable { //静态内部类,用于辅助 private static class Sync extends AbstractQueuedSynchronizer{ @Override protected boolean tryAcquire(int arg) { assert arg == 1;//如果state为0,则尝试获取锁 if (compareAndSetState(0,1)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { assert arg == 1;//如果state为0,则尝试获取锁 if (getState()==0){ throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } @Override protected boolean isHeldExclusively() { // 是否锁已经被持有 return getState()==1; } //提供条件变量接口 public Condition newCondition(){ return new ConditionObject(); } } Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } }
在如上代码中,NonReentrantLock定义了一个内部类Sync用来实现具体的锁的操作,Sync则继承了AQS。由于我们实现的是独占模式的锁,所以Sync重写了tryAcquire、tryRelease和isHeldExclusively 3个方法。另外,Sync提供了newCondition这个方法用来支持条件变量。
【使用自定义锁实现生产—消费模型】
import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.Condition; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/12/5 22:59 * @mark: show me the code , change the world */ public class NonReentrantLockTest { static NonReentrantLock lock = new NonReentrantLock(); static Condition notFull = lock.newCondition(); static Condition notEmpty = lock.newCondition(); static Queue<String> queue = new LinkedBlockingQueue<>(); static int queueSize = 10; public static void main(String[] args) { Thread producer = new Thread(() -> { lock.lock(); try { //如果队列满了,则等待 while (queue.size() == queueSize) { notEmpty.await(); } //添加队列元素 queue.add("element "); //唤醒消费线程 notFull.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); }finally { //释放锁 lock.unlock(); } }); Thread consumer = new Thread(() -> { lock.lock(); try { //队列为空,则等待 while (queue.size()==0){ notFull.await(); } //消费元素 queue.poll(); //唤醒生产线程 notEmpty.signalAll(); }catch (InterruptedException e){ e.printStackTrace(); } finally { lock.unlock(); } }); producer.start(); consumer.start(); } }
如上代码首先创建了NonReentrantLock的一个对象lock,然后调用lock.newCondition创建了两个条件变量,用来进行生产者和消费者线程之间的同步。
在main 函数里面,首先创建了producer生产线程,在线程内部首先调用lock.lock()获取独占锁,然后判断当前队列是否已经满了,如果满了则调用notEmpty.await()阻塞挂起当前线程。需要注意的是,这里使用while 而不是if是为了避免虚假唤醒。如果队列不满则直接向队列里面添加元素,然后调用notFull.signalAll()唤醒所有因为消费元素而被阻塞的消费线程,最后释放获取的锁。
然后在main函数里面创建了consumer生产线程,在线程内部首先调用lock.lock()获取独占锁,然后判断当前队列里面是不是有元素,如果队列为空则调用notFull.await()阻塞挂起当前线程。需要注意的是,这里使用while 而不是if是为了避免虚假唤醒。如果队列不为空则直接从队列里面获取并移除元素,然后唤醒因为队列满而被阻塞的生产线程,最后释放获取的锁。