Java并发编程—并发流程控制与AQS原理及相关源码解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介: Java并发编程—并发流程控制与AQS原理及相关源码解析


Java并发编程

代码GitHub地址 github.com/imyiren/con…

  1. 刨根问底搞懂创建线程到底有几种方法?
  2. 如何正确得启动和停止一个线程 最佳实践与源码分析
  3. 多案例理解Object的wait,notify,notifyAll与Thread的sleep,yield,join等方法
  4. 了解线程属性,如何处理子线程异常
  5. 多线程安全和性能问题
  6. JMM(Java内存模型)在并发中的原理与应用
  7. 深入理解死锁问题及其解决方案
  8. 剖析线程池的使用与组成
  9. 带你一文搞懂ThreadLocal的用法以及内部原理
  10. J.U.C下Lock的分类及特点详解(结合案例和源码)
  11. J.U.C下各种Atomic类使用及CAS相关源码分析
  12. 结合源码分析ConcurrenthashMap与CopyOnWriteArrayList的原理
  13. 并发流程控制与AQS原理及相关源码解析

0. 主要内容

  • 文章分为两部分:
  1. 第一个部分主要讲并发流程控制的各大类的使用及案例
  2. 第二部分主要是先将AQS的组成及原理,然后结合CountDownLatch、Semaphore等分析源码逻辑

ps: 文章内容比较多

1. 并发流程控制

1.1 什么是并发流程控制

  • 并发流程控制,就是让线程之间相互配合完成任务,来满足业务逻辑
  • 如:让线程A等待线程B完成后再执行等策略

1.2 并发流程控制的工具

作用 说明
Semaphore 信号量:可以通过控制“许可”的数量,来保证线程间配合 线程只有拿到了许可才可以继续运行
CyclicBarrier 循环栅栏:线程会等待,直到足够多线程达到了规定数量,再执行下一步任务 适用于线程间相互等待处理结果就绪的场景
Phaser 和CyclicBarrier类似,但是计数可变 java7加入的新类
CountDownLatch 也是一个计数等待相关,数量地见到0时,触发动作 不可重复使用
Exchanger 让两个线程在合适时交换对象 适用于两个线程工作在同一个类的不同实例上时,用于交换数据
Condition 可以控制线程的等待和唤醒 是Object.wati()的升级版

2. CountDownLatch计数门闩

2.1 作用

  • 并发流程控制的工具,用于等待数量(我们设定的)足够后再执行某些任务

2.2 主要方法

  • CountDownLatch(int count):只有一个构造方法,参数count为需要倒数的值
  • await():调用此方法的线程会被挂起,它会等到count值为零的时候才继续执行
  • countdown():讲count减1,直到0,等待的线程会被唤醒

2.3 用法一:等待线程执行完毕

/**
 * @author yiren
 */
public class CountDownLatchExample01 {
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger integer = new AtomicInteger(1);
        CountDownLatch latch = new CountDownLatch(5);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName()+ " produce ....");
                    TimeUnit.SECONDS.sleep(1);
                    integer.incrementAndGet();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    latch.countDown();
                }
            });
        }
        System.out.println(Thread.currentThread().getName() + " waiting....");
        latch.await();
        System.out.println(Thread.currentThread().getName() + " finished!");
        System.out.println(Thread.currentThread().getName() + " num: " +  integer.get());
        executorService.shutdown();
    }
}
复制代码
pool-1-thread-1 produce ....
pool-1-thread-2 produce ....
pool-1-thread-3 produce ....
main waiting....
pool-1-thread-4 produce ....
pool-1-thread-5 produce ....
main finished!
main num: 6
Process finished with exit code 0
复制代码

2.4 用法二:多等一

/**
 * @author yiren
 */
