Java处理并发编程工具集合(JUC)详解2

简介: Java处理并发编程工具集合(JUC)详解2
+关注继续查看

4 AQS

4.1 前言

如果要想真正的理解JUC下的并发工具的实现原理,我们必须要来学习AQS,因为它是JUC下很多类的基石。

在讲解AQS之前,如果老板让你自己写一个SDK层面的锁,给其他同事去使用,你会如何写呢?

1、搞一个状态标记,用来表示持有或未持有锁,但得是volatile类型的保证线程可见性。

2、编写一个lockunlock函数用于抢锁和释放锁,就是对状态标记的修改操作

3、lock函数要保证并发下只能有一个线程能抢到锁,其他线程要等待获取锁(阻塞式),可以采用CAS+自旋的方式实现

初步实现如下:

public class MyLock {
    // 定义一个状态变量status:为1表示锁被持有,为0表示锁未被持有
    private volatile int status;

    private static final Unsafe unsafe = reflectGetUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                    (MyLock.class.getDeclaredField("status"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private static Unsafe reflectGetUnsafe() {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 阻塞式获取锁
     * @return
     */
    public boolean lock() {
        while (!compareAndSet(0,1)) {

        }
        return true;
    }

    // cas 设置 status
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    /**
     * 释放锁
     */
    public void unlock() {
        status = 0;
    }
}

问题:获取不到锁自旋时,是空转,浪费CPU

1、使用yield让出CPU执行权,等待调度

public boolean lock() {
    while (!compareAndSet(0,1)) {
        Thread.yield();//yield+自旋,尽可能的防止CPU空转,让出CPU资源
    }
    return true;
}

或者可以采用线程休眠的方式,但是休眠时间不太好确定,太长太短都不好。

2、采用等待唤醒机制,但是这里由于没有使用synchronized关键字,所以也无法使用wait/notify,但是我们可以使用park/unpark,获取不到锁的线程park并且去队列排队,释放锁时从队列拿出一个线程unpark

private static final Queue<Thread> QUEUE = new LinkedBlockingQueue<>();

public boolean lock() {
    while (!compareAndSet(0,1)) {
        QUEUE.offer(Thread.currentThread());
        LockSupport.park();//线程休眠
    }
    return true;
}

public void unlock() {
    status = 0;
    LockSupport.unpark(QUEUE.poll());
}

4.2 AQS概述

AQS(AbstractQueuedSynchronizer):抽象队列同步器,定义了一套多线程访问共享资源的同步器框架,提供了SDK层面的锁机制,JUC中的很多类譬如:ReentrantLock/Semaphore/CountDownLatch…等都是基于它。

通过查阅作者的对于该类的文档注释可以得到如下核心信息:

1、AQS用一个volatile int state;属性表示锁状态,1表示锁被持有,0表示未被持有,具体的维护由子类去维护,但是提供了修改该属性的三个方法:getState()setState(int newState)compareAndSetState(int expect, int update),其中CAS方法是核心。

2、框架内部维护了一个FIFO的等待队列,是用双向链表实现的,我们称之为CLH队列,

3、框架也内部也实现了条件变量Condition,用它来实现等待唤醒机制,并且支持多个条件变量

4、AQS支持两种资源共享的模式:独占模式(Exclusive)和共享模式(Share),所谓独占模式就是任意时刻只允许一个线程访问共享资源,譬如ReentrantLock;而共享模式指的就是允许多个线程同时访问共享资源,譬如Semaphore/CountDownLatch

5、使用者只需继承AbstractQueuedSynchronizer并重写指定的方法,在方法内完成对共享资源state的获取和释放,至于具体线程等待队列的维护,AQS已经在顶层实现好了,在那些final的模板方法里。

* <p>To use this class as the basis of a synchronizer, redefine the
 * following methods, as applicable, by inspecting and/or modifying
 * the synchronization state using {@link #getState}, {@link
 * #setState} and/or {@link #compareAndSetState}:
 *
 * <ul>
 * <li> {@link #tryAcquire}
 * <li> {@link #tryRelease}
 * <li> {@link #tryAcquireShared}
 * <li> {@link #tryReleaseShared}
 * <li> {@link #isHeldExclusively}
 * </ul>
 *
 * Each of these methods by default throws {@link
 * UnsupportedOperationException}.  Implementations of these methods
 * must be internally thread-safe, and should in general be short and
 * not block. Defining these methods is the <em>only</em> supported
 * means of using this class. All other methods are declared
 * {@code final} because they cannot be independently varied.

6、AQS底层使用了模板方法模式,给我们提供了许多模板方法,我们直接使用即可。

API说明
final void acquire(int arg)独占模式获取锁,AQS顶层已实现,内部调用了tryAcquire模板方法
boolean tryAcquire(int arg)独占模式 尝试获取锁,AQS中未实现,由子类去实现,获取到锁返回true
final boolean release(int arg)释放独占锁,AQS顶层已实现,内部调用了tryRelease模板方法
boolean tryRelease(int arg)尝试释放独占锁,AQS中未实现,由子类去实现,成功释放返回true
final void acquireShared(int arg)共享模式获取锁,AQS顶层已实现,内部调用了tryAcquireShared模板方法
int tryAcquireShared(int arg)尝试获取共享锁,返回负数表示失败,0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源,AQS中未实现,由子类实现
final boolean releaseShared(int arg)释放共享锁,返回true代表释放成功,AQS中已实现,内部调用了tryReleaseShared模板方法
boolean tryReleaseShared(int arg)尝试释放锁,释放后允许唤醒后续等待结点返回true,否则返回false,AQS中未实现,需要由子类实现
boolean isHeldExclusively()共享资源是否被独占

4.3 基本使用

此时老板给你加了需求,要求你实现一个基于AQS的锁,那该怎么办呢?

AbstractQueuedSynchronizer的类注释中给出了使用它的基本方法,我们按照它的写法尝试即可

/**
 * 基于 aqs实现锁
 */
public class MyLock implements Lock {

    //同步器
    private Syn syn = new Syn();

    @Override
    public void lock() {
        //调用模板方法
        syn.acquire(1);
    }

    @Override
    public void unlock() {
        //调用模板方法
        syn.release(0);
    }

    // 其他接口方法暂时先不实现 省略


    // 实现一个独占同步器
    class Syn extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0,arg)) {
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            setState(arg);
            return true;
        }

    }

}

4.4 原理解析

自己实现的锁在使用过程中发现一个问题,就是有时候有的线程特别容易抢到锁,而有的线程老是抢不到锁,虽说线程们抢锁确实看命,但能不能加入一种设计,让各个线程机会均等些,起码不要出现某几个线程总是特倒霉抢不到锁的情况吧!

这其实就是涉及到锁是否是公平的,那么什么是公平锁什么是非公平锁呢?

这我们就不得不深入我们使用的模板方法中看一眼了

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

//结合我自己写的尝试获取锁的方法
protected boolean tryAcquire(int arg) {
    if (compareAndSetState(0,arg)) {
        return true;
    }
    return false;
}

这里大概描述如下:

1、线程一来首先调用tryAcquire,在tryAcquire中直接CAS获取锁,如果获取不成功通过addWaiter加入等待队列,然后走acquireQueued让队列中的某个等待线程去获取锁。

2、不公平就体现在这里,线程来了也不先看一下等待队列中是否有线程在等待,如果没有线程等待,那直接获取锁没什么 问题,如果有线程等待就直接去获取锁不就相当于插队么?

那如何实现这种公平性呢?这就不得不探究一下AQS的内部的实现原理了,下面我们依次来看:

1、查看AbstractQueuedSynchronizer的类定义,虽然它里面代码很多,但重要的属性就那么几个,

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    private volatile int state;
    private transient volatile Node head;
    private transient volatile Node tail;
    
    static final class Node {
        //其他不重要的略
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
    }
    
    public class ConditionObject implements Condition, java.io.Serializable {...}
}

结合前面讲的AQS的类文档注释不难猜到,内部类 Node以及其类型的变量 head 和 tail 就表示 AQS 内部的一个等待队列,而剩下的 state 变量就用来表示锁的状态。

等待队列应该就是线程获取锁失败时,需要临时存放的一个地方,用来等待被唤醒并尝试获取锁。再看 Node 的属性我们知道,Node 存放了当前线程的指针 thread,也即可以表示当前线程并对其进行某些操作,prevnext 说明它构成了一个双向链表,也就是为某些需要得到前驱或后继节点的算法提供便利。

2、AQS加锁最核心的代码就是如下,我们要来探究它的实现原理

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

它的原理及整个过程我们以图的形式说明如下:

image

3、原理搞懂了,那如何让自定义的锁是公平的呢?

其实导致不公平的原因就是线程每次调用acquire时,都会先去tryAcquire,而该方法目前的实现时直接去抢锁,也不看现在等待队列中有没有线程在排队,如果有线程在排队,那岂不是变成了插队,导致不公平。

所以现在的解决办法就是,在tryAcquire时先看一下等待队列中是否有在排队的,如果有那就乖乖去排队,不插队,如果没有则可以直接去获取锁。

那如何知道线程AQS等待队列中是否有线程排队呢?其实AQS顶层已经实现好了,它提供了一个hasQueuedPredecessors函数:如果在当前线程之前有一个排队的线程,则为True; 如果当前线程位于队列的头部(head.next)或队列为空,则为false。

protected boolean tryAcquire(int arg) {
    //先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
    if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) {
        return true;
    }
    return false;
}

4、现在已经有公平锁了,但是成年人的世界不是做选择题,而是都想要,自己编写的锁既能支持公平锁,也支持非公平锁,让使用者可以自由选择,怎么办?

其实只要稍微改造一下即可,

public class MyLock implements Lock {

    //同步器
    private Sync syn ;

    MyLock () {
        syn = new NoFairSync();
    }

    MyLock (boolean fair) {
        syn = fair ? new FairSync():new NoFairSync();
    }


    @Override
    public void lock() {
        //调用模板方法
        syn.acquire(1);
    }

    @Override
    public void unlock() {
        //调用模板方法
        syn.release(0);
    }

    // Lock接口其他方法暂时先不实现 略
    


    // 实现一个独占同步器
    class Sync extends AbstractQueuedSynchronizer{


        @Override
        protected boolean tryRelease(int arg) {
            setState(arg);
            return true;
        }

    }

    class FairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
            //先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
            if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) {
                return true;
            }
            return false;
        }
    }

    class NoFairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
            //直接去获取锁
            if (compareAndSetState(0,arg)) {
                return true;
            }
            return false;
        }
    }

}

