92. 你说你精通Java并发,那给我讲讲JUC吧(二)

简介: 92. 你说你精通Java并发,那给我讲讲JUC吧(二)

92. 你说你精通Java并发,那给我讲讲JUC吧(二)


自定义资源共享方式

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch(CountDownLatch是并发的))。  不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。  再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。  一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

源码解析

1. acquire(int)

acquire是一种以独占方式获取资源,如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。该方法是独占模式下线程获取共享资源的顶层入口。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:

public final void acquire(int arg) {        
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))    
        selfInterrupt();    
}

函数流程如下:

tryAcquire()尝试直接去获取资源,如果成功则直接返回;

addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

接下来介绍相关方法。

1.1 tryAcquire(int)

tryAcquire尝试以独占的方式获取资源,如果获取成功,则直接返回true,否则直接返回false。该方法可以用于实现Lock中的tryLock()方法。该方法的默认实现是抛出UnsupportedOperationException,具体实现由自定义的扩展了AQS的同步类来实现。AQS在这里只负责定义了一个公共的方法框架。这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。

protected boolean tryAcquire(int arg) {    
    throw new UnsupportedOperationException();
}

1.2 addWaiter(Node)

该方法用于将当前线程根据不同的模式(Node.EXCLUSIVE互斥模式、Node.SHARED共享模式)加入到等待队列的队尾,并返回当前线程所在的结点。如果队列不为空,则以通过compareAndSetTail方法以CAS(CAS (compare and swap) 比较并交换,就是将内存值与预期值进行比较,如果相等才将新值替换到内存中,并返回true表示操作成功;如果不相等,则直接返回false表示操作失败。)的方式将当前线程节点加入到等待队列的末尾。否则,通过enq(node)方法初始化一个等待队列,并返回当前节点。源码如下:

private Node addWaiter(Node mode) {
    // 以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享) 
    Node node = new Node(Thread.currentThread(), mode); 
    // 尝试快速方式直接放到队尾。    
    Node pred = tail;   
    if (pred != null) {      
        node.prev = pred;        
        if (compareAndSetTail(pred, node)) {        
            pred.next = node;           
            return node;   
        }
    } 
    // 上一步失败则通过enq入队。 
    enq(node);  
    return node;
}

1.2.1 enq(node)

enq(node)用于将当前节点插入等待队列,如果队列为空,则初始化当前队列。整个过程以CAS自旋的方式进行,直到成功加入队尾为止。源码如下:

private Node enq(final Node node) {    
    // CAS"自旋",直到成功加入队尾    
    for (;;) {  
        Node t = tail;     
        if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。   
            if (compareAndSetHead(new Node()))   
                tail = head;        
        } else {
            // 正常流程,放入队尾         
            node.prev = t;   
            if (compareAndSetTail(t, node)) {          
                t.next = node;        
                return t;       
            }   
        }
    }
}

1.3 acquireQueued(Node, int)

通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了,接下来就是等待队列前面的线程依次出队列,最后轮到自己被唤醒。acquireQueued(Node, int)函数的作用就是这个。  acquireQueued()用于队列中的线程自旋地以独占且不可中断的方式获取同步状态(acquire),直到拿到锁之后再返回。该方法的实现分成两部分:如果当前节点已经成为头结点,尝试获取锁(tryAcquire)成功,然后返回;否则检查当前节点是否应该被park(即进入waiting状态),然后将该线程park并且检查当前线程是否被可以被中断。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; // 标记是否成功拿到资源   
    try {        
        boolean interrupted = false; // 标记等待过程中是否被中断过   
        // 又是一个“自旋”!        
        for (;;) {     
          final Node p = node.predecessor();// 拿到前驱            
            // 如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。      
            if (p == head && tryAcquire(arg)) {         
                setHead(node); // 拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
                p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!                
                failed = false;               
                return interrupted; // 返回等待过程中是否被中断过         
            }            
            // 如果自己可以休息了,就进入waiting状态,直到被unpark()        
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())                
                interrupted = true; // 如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true      
        }   
    } finally {        
        if (failed)    
            cancelAcquire(node);  
    }
}

1.3.1 shouldParkAfterFailedAcquire(Node, Node)

