Java并发编程—并发流程控制与AQS原理及相关源码解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: Java并发编程—并发流程控制与AQS原理及相关源码解析


Java并发编程

代码GitHub地址 github.com/imyiren/con…

  1. 刨根问底搞懂创建线程到底有几种方法?
  2. 如何正确得启动和停止一个线程 最佳实践与源码分析
  3. 多案例理解Object的wait,notify,notifyAll与Thread的sleep,yield,join等方法
  4. 了解线程属性,如何处理子线程异常
  5. 多线程安全和性能问题
  6. JMM(Java内存模型)在并发中的原理与应用
  7. 深入理解死锁问题及其解决方案
  8. 剖析线程池的使用与组成
  9. 带你一文搞懂ThreadLocal的用法以及内部原理
  10. J.U.C下Lock的分类及特点详解(结合案例和源码)
  11. J.U.C下各种Atomic类使用及CAS相关源码分析
  12. 结合源码分析ConcurrenthashMap与CopyOnWriteArrayList的原理
  13. 并发流程控制与AQS原理及相关源码解析

0. 主要内容

  • 文章分为两部分:
  1. 第一个部分主要讲并发流程控制的各大类的使用及案例
  2. 第二部分主要是先将AQS的组成及原理,然后结合CountDownLatch、Semaphore等分析源码逻辑

ps: 文章内容比较多

1. 并发流程控制

1.1 什么是并发流程控制

  • 并发流程控制,就是让线程之间相互配合完成任务,来满足业务逻辑
  • 如:让线程A等待线程B完成后再执行等策略

1.2 并发流程控制的工具

作用 说明
Semaphore 信号量:可以通过控制“许可”的数量,来保证线程间配合 线程只有拿到了许可才可以继续运行
CyclicBarrier 循环栅栏:线程会等待,直到足够多线程达到了规定数量,再执行下一步任务 适用于线程间相互等待处理结果就绪的场景
Phaser 和CyclicBarrier类似,但是计数可变 java7加入的新类
CountDownLatch 也是一个计数等待相关,数量地见到0时,触发动作 不可重复使用
Exchanger 让两个线程在合适时交换对象 适用于两个线程工作在同一个类的不同实例上时,用于交换数据
Condition 可以控制线程的等待和唤醒 是Object.wati()的升级版

2. CountDownLatch计数门闩

2.1 作用

  • 并发流程控制的工具,用于等待数量(我们设定的)足够后再执行某些任务

2.2 主要方法

  • CountDownLatch(int count):只有一个构造方法,参数count为需要倒数的值
  • await():调用此方法的线程会被挂起,它会等到count值为零的时候才继续执行
  • countdown():讲count减1,直到0,等待的线程会被唤醒

2.3 用法一:等待线程执行完毕

/**
 * @author yiren
 */
public class CountDownLatchExample01 {
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger integer = new AtomicInteger(1);
        CountDownLatch latch = new CountDownLatch(5);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName()+ " produce ....");
                    TimeUnit.SECONDS.sleep(1);
                    integer.incrementAndGet();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    latch.countDown();
                }
            });
        }
        System.out.println(Thread.currentThread().getName() + " waiting....");
        latch.await();
        System.out.println(Thread.currentThread().getName() + " finished!");
        System.out.println(Thread.currentThread().getName() + " num: " +  integer.get());
        executorService.shutdown();
    }
}
复制代码
pool-1-thread-1 produce ....
pool-1-thread-2 produce ....
pool-1-thread-3 produce ....
main waiting....
pool-1-thread-4 produce ....
pool-1-thread-5 produce ....
main finished!
main num: 6
Process finished with exit code 0
复制代码

2.4 用法二:多等一

/**
 * @author yiren
 */
