并发编程之常用概念类和框架(二)

简介: 并发编程之常用概念类和框架

6.AQS架构

  6.1它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)

 6.2AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)

 6.3不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种模板方法

isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

tryAcquire(int): 独占方式。尝试获取资源,成功则返回true,失败则返回false。

tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

6.4以ReentrantLock的源码为例来深入理解AQS

ReentrantLock是基于AQS的独占锁来实现的

6.4.1.state初始化时为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1

6.4.2.此后,其他线程再tryAcquire()会失败,直到A线程unlock()到state=0(即释放锁)为止,其他线程才有机会获得该锁

6.4.3 ReentrantLock 默认采用非公平锁,除非在构造方法中传入参数 true 。非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。

ReentrantLock类结构

下面看下Sync类源代码:

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
        abstract void lock();
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) 
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
        protected final boolean tryRelease(int releases) {
            //releasesy一般传1
            int c = getState() - releases;
            //当前线程不是独占的不满足条件抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
           //如果state - releasesd等于0把空闲状态free置为true,同时把独占线程置为null
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
             //设置新的state
            setState(c);
            return free;
        }
        protected final boolean isHeldExclusively() {
            //判断Thread是否和当前线程相等从而判断是否为独占的
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
        final Thread getOwner() {
            //等于0表示没有任何线程占用这个资源,有的话返回占用的线程
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
        final boolean isLocked() {
            return getState() != 0;
        }
        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

FairSync 类里的tryAcquire方法:

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //队列里找不到等待的其他线程,就设置为当前线程独占,非公平锁(具体实现是Sync类的
                //nonfairTryAcquire方法,见上面代码)下的tryAcquire没有!hasQueuedPredecessors
                //判断其他代码都一样
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //可重入锁主要体现在这段逻辑,占用的线程就是当前线程把state加1
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

AbstractQueuedSynchronizer的 acquire方法

    public final void acquire(int arg) {
       //1.addWaiter(Node.EXCLUSIVE)把最后一个线程放在队列尾部
       //2.acquireQueued循环判断新入的节点是否为前驱节点,是前驱节点时
       //3.整个if的判断逻辑是没有获得锁成功的话加入等待队列的的再打断自己
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire方法调用流程图

AbstractQueuedSynchronizer的 acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            //不断自旋
            for (;;) {
                final Node p = node.predecessor();
                //是前驱节点时,尝试获取许可tryAcquire
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

6.5以CountDownLatch的原理为例来深入理解AQS

CountDownLatch是基于AQS的共享锁来实现的,由于内部类继承了AQS,所以它内部也是FIFO队列,同时也一样是前驱节点唤醒后继节点,不能像CyclicBarrier那样使用完毕后还可以复用;

6.5.1任务分为N个任子线程去执行,state也初始化为N(注意N要要与线程个数一致)

6.5.2这N个线程也是并行执行的,每个子线程执行完后后countDown()一次,state会CAS减1

6.5.3等到所有子线程都执行完成之后即(state==0),会调用LockSupport.unpark(s.thread)

6.5.4然后主调用会从await()函数返回(之前是通过LockSupport.park(this);阻塞),继续后余动作

源码分析: 【JDK源码分析】并发包同步工具CountDownLatch - 还是搬砖踏实 - 博客园

7.ReentrantLock使用场景

场景1:如果发现该操作已经在执行中则不再执行(有状态执行)

ReentrantLock lock = new ReentrantLock();  
if(lock.tryLock()){
    try{
    }finally{
         lock.unlock();
    }
}

场景2:如果发现该操作已经在执行,等待一个一个执行(同步执行,类似synchronized)

场景3:如果发现该操作已经在执行,则尝试等待一段时间,等待超时则不执行(尝试等待执行)

import java.util.concurrent.*;  
import java.util.concurrent.locks.*;  
public class AttemptLocking {  
    private ReentrantLock lock = new ReentrantLock();  
    public void untimed() {  
        boolean captured = lock.tryLock();  
        try {  
            System.out.println("tryLock(): " + captured);  
        } finally {  
            if (captured)  
                lock.unlock();  
        }  
    }  
    public void timed() {  
        boolean captured = false;  
        try {  
            captured = lock.tryLock(2, TimeUnit.SECONDS);  
        } catch (InterruptedException e) {  
            throw new RuntimeException(e);  
        }  
        try {  
            System.out.println("tryLock(2, TimeUnit.SECONDS): " + captured);  
        } finally {  
            if (captured)  
                lock.unlock();  
        }  
    }  
    public static void main(String[] args) throws InterruptedException {  
        final AttemptLocking al = new AttemptLocking();  
        al.untimed(); // True -- 可以成功获得锁  
        al.timed(); // True --可以成功获得锁  
        //新创建一个线程获得锁并且不释放  
        new Thread() {  
            {  
                setDaemon(true);  
            }  
            public void run() {  
                al.lock.lock();  
                System.out.println("acquired");  
            }  
        }.start();  
        Thread.sleep(100);// 保证新线程能够先执行  
        al.untimed(); // False -- 马上中断放弃  
        al.timed(); // False -- 等两秒超时后中断放弃  
    }  
}  

场景4:如果发现该操作已经在执行,等待执行。这时可中断正在进行的操作立刻释放锁继续下一操作(类似于wait())

ReentrantLock lock = new ReentrantLock();  
try{
     lock.lockInterruptibly();
}finally{
     lock.unlock();
}

另外在ReentrantLock类中定义了很多方法,比如:

  isFair()        //判断锁是否是公平锁

  isLocked()    //判断锁是否被任何线程获取了

  isHeldByCurrentThread()   //判断锁是否被当前线程获取了

  hasQueuedThreads()   //判断是否有线程在等待该锁

8、线程中断

Thread.currentThread().interrupt()

线程中断只是一个状态而已,true表示已中断,false表示未中断

//获取线程中断状态,如果中断了返回true,否则返回false

Thread.currentThread().isInterrupted()

 设置线程中断不影响线程的继续执行,但是线程设置中断后,线程内调用了wait、jion、sleep方法中的一种, 立马抛出一个 InterruptedException,且中断标志被清除,重新设置为false。

 

这个恢复过来就可以包含两个目的:

  一、[可以使线程继续执行],那就是在catch语句中招待醒来后的逻辑,或由catch语句转回正常的逻辑。总之它是从wait,sleep,join的暂停状态活过来了。

  二、[可以直接停止线程的运行],当然在catch中什么也不处理,或return,那么就完成了当前线程的使命,可以使在上面"暂停"的状态中立即真正的"停止"。

9、ABA问题

ABA问题是指在CAS操作中带来的潜在问题

CAS意思是 compare and swap 或者 compare and set , CAS 指令需要有 3 个操作数,分别是内存地址 V、旧的预期值 A 和新值 B。当执行操作时,只有当 V 的值等于 A,才将 V 的值更新为 B。

如果CAS操作是基于CPU内核的原子操作,那基本是不会出现ABA问题的,但是如果CAS本身操作不满足原子性,则会带来ABA问题,

比如两个线程

  • 线程1 查询A的值为a,与旧值a比较,
  • 线程2 查询A的值为a,与旧值a比较,相等,更新为b值
  • 线程2 查询A的值为b,与旧值b比较,相等,更新为a值
  • 线程1 相等,更新B的值为c

可以看到这样的情况下,线程1 可以正常 进行CAS操作,将值从a变为c 但是在这之间,实际A值已经发了a->b b->a的转换

仔细思考,这样可能带来的问题是,如果需要关注A值变化过程,是会漏掉一段时间窗口的监控

10、Lock锁和Condition条件

Lock的特性:

  1).Lock不是Java语言内置的;

  2).synchronized是在JVM层面上实现的,如果代码执行出现异常,JVM会自动释放锁,但是Lock不行,要保证锁一定会被释放,就必须将unLock放到finally{}中(手动释放);

  3).在资源竞争不是很激烈的情况下,Synchronized的性能要优于ReetarntLock,但是在很激烈的情况下,synchronized的性能会下降几十倍(新版本的jdk新能相差不大);

  4).ReentrantLock增加了锁:

    a. void lock(); // 无条件的锁;

    b. void lockInterruptibly throws InterruptedException;//可中断的锁;