public class CountDownLatchExample02 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " ready!");
                try {
                    latch.await();
                    System.out.println(Thread.currentThread().getName()+ " produce ....");
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    latch.countDown();
                }
            });
        }
        Thread.sleep(10);
        System.out.println(Thread.currentThread().getName() + " ready!");
        latch.countDown();
        System.out.println(Thread.currentThread().getName() + " go!");
        executorService.shutdown();
    }
}
复制代码
pool-1-thread-1 ready!
pool-1-thread-4 ready!
pool-1-thread-3 ready!
pool-1-thread-2 ready!
pool-1-thread-5 ready!
main ready!
main go!
pool-1-thread-1 produce ....
pool-1-thread-2 produce ....
pool-1-thread-5 produce ....
pool-1-thread-3 produce ....
pool-1-thread-4 produce ....
Process finished with exit code 0
复制代码

2.4 注意

  • CountDownLatch不仅可以无限等待,还可以给参数,在指定的事件内如果等到就唤醒线程继续执行
boolean await(long timeout, TimeUnit unit)
复制代码
  • CountDownLatch不能重用,如果涉及重新计数,可以使用CyclicBarrier或者新创建CountDownLatch

3. Semaphore信号量

3.1 信号量作用

  • Semaphore可以用来限制或管理数量有限的资源使用情况
  • 信号量的租用是维护一个许可计数,线程可以获取许可,然后信号量减一;线程也可以释放许可,信号量就加一;如果信号量的许可颁发完了,其他线程想要获取,就需要等待,直到有另外的线程释放了许可。

3.2 信号量使用

  1. 初始化Semaphore指定许可数量
  2. 在需要获取许可的代码前面加上acquire()或者acquireUniterruptibly()方法
  3. 任务执行完成有调用release()释放许可

3.3 主要方法

  • Semaphore(int permits, boolean fair)这里设置许可数量,以及是否使用公平策略。
  • 如果传入true那么久吧等待线程放入到FIFO的队列里面。
  • aquire()请求许可,可以响应中断
  • aquireUnniterruptibly()请求许可不可中断
  • tryAcquire()看看现在有没有空闲的许可,如果有那就返回true;这个方法还可以设置等待时间给一个timeout,让线程等待一段时间。
  • release()释放许可

3.4 案例演示

/**
 * @author yiren
 */
public class SemaphoreExample01 {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3, true);
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 8; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName()+" start to get permit");
                    semaphore.acquire();
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + " " + LocalDateTime.now() +" finished!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();
                }
            });
        }
        executorService.shutdown();
    }
}
复制代码
pool-1-thread-1 start to get permit
pool-1-thread-4 start to get permit
pool-1-thread-3 start to get permit
pool-1-thread-2 start to get permit
pool-1-thread-5 start to get permit
pool-1-thread-6 start to get permit
pool-1-thread-7 start to get permit
pool-1-thread-8 start to get permit
pool-1-thread-3 2020-02-21T19:54:47.392 finished!
pool-1-thread-1 2020-02-21T19:54:47.392 finished!
pool-1-thread-4 2020-02-21T19:54:47.392 finished!
pool-1-thread-6 2020-02-21T19:54:49.396 finished!
pool-1-thread-2 2020-02-21T19:54:49.396 finished!
pool-1-thread-5 2020-02-21T19:54:49.396 finished!
pool-1-thread-8 2020-02-21T19:54:51.401 finished!
pool-1-thread-7 2020-02-21T19:54:51.401 finished!
Process finished with exit code 0
复制代码

3.5 注意点

  • 获取和释放的许可证必须一致,acquire和release都是可以传入数值的来确定获取和释放的数量。如果我们获取和释放不一致,就会容易导致程序bug。当然也不是绝对,除非有特殊业务需求,否则都获取释放设置为一样的
  • 注意在初始化Semaphore的时候设置公平性,一般设置为true会比较合理。如果插队情况比较严重的话,某些线程可能一直阻塞
  • 获取和释放许可对线程并不要求,线程A获取了可以线程B释放。

4. Condition接口

4.1 作用

  • 当线程A需要等待某个任务或者某个资源,就可以执行condition.await()方法,然后就会陷入阻塞状态。
  • 此时另一个线程B,去获取资源或者执行任务完成后,调用condition.signal()或者signalAll()方法,通知线程A,继续执行
  • 这个类似于object.wait()notify()notifyAll()
  • signal()方法如果遇到多个线程都在等待的时候,会去唤醒等待时间最长的那个
  • 在我们ReentrantLock中就可以直接新建Condition。看下面案例