public class CountDownLatchExample02 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " ready!");
                try {
                    latch.await();
                    System.out.println(Thread.currentThread().getName()+ " produce ....");
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    latch.countDown();
                }
            });
        }
        Thread.sleep(10);
        System.out.println(Thread.currentThread().getName() + " ready!");
        latch.countDown();
        System.out.println(Thread.currentThread().getName() + " go!");
        executorService.shutdown();
    }
}
复制代码
pool-1-thread-1 ready!
pool-1-thread-4 ready!
pool-1-thread-3 ready!
pool-1-thread-2 ready!
pool-1-thread-5 ready!
main ready!
main go!
pool-1-thread-1 produce ....
pool-1-thread-2 produce ....
pool-1-thread-5 produce ....
pool-1-thread-3 produce ....
pool-1-thread-4 produce ....
Process finished with exit code 0
复制代码

2.4 注意

  • CountDownLatch不仅可以无限等待,还可以给参数,在指定的事件内如果等到就唤醒线程继续执行
boolean await(long timeout, TimeUnit unit)
复制代码
  • CountDownLatch不能重用,如果涉及重新计数,可以使用CyclicBarrier或者新创建CountDownLatch

3. Semaphore信号量

3.1 信号量作用

  • Semaphore可以用来限制或管理数量有限的资源使用情况
  • 信号量的租用是维护一个许可计数,线程可以获取许可,然后信号量减一;线程也可以释放许可,信号量就加一;如果信号量的许可颁发完了,其他线程想要获取,就需要等待,直到有另外的线程释放了许可。

3.2 信号量使用

  1. 初始化Semaphore指定许可数量
  2. 在需要获取许可的代码前面加上acquire()或者acquireUniterruptibly()方法
  3. 任务执行完成有调用release()释放许可

3.3 主要方法

  • Semaphore(int permits, boolean fair)这里设置许可数量,以及是否使用公平策略。
  • 如果传入true那么久吧等待线程放入到FIFO的队列里面。
  • aquire()请求许可,可以响应中断
  • aquireUnniterruptibly()请求许可不可中断
  • tryAcquire()看看现在有没有空闲的许可,如果有那就返回true;这个方法还可以设置等待时间给一个timeout,让线程等待一段时间。
  • release()释放许可

3.4 案例演示

/**
 * @author yiren
 */
public class SemaphoreExample01 {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3, true);
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 8; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName()+" start to get permit");
                    semaphore.acquire();
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + " " + LocalDateTime.now() +" finished!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();
                }
            });
        }
        executorService.shutdown();
    }
}
复制代码
pool-1-thread-1 start to get permit
pool-1-thread-4 start to get permit
pool-1-thread-3 start to get permit
pool-1-thread-2 start to get permit
pool-1-thread-5 start to get permit
pool-1-thread-6 start to get permit
pool-1-thread-7 start to get permit
pool-1-thread-8 start to get permit
pool-1-thread-3 2020-02-21T19:54:47.392 finished!
pool-1-thread-1 2020-02-21T19:54:47.392 finished!
pool-1-thread-4 2020-02-21T19:54:47.392 finished!
pool-1-thread-6 2020-02-21T19:54:49.396 finished!
pool-1-thread-2 2020-02-21T19:54:49.396 finished!
pool-1-thread-5 2020-02-21T19:54:49.396 finished!
pool-1-thread-8 2020-02-21T19:54:51.401 finished!
pool-1-thread-7 2020-02-21T19:54:51.401 finished!
Process finished with exit code 0
复制代码

3.5 注意点

  • 获取和释放的许可证必须一致,acquire和release都是可以传入数值的来确定获取和释放的数量。如果我们获取和释放不一致,就会容易导致程序bug。当然也不是绝对,除非有特殊业务需求,否则都获取释放设置为一样的
  • 注意在初始化Semaphore的时候设置公平性,一般设置为true会比较合理。如果插队情况比较严重的话,某些线程可能一直阻塞
  • 获取和释放许可对线程并不要求,线程A获取了可以线程B释放。

4. Condition接口

4.1 作用

  • 当线程A需要等待某个任务或者某个资源,就可以执行condition.await()方法,然后就会陷入阻塞状态。
  • 此时另一个线程B,去获取资源或者执行任务完成后,调用condition.signal()或者signalAll()方法,通知线程A,继续执行
  • 这个类似于object.wait()notify()notifyAll()
  • signal()方法如果遇到多个线程都在等待的时候,会去唤醒等待时间最长的那个
  • 在我们ReentrantLock中就可以直接新建Condition。看下面案例

