Java处理并发编程工具集合(JUC)详解2

简介: Java处理并发编程工具集合(JUC)详解2

4 AQS

4.1 前言

如果要想真正的理解JUC下的并发工具的实现原理,我们必须要来学习AQS,因为它是JUC下很多类的基石。

在讲解AQS之前,如果老板让你自己写一个SDK层面的锁,给其他同事去使用,你会如何写呢?

1、搞一个状态标记,用来表示持有或未持有锁,但得是volatile类型的保证线程可见性。

2、编写一个lockunlock函数用于抢锁和释放锁,就是对状态标记的修改操作

3、lock函数要保证并发下只能有一个线程能抢到锁,其他线程要等待获取锁(阻塞式),可以采用CAS+自旋的方式实现

初步实现如下:

public class MyLock {
    // 定义一个状态变量status:为1表示锁被持有,为0表示锁未被持有
    private volatile int status;
    private static final Unsafe unsafe = reflectGetUnsafe();
    private static final long valueOffset;
    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                    (MyLock.class.getDeclaredField("status"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    private static Unsafe reflectGetUnsafe() {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 阻塞式获取锁
     * @return
     */
    public boolean lock() {
        while (!compareAndSet(0,1)) {
        }
        return true;
    }
    // cas 设置 status
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
    /**
     * 释放锁
     */
    public void unlock() {
        status = 0;
    }
}

问题:获取不到锁自旋时,是空转,浪费CPU

1、使用yield让出CPU执行权,等待调度

public boolean lock() {
    while (!compareAndSet(0,1)) {
        Thread.yield();//yield+自旋,尽可能的防止CPU空转,让出CPU资源
    }
    return true;
}

或者可以采用线程休眠的方式,但是休眠时间不太好确定,太长太短都不好。

2、采用等待唤醒机制,但是这里由于没有使用synchronized关键字,所以也无法使用wait/notify,但是我们可以使用park/unpark,获取不到锁的线程park并且去队列排队,释放锁时从队列拿出一个线程unpark

private static final Queue<Thread> QUEUE = new LinkedBlockingQueue<>();
public boolean lock() {
    while (!compareAndSet(0,1)) {
        QUEUE.offer(Thread.currentThread());
        LockSupport.park();//线程休眠
    }
    return true;
}
public void unlock() {
    status = 0;
    LockSupport.unpark(QUEUE.poll());
}

4.2 AQS概述

AQS(AbstractQueuedSynchronizer):抽象队列同步器,定义了一套多线程访问共享资源的同步器框架,提供了SDK层面的锁机制,JUC中的很多类譬如:ReentrantLock/Semaphore/CountDownLatch…等都是基于它。

通过查阅作者的对于该类的文档注释可以得到如下核心信息:

1、AQS用一个volatile int state;属性表示锁状态,1表示锁被持有,0表示未被持有,具体的维护由子类去维护,但是提供了修改该属性的三个方法:getState()setState(int newState)compareAndSetState(int expect, int update),其中CAS方法是核心。

2、框架内部维护了一个FIFO的等待队列,是用双向链表实现的,我们称之为CLH队列,

3、框架也内部也实现了条件变量Condition,用它来实现等待唤醒机制,并且支持多个条件变量

4、AQS支持两种资源共享的模式:独占模式(Exclusive)和共享模式(Share),所谓独占模式就是任意时刻只允许一个线程访问共享资源,譬如ReentrantLock;而共享模式指的就是允许多个线程同时访问共享资源,譬如Semaphore/CountDownLatch

5、使用者只需继承AbstractQueuedSynchronizer并重写指定的方法,在方法内完成对共享资源state的获取和释放,至于具体线程等待队列的维护,AQS已经在顶层实现好了,在那些final的模板方法里。

* <p>To use this class as the basis of a synchronizer, redefine the
 * following methods, as applicable, by inspecting and/or modifying
 * the synchronization state using {@link #getState}, {@link
 * #setState} and/or {@link #compareAndSetState}:
 *
 * <ul>
 * <li> {@link #tryAcquire}
 * <li> {@link #tryRelease}
 * <li> {@link #tryAcquireShared}
 * <li> {@link #tryReleaseShared}
 * <li> {@link #isHeldExclusively}
 * </ul>
 *
 * Each of these methods by default throws {@link
 * UnsupportedOperationException}.  Implementations of these methods
 * must be internally thread-safe, and should in general be short and
 * not block. Defining these methods is the <em>only</em> supported
 * means of using this class. All other methods are declared
 * {@code final} because they cannot be independently varied.

6、AQS底层使用了模板方法模式,给我们提供了许多模板方法,我们直接使用即可。

API 说明
final void acquire(int arg) 独占模式获取锁,AQS顶层已实现,内部调用了tryAcquire 模板方法
boolean tryAcquire(int arg) 独占模式 尝试获取锁,AQS中未实现,由子类去实现,获取到锁返回true
final boolean release(int arg) 释放独占锁,AQS顶层已实现,内部调用了tryRelease 模板方法
boolean tryRelease(int arg) 尝试释放独占锁,AQS中未实现,由子类去实现,成功释放返回true
final void acquireShared(int arg) 共享模式获取锁,AQS顶层已实现,内部调用了tryAcquireShared 模板方法
int tryAcquireShared(int arg) 尝试获取共享锁,返回负数表示失败,0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源,AQS中未实现,由子类实现
final boolean releaseShared(int arg) 释放共享锁,返回true代表释放成功,AQS中已实现,内部调用了tryReleaseShared 模板方法
boolean tryReleaseShared(int arg) 尝试释放锁,释放后允许唤醒后续等待结点返回true,否则返回false,AQS中未实现,需要由子类实现
boolean isHeldExclusively() 共享资源是否被独占

4.3 基本使用

此时老板给你加了需求,要求你实现一个基于AQS的锁,那该怎么办呢?

AbstractQueuedSynchronizer的类注释中给出了使用它的基本方法,我们按照它的写法尝试即可

/**
 * 基于 aqs实现锁
 */
public class MyLock implements Lock {
    //同步器
    private Syn syn = new Syn();
    @Override
    public void lock() {
        //调用模板方法
        syn.acquire(1);
    }
    @Override
    public void unlock() {
        //调用模板方法
        syn.release(0);
    }
    // 其他接口方法暂时先不实现 省略
    // 实现一个独占同步器
    class Syn extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0,arg)) {
                return true;
            }
            return false;
        }
        @Override
        protected boolean tryRelease(int arg) {
            setState(arg);
            return true;
        }
    }
}

4.4 原理解析

自己实现的锁在使用过程中发现一个问题,就是有时候有的线程特别容易抢到锁,而有的线程老是抢不到锁,虽说线程们抢锁确实看命,但能不能加入一种设计,让各个线程机会均等些,起码不要出现某几个线程总是特倒霉抢不到锁的情况吧!

这其实就是涉及到锁是否是公平的,那么什么是公平锁什么是非公平锁呢?

这我们就不得不深入我们使用的模板方法中看一眼了

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
//结合我自己写的尝试获取锁的方法
protected boolean tryAcquire(int arg) {
    if (compareAndSetState(0,arg)) {
        return true;
    }
    return false;
}

这里大概描述如下:

1、线程一来首先调用tryAcquire,在tryAcquire中直接CAS获取锁,如果获取不成功通过addWaiter加入等待队列,然后走acquireQueued让队列中的某个等待线程去获取锁。

2、不公平就体现在这里,线程来了也不先看一下等待队列中是否有线程在等待,如果没有线程等待,那直接获取锁没什么 问题,如果有线程等待就直接去获取锁不就相当于插队么?

那如何实现这种公平性呢?这就不得不探究一下AQS的内部的实现原理了,下面我们依次来看:

1、查看AbstractQueuedSynchronizer的类定义,虽然它里面代码很多,但重要的属性就那么几个,

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private volatile int state;
    private transient volatile Node head;
    private transient volatile Node tail;
    static final class Node {
        //其他不重要的略
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
    }
    public class ConditionObject implements Condition, java.io.Serializable {...}
}

结合前面讲的AQS的类文档注释不难猜到,内部类 Node以及其类型的变量 headtail 就表示 AQS 内部的一个等待队列,而剩下的 state 变量就用来表示锁的状态。

等待队列应该就是线程获取锁失败时,需要临时存放的一个地方,用来等待被唤醒并尝试获取锁。再看 Node 的属性我们知道,Node 存放了当前线程的指针 thread,也即可以表示当前线程并对其进行某些操作,prevnext 说明它构成了一个双向链表,也就是为某些需要得到前驱或后继节点的算法提供便利。

2、AQS加锁最核心的代码就是如下,我们要来探究它的实现原理

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

它的原理及整个过程我们以图的形式说明如下:

3、原理搞懂了,那如何让自定义的锁是公平的呢?

其实导致不公平的原因就是线程每次调用acquire时,都会先去tryAcquire,而该方法目前的实现时直接去抢锁,也不看现在等待队列中有没有线程在排队,如果有线程在排队,那岂不是变成了插队,导致不公平。

所以现在的解决办法就是,在tryAcquire时先看一下等待队列中是否有在排队的,如果有那就乖乖去排队,不插队,如果没有则可以直接去获取锁。

那如何知道线程AQS等待队列中是否有线程排队呢?其实AQS顶层已经实现好了,它提供了一个hasQueuedPredecessors函数:如果在当前线程之前有一个排队的线程,则为True; 如果当前线程位于队列的头部(head.next)或队列为空,则为false。

protected boolean tryAcquire(int arg) {
    //先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
    if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) {
        return true;
    }
    return false;
}

4、现在已经有公平锁了,但是成年人的世界不是做选择题,而是都想要,自己编写的锁既能支持公平锁,也支持非公平锁,让使用者可以自由选择,怎么办?

其实只要稍微改造一下即可,

public class MyLock implements Lock {
    //同步器
    private Sync syn ;
    MyLock () {
        syn = new NoFairSync();
    }
    MyLock (boolean fair) {
        syn = fair ? new FairSync():new NoFairSync();
    }
    @Override
    public void lock() {
        //调用模板方法
        syn.acquire(1);
    }
    @Override
    public void unlock() {
        //调用模板方法
        syn.release(0);
    }
    // Lock接口其他方法暂时先不实现 略
    // 实现一个独占同步器
    class Sync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryRelease(int arg) {
            setState(arg);
            return true;
        }
    }
    class FairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
            //先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
            if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) {
                return true;
            }
            return false;
        }
    }
    class NoFairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
            //直接去获取锁
            if (compareAndSetState(0,arg)) {
                return true;
            }
            return false;
        }
    }
}

