并发编程之常用概念类和框架(二)

简介: 并发编程之常用概念类和框架

6.AQS架构

  6.1它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)

 6.2AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)

 6.3不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种模板方法

isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

tryAcquire(int): 独占方式。尝试获取资源,成功则返回true,失败则返回false。

tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

6.4以ReentrantLock的源码为例来深入理解AQS

ReentrantLock是基于AQS的独占锁来实现的

6.4.1.state初始化时为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1

6.4.2.此后,其他线程再tryAcquire()会失败,直到A线程unlock()到state=0(即释放锁)为止,其他线程才有机会获得该锁

6.4.3 ReentrantLock 默认采用非公平锁,除非在构造方法中传入参数 true 。非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。

ReentrantLock类结构

下面看下Sync类源代码:

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
        abstract void lock();
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) 
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
        protected final boolean tryRelease(int releases) {
            //releasesy一般传1
            int c = getState() - releases;
            //当前线程不是独占的不满足条件抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
           //如果state - releasesd等于0把空闲状态free置为true,同时把独占线程置为null
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
             //设置新的state
            setState(c);
            return free;
        }
        protected final boolean isHeldExclusively() {
            //判断Thread是否和当前线程相等从而判断是否为独占的
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
        final Thread getOwner() {
            //等于0表示没有任何线程占用这个资源,有的话返回占用的线程
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
        final boolean isLocked() {
            return getState() != 0;
        }
        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

FairSync 类里的tryAcquire方法:

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //队列里找不到等待的其他线程,就设置为当前线程独占,非公平锁(具体实现是Sync类的
                //nonfairTryAcquire方法,见上面代码)下的tryAcquire没有!hasQueuedPredecessors
                //判断其他代码都一样
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //可重入锁主要体现在这段逻辑,占用的线程就是当前线程把state加1
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

AbstractQueuedSynchronizer的 acquire方法

    public final void acquire(int arg) {
       //1.addWaiter(Node.EXCLUSIVE)把最后一个线程放在队列尾部
       //2.acquireQueued循环判断新入的节点是否为前驱节点,是前驱节点时
       //3.整个if的判断逻辑是没有获得锁成功的话加入等待队列的的再打断自己
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire方法调用流程图

AbstractQueuedSynchronizer的 acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            //不断自旋
            for (;;) {
                final Node p = node.predecessor();
                //是前驱节点时,尝试获取许可tryAcquire
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

6.5以CountDownLatch的原理为例来深入理解AQS

CountDownLatch是基于AQS的共享锁来实现的,由于内部类继承了AQS,所以它内部也是FIFO队列,同时也一样是前驱节点唤醒后继节点,不能像CyclicBarrier那样使用完毕后还可以复用;

6.5.1任务分为N个任子线程去执行,state也初始化为N(注意N要要与线程个数一致)

6.5.2这N个线程也是并行执行的,每个子线程执行完后后countDown()一次,state会CAS减1

6.5.3等到所有子线程都执行完成之后即(state==0),会调用LockSupport.unpark(s.thread)

6.5.4然后主调用会从await()函数返回(之前是通过LockSupport.park(this);阻塞),继续后余动作

源码分析: 【JDK源码分析】并发包同步工具CountDownLatch - 还是搬砖踏实 - 博客园

7.ReentrantLock使用场景

场景1:如果发现该操作已经在执行中则不再执行(有状态执行)

ReentrantLock lock = new ReentrantLock();  
if(lock.tryLock()){
    try{
    }finally{
         lock.unlock();
    }
}

场景2:如果发现该操作已经在执行,等待一个一个执行(同步执行,类似synchronized)

场景3:如果发现该操作已经在执行,则尝试等待一段时间,等待超时则不执行(尝试等待执行)

import java.util.concurrent.*;  
import java.util.concurrent.locks.*;  
public class AttemptLocking {  
    private ReentrantLock lock = new ReentrantLock();  
    public void untimed() {  
        boolean captured = lock.tryLock();  
        try {  
            System.out.println("tryLock(): " + captured);  
        } finally {  
            if (captured)  
                lock.unlock();  
        }  
    }  
    public void timed() {  
        boolean captured = false;  
        try {  
            captured = lock.tryLock(2, TimeUnit.SECONDS);  
        } catch (InterruptedException e) {  
            throw new RuntimeException(e);  
        }  
        try {  
            System.out.println("tryLock(2, TimeUnit.SECONDS): " + captured);  
        } finally {  
            if (captured)  
                lock.unlock();  
        }  
    }  
    public static void main(String[] args) throws InterruptedException {  
        final AttemptLocking al = new AttemptLocking();  
        al.untimed(); // True -- 可以成功获得锁  
        al.timed(); // True --可以成功获得锁  
        //新创建一个线程获得锁并且不释放  
        new Thread() {  
            {  
                setDaemon(true);  
            }  
            public void run() {  
                al.lock.lock();  
                System.out.println("acquired");  
            }  
        }.start();  
        Thread.sleep(100);// 保证新线程能够先执行  
        al.untimed(); // False -- 马上中断放弃  
        al.timed(); // False -- 等两秒超时后中断放弃  
    }  
}  

场景4:如果发现该操作已经在执行,等待执行。这时可中断正在进行的操作立刻释放锁继续下一操作(类似于wait())

ReentrantLock lock = new ReentrantLock();  
try{
     lock.lockInterruptibly();
}finally{
     lock.unlock();
}

另外在ReentrantLock类中定义了很多方法,比如:

  isFair()        //判断锁是否是公平锁

  isLocked()    //判断锁是否被任何线程获取了

  isHeldByCurrentThread()   //判断锁是否被当前线程获取了

  hasQueuedThreads()   //判断是否有线程在等待该锁

8、线程中断

Thread.currentThread().interrupt()

线程中断只是一个状态而已,true表示已中断,false表示未中断

//获取线程中断状态,如果中断了返回true,否则返回false

Thread.currentThread().isInterrupted()

 设置线程中断不影响线程的继续执行,但是线程设置中断后,线程内调用了wait、jion、sleep方法中的一种, 立马抛出一个 InterruptedException,且中断标志被清除,重新设置为false。

 

这个恢复过来就可以包含两个目的:

  一、[可以使线程继续执行],那就是在catch语句中招待醒来后的逻辑,或由catch语句转回正常的逻辑。总之它是从wait,sleep,join的暂停状态活过来了。

  二、[可以直接停止线程的运行],当然在catch中什么也不处理,或return,那么就完成了当前线程的使命,可以使在上面"暂停"的状态中立即真正的"停止"。

9、ABA问题

ABA问题是指在CAS操作中带来的潜在问题

CAS意思是 compare and swap 或者 compare and set , CAS 指令需要有 3 个操作数,分别是内存地址 V、旧的预期值 A 和新值 B。当执行操作时,只有当 V 的值等于 A,才将 V 的值更新为 B。

如果CAS操作是基于CPU内核的原子操作,那基本是不会出现ABA问题的,但是如果CAS本身操作不满足原子性,则会带来ABA问题,

比如两个线程

  • 线程1 查询A的值为a,与旧值a比较,
  • 线程2 查询A的值为a,与旧值a比较,相等,更新为b值
  • 线程2 查询A的值为b,与旧值b比较,相等,更新为a值
  • 线程1 相等,更新B的值为c

可以看到这样的情况下,线程1 可以正常 进行CAS操作,将值从a变为c 但是在这之间,实际A值已经发了a->b b->a的转换

仔细思考,这样可能带来的问题是,如果需要关注A值变化过程,是会漏掉一段时间窗口的监控

10、Lock锁和Condition条件

Lock的特性:

  1).Lock不是Java语言内置的;

  2).synchronized是在JVM层面上实现的,如果代码执行出现异常,JVM会自动释放锁,但是Lock不行,要保证锁一定会被释放,就必须将unLock放到finally{}中(手动释放);

  3).在资源竞争不是很激烈的情况下,Synchronized的性能要优于ReetarntLock,但是在很激烈的情况下,synchronized的性能会下降几十倍(新版本的jdk新能相差不大);

  4).ReentrantLock增加了锁:

    a. void lock(); // 无条件的锁;

    b. void lockInterruptibly throws InterruptedException;//可中断的锁;