4.2 案例演示

  • 普通用法
/**
 * @author yiren
 */
public class ConditionExample01 {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            task1();
        });
        Thread thread2 = new Thread(() -> {
            task2();
        });
        thread1.start();
        Thread.sleep(100);
        thread2.start();
    }
    private static void task1() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " start await()");
            condition.await();
            System.out.println(Thread.currentThread().getName() + " await finished!");
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    private static void task2() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " start signal()");
            Thread.sleep(1000);
            condition.signal();
            System.out.println(Thread.currentThread().getName() + " signal finished!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
复制代码
Thread-0 start await()
Thread-1 start signal()
Thread-1 signal finished!
Thread-0 await finished!
Process finished with exit code 0
复制代码
  • 生产者消费者模式
/**
 * @author yiren
 */
public class ConditionExample02 {
    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();
    public static void main(String[] args) {
        ConditionExample02 conditionDemo2 = new ConditionExample02();
        Producer producer = conditionDemo2.new Producer();
        Consumer consumer = conditionDemo2.new Consumer();
        producer.start();
        consumer.start();
    }
    class Consumer extends Thread {
        @Override
        public void run() {
            consume();
        }
        private void consume() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("队列空,等待数据");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signalAll();
                    System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
                } finally {
                    lock.unlock();
                }
            }
        }
    }
    class Producer extends Thread {
        @Override
        public void run() {
            produce();
        }
        private void produce() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("队列满,等待有空余");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(1);
                    notEmpty.signalAll();
                    System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}
复制代码
  • 以上使用两个Condition作为队列满和空的通知传递工具在生产者和消费者之间互通

4.3 注意点

  • 我们知道Lock可以看做synchronized的替代方案,而Condition就是用来替代object.wait/notify的,在用法上几乎一致。
  • 调用await()方法时必须持有Lock锁,否则会抛出异常,并且await()方法会释放当前持有的Lock锁,
  • 一个Lock锁可以有多个Condition更加灵活

5. CyclicBarrier循环栅栏

5.1 作用

  • CyclicBarrier循环栅栏和CountDownLatch很类似,都能阻塞一组线程
  • 当需要多个线程配合完成任务,并最后需要统一汇总时,我们就可以使用CyclicBarrier,当某个线程完成任务后,它先会等待,等到所有线程都执行好了任务,再一起继续执行剩下的任务
  • 比如:同时出去聚餐约在了公司,等大家到公司了一起走过去。
  • 但是注意CyclicBarrier是可以重复使用的,这个和CountDownLatch不同

5.2 案例

/**
 * @author yiren
 */
public class CyclicBarrierExample {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有人都到场了, 大家统一出发!");
            }
        });
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(i, cyclicBarrier)).start();
        }
    }
    static class Task implements Runnable {
        private int id;
        private CyclicBarrier cyclicBarrier;
        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            System.out.println("线程" + id + "现在前往集合地点");
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("线程" + id + "到了集合地点,开始等待其他人到达");
                cyclicBarrier.await();
                System.out.println("线程" + id + "出发了");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}
复制代码
线程0现在前往集合地点
线程2现在前往集合地点
线程3现在前往集合地点
线程1现在前往集合地点
线程4现在前往集合地点
线程5现在前往集合地点
线程6现在前往集合地点
线程7现在前往集合地点
线程8现在前往集合地点
线程9现在前往集合地点
线程3到了集合地点,开始等待其他人到达
线程9到了集合地点,开始等待其他人到达
线程8到了集合地点,开始等待其他人到达
线程4到了集合地点,开始等待其他人到达
线程5到了集合地点,开始等待其他人到达
所有人都到场了, 大家统一出发!
线程5出发了
线程3出发了
线程8出发了
线程4出发了
线程9出发了
线程1到了集合地点,开始等待其他人到达
线程6到了集合地点,开始等待其他人到达
线程0到了集合地点,开始等待其他人到达
线程7到了集合地点,开始等待其他人到达
线程2到了集合地点,开始等待其他人到达
所有人都到场了, 大家统一出发!
线程2出发了
线程1出发了
线程7出发了
线程0出发了
线程6出发了
Process finished with exit code 0
复制代码
  • 每五个人到了过后,就出发一批

