Java处理并发编程工具集合(JUC)详解2

简介: Java处理并发编程工具集合(JUC)详解2

4 AQS

4.1 前言

如果要想真正的理解JUC下的并发工具的实现原理,我们必须要来学习AQS,因为它是JUC下很多类的基石。

在讲解AQS之前,如果老板让你自己写一个SDK层面的锁,给其他同事去使用,你会如何写呢?

1、搞一个状态标记,用来表示持有或未持有锁,但得是volatile类型的保证线程可见性。

2、编写一个lockunlock函数用于抢锁和释放锁,就是对状态标记的修改操作

3、lock函数要保证并发下只能有一个线程能抢到锁,其他线程要等待获取锁(阻塞式),可以采用CAS+自旋的方式实现

初步实现如下:

public class MyLock {
    // 定义一个状态变量status:为1表示锁被持有,为0表示锁未被持有
    private volatile int status;
    private static final Unsafe unsafe = reflectGetUnsafe();
    private static final long valueOffset;
    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                    (MyLock.class.getDeclaredField("status"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    private static Unsafe reflectGetUnsafe() {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 阻塞式获取锁
     * @return
     */
    public boolean lock() {
        while (!compareAndSet(0,1)) {
        }
        return true;
    }
    // cas 设置 status
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
    /**
     * 释放锁
     */
    public void unlock() {
        status = 0;
    }
}

问题:获取不到锁自旋时,是空转,浪费CPU

1、使用yield让出CPU执行权,等待调度

public boolean lock() {
    while (!compareAndSet(0,1)) {
        Thread.yield();//yield+自旋,尽可能的防止CPU空转,让出CPU资源
    }
    return true;
}

或者可以采用线程休眠的方式,但是休眠时间不太好确定,太长太短都不好。

2、采用等待唤醒机制,但是这里由于没有使用synchronized关键字,所以也无法使用wait/notify,但是我们可以使用park/unpark,获取不到锁的线程park并且去队列排队,释放锁时从队列拿出一个线程unpark

private static final Queue<Thread> QUEUE = new LinkedBlockingQueue<>();
public boolean lock() {
    while (!compareAndSet(0,1)) {
        QUEUE.offer(Thread.currentThread());
        LockSupport.park();//线程休眠
    }
    return true;
}
public void unlock() {
    status = 0;
    LockSupport.unpark(QUEUE.poll());
}

4.2 AQS概述

AQS(AbstractQueuedSynchronizer):抽象队列同步器,定义了一套多线程访问共享资源的同步器框架,提供了SDK层面的锁机制,JUC中的很多类譬如:ReentrantLock/Semaphore/CountDownLatch…等都是基于它。

通过查阅作者的对于该类的文档注释可以得到如下核心信息:

1、AQS用一个volatile int state;属性表示锁状态,1表示锁被持有,0表示未被持有,具体的维护由子类去维护,但是提供了修改该属性的三个方法:getState()setState(int newState)compareAndSetState(int expect, int update),其中CAS方法是核心。

2、框架内部维护了一个FIFO的等待队列,是用双向链表实现的,我们称之为CLH队列,

3、框架也内部也实现了条件变量Condition,用它来实现等待唤醒机制,并且支持多个条件变量

4、AQS支持两种资源共享的模式:独占模式(Exclusive)和共享模式(Share),所谓独占模式就是任意时刻只允许一个线程访问共享资源,譬如ReentrantLock;而共享模式指的就是允许多个线程同时访问共享资源,譬如Semaphore/CountDownLatch

5、使用者只需继承AbstractQueuedSynchronizer并重写指定的方法,在方法内完成对共享资源state的获取和释放,至于具体线程等待队列的维护,AQS已经在顶层实现好了,在那些final的模板方法里。

* <p>To use this class as the basis of a synchronizer, redefine the
 * following methods, as applicable, by inspecting and/or modifying
 * the synchronization state using {@link #getState}, {@link
 * #setState} and/or {@link #compareAndSetState}:
 *
 * <ul>
 * <li> {@link #tryAcquire}
 * <li> {@link #tryRelease}
 * <li> {@link #tryAcquireShared}
 * <li> {@link #tryReleaseShared}
 * <li> {@link #isHeldExclusively}
 * </ul>
 *
 * Each of these methods by default throws {@link
 * UnsupportedOperationException}.  Implementations of these methods
 * must be internally thread-safe, and should in general be short and
 * not block. Defining these methods is the <em>only</em> supported
 * means of using this class. All other methods are declared
 * {@code final} because they cannot be independently varied.

6、AQS底层使用了模板方法模式,给我们提供了许多模板方法,我们直接使用即可。

API 说明
final void acquire(int arg) 独占模式获取锁,AQS顶层已实现,内部调用了tryAcquire 模板方法
boolean tryAcquire(int arg) 独占模式 尝试获取锁,AQS中未实现,由子类去实现,获取到锁返回true
final boolean release(int arg) 释放独占锁,AQS顶层已实现,内部调用了tryRelease 模板方法
boolean tryRelease(int arg) 尝试释放独占锁,AQS中未实现,由子类去实现,成功释放返回true
final void acquireShared(int arg) 共享模式获取锁,AQS顶层已实现,内部调用了tryAcquireShared 模板方法
int tryAcquireShared(int arg) 尝试获取共享锁,返回负数表示失败,0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源,AQS中未实现,由子类实现
final boolean releaseShared(int arg) 释放共享锁,返回true代表释放成功,AQS中已实现,内部调用了tryReleaseShared 模板方法
boolean tryReleaseShared(int arg) 尝试释放锁,释放后允许唤醒后续等待结点返回true,否则返回false,AQS中未实现,需要由子类实现
boolean isHeldExclusively() 共享资源是否被独占

4.3 基本使用

此时老板给你加了需求,要求你实现一个基于AQS的锁,那该怎么办呢?

AbstractQueuedSynchronizer的类注释中给出了使用它的基本方法,我们按照它的写法尝试即可

/**
 * 基于 aqs实现锁
 */
public class MyLock implements Lock {
    //同步器
    private Syn syn = new Syn();
    @Override
    public void lock() {
        //调用模板方法
        syn.acquire(1);
    }
    @Override
    public void unlock() {
        //调用模板方法
        syn.release(0);
    }
    // 其他接口方法暂时先不实现 省略
    // 实现一个独占同步器
    class Syn extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0,arg)) {
                return true;
            }
            return false;
        }
        @Override
        protected boolean tryRelease(int arg) {
            setState(arg);
            return true;
        }
    }
}

4.4 原理解析

自己实现的锁在使用过程中发现一个问题,就是有时候有的线程特别容易抢到锁,而有的线程老是抢不到锁,虽说线程们抢锁确实看命,但能不能加入一种设计,让各个线程机会均等些,起码不要出现某几个线程总是特倒霉抢不到锁的情况吧!

这其实就是涉及到锁是否是公平的,那么什么是公平锁什么是非公平锁呢?

这我们就不得不深入我们使用的模板方法中看一眼了

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
//结合我自己写的尝试获取锁的方法
protected boolean tryAcquire(int arg) {
    if (compareAndSetState(0,arg)) {
        return true;
    }
    return false;
}

这里大概描述如下:

1、线程一来首先调用tryAcquire,在tryAcquire中直接CAS获取锁,如果获取不成功通过addWaiter加入等待队列,然后走acquireQueued让队列中的某个等待线程去获取锁。

2、不公平就体现在这里,线程来了也不先看一下等待队列中是否有线程在等待,如果没有线程等待,那直接获取锁没什么 问题,如果有线程等待就直接去获取锁不就相当于插队么?

那如何实现这种公平性呢?这就不得不探究一下AQS的内部的实现原理了,下面我们依次来看:

1、查看AbstractQueuedSynchronizer的类定义,虽然它里面代码很多,但重要的属性就那么几个,

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private volatile int state;
    private transient volatile Node head;
    private transient volatile Node tail;
    static final class Node {
        //其他不重要的略
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
    }
    public class ConditionObject implements Condition, java.io.Serializable {...}
}

结合前面讲的AQS的类文档注释不难猜到,内部类 Node以及其类型的变量 headtail 就表示 AQS 内部的一个等待队列,而剩下的 state 变量就用来表示锁的状态。

等待队列应该就是线程获取锁失败时,需要临时存放的一个地方,用来等待被唤醒并尝试获取锁。再看 Node 的属性我们知道,Node 存放了当前线程的指针 thread,也即可以表示当前线程并对其进行某些操作,prevnext 说明它构成了一个双向链表,也就是为某些需要得到前驱或后继节点的算法提供便利。

2、AQS加锁最核心的代码就是如下,我们要来探究它的实现原理

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

它的原理及整个过程我们以图的形式说明如下:

3、原理搞懂了,那如何让自定义的锁是公平的呢?

其实导致不公平的原因就是线程每次调用acquire时,都会先去tryAcquire,而该方法目前的实现时直接去抢锁,也不看现在等待队列中有没有线程在排队,如果有线程在排队,那岂不是变成了插队,导致不公平。

所以现在的解决办法就是,在tryAcquire时先看一下等待队列中是否有在排队的,如果有那就乖乖去排队,不插队,如果没有则可以直接去获取锁。

那如何知道线程AQS等待队列中是否有线程排队呢?其实AQS顶层已经实现好了,它提供了一个hasQueuedPredecessors函数:如果在当前线程之前有一个排队的线程,则为True; 如果当前线程位于队列的头部(head.next)或队列为空,则为false。

protected boolean tryAcquire(int arg) {
    //先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
    if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) {
        return true;
    }
    return false;
}

4、现在已经有公平锁了,但是成年人的世界不是做选择题,而是都想要,自己编写的锁既能支持公平锁,也支持非公平锁,让使用者可以自由选择,怎么办?

其实只要稍微改造一下即可,

public class MyLock implements Lock {
    //同步器
    private Sync syn ;
    MyLock () {
        syn = new NoFairSync();
    }
    MyLock (boolean fair) {
        syn = fair ? new FairSync():new NoFairSync();
    }
    @Override
    public void lock() {
        //调用模板方法
        syn.acquire(1);
    }
    @Override
    public void unlock() {
        //调用模板方法
        syn.release(0);
    }
    // Lock接口其他方法暂时先不实现 略
    // 实现一个独占同步器
    class Sync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryRelease(int arg) {
            setState(arg);
            return true;
        }
    }
    class FairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
            //先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
            if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) {
                return true;
            }
            return false;
        }
    }
    class NoFairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
            //直接去获取锁
            if (compareAndSetState(0,arg)) {
                return true;
            }
            return false;
        }
    }
}