解释:  使用ReentrantLock如果获取了锁立即返回,如果没有获取锁,当前线程处于休眠状态,直到获得锁或者当前线程可以被别的线程中断去做其他的事情;但是如果是synchronized的话,如果没有获取到锁,则会一直等待下去;

    c. boolean tryLock();//如果获取了锁立即返回true,如果别的线程正持有,立即返回false,不会等待;

    d. boolean tryLock(long timeout,TimeUnit unit);//如果获取了锁立即返回true,如果别的线程正持有锁,会等待参数给的时间,在等待的过程中,如果获取锁,则返回true,如果等待超时,返回false;

Condition的特性:

 1.Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的这些方法是和同步锁捆绑使用的;而Condition是需要与互斥锁/共享锁捆绑使用的。

 2.Condition它更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。

    例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒"读线程";当从缓冲区读出数据之后,唤醒"写线程";并且当缓冲区满的时候,"写线程"需要等待;当缓冲区为空时,"读线程"需要等待。      

      如果采用Object类中的wait(), notify(), notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒"读线程"时,不可能通过notify()或notifyAll()明确的指定唤醒"读线程",而只能通过notifyAll唤醒所有线程(但是notifyAll无法区分唤醒的线程是读线程,还是写线程)。  但是,通过Condition,就能明确的指定唤醒读线程。