5、现在锁的公平性问题解决了,但是老板又出了新的需求,要求我们的锁支持可重入,因为它写了如下一段代码,发现一直获取不到锁

static Lock lock = new MyLock();
static void test3() {
    lock.lock();
    try {
        System.out.println("test3 get lock,then do something ");
        test4();
    } finally {
        lock.unlock();
    }
}
static void test4() {
    lock.lock();
    try {
        System.out.println("test4 get lock,then do something ");
    } finally {
        lock.unlock();
    }
}

那如何让锁支持可重入呢?也就是说如果一个线程持有锁之后,还能继续获取锁,也就是说让锁只对不同线程互斥。

查看AbstractQueuedSynchronizer的定义我们发现,它还继承自另一个类:AbstractOwnableSynchronizer

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {...}
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private transient Thread exclusiveOwnerThread;
    protected final void setExclusiveOwnerThread(Thread thread) {...}
    protected final Thread getExclusiveOwnerThread(){...}
}

看到这我们明白了,原来AQS中有个变量是可以保存当前持有独占锁的线程的。那好办了,当我们获取锁时,如果发现锁被持有不要着急放弃,先看看持有锁的线程是否时当前线程,如果是还能继续获取锁。

另外关于可重入锁,还要注意一点,锁的获取和释放操作是成对出现的,就像下面这样

