92. 你说你精通Java并发,那给我讲讲JUC吧(二)

简介: 92. 你说你精通Java并发,那给我讲讲JUC吧(二)

92. 你说你精通Java并发,那给我讲讲JUC吧(二)


自定义资源共享方式

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch(CountDownLatch是并发的))。  不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。  再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。  一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

源码解析

1. acquire(int)

acquire是一种以独占方式获取资源,如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。该方法是独占模式下线程获取共享资源的顶层入口。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:

public final void acquire(int arg) {        
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))    
        selfInterrupt();    
}

函数流程如下:

tryAcquire()尝试直接去获取资源,如果成功则直接返回;

addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

接下来介绍相关方法。

1.1 tryAcquire(int)

tryAcquire尝试以独占的方式获取资源,如果获取成功,则直接返回true,否则直接返回false。该方法可以用于实现Lock中的tryLock()方法。该方法的默认实现是抛出UnsupportedOperationException,具体实现由自定义的扩展了AQS的同步类来实现。AQS在这里只负责定义了一个公共的方法框架。这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。

protected boolean tryAcquire(int arg) {    
    throw new UnsupportedOperationException();
}

1.2 addWaiter(Node)

该方法用于将当前线程根据不同的模式(Node.EXCLUSIVE互斥模式、Node.SHARED共享模式)加入到等待队列的队尾,并返回当前线程所在的结点。如果队列不为空,则以通过compareAndSetTail方法以CAS(CAS (compare and swap) 比较并交换,就是将内存值与预期值进行比较,如果相等才将新值替换到内存中,并返回true表示操作成功;如果不相等,则直接返回false表示操作失败。)的方式将当前线程节点加入到等待队列的末尾。否则,通过enq(node)方法初始化一个等待队列,并返回当前节点。源码如下:

private Node addWaiter(Node mode) {
    // 以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享) 
    Node node = new Node(Thread.currentThread(), mode); 
    // 尝试快速方式直接放到队尾。    
    Node pred = tail;   
    if (pred != null) {      
        node.prev = pred;        
        if (compareAndSetTail(pred, node)) {        
            pred.next = node;           
            return node;   
        }
    } 
    // 上一步失败则通过enq入队。 
    enq(node);  
    return node;
}

1.2.1 enq(node)

enq(node)用于将当前节点插入等待队列,如果队列为空,则初始化当前队列。整个过程以CAS自旋的方式进行,直到成功加入队尾为止。源码如下:

private Node enq(final Node node) {    
    // CAS"自旋",直到成功加入队尾    
    for (;;) {  
        Node t = tail;     
        if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。   
            if (compareAndSetHead(new Node()))   
                tail = head;        
        } else {
            // 正常流程,放入队尾         
            node.prev = t;   
            if (compareAndSetTail(t, node)) {          
                t.next = node;        
                return t;       
            }   
        }
    }
}

1.3 acquireQueued(Node, int)

通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了,接下来就是等待队列前面的线程依次出队列,最后轮到自己被唤醒。acquireQueued(Node, int)函数的作用就是这个。  acquireQueued()用于队列中的线程自旋地以独占且不可中断的方式获取同步状态(acquire),直到拿到锁之后再返回。该方法的实现分成两部分:如果当前节点已经成为头结点,尝试获取锁(tryAcquire)成功,然后返回;否则检查当前节点是否应该被park(即进入waiting状态),然后将该线程park并且检查当前线程是否被可以被中断。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; // 标记是否成功拿到资源   
    try {        
        boolean interrupted = false; // 标记等待过程中是否被中断过   
        // 又是一个“自旋”!        
        for (;;) {     
          final Node p = node.predecessor();// 拿到前驱            
            // 如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。      
            if (p == head && tryAcquire(arg)) {         
                setHead(node); // 拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
                p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!                
                failed = false;               
                return interrupted; // 返回等待过程中是否被中断过         
            }            
            // 如果自己可以休息了,就进入waiting状态,直到被unpark()        
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())                
                interrupted = true; // 如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true      
        }   
    } finally {        
        if (failed)    
            cancelAcquire(node);  
    }
}

1.3.1 shouldParkAfterFailedAcquire(Node, Node)

shouldParkAfterFailedAcquire方法通过对当前节点的前一个节点的状态进行判断,对当前节点做出不同的操作(进入waiting状态或者继续往前找)。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  
    int ws = pred.waitStatus; // 拿到前驱的状态    
    if (ws == Node.SIGNAL)   
        // 如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了        
        return true;
    if (ws > 0) {     
        /* 
         * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。     
         * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!     
         */     
        do {    
            node.prev = pred = pred.prev;     
        } while (pred.waitStatus > 0); 
        pred.next = node;   
    } else {  
        // 如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢! 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    
    }    
    return false;
}