解释:  使用ReentrantLock如果获取了锁立即返回,如果没有获取锁,当前线程处于休眠状态,直到获得锁或者当前线程可以被别的线程中断去做其他的事情;但是如果是synchronized的话,如果没有获取到锁,则会一直等待下去;

    c. boolean tryLock();//如果获取了锁立即返回true,如果别的线程正持有,立即返回false,不会等待;

    d. boolean tryLock(long timeout,TimeUnit unit);//如果获取了锁立即返回true,如果别的线程正持有锁,会等待参数给的时间,在等待的过程中,如果获取锁,则返回true,如果等待超时,返回false;

Condition的特性:

 1.Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的这些方法是和同步锁捆绑使用的;而Condition是需要与互斥锁/共享锁捆绑使用的。

 2.Condition它更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。

    例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒"读线程";当从缓冲区读出数据之后,唤醒"写线程";并且当缓冲区满的时候,"写线程"需要等待;当缓冲区为空时,"读线程"需要等待。      

      如果采用Object类中的wait(), notify(), notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒"读线程"时,不可能通过notify()或notifyAll()明确的指定唤醒"读线程",而只能通过notifyAll唤醒所有线程(但是notifyAll无法区分唤醒的线程是读线程,还是写线程)。  但是,通过Condition,就能明确的指定唤醒读线程。