5、现在锁的公平性问题解决了,但是老板又出了新的需求,要求我们的锁支持可重入,因为它写了如下一段代码,发现一直获取不到锁

static Lock lock = new MyLock();
static void test3() {
    lock.lock();
    try {
        System.out.println("test3 get lock,then do something ");
        test4();
    } finally {
        lock.unlock();
    }
}
static void test4() {
    lock.lock();
    try {
        System.out.println("test4 get lock,then do something ");
    } finally {
        lock.unlock();
    }
}

那如何让锁支持可重入呢?也就是说如果一个线程持有锁之后,还能继续获取锁,也就是说让锁只对不同线程互斥。

查看AbstractQueuedSynchronizer的定义我们发现,它还继承自另一个类:AbstractOwnableSynchronizer

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {...}

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    private transient Thread exclusiveOwnerThread;
    
    protected final void setExclusiveOwnerThread(Thread thread) {...}
    protected final Thread getExclusiveOwnerThread(){...}
}

看到这我们明白了,原来AQS中有个变量是可以保存当前持有独占锁的线程的。那好办了,当我们获取锁时,如果发现锁被持有不要着急放弃,先看看持有锁的线程是否时当前线程,如果是还能继续获取锁。

另外关于可重入锁,还要注意一点,锁的获取和释放操作是成对出现的,就像下面这样