4.2 案例演示

  • 普通用法
/**
 * @author yiren
 */
public class ConditionExample01 {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            task1();
        });
        Thread thread2 = new Thread(() -> {
            task2();
        });
        thread1.start();
        Thread.sleep(100);
        thread2.start();
    }
    private static void task1() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " start await()");
            condition.await();
            System.out.println(Thread.currentThread().getName() + " await finished!");
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    private static void task2() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " start signal()");
            Thread.sleep(1000);
            condition.signal();
            System.out.println(Thread.currentThread().getName() + " signal finished!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
复制代码
Thread-0 start await()
Thread-1 start signal()
Thread-1 signal finished!
Thread-0 await finished!
Process finished with exit code 0
复制代码
  • 生产者消费者模式
/**
 * @author yiren
 */
public class ConditionExample02 {
    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();
    public static void main(String[] args) {
        ConditionExample02 conditionDemo2 = new ConditionExample02();
        Producer producer = conditionDemo2.new Producer();
        Consumer consumer = conditionDemo2.new Consumer();
        producer.start();
        consumer.start();
    }
    class Consumer extends Thread {
        @Override
        public void run() {
            consume();
        }
        private void consume() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("队列空,等待数据");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signalAll();
                    System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
                } finally {
                    lock.unlock();
                }
            }
        }
    }
    class Producer extends Thread {
        @Override
        public void run() {
            produce();
        }
        private void produce() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("队列满,等待有空余");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(1);
                    notEmpty.signalAll();
                    System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}
复制代码
  • 以上使用两个Condition作为队列满和空的通知传递工具在生产者和消费者之间互通

4.3 注意点

  • 我们知道Lock可以看做synchronized的替代方案,而Condition就是用来替代object.wait/notify的,在用法上几乎一致。
  • 调用await()方法时必须持有Lock锁,否则会抛出异常,并且await()方法会释放当前持有的Lock锁,
  • 一个Lock锁可以有多个Condition更加灵活

5. CyclicBarrier循环栅栏

5.1 作用

  • CyclicBarrier循环栅栏和CountDownLatch很类似,都能阻塞一组线程
  • 当需要多个线程配合完成任务,并最后需要统一汇总时,我们就可以使用CyclicBarrier,当某个线程完成任务后,它先会等待,等到所有线程都执行好了任务,再一起继续执行剩下的任务
  • 比如:同时出去聚餐约在了公司,等大家到公司了一起走过去。
  • 但是注意CyclicBarrier是可以重复使用的,这个和CountDownLatch不同

5.2 案例

/**
 * @author yiren
 */
public class CyclicBarrierExample {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有人都到场了, 大家统一出发!");
            }
        });
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(i, cyclicBarrier)).start();
        }
    }
    static class Task implements Runnable {
        private int id;
        private CyclicBarrier cyclicBarrier;
        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            System.out.println("线程" + id + "现在前往集合地点");
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("线程" + id + "到了集合地点,开始等待其他人到达");
                cyclicBarrier.await();
                System.out.println("线程" + id + "出发了");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}
复制代码
线程0现在前往集合地点
线程2现在前往集合地点
线程3现在前往集合地点
线程1现在前往集合地点
线程4现在前往集合地点
线程5现在前往集合地点
线程6现在前往集合地点
线程7现在前往集合地点
线程8现在前往集合地点
线程9现在前往集合地点
线程3到了集合地点,开始等待其他人到达
线程9到了集合地点,开始等待其他人到达
线程8到了集合地点,开始等待其他人到达
线程4到了集合地点,开始等待其他人到达
线程5到了集合地点,开始等待其他人到达
所有人都到场了, 大家统一出发!
线程5出发了
线程3出发了
线程8出发了
线程4出发了
线程9出发了
线程1到了集合地点,开始等待其他人到达
线程6到了集合地点,开始等待其他人到达
线程0到了集合地点,开始等待其他人到达
线程7到了集合地点,开始等待其他人到达
线程2到了集合地点,开始等待其他人到达
所有人都到场了, 大家统一出发!
线程2出发了
线程1出发了
线程7出发了
线程0出发了
线程6出发了
Process finished with exit code 0
复制代码
  • 每五个人到了过后,就出发一批