相关文章
|
6月前
|
安全 Java 数据安全/隐私保护
|
4月前
|
设计模式 存储 安全
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
62 1
|
5月前
|
存储 安全 Java
深入探讨 Java 封装机制:为何它是面向对象编程的核心?
【6月更文挑战第16天】Java的封装是OOP核心,它将数据和操作数据的方法打包在类中,隐藏实现细节并提供公共接口。例如,`Student`类封装了私有属性`name`和`age`,通过`get/set`方法安全访问。封装提升代码稳定性、可维护性和复用性,防止外部直接修改导致的错误,确保数据安全。它是面向对象编程优于传统编程的关键,促进高效、可靠的开发。
62 7
|
4月前
|
安全 Java 数据库连接
Java面试题:解释Java内存模型的无锁编程支持,并讨论其优势和局限性,解释Java中的CompletableFuture的工作原理,并讨论其在异步编程中的应用
Java面试题:解释Java内存模型的无锁编程支持,并讨论其优势和局限性,解释Java中的CompletableFuture的工作原理,并讨论其在异步编程中的应用
29 0
|
4月前
|
设计模式 缓存 安全
Java面试题:设计模式在并发编程中的创新应用,Java内存管理与多线程工具类的综合应用,Java并发工具包与并发框架的创新应用
Java面试题:设计模式在并发编程中的创新应用,Java内存管理与多线程工具类的综合应用,Java并发工具包与并发框架的创新应用
41 0
|
4月前
|
存储 安全 Java
Java面试题:Java内存管理、多线程与并发框架:一道综合性面试题的深度解析,描述Java内存模型,并解释如何在应用中优化内存使用,阐述Java多线程的创建和管理方式,并讨论线程安全问题
Java面试题:Java内存管理、多线程与并发框架:一道综合性面试题的深度解析,描述Java内存模型,并解释如何在应用中优化内存使用,阐述Java多线程的创建和管理方式,并讨论线程安全问题
45 0
|
Java
多线程的相关概念
多线程的相关概念
64 1
|
存储 Kubernetes API
【k8s概念】一文搞懂k8s核心概念!!!(下)
【k8s概念】一文搞懂k8s核心概念!!!(下)
53353 8
|
存储 Kubernetes 调度
【k8s概念】一文搞懂k8s核心概念!!!(上)
【k8s概念】一文搞懂k8s核心概念!!!(上)
650 0
|
Kubernetes 应用服务中间件 nginx
【k8s概念】一文搞懂k8s核心概念!!!(中)
【k8s概念】一文搞懂k8s核心概念!!!(中)
374 1