AQS详解-阿里云开发者社区

开发者社区> 茕祇> 正文

AQS详解

简介: AQS详解
+关注继续查看

作用

提供一个框架用于实现依赖先进先出等待队列的阻塞锁和相关同步器(信号量,事件)

使用

子类应该定义为非公共内部帮助类,用于实现其封闭类的同步属性,AQS并不实现任何同步接口,这一部分主要是从源码里搬过来的

class Mutex implements Lock, java.io.Serializable {
   // Our internal helper class
   private static class Sync extends AbstractQueuedSynchronizer {
     // Reports whether in locked state
     protected boolean isHeldExclusively() {
       return getState() == 1;
     }
     // Acquires the lock if state is zero
     public boolean tryAcquire(int acquires) {
       assert acquires == 1; // Otherwise unused
       if (compareAndSetState(0, 1)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }

     // Releases the lock by setting state to zero
     protected boolean tryRelease(int releases) {
       assert releases == 1; // Otherwise unused
       if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }

     // Provides a Condition
     Condition newCondition() { return new ConditionObject(); }

     // Deserializes properly
     private void readObject(ObjectInputStream s)
         throws IOException, ClassNotFoundException {
       s.defaultReadObject();
       setState(0); // reset to unlocked state
     }
   }

   // The sync object does all the hard work. We just forward to it.
   private final Sync sync = new Sync();

   public void lock()                { sync.acquire(1); }
   public boolean tryLock()          { return sync.tryAcquire(1); }
   public void unlock()              { sync.release(1); }
   public Condition newCondition()   { return sync.newCondition(); }
   public boolean isLocked()         { return sync.isHeldExclusively(); }
   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
   public void lockInterruptibly() throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }
 }
 class BooleanLatch {

   private static class Sync extends AbstractQueuedSynchronizer {
     boolean isSignalled() { return getState() != 0; }

     protected int tryAcquireShared(int ignore) {
       return isSignalled() ? 1 : -1;
     }

     protected boolean tryReleaseShared(int ignore) {
       setState(1);
       return true;
     }
   }

   private final Sync sync = new Sync();
   public boolean isSignalled() { return sync.isSignalled(); }
   public void signal()         { sync.releaseShared(1); }
   public void await() throws InterruptedException {
     sync.acquireSharedInterruptibly(1);
   }
 }

实现

主要分为两个大的部分

一为对于state的访问与维护,聚焦于锁本身

二为对于需要获取锁的线程的访问与维护,聚焦于想要获取锁的线程

三为获取释放锁的过程,聚焦于线程与锁的交互

state维护

AQS是这样定义state的volatile int state

初始化

访问

读取和写入分别有一个常规的set,get方法getState(),setState(int newState)

但这会有一个问题,我们经常需要先判断state之前的状态,然后再对其进行修改,如果采用if+set的形式,在并发情况下很可能产生问题.

AQS采用CAS操作提供原子性,从而避免了这个问题,提供的方法为compareAndSetState(int expect, int update)

这个方法事实上也是调用的Unsafe类提供的一个native方法

同步队列

在并发的语境下,几乎都要考虑多个线程去竞争一把锁的情形,而往往锁又是互斥的或者是独占的.如果一个线程获取到了锁,那其他线程显然不能直接放弃,而AQS则通过内建的同步队列去存储这些线程.

Node

AQS使用静态内部类Node去作为这个同步队列的元素,一个Node里,不仅仅包含了一个Thread对象,还存储着一些其他信息,例如线程此时的等待状态,前后节点的指针

Queue

其实通过Node的数据结构就可以看出来,队列是以一种链表的形式存在的(AQS并没有使用现成的集合框架),通过两个Node类型的变量tail,head去定位到自己想要操纵的数据.

说完了Queue的一些重要属性,我们再来看看他的一些方法. 大部分都是类似于get,set方法区做一些简单的访问.

private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
private static final boolean compareAndSetNext(Node node,Node expect,Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

以上这三个方法都是原子操作,但显然我们新加入一个节点的话至少需要设置tail,next两个指针的指向,这却不是原子性的了

AQS采用addWaiter方法包装这一系列操作

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

通过CompareAndSetTail(实际上是Unsafe类的compareAndSetObject()去设置)来设置tail节点,通过后再将尾节点与前一个节点进行连接,设置不成功则进入死循环,不断获取当前的tail节点,然后CAS去设置

节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程)

锁的获取与释放