5、现在锁的公平性问题解决了,但是老板又出了新的需求,要求我们的锁支持可重入,因为它写了如下一段代码,发现一直获取不到锁

static Lock lock = new MyLock();
static void test3() {
    lock.lock();
    try {
        System.out.println("test3 get lock,then do something ");
        test4();
    } finally {
        lock.unlock();
    }
}
static void test4() {
    lock.lock();
    try {
        System.out.println("test4 get lock,then do something ");
    } finally {
        lock.unlock();
    }
}

那如何让锁支持可重入呢?也就是说如果一个线程持有锁之后,还能继续获取锁,也就是说让锁只对不同线程互斥。

查看AbstractQueuedSynchronizer的定义我们发现,它还继承自另一个类:AbstractOwnableSynchronizer

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {...}
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private transient Thread exclusiveOwnerThread;
    protected final void setExclusiveOwnerThread(Thread thread) {...}
    protected final Thread getExclusiveOwnerThread(){...}
}

看到这我们明白了,原来AQS中有个变量是可以保存当前持有独占锁的线程的。那好办了,当我们获取锁时,如果发现锁被持有不要着急放弃,先看看持有锁的线程是否时当前线程,如果是还能继续获取锁。

另外关于可重入锁,还要注意一点,锁的获取和释放操作是成对出现的,就像下面这样