5.3 CountDownLatch和CyclicBarrier`区别

  • 作用不同:CountDownLatch使用countDown()是用于事件的,而CyclicBarrier使用await()是用于线程的
  • 可重用性不同:CountDownLatch在倒数到0后不能再次重用,除非创建新对象;而CyclicBarrier是可以直接重用的

6. 深入AQS理解J.U.C的根基

6.1 AQS作用及其重要性

  • AQS在CountDownLatch等工具内都有使用,全称是:AbstractQueuedSynchronizer是一个抽象类
  • 锁和上面的线程并发控制类(Semaphore等)都有类似的地方。 其实他们底层都是使用了AQS作为基类的拓展
  • 正因为他们很多工作都类似,JDK就把这部分通用逻辑抽离了出来,提供给他们直接使用,使其不必关注很多深层次的细节,从而完成他们的功能。
  • 我们可以大致看一下我们锁用到的这些并发控制的工具类和锁的内部实现
  • `Semaphore``
public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        Sync(int permits) {
            setState(permits);
        }
  ......
复制代码
  • ReentrantLock
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
    ......
复制代码
  • CountDownLatch
public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {
            setState(count);
        }
    ......
复制代码
  • 由上源码我们可以看到,里面都有一个内部类,Sync继承自AbstractQueuedSynchronizer
  • 那么AQS是用来干些什么事情的呢?
  • J.U.C基本都是是基于AQS实现的,AQS是一个用于构建锁、同步器、线程协作工具类的框架供给子类使用,主要使用模板模式来设计。
  • 它主要工作就是管理线程的阻塞与唤醒,实现同步的管理,以及阻塞线程的队列管理工作

6.2 AQS的组成及内部原理

  • AbstractQueuedSynchronizer自JDK1.5加入,是基于FIFO等待队列实现的一个用于同步器的基础框架。
  • JDK1.8 继承AQS实现的类:

  • 我们可以看到,在可重入锁,读写锁,计数门闩等,信号量里面都是用了AQS的子类,接下来我们就学习一下AQS的内部原理
  1. AQS的三大部分
  • state:状态,
  • FIFO队列:线程竞争锁的管理队列
  • 获取和释放方法:需要工具类去实现的方法
  1. state:状态
/**
     * The synchronization state.
     */
    private volatile int state;
复制代码
  • 它的含义并不具体,根据实现的不同而不同,如:Semaphore内是剩余许可数量、CountDownLatch内是还需要倒数的数量,可看做一个计数器,只是不同类的作用及意义不用
protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
复制代码
  • 状态值的更新,是使用Unsafe的CAS完成
  • 在ReentrantLock中:state表示锁的占用情况,可重入的计数,每重入一次就加一,当要释放锁时,它的值就会变成0,表示不被任何线程占有。
  1. FIFO队列:
/**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;
    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;
复制代码
  • 这个队列是用来存放等待的线程的,AQS会对这个队列进行管理。当多个线程竞争锁时,没有拿到锁的,就会被翻到队列中,当前拿到锁的执行任务的线程结束,AQS就会从队列中选一个线程来占有这个锁。
  • AQS维护一个双向链表的等待队列,把等待线程都放到这个队列里面管理;队列头节点是当前拿到锁的线程;在AQS中保存了这个队列的头尾节点。
  1. 获取和释放的方法
  • 获取方法:
  • 获取操作会依赖state变量,经常会阻塞,如:获取不到锁的时候,获取不到许可的时候等
  • ReentrantLock中,就是获取锁。state+1
  • Semaphore中就是acquire获取许可,state-1,当state==0就会阻塞
  • CountDownLatch中就是await方法,就是等待state==0
  • 释放方法:
  • 释放操作不会阻塞
  • ReentrantLock中就是unlock方法调用release(1)对应state-1
  • Semaphore中就是realease,也是state-1
  • CountDownLatch中就是countDown方法,也是state-1
  • 一般情况下,实现类都会实现tryAcquiretryRelease相关方法,以对应各个类的需求

6.3 AQS的用法

  1. 指定协作逻辑,实现获取和释放方法
  2. 在内部写一个Sync类继承AQS
  3. 根据是否独占来决定重写的方法:独占使用tryAcquire/tryRelease、共享使用tryAcquireShared(int acquires)/tryReleaseShared(int releases),在主逻辑里面的获取释放相关方法中调用Sync的方法

7. AQS在CountDownLatch中的源码剖析

  • 下面我们以CountDownLatch为例分析源码:
  • 构造函数
  • 我们看到内部实现就是初始化一个Sync然后把计数值传入
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
复制代码
  • 我们可以看下面的CountDownlatchSync的实现,在构造方法创建的Sync传入的count调用了setState方法传入了AQSstate
  • CountDownLatch内部有一个继承AQS的Sync
private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
复制代码
  • CountDownLatchgetCount()方法
public long getCount() {
        return sync.getCount();
    }
复制代码
  • 我们可以看到getCount实际也是调用SyncgetCount()来获取state并返回
  • CountDownLatchcountDown()方法
public void countDown() {
        sync.releaseShared(1);
    }
复制代码
  • 我们看一看到它直接调用了AQSreleaseShared(1)
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
复制代码
  • releaseShared则是回去调用CountDownLatch中实现的tryReleaseShared
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
复制代码
  • 而在tryReleaseShared中则是主要对state的值做-1操作,如果state大于零可以获取到就减一并且用CAS并发更新值,如果最新值为0就返回true
  • 返回true过后就doReleaseShared释放锁,唤醒队列里面的等待线程。也就是调用了await()方法的线程
  • CountDownLatchawait()方法
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
复制代码
  • await则会调用AQS中的默认实现sync.acquireSharedInterruptibly(1);
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
复制代码
  • 而里面则是调用tryAcquireShared(arg) < 0看是否小于0,如果小于0就代表没有获取到锁,就调用doAcquireSharedInterruptibly(arg);入队
  • tryAcquireShared则是在CountDownLatch中的Sync实现的
protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
复制代码
  • 如果当前state为0了(也就是说计数已经到0了)就返回一个1就不会满住上面的acquireSharedInterruptibly方法中的条件,就会放行,如果不等于0就会返回-1,此时就会入队。调用doAcquireSharedInterruptibly方法
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码
  • 这个方法首先会把当前线程在addWaiter中包装成一个Node节点并添加到队列尾部;而这个Node节点就是FIFO队列的节点。
  • 然后就会进入循环,如果当前节点不是head,那么就会进入到后面的判断,其中重要的是parkAndCheckInterrupt,方法如下:
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
复制代码
  • 它会调用LockSupportpark并且此park方法就是封装了Unsafe的native方法park()来把线程挂起进入阻塞状态
public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
复制代码
  • 再往下就没意义了。我们只需要知道doAcquireSharedInterruptibly方法就是把当前线程放到阻塞队列中,并且把线程阻塞就OK了。
  • AQS在CountDownLatch中使用的一些点:
  • 调用CountDownLatchawait()时,便会尝试获取共享锁,开始时是获取不到锁的,于是就被阻塞
  • 可以获取到的条件就是计数器为0,也就是state==0的时候。
  • 只有每次调用countDown方法才会使得计数器减一,减到0时就回去唤醒阻塞中的线程。

8. AQS在Semaphore中的源码剖析

  • 由于上面讲得很细了,接下来就简略一些
  • Semaphorestate就是许可证的数量
  • 主要的操作就是acquire和release,也是借用Sync对state的操作来控制线程的阻塞与唤醒
public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
复制代码
public void release() {
        sync.releaseShared(1);
    }
复制代码
  • 先看下acquire调用的acquireSharedInterruptibly此方法在上面已经说过。
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
复制代码
  • 而在Semaphore中Sync有两个实现:NonfairSyncFairSync
  • 在FairSync中tryAcquireShared就会有hasQueuedPredecessors判断,如果不是头节点,那就返回-1,在acquireSharedInterruptibly方法中去调用doAcquireSharedInterruptibly入队并且阻塞线程
protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
复制代码
  • 而在NonfairSync中而是直接调用SyncnonfairTryAcquireShared
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
复制代码
final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
复制代码
  • 可以看到其中并没有对是否阻塞队列的头节点判断,直接去获取值,判断是会否许可足够。
  • release中则是调用AQS的releaseShared其也是调用SemaphoreSynctryReleaseShared来判断是否需要释放锁,去唤醒阻塞线程
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
复制代码
  • tryReleaseShared
protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
复制代码
  • 我们可以看到此处就是关于Semaphore的已获取许可的释放 把state加回去然后用CAS更新state

9. AQS在ReentrantLock中的应用

  • 源码就不分析了
  • ReentrantLock中,state主要是重入的次数,加锁的时候state+1 ,而在释放锁的时候,state-1然后判断当前的state==0
  • ReentrantLock中与AQS相关的有三个类:UnfairSyncFairSyncSync
  • 关于加锁和解锁的逻辑也是AQS中的acquire方法的逻辑(获取锁失败就会放入队列中)和release方法(调用子类的tryRelease来去掉头部,并且唤醒线程)
  • 而加锁解锁中的逻辑,主要是公平锁和非公平锁的区别,公平锁会去判断是否在队列头部,如果在才会去执行,而非公平锁则会抢锁。不会管你是不是在队列头部。
  • 相信在上面的源码分析过后,分析ReentrantLock是十分简单的。大家可以自行分析。



目录
相关文章
|
2天前
|
监控 安全 开发工具
鸿蒙HarmonyOS应用开发 | HarmonyOS Next-从应用开发到上架全流程解析
HarmonyOS Next是华为推出的最新版本鸿蒙操作系统,强调多设备协同和分布式技术,提供丰富的开发工具和API接口。本文详细解析了从应用开发到上架的全流程,包括环境搭建、应用设计与开发、多设备适配、测试调试、应用上架及推广等环节,并介绍了鸿蒙原生应用开发者激励计划,帮助开发者更好地融入鸿蒙生态。通过DevEco Studio集成开发环境和华为提供的多种支持工具,开发者可以轻松创建并发布高质量的鸿蒙应用,享受技术和市场推广的双重支持。
117 11
|
5天前
|
域名解析 弹性计算 安全
阿里云服务器租用、注册域名、备案及域名解析完整流程参考(图文教程)
对于很多初次建站的用户来说,选购云服务器和注册应及备案和域名解析步骤必须了解的,目前轻量云服务器2核2G68元一年,2核4G4M服务器298元一年,域名注册方面,阿里云推出域名1元购买活动,新用户注册com和cn域名2年首年仅需0元,xyz和top等域名首年仅需1元。对于建站的用户来说,购买完云服务器并注册好域名之后,下一步还需要操作备案和域名绑定。本文为大家展示阿里云服务器的购买流程,域名注册、绑定以及备案的完整流程,全文以图文教程形式为大家展示具体细节及注意事项,以供新手用户参考。
|
21天前
|
缓存 Java 调度
多线程编程核心:上下文切换深度解析
在现代计算机系统中,多线程编程已成为提高程序性能和响应速度的关键技术。然而,多线程编程中一个不可避免的概念就是上下文切换(Context Switching)。本文将深入探讨上下文切换的概念、原因、影响以及优化策略,帮助你在工作和学习中深入理解这一技术干货。
37 10
|
23天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
51 12
|
18天前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
21天前
|
存储 编译器 C语言
【C语言】数据类型全解析:编程效率提升的秘诀
在C语言中,合理选择和使用数据类型是编程的关键。通过深入理解基本数据类型和派生数据类型,掌握类型限定符和扩展技巧,可以编写出高效、稳定、可维护的代码。无论是在普通应用还是嵌入式系统中,数据类型的合理使用都能显著提升程序的性能和可靠性。
40 8
|
21天前
|
算法 调度 开发者
多线程编程核心:上下文切换深度解析
在多线程编程中,上下文切换是一个至关重要的概念,它直接影响到程序的性能和响应速度。本文将深入探讨上下文切换的含义、原因、影响以及如何优化,帮助你在工作和学习中更好地理解和应用多线程技术。
32 4
|
1月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
73 2
|
2月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
78 0
|
2月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
62 0

推荐镜像

更多