1.3.2 parkAndCheckInterrupt()

该方法让线程去休息,真正进入等待状态。park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。

private final boolean parkAndCheckInterrupt() {   
    LockSupport.park(this); // 调用park()使线程进入waiting状态    
    return Thread.interrupted(); // 如果被唤醒,查看自己是不是被中断的。
}

1.3.3 acquireQueued()小结

acquireQueued()函数的具体流程:

结点进入队尾后,检查状态,找到安全休息点;

调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;

被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。

1.4 acquire()小结

acquire()的流程:

调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;

没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

目录
相关文章
|
5天前
|
算法 Java 程序员
Java中的线程同步与并发控制
【5月更文挑战第18天】随着计算机技术的不断发展,多核处理器的普及使得多线程编程成为提高程序性能的关键。在Java中,线程是实现并发的一种重要手段。然而,线程的并发执行可能导致数据不一致、死锁等问题。本文将深入探讨Java中线程同步的方法和技巧,以及如何避免常见的并发问题,从而提高程序的性能和稳定性。
|
5天前
|
安全 Java 容器
Java一分钟之-并发编程:并发容器(ConcurrentHashMap, CopyOnWriteArrayList)
【5月更文挑战第18天】本文探讨了Java并发编程中的`ConcurrentHashMap`和`CopyOnWriteArrayList`,两者为多线程数据共享提供高效、线程安全的解决方案。`ConcurrentHashMap`采用分段锁策略,而`CopyOnWriteArrayList`适合读多写少的场景。注意,`ConcurrentHashMap`的`forEach`需避免手动同步,且并发修改时可能导致`ConcurrentModificationException`。`CopyOnWriteArrayList`在写操作时会复制数组。理解和正确使用这些特性是优化并发性能的关键。
11 1
|
5天前
|
安全 Java 容器
Java一分钟之-高级集合框架:并发集合(Collections.synchronizedXXX)
【5月更文挑战第18天】Java集合框架的`Collections.synchronizedXXX`方法可将普通集合转为线程安全,但使用时需注意常见问题和易错点。错误的同步范围(仅同步单个操作而非迭代)可能导致并发修改异常;错误地同步整个集合类可能引起死锁;并发遍历和修改集合需使用`Iterator`避免`ConcurrentModificationException`。示例代码展示了正确使用同步集合的方法。在复杂并发场景下,推荐使用`java.util.concurrent`包中的并发集合以提高性能。
18 3
|
8天前
|
Java
【Java多线程】面试常考 —— JUC(java.util.concurrent) 的常见类
【Java多线程】面试常考 —— JUC(java.util.concurrent) 的常见类
26 0
|
8天前
|
存储 安全 算法
掌握Java并发编程:Lock、Condition与并发集合
掌握Java并发编程:Lock、Condition与并发集合
16 0
|
8天前
|
Java
Java并发Futures和Callables类
Java程序`TestThread`演示了如何在多线程环境中使用`Futures`和`Callables`。它创建了一个单线程`ExecutorService`,然后提交两个`FactorialService`任务,分别计算10和20的阶乘。每个任务返回一个`Future`对象,通过`get`方法获取结果,该方法会阻塞直到计算完成。计算过程中模拟延迟以展示异步执行。最终,打印出10!和20!的结果。
25 10
|
8天前
|
安全 Java
Java中的并发编程:理解并发性与线程安全
Java作为一种广泛应用的编程语言,在并发编程方面具有显著的优势和特点。本文将探讨Java中的并发编程概念,重点关注并发性与线程安全,并提供一些实用的技巧和建议,帮助开发人员更好地理解和应用Java中的并发机制。
|
8天前
|
存储 安全 Java
【亮剑】`ConcurrentHashMap`是Java中线程安全的哈希表,采用锁定分离技术提高并发性能
【4月更文挑战第30天】`ConcurrentHashMap`是Java中线程安全的哈希表,采用锁定分离技术提高并发性能。数据被分割成多个Segment,每个拥有独立锁,允许多线程并发访问不同Segment。当写操作发生时,计算键的哈希值定位Segment并获取其锁;读操作通常无需锁定。内部会根据负载动态调整Segment,减少锁竞争。虽然使用不公平锁,但Java 8及以上版本提供了公平锁选项。理解其工作原理对开发高性能并发应用至关重要。
|
8天前
|
数据可视化 Java 测试技术
Java 编程问题:十一、并发-深入探索1
Java 编程问题:十一、并发-深入探索
48 0
|
8天前
|
存储 设计模式 安全
Java 编程问题:十、并发-线程池、可调用对象和同步器2
Java 编程问题:十、并发-线程池、可调用对象和同步器
39 0