lock
    lock
        lock
            lock
                ....
            unlock
        unlock
    unlock
unlock

所以对于重入锁不仅要能记录锁被持有,还要记录重入的次数,释放的时候也不是直接将锁真实的释放,而是先减少重入次数,能释放的时候在释放。


故此时状态变量state不在只有两个取值0,1,某线程获取到锁state=1,如果当前线程重入获取只需增加状态值state=2,依次同理,锁释放时释放一次状态值-1,当state=0时才真正释放,其他线程才能继续获取锁。


修改我们锁的代码如下:公平非公平在可重入上的逻辑是一样的

public class MyLock implements Lock {
    //同步器
    private Sync syn ;
    MyLock () {
        syn = new NoFairSync();
    }
    MyLock (boolean fair) {
        syn = fair ? new FairSync():new NoFairSync();
    }
    @Override
    public void lock() {
        //调用模板方法
        syn.acquire(1);
    }
    @Override
    public void unlock() {
        //调用模板方法
        syn.release(1);
    }
    // Lock接口其他方法暂时先不实现 略
    // 实现一个独占同步器
    class Sync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryRelease(int arg) {
            if (Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            boolean realRelease = false;
            int nextState = getState() - arg;
            if (nextState == 0) {
                realRelease = true;
                setExclusiveOwnerThread(null);
            }
            setState(nextState);
            return realRelease;
        }
    }
    class FairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
            final Thread currentThread = Thread.currentThread();
            int currentState = getState();
            if (currentState == 0 ) { // 可以获取锁
                //先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
                if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) {
                    setExclusiveOwnerThread(currentThread);
                    return true;
                }
            }else if (currentThread == getExclusiveOwnerThread()) {
                //重入逻辑 增加 state值
                int nextState = currentState + arg;
                if (nextState < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                setState(nextState);
                return true;
            }
            return false;
        }
    }
    class NoFairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
            final Thread currentThread = Thread.currentThread();
            int currentState = getState();
            if (currentState ==0 ) { // 可以获取锁
                //直接去获取锁
                if (compareAndSetState(0,arg)) {
                    setExclusiveOwnerThread(currentThread);
                    return true;
                }
            }else if (currentThread == getExclusiveOwnerThread()) {
                //重入逻辑 增加 state值
                int nextState = currentState + arg;
                if (nextState < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                setState(nextState);
                return true;
            }
            return false;
        }
    }
}

