三、基于AQS实现自定义同步器
之前学习了这么多关于AQS的原理性的知识,这一期,我们来基于AQS实现一个不可重入的独占锁, 自定义AQS需要重写一系列函数,还需要定义原子变量state的含义。这里我们定义, state=0 表示目前锁没有被线程持有 ,state=1 表示锁己经被某一个线程持有。 由于是不可重入锁,所以不需要记录持有锁的线程获取锁的次数。另外,还要自定义锁的支持条件变量。
1、代码实现
如下代码是基于AQS实现不可重入的独占锁。
package MyNonReentrantLock; import java.io.Serializable; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class NonReentrantLock implements Lock, Serializable { //静态内部类,用于辅助 private static class Sync extends AbstractQueuedSynchronizer{ @Override protected boolean tryAcquire(int arg) { assert arg == 1;//如果state为0,则尝试获取锁 if (compareAndSetState(0,1)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { assert arg == 1;//如果state为0,则尝试获取锁 if (getState()==0){ throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } @Override protected boolean isHeldExclusively() { // 是否锁已经被持有 return getState()==1; } //提供条件变量接口 public Condition newCondition(){ return new ConditionObject(); } } Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } }
在如上代码中, NonReentrantLock 定义了一个内部类 Sync 用来实现具体的锁的操作, Sync 继承了AQS 。由于实现的是独占模式的锁,所以Sync重写了tryAcquire/tryRelease/isHeldExclusively个方法。另 外, Sync 提供 newCondition 这个方法用来支持条件变量。
2、使用自定义锁实现生产一消费模型
代码如下:
package MyNonReentrantLock; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.Condition; public class 自定义生产消费模型 { static NonReentrantLock lock = new NonReentrantLock(); static Condition notFull = lock.newCondition(); static Condition notEmpty = lock.newCondition(); static Queue<String> queue = new LinkedBlockingQueue<>(); static int queueSize = 10; public static void main(String[] args) { Thread producer = new Thread(() -> { lock.lock(); try { //如果队列满了,则等待 while (queue.size() == queueSize) { notEmpty.await(); } //添加队列元素 queue.add("element "); //唤醒消费线程 notFull.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); }finally { //释放锁 lock.unlock(); } }); Thread consumer = new Thread(() -> { lock.lock(); try { //队列为空,则等待 while (queue.size()==0){ notFull.await(); } //消费元素 queue.poll(); //唤醒生产线程 notEmpty.signalAll(); }catch (InterruptedException e){ e.printStackTrace(); } finally { lock.unlock(); } }); producer.start(); consumer.start(); } }
如上代码首先创建了NonReentrantLock的一个对象lock ,然后调用 lock.newCondition创建了两个条件变量,用来进行生产者和消费者线程之间的同步。
在 main 函数里面,首先创建了 producer 生产线程,在线程内部首先调用 lock.lock()获取独占锁,然后判断当前队列是否己经满了 ,如果满了则调用 notEmpty.await()阻塞挂起当前线程。需要注意的是,这里使用 while 而不是 if 是为了避免虚假唤醒 。如果队列不满则直接向队列里面添加元素,然后调用 notFull.signalAll唤醒所有因为消费元素而被阻塞的消费线程,最后释放获取的锁。
然后在 main 函数里面创建了 consumer 生产线程,在线程内部首先调用 lock.lock()获取独占锁,然后判断当前队列里面是不是有元素,如果队列为空则调用 notFull.await()阻塞挂起当前线程。需要注意的是,这里使用 while 而不 if 是为了避免虚假唤醒。如果队列不为空则直接从队列里面获取并移除元素,然后唤醒因为队列满而被阻塞的生产线程,最后释放获取的锁。
到目前为止,AQS相关的知识就告一段落,后续的多线程学习中我们还是会继续看到AQS的影子!我是Zhongger,一个在互联网行业摸鱼写代码的打工人,你们的【关注】和【在看】与支持是我创作的最大动力,我们下期见~