AQS的应用:基于AQS实现自定义同步器

简介: AQS的应用:基于AQS实现自定义同步器

三、基于AQS实现自定义同步器


之前学习了这么多关于AQS的原理性的知识,这一期,我们来基于AQS实现一个不可重入的独占锁, 自定义AQS需要重写一系列函数,还需要定义原子变量state的含义。这里我们定义, state=0 表示目前锁没有被线程持有 ,state=1 表示锁己经被某一个线程持有。 由于是不可重入锁,所以不需要记录持有锁的线程获取锁的次数。另外,还要自定义锁的支持条件变量。


1、代码实现


如下代码是基于AQS实现不可重入的独占锁。

package MyNonReentrantLock;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class NonReentrantLock implements Lock, Serializable {
    //静态内部类,用于辅助
    private static class Sync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
           assert arg == 1;//如果state为0,则尝试获取锁
            if (compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        @Override
        protected boolean tryRelease(int arg) {
            assert arg == 1;//如果state为0,则尝试获取锁
            if (getState()==0){
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        @Override
        protected boolean isHeldExclusively() {
            // 是否锁已经被持有
            return getState()==1;
        }
        //提供条件变量接口
        public Condition newCondition(){
            return new ConditionObject();
        }
    }
    Sync sync = new Sync();
    @Override
    public void lock() {
        sync.acquire(1);
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
    @Override
    public void unlock() {
        sync.release(1);
    }
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

在如上代码中, NonReentrantLock 定义了一个内部类 Sync 用来实现具体的锁的操作, Sync 继承了AQS 。由于实现的是独占模式的锁,所以Sync重写了tryAcquire/tryRelease/isHeldExclusively个方法。另 外, Sync 提供 newCondition 这个方法用来支持条件变量。


2、使用自定义锁实现生产一消费模型


代码如下:

package MyNonReentrantLock;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
public class 自定义生产消费模型 {
    static NonReentrantLock lock = new NonReentrantLock();
    static Condition notFull = lock.newCondition();
    static Condition notEmpty = lock.newCondition();
    static Queue<String> queue = new LinkedBlockingQueue<>();
    static int queueSize = 10;
    public static void main(String[] args) {
        Thread producer = new Thread(() -> {
            lock.lock();
            try {
                //如果队列满了,则等待
                while (queue.size() == queueSize) {
                    notEmpty.await();
                }
                //添加队列元素
                queue.add("element ");
                //唤醒消费线程
                notFull.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                //释放锁
                lock.unlock();
            }
        });
        Thread consumer = new Thread(() -> {
            lock.lock();
            try {
                //队列为空,则等待
                while (queue.size()==0){
                    notFull.await();
                }
                //消费元素
                queue.poll();
                //唤醒生产线程
                notEmpty.signalAll();
            }catch (InterruptedException e){
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });
        producer.start();
        consumer.start();
    }
}

如上代码首先创建了NonReentrantLock的一个对象lock ,然后调用 lock.newCondition创建了两个条件变量,用来进行生产者和消费者线程之间的同步。


在 main 函数里面,首先创建了 producer 生产线程,在线程内部首先调用 lock.lock()获取独占锁,然后判断当前队列是否己经满了 ,如果满了则调用 notEmpty.await()阻塞挂起当前线程。需要注意的是,这里使用 while 而不是 if 是为了避免虚假唤醒 。如果队列不满则直接向队列里面添加元素,然后调用 notFull.signalAll唤醒所有因为消费元素而被阻塞的消费线程,最后释放获取的锁。


然后在 main 函数里面创建了 consumer 生产线程,在线程内部首先调用 lock.lock()获取独占锁,然后判断当前队列里面是不是有元素,如果队列为空则调用 notFull.await()阻塞挂起当前线程。需要注意的是,这里使用 while 而不 if 是为了避免虚假唤醒。如果队列不为空则直接从队列里面获取并移除元素,然后唤醒因为队列满而被阻塞的生产线程,最后释放获取的锁。


到目前为止,AQS相关的知识就告一段落,后续的多线程学习中我们还是会继续看到AQS的影子!我是Zhongger,一个在互联网行业摸鱼写代码的打工人,你们的【关注】和【在看】与支持是我创作的最大动力,我们下期见~

相关文章
|
缓存 Java API
【云原生】Spring Cloud Gateway的底层原理与实践方法探究
【云原生】Spring Cloud Gateway的底层原理与实践方法探究
|
消息中间件
RabbitMQ中的消息优先级是如何实现的?
RabbitMQ中的消息优先级是如何实现的?
716 0
|
jenkins 持续交付
Jenkins 管理界面里提示“反向代理设置有误“的问题解决办法
Jenkins 管理界面里提示“反向代理设置有误“的问题解决办法
1952 0
Jenkins 管理界面里提示“反向代理设置有误“的问题解决办法
|
11月前
|
数据采集 安全 数据管理
通信行业数据治理:如何实现高效、安全的数据管理?
在未来的发展中,通信行业的企业应加强数据治理意识,提高数据治理能力;同时,积极开展跨行业的合作创新,共同推动行业的繁荣与发展。相信在不久的将来,通信行业将迎来更加美好的明天。
|
Ubuntu
在树莓派4B上安装ubuntu系统
在树莓派4B上安装ubuntu系统
|
存储 分布式计算 前端开发
jvm性能调优实战 - 26一个每秒10万并发的系统如何频繁发生Young GC的
jvm性能调优实战 - 26一个每秒10万并发的系统如何频繁发生Young GC的
322 0
|
分布式计算 Hadoop Shell
Hbase集群搭建
Hbase集群搭建
359 0
|
API
Camera2预览方向、拍照方向设置
Camera2预览方向、拍照方向设置
961 2
|
弹性计算
包月的ecs实例快到期了,需要手动释放吗?
包月的ecs实例快到期了,需要手动释放吗?
222 2
|
监控 前端开发 JavaScript
SVG实现流程动态效果
SVG实现流程动态效果
189 0