lock
    lock
        lock
            lock
                ....
            unlock
        unlock
    unlock
unlock

所以对于重入锁不仅要能记录锁被持有,还要记录重入的次数,释放的时候也不是直接将锁真实的释放,而是先减少重入次数,能释放的时候在释放。


故此时状态变量state不在只有两个取值0,1,某线程获取到锁state=1,如果当前线程重入获取只需增加状态值state=2,依次同理,锁释放时释放一次状态值-1,当state=0时才真正释放,其他线程才能继续获取锁。


修改我们锁的代码如下:公平非公平在可重入上的逻辑是一样的

public class MyLock implements Lock {

    //同步器
    private Sync syn ;

    MyLock () {
        syn = new NoFairSync();
    }

    MyLock (boolean fair) {
        syn = fair ? new FairSync():new NoFairSync();
    }


    @Override
    public void lock() {
        //调用模板方法
        syn.acquire(1);
    }

    @Override
    public void unlock() {
        //调用模板方法
        syn.release(1);
    }

    // Lock接口其他方法暂时先不实现 略


    // 实现一个独占同步器
    class Sync extends AbstractQueuedSynchronizer{


        @Override
        protected boolean tryRelease(int arg) {
            if (Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            boolean realRelease = false;
            int nextState = getState() - arg;
            if (nextState == 0) {
                realRelease = true;
                setExclusiveOwnerThread(null);
            }
            setState(nextState);
            return realRelease;
        }

    }

    class FairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
            final Thread currentThread = Thread.currentThread();
            int currentState = getState();
            if (currentState == 0 ) { // 可以获取锁
                //先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
                if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) {
                    setExclusiveOwnerThread(currentThread);
                    return true;
                }
            }else if (currentThread == getExclusiveOwnerThread()) {
                //重入逻辑 增加 state值
                int nextState = currentState + arg;
                if (nextState < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                setState(nextState);
                return true;
            }
            return false;
        }
    }

    class NoFairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
            final Thread currentThread = Thread.currentThread();
            int currentState = getState();
            if (currentState ==0 ) { // 可以获取锁
                //直接去获取锁
                if (compareAndSetState(0,arg)) {
                    setExclusiveOwnerThread(currentThread);
                    return true;
                }
            }else if (currentThread == getExclusiveOwnerThread()) {
                //重入逻辑 增加 state值
                int nextState = currentState + arg;
                if (nextState < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                setState(nextState);
                return true;
            }
            return false;
        }
    }

}