lock
    lock
        lock
            lock
                ....
            unlock
        unlock
    unlock
unlock

所以对于重入锁不仅要能记录锁被持有,还要记录重入的次数,释放的时候也不是直接将锁真实的释放,而是先减少重入次数,能释放的时候在释放。


故此时状态变量state不在只有两个取值0,1,某线程获取到锁state=1,如果当前线程重入获取只需增加状态值state=2,依次同理,锁释放时释放一次状态值-1,当state=0时才真正释放,其他线程才能继续获取锁。


修改我们锁的代码如下:公平非公平在可重入上的逻辑是一样的

public class MyLock implements Lock {
    //同步器
    private Sync syn ;
    MyLock () {
        syn = new NoFairSync();
    }
    MyLock (boolean fair) {
        syn = fair ? new FairSync():new NoFairSync();
    }
    @Override
    public void lock() {
        //调用模板方法
        syn.acquire(1);
    }
    @Override
    public void unlock() {
        //调用模板方法
        syn.release(1);
    }
    // Lock接口其他方法暂时先不实现 略
    // 实现一个独占同步器
    class Sync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryRelease(int arg) {
            if (Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            boolean realRelease = false;
            int nextState = getState() - arg;
            if (nextState == 0) {
                realRelease = true;
                setExclusiveOwnerThread(null);
            }
            setState(nextState);
            return realRelease;
        }
    }
    class FairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
            final Thread currentThread = Thread.currentThread();
            int currentState = getState();
            if (currentState == 0 ) { // 可以获取锁
                //先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
                if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) {
                    setExclusiveOwnerThread(currentThread);
                    return true;
                }
            }else if (currentThread == getExclusiveOwnerThread()) {
                //重入逻辑 增加 state值
                int nextState = currentState + arg;
                if (nextState < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                setState(nextState);
                return true;
            }
            return false;
        }
    }
    class NoFairSync extends Sync {
        @Override
        protected boolean tryAcquire(int arg) {
            final Thread currentThread = Thread.currentThread();
            int currentState = getState();
            if (currentState ==0 ) { // 可以获取锁
                //直接去获取锁
                if (compareAndSetState(0,arg)) {
                    setExclusiveOwnerThread(currentThread);
                    return true;
                }
            }else if (currentThread == getExclusiveOwnerThread()) {
                //重入逻辑 增加 state值
                int nextState = currentState + arg;
                if (nextState < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                setState(nextState);
                return true;
            }
            return false;
        }
    }
}