好了至此我们已经掌握了AQS的核心原理以及它的一个经典实现ReentrantLock几乎全部的知识点,此时打开ReentrantLock的源码你会发现一切都很清爽!!!

5 并发容器

juc中还包含很多其他的并发容器(了解)

1.ConcurrentHashMap

对应:HashMap

目标:代替Hashtable、synchronizedMap,使用最多,源码篇会详细讲解

原理:JDK7中采用Segment分段锁,JDK8中采用CAS+synchronized

2.CopyOnWriteArrayList

对应:ArrayList

目标:代替Vector、synchronizedList

原理:高并发往往是读多写少的特性,读操作不加锁,而对写操作加Lock独享锁,先复制一份新的集合,在新的集合上面修改,然后将新集合赋值给旧的引用,并通过volatile 保证其可见性。

查看源码:volatile array,lock加锁,数组复制

3.CopyOnWriteArraySet

对应:HashSet

目标:代替synchronizedSet

原理:与CopyOnWriteArrayList实现原理类似。

4.ConcurrentSkipListMap

对应:TreeMap

目标:代替synchronizedSortedMap(TreeMap)

原理:基于Skip list(跳表)来代替平衡树,按照分层key上下链接指针来实现。

附加:跳表

5.ConcurrentSkipListSet

对应:TreeSet