好了至此我们已经掌握了AQS的核心原理以及它的一个经典实现ReentrantLock几乎全部的知识点,此时打开ReentrantLock的源码你会发现一切都很清爽!!!

5 并发容器

juc中还包含很多其他的并发容器(了解)

1.ConcurrentHashMap

对应:HashMap

目标:代替Hashtable、synchronizedMap,使用最多,源码篇会详细讲解

原理:JDK7中采用Segment分段锁,JDK8中采用CAS+synchronized

2.CopyOnWriteArrayList

对应:ArrayList

目标:代替Vector、synchronizedList

原理:高并发往往是读多写少的特性,读操作不加锁,而对写操作加Lock独享锁,先复制一份新的集合,在新的集合上面修改,然后将新集合赋值给旧的引用,并通过volatile 保证其可见性。

查看源码:volatile array,lock加锁,数组复制

3.CopyOnWriteArraySet

对应:HashSet

目标:代替synchronizedSet

原理:与CopyOnWriteArrayList实现原理类似。

4.ConcurrentSkipListMap

对应:TreeMap

目标:代替synchronizedSortedMap(TreeMap)

原理:基于Skip list(跳表)来代替平衡树,按照分层key上下链接指针来实现。

附加:跳表

5.ConcurrentSkipListSet

对应:TreeSet

目标:代替synchronizedSortedSet(TreeSet)

原理:内部基于ConcurrentSkipListMap实现,原理一致

6.ConcurrentLinkedQueue

对应:LinkedList

对应:无界线程安全队列

原理:通过队首队尾指针,以及Node类元素的next实现FIFO队列

7.BlockingQueue

对应:Queue

特点:拓展了Queue,增加了可阻塞的插入和获取等操作

原理:通过ReentrantLock实现线程安全,通过Condition实现阻塞和唤醒

实现类:

  • LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列
  • ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列
  • PriorityBlockingQueue:按优先级排序的队列



目录
相关文章
|
15天前
|
Java
Java接口:实现多重继承,促进代码复用与扩展的强大工具
Java接口:实现多重继承,促进代码复用与扩展的强大工具
|
16天前
|
存储 Java 数据库
Java Lambda表达式:简洁且强大的函数式编程工具
Lambda表达式是Java 8及以后版本中引入的一种函数式编程特性。它是一种匿名函数,允许开发人员以简洁和易读的方式编写代码,并且可以作为参数传递给方法或存储在变量中。Lambda表达式的基本语法如下:(parameters) -> expression,其中parameters是函数的输入参数,可以是零个或多个,箭头"->"将参数与表达式分开,expression是函数的执行体,它定义了Lambda表达式的功能。
|
16天前
|
消息中间件 Java 数据库
探索Java生态系统的其他技术与工具
探索Java生态系统的其他技术与工具
|
21天前
|
Arthas Dubbo Java
Alibaba Java诊断工具Arthas查看Dubbo动态代理类
Alibaba Java诊断工具Arthas查看Dubbo动态代理类
24 0
|
21天前
|
存储 Java BI
MAT工具定位分析Java堆内存泄漏问题方法
MAT,全称Memory Analysis Tools,是一款分析Java堆内存的工具,可以快速定位到堆内泄漏问题。该工具提供了两种使用方式,一种是插件版,可以安装到Eclipse使用,另一种是独立版,可以直接解压使用。
14 0
|
21天前
|
Java Maven
开源一套原创文本处理工具:Java+Bat脚本实现自动批量处理对账单工具
这款工具是笔者在2018年初开发完成的,时隔两载,偶然想起这款小工具,于是,决定将其开源,若有人需要做类似Java批处理实现整理文档的工具,可参考该工具逻辑思路来实现。
9 0
|
24天前
|
Java 开发工具 Maven
分布式系列教程(09) -分布式协调工具Zookeeper(Java基本操作)
分布式系列教程(09) -分布式协调工具Zookeeper(Java基本操作)
11 0
|
2月前
|
监控 Oracle Java
Java 最常见的面试题:说一下 jvm 调优的工具?
Java 最常见的面试题:说一下 jvm 调优的工具?
|
2月前
|
Java API Maven
【现成工具】java获取国家法定节假日包含指定月份节假日和周末
【现成工具】java获取国家法定节假日包含指定月份节假日和周末
264 0
|
2月前
|
Java 程序员 测试技术
Java程序员必备工具大全,助力开发效率提升!
Java程序员必备工具大全,助力开发效率提升!
33 0
相关产品
云迁移中心
推荐文章
更多