好了至此我们已经掌握了AQS的核心原理以及它的一个经典实现ReentrantLock几乎全部的知识点,此时打开ReentrantLock的源码你会发现一切都很清爽!!!

5 并发容器

juc中还包含很多其他的并发容器(了解)

1.ConcurrentHashMap

对应:HashMap

目标:代替Hashtable、synchronizedMap,使用最多,源码篇会详细讲解

原理:JDK7中采用Segment分段锁,JDK8中采用CAS+synchronized

2.CopyOnWriteArrayList

对应:ArrayList

目标:代替Vector、synchronizedList

原理:高并发往往是读多写少的特性,读操作不加锁,而对写操作加Lock独享锁,先复制一份新的集合,在新的集合上面修改,然后将新集合赋值给旧的引用,并通过volatile 保证其可见性。

查看源码:volatile array,lock加锁,数组复制

3.CopyOnWriteArraySet

对应:HashSet

目标:代替synchronizedSet

原理:与CopyOnWriteArrayList实现原理类似。

4.ConcurrentSkipListMap

对应:TreeMap

目标:代替synchronizedSortedMap(TreeMap)

原理:基于Skip list(跳表)来代替平衡树,按照分层key上下链接指针来实现。

附加:跳表

5.ConcurrentSkipListSet

对应:TreeSet