shouldParkAfterFailedAcquire方法通过对当前节点的前一个节点的状态进行判断,对当前节点做出不同的操作(进入waiting状态或者继续往前找)。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  
    int ws = pred.waitStatus; // 拿到前驱的状态    
    if (ws == Node.SIGNAL)   
        // 如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了        
        return true;
    if (ws > 0) {     
        /* 
         * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。     
         * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!     
         */     
        do {    
            node.prev = pred = pred.prev;     
        } while (pred.waitStatus > 0); 
        pred.next = node;   
    } else {  
        // 如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢! 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    
    }    
    return false;
}

1.3.2 parkAndCheckInterrupt()

该方法让线程去休息,真正进入等待状态。park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。

private final boolean parkAndCheckInterrupt() {   
    LockSupport.park(this); // 调用park()使线程进入waiting状态    
    return Thread.interrupted(); // 如果被唤醒,查看自己是不是被中断的。
}

1.3.3 acquireQueued()小结

acquireQueued()函数的具体流程:

结点进入队尾后,检查状态,找到安全休息点;

调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;

被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。

1.4 acquire()小结

acquire()的流程:

调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;

没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

目录
相关文章
|
5月前
|
Java 大数据 Go
从混沌到秩序:Java共享内存模型如何通过显式约束驯服并发?
并发编程旨在混乱中建立秩序。本文对比Java共享内存模型与Golang消息传递模型,剖析显式同步与隐式因果的哲学差异,揭示happens-before等机制如何保障内存可见性与数据一致性,展现两大范式的深层分野。(238字)
153 4
|
5月前
|
缓存 安全 Java
如何理解Java中的并发?
Java并发指多任务交替执行,提升资源利用率与响应速度。通过线程实现,涉及线程安全、可见性、原子性等问题,需用synchronized、volatile、线程池及并发工具类解决,是高并发系统开发的关键基础。(238字)
321 5
|
5月前
|
缓存 安全 Java
JUC系列《深入浅出Java并发容器:CopyOnWriteArrayList全解析》
CopyOnWriteArrayList是Java中基于“写时复制”实现的线程安全List,读操作无锁、性能高,适合读多写少场景,如配置管理、事件监听器等,但频繁写入时因复制开销大需谨慎使用。
|
5月前
|
设计模式 算法 安全
JUC系列之《深入理解AQS:Java并发锁的基石与灵魂 》
本文深入解析Java并发核心组件AQS(AbstractQueuedSynchronizer),从其设计动机、核心思想到源码实现,系统阐述了AQS如何通过state状态、CLH队列和模板方法模式构建通用同步框架,并结合独占与共享模式分析典型应用,最后通过自定义锁的实战案例,帮助读者掌握其原理与最佳实践。
|
前端开发 Java C++
JUC系列之《CompletableFuture:Java异步编程的终极武器》
本文深入解析Java 8引入的CompletableFuture,对比传统Future的局限,详解其非阻塞回调、链式编排、多任务组合及异常处理等核心功能,结合实战示例展示异步编程的最佳实践,助你构建高效、响应式的Java应用。
|
8月前
|
SQL 缓存 安全
深度理解 Java 内存模型:从并发基石到实践应用
本文深入解析 Java 内存模型(JMM),涵盖其在并发编程中的核心作用与实践应用。内容包括 JMM 解决的可见性、原子性和有序性问题,线程与内存的交互机制,volatile、synchronized 和 happens-before 等关键机制的使用,以及在单例模式、线程通信等场景中的实战案例。同时,还介绍了常见并发 Bug 的排查与解决方案,帮助开发者写出高效、线程安全的 Java 程序。
436 0
|
8月前
|
Java API 调度
从阻塞到畅通:Java虚拟线程开启并发新纪元
从阻塞到畅通:Java虚拟线程开启并发新纪元
418 83
|
8月前
|
存储 Java 调度
Java虚拟线程:轻量级并发的革命性突破
Java虚拟线程:轻量级并发的革命性突破
456 83
|
运维 Java 大数据
Java并发JUC(java.util.concurrent)线程池
Java并发JUC(java.util.concurrent)线程池
Java并发JUC(java.util.concurrent)线程池