目录
相关文章
|
存储 自然语言处理 搜索推荐
Machine Learning机器学习之文本分析的知识图谱(详细讲解)
Machine Learning机器学习之文本分析的知识图谱(详细讲解)
|
网络安全 开发者 iOS开发
iOS技术博客:App备案指南
本文介绍了移动应用程序(App)备案的重要性和流程。备案是规范App开发和运营的必要手段,有助于保护用户权益、维护网络安全和社会秩序。为了帮助开发者更好地了解备案流程,本文提供了一份最新、最全、最详的备案指南,包括备案目的、好处、对象、时间、流程、条件和注意事项等内容。
iOS技术博客:App备案指南
|
JavaScript NoSQL Java
接替此文【下篇-服务端+后台管理】优雅草蜻蜓z系统JAVA版暗影版为例-【蜻蜓z系列通用】-2025年全新项目整合搭建方式-这是独立吃透代码以后首次改变-独立PC版本vue版搭建教程-优雅草卓伊凡
接替此文【下篇-服务端+后台管理】优雅草蜻蜓z系统JAVA版暗影版为例-【蜻蜓z系列通用】-2025年全新项目整合搭建方式-这是独立吃透代码以后首次改变-独立PC版本vue版搭建教程-优雅草卓伊凡
833 96
接替此文【下篇-服务端+后台管理】优雅草蜻蜓z系统JAVA版暗影版为例-【蜻蜓z系列通用】-2025年全新项目整合搭建方式-这是独立吃透代码以后首次改变-独立PC版本vue版搭建教程-优雅草卓伊凡
|
11月前
|
机器学习/深度学习 编解码 监控
人机融合智能 | 人智交互的神经人因学方法
本文探讨了智能时代神经人因学方法在人智交互研究和应用中的推动作用,重点介绍了内源性智能系统、面向认知的智能系统、智能穿戴可移动技术和外源性智能系统四大内容。其中,内源性智能系统以神经反馈技术为核心,涵盖多模态采集系统、脑状态解码及智能反馈控制;面向认知的智能系统涉及认知神经信号处理与特征提取;智能穿戴技术覆盖肌电、心电等外周生理数据采集;外源性智能系统则聚焦于认知功能增强新技术。文章还深入分析了神经反馈训练的发展历程、技术优势及智能化发展方向,为高级智能系统的构建提供了理论与实践支持。
352 3
|
存储 物联网 数据处理
什么数据中心最好?盘点全球十大数据中心!
在数字时代,数据中心作为关键基础设施,支撑着商业和社会的高效运转。从AWS、谷歌、微软到阿里云、苹果等巨头的数据中心,它们各具特色,涵盖高性能计算、液冷技术、绿色节能和高安全性等领域。这些“超级堡垒”不仅保障了在线交易、远程教育、智慧医疗等服务的稳定运行,还推动了云计算、大数据和物联网的发展,极大提升了社会效率和生活质量。每个数据中心根据自身优势,在不同应用场景中发挥着不可替代的作用,共同构建了数字化世界的基石。
1544 1
|
IDE 程序员 开发工具
Python编程入门:打造你的第一个程序
迈出编程的第一步,就像在未知的海洋中航行。本文是你启航的指南针,带你了解Python这门语言的魅力所在,并手把手教你构建第一个属于自己的程序。从安装环境到编写代码,我们将一步步走过这段旅程。准备好了吗?让我们开始吧!
|
人工智能 监控 物联网
数字孪生与智慧城市:构建未来城市模型
在信息化和智能化时代,数字孪生技术融合大数据、云计算、物联网和AI,成为推动智慧城市建设的关键力量。本文探讨其在城市规划、管理、交通、环保及公共服务中的应用,展现其如何优化城市运行,助力构建未来的理想城市模型。
|
运维 监控 关系型数据库
PostgreSQL运维核心技能之掌握并行查询
PostgreSQL运维核心技能之掌握并行查询
491 9
|
中间件
eggjs 怎么使用 egg-jwt 实现登录验证中间件?
eggjs 怎么使用 egg-jwt 实现登录验证中间件?
872 94
eggjs 怎么使用 egg-jwt 实现登录验证中间件?
|
弹性计算 关系型数据库 网络安全
【ECS最佳实践】ECS+RDS构建云服务器主动防御系统部署开源蜜罐系统Hfish及ECS周边功能测试
我已经是阿里云ECS产品的老用户了,阿里的云计算产品性能可靠性毋庸置疑,这次分享一个开源蜜罐系统Hfish的单节点搭建,并围绕ECS周边的技术功能做个简单举例。
29585 27
【ECS最佳实践】ECS+RDS构建云服务器主动防御系统部署开源蜜罐系统Hfish及ECS周边功能测试