目标:代替synchronizedSortedSet(TreeSet)

原理:内部基于ConcurrentSkipListMap实现,原理一致

6.ConcurrentLinkedQueue

对应:LinkedList

对应:无界线程安全队列

原理:通过队首队尾指针,以及Node类元素的next实现FIFO队列

7.BlockingQueue

对应:Queue

特点:拓展了Queue,增加了可阻塞的插入和获取等操作

原理:通过ReentrantLock实现线程安全,通过Condition实现阻塞和唤醒

实现类:

  • LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列
  • ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列
  • PriorityBlockingQueue:按优先级排序的队列



目录
打赏
0
1
1
0
110
分享
相关文章
Java 集合框架中的老炮与新秀:HashTable 和 HashMap 谁更胜一筹?
嗨,大家好,我是技术伙伴小米。今天通过讲故事的方式,详细介绍 Java 中 HashMap 和 HashTable 的区别。从版本、线程安全、null 值支持、性能及迭代器行为等方面对比,帮助你轻松应对面试中的经典问题。HashMap 更高效灵活,适合单线程或需手动处理线程安全的场景;HashTable 较古老,线程安全但性能不佳。现代项目推荐使用 ConcurrentHashMap。关注我的公众号“软件求生”,获取更多技术干货!
66 3
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
62 0
java常见的集合类有哪些
Map接口和Collection接口是所有集合框架的父接口: 1. Collection接口的子接口包括:Set接口和List接口 2. Map接口的实现类主要有:HashMap、TreeMap、Hashtable、ConcurrentHashMap以及 Properties等 3. Set接口的实现类主要有:HashSet、TreeSet、LinkedHashSet等 4. List接口的实现类主要有:ArrayList、LinkedList、Stack以及Vector等
FastExcel:开源的 JAVA 解析 Excel 工具,集成 AI 通过自然语言处理 Excel 文件,完全兼容 EasyExcel
FastExcel 是一款基于 Java 的高性能 Excel 处理工具,专注于优化大规模数据处理,提供简洁易用的 API 和流式操作能力,支持从 EasyExcel 无缝迁移。
425 9
FastExcel:开源的 JAVA 解析 Excel 工具,集成 AI 通过自然语言处理 Excel 文件,完全兼容 EasyExcel
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
126 7
Spring Boot 入门:简化 Java Web 开发的强大工具
Java 集合江湖:底层数据结构的大揭秘!
小米是一位热爱技术分享的程序员,本文详细解析了Java面试中常见的List、Set、Map的区别。不仅介绍了它们的基本特性和实现类,还深入探讨了各自的使用场景和面试技巧,帮助读者更好地理解和应对相关问题。
79 5
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
519 6
Java 集合框架优化:从基础到高级应用
《Java集合框架优化:从基础到高级应用》深入解析Java集合框架的核心原理与优化技巧,涵盖列表、集合、映射等常用数据结构,结合实际案例,指导开发者高效使用和优化Java集合。
89 4
Java 多线程并发编程
Java多线程并发编程是指在Java程序中使用多个线程同时执行,以提高程序的运行效率和响应速度。通过合理管理和调度线程,可以充分利用多核处理器资源,实现高效的任务处理。本内容将介绍Java多线程的基础概念、实现方式及常见问题解决方法。
235 1
|
9月前
|
关于《Java并发编程之线程池十八问》的补充内容
【6月更文挑战第6天】关于《Java并发编程之线程池十八问》的补充内容
69 5
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等