5.3 CountDownLatch和CyclicBarrier`区别

  • 作用不同:CountDownLatch使用countDown()是用于事件的,而CyclicBarrier使用await()是用于线程的
  • 可重用性不同:CountDownLatch在倒数到0后不能再次重用,除非创建新对象;而CyclicBarrier是可以直接重用的

6. 深入AQS理解J.U.C的根基

6.1 AQS作用及其重要性

  • AQS在CountDownLatch等工具内都有使用,全称是:AbstractQueuedSynchronizer是一个抽象类
  • 锁和上面的线程并发控制类(Semaphore等)都有类似的地方。 其实他们底层都是使用了AQS作为基类的拓展
  • 正因为他们很多工作都类似,JDK就把这部分通用逻辑抽离了出来,提供给他们直接使用,使其不必关注很多深层次的细节,从而完成他们的功能。
  • 我们可以大致看一下我们锁用到的这些并发控制的工具类和锁的内部实现
  • `Semaphore``
public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        Sync(int permits) {
            setState(permits);
        }
  ......
复制代码
  • ReentrantLock
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
    ......
复制代码
  • CountDownLatch
public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {
            setState(count);
        }
    ......
复制代码
  • 由上源码我们可以看到,里面都有一个内部类,Sync继承自AbstractQueuedSynchronizer
  • 那么AQS是用来干些什么事情的呢?
  • J.U.C基本都是是基于AQS实现的,AQS是一个用于构建锁、同步器、线程协作工具类的框架供给子类使用,主要使用模板模式来设计。
  • 它主要工作就是管理线程的阻塞与唤醒,实现同步的管理,以及阻塞线程的队列管理工作

6.2 AQS的组成及内部原理

  • AbstractQueuedSynchronizer自JDK1.5加入,是基于FIFO等待队列实现的一个用于同步器的基础框架。
  • JDK1.8 继承AQS实现的类:

  • 我们可以看到,在可重入锁,读写锁,计数门闩等,信号量里面都是用了AQS的子类,接下来我们就学习一下AQS的内部原理
  1. AQS的三大部分
  • state:状态,
  • FIFO队列:线程竞争锁的管理队列
  • 获取和释放方法:需要工具类去实现的方法
  1. state:状态
/**
     * The synchronization state.
     */
    private volatile int state;
复制代码
  • 它的含义并不具体,根据实现的不同而不同,如:Semaphore内是剩余许可数量、CountDownLatch内是还需要倒数的数量,可看做一个计数器,只是不同类的作用及意义不用
protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
复制代码
  • 状态值的更新,是使用Unsafe的CAS完成
  • 在ReentrantLock中:state表示锁的占用情况,可重入的计数,每重入一次就加一,当要释放锁时,它的值就会变成0,表示不被任何线程占有。
  1. FIFO队列:
/**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;
    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;
复制代码
  • 这个队列是用来存放等待的线程的,AQS会对这个队列进行管理。当多个线程竞争锁时,没有拿到锁的,就会被翻到队列中,当前拿到锁的执行任务的线程结束,AQS就会从队列中选一个线程来占有这个锁。
  • AQS维护一个双向链表的等待队列,把等待线程都放到这个队列里面管理;队列头节点是当前拿到锁的线程;在AQS中保存了这个队列的头尾节点。
  1. 获取和释放的方法
  • 获取方法:
  • 获取操作会依赖state变量,经常会阻塞,如:获取不到锁的时候,获取不到许可的时候等
  • ReentrantLock中,就是获取锁。state+1
  • Semaphore中就是acquire获取许可,state-1,当state==0就会阻塞
  • CountDownLatch中就是await方法,就是等待state==0
  • 释放方法:
  • 释放操作不会阻塞
  • ReentrantLock中就是unlock方法调用release(1)对应state-1
  • Semaphore中就是realease,也是state-1
  • CountDownLatch中就是countDown方法,也是state-1
  • 一般情况下,实现类都会实现tryAcquiretryRelease相关方法,以对应各个类的需求

6.3 AQS的用法

  1. 指定协作逻辑,实现获取和释放方法
  2. 在内部写一个Sync类继承AQS
  3. 根据是否独占来决定重写的方法:独占使用tryAcquire/tryRelease、共享使用tryAcquireShared(int acquires)/tryReleaseShared(int releases),在主逻辑里面的获取释放相关方法中调用Sync的方法

7. AQS在CountDownLatch中的源码剖析

  • 下面我们以CountDownLatch为例分析源码:
  • 构造函数
  • 我们看到内部实现就是初始化一个Sync然后把计数值传入
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
复制代码
  • 我们可以看下面的CountDownlatchSync的实现,在构造方法创建的Sync传入的count调用了setState方法传入了AQSstate
  • CountDownLatch内部有一个继承AQS的Sync
private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
复制代码
  • CountDownLatchgetCount()方法
public long getCount() {
        return sync.getCount();
    }
复制代码
  • 我们可以看到getCount实际也是调用SyncgetCount()来获取state并返回
  • CountDownLatchcountDown()方法
public void countDown() {
        sync.releaseShared(1);
    }
复制代码
  • 我们看一看到它直接调用了AQSreleaseShared(1)
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
复制代码
  • releaseShared则是回去调用CountDownLatch中实现的tryReleaseShared
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
复制代码
  • 而在tryReleaseShared中则是主要对state的值做-1操作,如果state大于零可以获取到就减一并且用CAS并发更新值,如果最新值为0就返回true
  • 返回true过后就doReleaseShared释放锁,唤醒队列里面的等待线程。也就是调用了await()方法的线程
  • CountDownLatchawait()方法
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
复制代码
  • await则会调用AQS中的默认实现sync.acquireSharedInterruptibly(1);
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
复制代码
  • 而里面则是调用tryAcquireShared(arg) < 0看是否小于0,如果小于0就代表没有获取到锁,就调用doAcquireSharedInterruptibly(arg);入队
  • tryAcquireShared则是在CountDownLatch中的Sync实现的
protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
复制代码
  • 如果当前state为0了(也就是说计数已经到0了)就返回一个1就不会满住上面的acquireSharedInterruptibly方法中的条件,就会放行,如果不等于0就会返回-1,此时就会入队。调用doAcquireSharedInterruptibly方法
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码
  • 这个方法首先会把当前线程在addWaiter中包装成一个Node节点并添加到队列尾部;而这个Node节点就是FIFO队列的节点。
  • 然后就会进入循环,如果当前节点不是head,那么就会进入到后面的判断,其中重要的是parkAndCheckInterrupt,方法如下:
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
复制代码
  • 它会调用LockSupportpark并且此park方法就是封装了Unsafe的native方法park()来把线程挂起进入阻塞状态
public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
复制代码
  • 再往下就没意义了。我们只需要知道doAcquireSharedInterruptibly方法就是把当前线程放到阻塞队列中,并且把线程阻塞就OK了。
  • AQS在CountDownLatch中使用的一些点:
  • 调用CountDownLatchawait()时,便会尝试获取共享锁,开始时是获取不到锁的,于是就被阻塞
  • 可以获取到的条件就是计数器为0,也就是state==0的时候。
  • 只有每次调用countDown方法才会使得计数器减一,减到0时就回去唤醒阻塞中的线程。

8. AQS在Semaphore中的源码剖析

  • 由于上面讲得很细了,接下来就简略一些
  • Semaphorestate就是许可证的数量
  • 主要的操作就是acquire和release,也是借用Sync对state的操作来控制线程的阻塞与唤醒
public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
复制代码
public void release() {
        sync.releaseShared(1);
    }
复制代码
  • 先看下acquire调用的acquireSharedInterruptibly此方法在上面已经说过。
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
复制代码
  • 而在Semaphore中Sync有两个实现:NonfairSyncFairSync
  • 在FairSync中tryAcquireShared就会有hasQueuedPredecessors判断,如果不是头节点,那就返回-1,在acquireSharedInterruptibly方法中去调用doAcquireSharedInterruptibly入队并且阻塞线程
protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
复制代码
  • 而在NonfairSync中而是直接调用SyncnonfairTryAcquireShared
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
复制代码
final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
复制代码
  • 可以看到其中并没有对是否阻塞队列的头节点判断,直接去获取值,判断是会否许可足够。
  • release中则是调用AQS的releaseShared其也是调用SemaphoreSynctryReleaseShared来判断是否需要释放锁,去唤醒阻塞线程
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
复制代码
  • tryReleaseShared
protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
复制代码
  • 我们可以看到此处就是关于Semaphore的已获取许可的释放 把state加回去然后用CAS更新state

9. AQS在ReentrantLock中的应用

  • 源码就不分析了
  • ReentrantLock中,state主要是重入的次数,加锁的时候state+1 ,而在释放锁的时候,state-1然后判断当前的state==0
  • ReentrantLock中与AQS相关的有三个类:UnfairSyncFairSyncSync
  • 关于加锁和解锁的逻辑也是AQS中的acquire方法的逻辑(获取锁失败就会放入队列中)和release方法(调用子类的tryRelease来去掉头部,并且唤醒线程)
  • 而加锁解锁中的逻辑,主要是公平锁和非公平锁的区别,公平锁会去判断是否在队列头部,如果在才会去执行,而非公平锁则会抢锁。不会管你是不是在队列头部。
  • 相信在上面的源码分析过后,分析ReentrantLock是十分简单的。大家可以自行分析。



目录
相关文章
|
7天前
|
JSON Java Apache
非常实用的Http应用框架,杜绝Java Http 接口对接繁琐编程
UniHttp 是一个声明式的 HTTP 接口对接框架,帮助开发者快速对接第三方 HTTP 接口。通过 @HttpApi 注解定义接口,使用 @GetHttpInterface 和 @PostHttpInterface 等注解配置请求方法和参数。支持自定义代理逻辑、全局请求参数、错误处理和连接池配置,提高代码的内聚性和可读性。
|
8天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
5天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
7天前
|
存储 缓存 安全
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见。本文介绍了使用 `File.createTempFile` 方法和自定义创建临时文件的两种方式,详细探讨了它们的使用场景和注意事项,包括数据缓存、文件上传下载和日志记录等。强调了清理临时文件、确保文件名唯一性和合理设置文件权限的重要性。
19 2
|
8天前
|
Java UED
Java中的多线程编程基础与实践
【10月更文挑战第35天】在Java的世界中,多线程是提升应用性能和响应性的利器。本文将深入浅出地介绍如何在Java中创建和管理线程,以及如何利用同步机制确保数据一致性。我们将从简单的“Hello, World!”线程示例出发,逐步探索线程池的高效使用,并讨论常见的多线程问题。无论你是Java新手还是希望深化理解,这篇文章都将为你打开多线程的大门。
|
存储 Java
【Java 虚拟机原理】线程栈 | 栈帧 | 局部变量表 | 反汇编字节码文件 | Java 虚拟机指令手册 | 程序计数器
【Java 虚拟机原理】线程栈 | 栈帧 | 局部变量表 | 反汇编字节码文件 | Java 虚拟机指令手册 | 程序计数器
126 0
【Java 虚拟机原理】线程栈 | 栈帧 | 局部变量表 | 反汇编字节码文件 | Java 虚拟机指令手册 | 程序计数器
|
9天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
18天前
|
安全 Java
java 中 i++ 到底是否线程安全?
本文通过实例探讨了 `i++` 在多线程环境下的线程安全性问题。首先,使用 100 个线程分别执行 10000 次 `i++` 操作,发现最终结果小于预期的 1000000,证明 `i++` 是线程不安全的。接着,介绍了两种解决方法:使用 `synchronized` 关键字加锁和使用 `AtomicInteger` 类。其中,`AtomicInteger` 通过 `CAS` 操作实现了高效的线程安全。最后,通过分析字节码和源码,解释了 `i++` 为何线程不安全以及 `AtomicInteger` 如何保证线程安全。
java 中 i++ 到底是否线程安全?
|
5天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
25 9
|
8天前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
22 3

推荐镜像

更多