目标:代替synchronizedSortedSet(TreeSet)

原理:内部基于ConcurrentSkipListMap实现,原理一致

6.ConcurrentLinkedQueue

对应:LinkedList

对应:无界线程安全队列

原理:通过队首队尾指针,以及Node类元素的next实现FIFO队列

7.BlockingQueue

对应:Queue

特点:拓展了Queue,增加了可阻塞的插入和获取等操作

原理:通过ReentrantLock实现线程安全,通过Condition实现阻塞和唤醒

实现类:

  • LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列
  • ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列
  • PriorityBlockingQueue:按优先级排序的队列



目录
相关文章
|
5天前
|
SQL Java 索引
java小工具util系列2:字符串工具
java小工具util系列2:字符串工具
130 83
|
18天前
|
Java
Java 8 引入的 Streams 功能强大,提供了一种简洁高效的处理数据集合的方式
Java 8 引入的 Streams 功能强大,提供了一种简洁高效的处理数据集合的方式。本文介绍了 Streams 的基本概念和使用方法,包括创建 Streams、中间操作和终端操作,并通过多个案例详细解析了过滤、映射、归并、排序、分组和并行处理等操作,帮助读者更好地理解和掌握这一重要特性。
25 2
|
18天前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
23天前
|
存储 Java
判断一个元素是否在 Java 中的 Set 集合中
【10月更文挑战第30天】使用`contains()`方法可以方便快捷地判断一个元素是否在Java中的`Set`集合中,但对于自定义对象,需要注意重写`equals()`方法以确保正确的判断结果,同时根据具体的性能需求选择合适的`Set`实现类。
|
23天前
|
存储 Java 开发者
在 Java 中,如何遍历一个 Set 集合?
【10月更文挑战第30天】开发者可以根据具体的需求和代码风格选择合适的遍历方式。增强for循环简洁直观,适用于大多数简单的遍历场景;迭代器则更加灵活,可在遍历过程中进行更多复杂的操作;而Lambda表达式和`forEach`方法则提供了一种更简洁的函数式编程风格的遍历方式。
|
23天前
|
Java 开发者
|
24天前
|
存储 缓存 安全
Java内存模型(JMM):深入理解并发编程的基石####
【10月更文挑战第29天】 本文作为一篇技术性文章,旨在深入探讨Java内存模型(JMM)的核心概念、工作原理及其在并发编程中的应用。我们将从JMM的基本定义出发,逐步剖析其如何通过happens-before原则、volatile关键字、synchronized关键字等机制,解决多线程环境下的数据可见性、原子性和有序性问题。不同于常规摘要的简述方式,本摘要将直接概述文章的核心内容,为读者提供一个清晰的学习路径。 ####
37 2
|
22天前
|
存储 Java 开发者
Java中的集合框架深入解析
【10月更文挑战第32天】本文旨在为读者揭开Java集合框架的神秘面纱,通过深入浅出的方式介绍其内部结构与运作机制。我们将从集合框架的设计哲学出发,探讨其如何影响我们的编程实践,并配以代码示例,展示如何在真实场景中应用这些知识。无论你是Java新手还是资深开发者,这篇文章都将为你提供新的视角和实用技巧。
21 0
|
4月前
|
存储 安全 Java
【Java集合类面试二十五】、有哪些线程安全的List?
线程安全的List包括Vector、Collections.SynchronizedList和CopyOnWriteArrayList,其中CopyOnWriteArrayList通过复制底层数组实现写操作,提供了最优的线程安全性能。
|
4月前
|
Java
【Java集合类面试二十三】、List和Set有什么区别?
List和Set的主要区别在于List是一个有序且允许元素重复的集合,而Set是一个无序且元素不重复的集合。