这一部分,AQS采取了模板方法的设计模式,将个性化的操作留于具体的锁或者其他同步组件.从这个模块的名字就很容易想到,最为核心的就只有两个方法acquirerelease(当然,这里暂且先不涉及share与exclusive的划分)

先从acquire来分析

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

这个方法主要是调用了其他的四个方法,其中tryAcquire(arg)是交由子类实现的,addWaiter上面已经介绍过,selfInterrupt()只是对于Thread类的interrupt()方法的简单包装.也就是说,现在值得我们注意的仅仅只有acquireQueued()这一个方法

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取前一个节点
                final Node p = node.predecessor();
                //如果前一个节点是头结点且已经获取到了锁则返回此时的中断状态
                if (p == head && tryAcquire(arg)) {
                    //把当前节点设置为头结点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire(p, node)判断线程是否应该阻塞,

parkAndCheckInterrupt()里也就两个方法

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

我们知道,interrupt()相关的方法只是置一个标记,而不会真正去使线程被中断.也就是说最核心的使当前线程暂停运行的方法就是

LockSupport.park(this)这个方法了

再说release,这个方法要简单一些

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease依然交由子类去实现,核心一眼便知unparkSucccessor

 private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //与之前的park()对应
            LockSupport.unpark(s.thread);
    }

由以上代码可知,事实上,我们唤醒的并不是头结点而是头结点的下一个节点 ,这一点从enq这个方法里就能找到源头 (头结点是哑结点,里面是没有数据的).

到现在为止,我们可以发现,真正使得一个线程得以暂停的就只是LockSupport的park.我们完全可以仅仅只用这个park去实现一个最淳朴,最简陋的锁,示例代码如下


public class SimpleLock {
   //利用原子变量自带的CAS
   AtomicInteger state = new AtomicInteger(0);
   //代表竞争的另一个线程
   Thread next;

   public void lock(){
       if(!state.compareAndSet(0,1)){
           next = Thread.currentThread();
           LockSupport.park();
       }
   }
   public void unlock(){
           if(next!=null&&state.compareAndSet(1,0)){
               LockSupport.unpark(next);
           }
   }
}

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
怎么设置阿里云服务器安全组?阿里云安全组规则详细解说
阿里云服务器安全组设置规则分享,阿里云服务器安全组如何放行端口设置教程
6402 0
阿里云服务器ECS远程登录用户名密码查询方法
阿里云服务器ECS远程连接登录输入用户名和密码,阿里云没有默认密码,如果购买时没设置需要先重置实例密码,Windows用户名是administrator,Linux账号是root,阿小云来详细说下阿里云服务器远程登录连接用户名和密码查询方法
2248 0
阿里云服务器端口号设置
阿里云服务器初级使用者可能面临的问题之一. 使用tomcat或者其他服务器软件设置端口号后,比如 一些不是默认的, mysql的 3306, mssql的1433,有时候打不开网页, 原因是没有在ecs安全组去设置这个端口号. 解决: 点击ecs下网络和安全下的安全组 在弹出的安全组中,如果没有就新建安全组,然后点击配置规则 最后如上图点击添加...或快速创建.   have fun!  将编程看作是一门艺术,而不单单是个技术。
3973 0
使用OpenApi弹性释放和设置云服务器ECS释放
云服务器ECS的一个重要特性就是按需创建资源。您可以在业务高峰期按需弹性的自定义规则进行资源创建,在完成业务计算的时候释放资源。本篇将提供几个Tips帮助您更加容易和自动化的完成云服务器的释放和弹性设置。
7623 0
windows server 2008阿里云ECS服务器安全设置
最近我们Sinesafe安全公司在为客户使用阿里云ecs服务器做安全的过程中,发现服务器基础安全性都没有做。为了为站长们提供更加有效的安全基础解决方案,我们Sinesafe将对阿里云服务器win2008 系统进行基础安全部署实战过程! 比较重要的几部分 1.
4998 0
阿里云服务器安全组设置内网互通的方法
虽然0.0.0.0/0使用非常方便,但是发现很多同学使用它来做内网互通,这是有安全风险的,实例有可能会在经典网络被内网IP访问到。下面介绍一下四种安全的内网互联设置方法。 购买前请先:领取阿里云幸运券,有很多优惠,可到下文中领取。
9325 0
腾讯云服务器 设置ngxin + fastdfs +tomcat 开机自启动
在tomcat中新建一个可以启动的 .sh 脚本文件 /usr/local/tomcat7/bin/ export JAVA_HOME=/usr/local/java/jdk7 export PATH=$JAVA_HOME/bin/:$PATH export CLASSPATH=.
2032 0
+关注
7
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载