Java并发编程—并发流程控制与AQS原理及相关源码解析

简介: Java并发编程—并发流程控制与AQS原理及相关源码解析


Java并发编程

代码GitHub地址 github.com/imyiren/con…

  1. 刨根问底搞懂创建线程到底有几种方法?
  2. 如何正确得启动和停止一个线程 最佳实践与源码分析
  3. 多案例理解Object的wait,notify,notifyAll与Thread的sleep,yield,join等方法
  4. 了解线程属性,如何处理子线程异常
  5. 多线程安全和性能问题
  6. JMM(Java内存模型)在并发中的原理与应用
  7. 深入理解死锁问题及其解决方案
  8. 剖析线程池的使用与组成
  9. 带你一文搞懂ThreadLocal的用法以及内部原理
  10. J.U.C下Lock的分类及特点详解(结合案例和源码)
  11. J.U.C下各种Atomic类使用及CAS相关源码分析
  12. 结合源码分析ConcurrenthashMap与CopyOnWriteArrayList的原理
  13. 并发流程控制与AQS原理及相关源码解析

0. 主要内容

  • 文章分为两部分:
  1. 第一个部分主要讲并发流程控制的各大类的使用及案例
  2. 第二部分主要是先将AQS的组成及原理,然后结合CountDownLatch、Semaphore等分析源码逻辑

ps: 文章内容比较多

1. 并发流程控制

1.1 什么是并发流程控制

  • 并发流程控制,就是让线程之间相互配合完成任务,来满足业务逻辑
  • 如:让线程A等待线程B完成后再执行等策略

1.2 并发流程控制的工具

作用 说明
Semaphore 信号量:可以通过控制“许可”的数量,来保证线程间配合 线程只有拿到了许可才可以继续运行
CyclicBarrier 循环栅栏:线程会等待,直到足够多线程达到了规定数量,再执行下一步任务 适用于线程间相互等待处理结果就绪的场景
Phaser 和CyclicBarrier类似,但是计数可变 java7加入的新类
CountDownLatch 也是一个计数等待相关,数量地见到0时,触发动作 不可重复使用
Exchanger 让两个线程在合适时交换对象 适用于两个线程工作在同一个类的不同实例上时,用于交换数据
Condition 可以控制线程的等待和唤醒 是Object.wati()的升级版

2. CountDownLatch计数门闩

2.1 作用

  • 并发流程控制的工具,用于等待数量(我们设定的)足够后再执行某些任务

2.2 主要方法

  • CountDownLatch(int count):只有一个构造方法,参数count为需要倒数的值
  • await():调用此方法的线程会被挂起,它会等到count值为零的时候才继续执行
  • countdown():讲count减1,直到0,等待的线程会被唤醒

2.3 用法一:等待线程执行完毕

/**
 * @author yiren
 */
public class CountDownLatchExample01 {
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger integer = new AtomicInteger(1);
        CountDownLatch latch = new CountDownLatch(5);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName()+ " produce ....");
                    TimeUnit.SECONDS.sleep(1);
                    integer.incrementAndGet();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    latch.countDown();
                }
            });
        }
        System.out.println(Thread.currentThread().getName() + " waiting....");
        latch.await();
        System.out.println(Thread.currentThread().getName() + " finished!");
        System.out.println(Thread.currentThread().getName() + " num: " +  integer.get());
        executorService.shutdown();
    }
}
复制代码
pool-1-thread-1 produce ....
pool-1-thread-2 produce ....
pool-1-thread-3 produce ....
main waiting....
pool-1-thread-4 produce ....
pool-1-thread-5 produce ....
main finished!
main num: 6
Process finished with exit code 0
复制代码

2.4 用法二:多等一

/**
 * @author yiren
 */
public class CountDownLatchExample02 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " ready!");
                try {
                    latch.await();
                    System.out.println(Thread.currentThread().getName()+ " produce ....");
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    latch.countDown();
                }
            });
        }
        Thread.sleep(10);
        System.out.println(Thread.currentThread().getName() + " ready!");
        latch.countDown();
        System.out.println(Thread.currentThread().getName() + " go!");
        executorService.shutdown();
    }
}
复制代码
pool-1-thread-1 ready!
pool-1-thread-4 ready!
pool-1-thread-3 ready!
pool-1-thread-2 ready!
pool-1-thread-5 ready!
main ready!
main go!
pool-1-thread-1 produce ....
pool-1-thread-2 produce ....
pool-1-thread-5 produce ....
pool-1-thread-3 produce ....
pool-1-thread-4 produce ....
Process finished with exit code 0
复制代码

2.4 注意

  • CountDownLatch不仅可以无限等待,还可以给参数,在指定的事件内如果等到就唤醒线程继续执行
boolean await(long timeout, TimeUnit unit)
复制代码
  • CountDownLatch不能重用,如果涉及重新计数,可以使用CyclicBarrier或者新创建CountDownLatch

3. Semaphore信号量

3.1 信号量作用

  • Semaphore可以用来限制或管理数量有限的资源使用情况
  • 信号量的租用是维护一个许可计数,线程可以获取许可,然后信号量减一;线程也可以释放许可,信号量就加一;如果信号量的许可颁发完了,其他线程想要获取,就需要等待,直到有另外的线程释放了许可。

3.2 信号量使用

  1. 初始化Semaphore指定许可数量
  2. 在需要获取许可的代码前面加上acquire()或者acquireUniterruptibly()方法
  3. 任务执行完成有调用release()释放许可

3.3 主要方法

  • Semaphore(int permits, boolean fair)这里设置许可数量,以及是否使用公平策略。
  • 如果传入true那么久吧等待线程放入到FIFO的队列里面。
  • aquire()请求许可,可以响应中断
  • aquireUnniterruptibly()请求许可不可中断
  • tryAcquire()看看现在有没有空闲的许可,如果有那就返回true;这个方法还可以设置等待时间给一个timeout,让线程等待一段时间。
  • release()释放许可

3.4 案例演示

/**
 * @author yiren
 */
public class SemaphoreExample01 {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3, true);
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 8; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName()+" start to get permit");
                    semaphore.acquire();
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + " " + LocalDateTime.now() +" finished!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();
                }
            });
        }
        executorService.shutdown();
    }
}
复制代码
pool-1-thread-1 start to get permit
pool-1-thread-4 start to get permit
pool-1-thread-3 start to get permit
pool-1-thread-2 start to get permit
pool-1-thread-5 start to get permit
pool-1-thread-6 start to get permit
pool-1-thread-7 start to get permit
pool-1-thread-8 start to get permit
pool-1-thread-3 2020-02-21T19:54:47.392 finished!
pool-1-thread-1 2020-02-21T19:54:47.392 finished!
pool-1-thread-4 2020-02-21T19:54:47.392 finished!
pool-1-thread-6 2020-02-21T19:54:49.396 finished!
pool-1-thread-2 2020-02-21T19:54:49.396 finished!
pool-1-thread-5 2020-02-21T19:54:49.396 finished!
pool-1-thread-8 2020-02-21T19:54:51.401 finished!
pool-1-thread-7 2020-02-21T19:54:51.401 finished!
Process finished with exit code 0
复制代码

3.5 注意点

  • 获取和释放的许可证必须一致,acquire和release都是可以传入数值的来确定获取和释放的数量。如果我们获取和释放不一致,就会容易导致程序bug。当然也不是绝对,除非有特殊业务需求,否则都获取释放设置为一样的
  • 注意在初始化Semaphore的时候设置公平性,一般设置为true会比较合理。如果插队情况比较严重的话,某些线程可能一直阻塞
  • 获取和释放许可对线程并不要求,线程A获取了可以线程B释放。

4. Condition接口

4.1 作用

  • 当线程A需要等待某个任务或者某个资源,就可以执行condition.await()方法,然后就会陷入阻塞状态。
  • 此时另一个线程B,去获取资源或者执行任务完成后,调用condition.signal()或者signalAll()方法,通知线程A,继续执行
  • 这个类似于object.wait()notify()notifyAll()
  • signal()方法如果遇到多个线程都在等待的时候,会去唤醒等待时间最长的那个
  • 在我们ReentrantLock中就可以直接新建Condition。看下面案例

4.2 案例演示

  • 普通用法
/**
 * @author yiren
 */
public class ConditionExample01 {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            task1();
        });
        Thread thread2 = new Thread(() -> {
            task2();
        });
        thread1.start();
        Thread.sleep(100);
        thread2.start();
    }
    private static void task1() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " start await()");
            condition.await();
            System.out.println(Thread.currentThread().getName() + " await finished!");
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    private static void task2() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " start signal()");
            Thread.sleep(1000);
            condition.signal();
            System.out.println(Thread.currentThread().getName() + " signal finished!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
复制代码
Thread-0 start await()
Thread-1 start signal()
Thread-1 signal finished!
Thread-0 await finished!
Process finished with exit code 0
复制代码
  • 生产者消费者模式
/**
 * @author yiren
 */
public class ConditionExample02 {
    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();
    public static void main(String[] args) {
        ConditionExample02 conditionDemo2 = new ConditionExample02();
        Producer producer = conditionDemo2.new Producer();
        Consumer consumer = conditionDemo2.new Consumer();
        producer.start();
        consumer.start();
    }
    class Consumer extends Thread {
        @Override
        public void run() {
            consume();
        }
        private void consume() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("队列空,等待数据");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signalAll();
                    System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
                } finally {
                    lock.unlock();
                }
            }
        }
    }
    class Producer extends Thread {
        @Override
        public void run() {
            produce();
        }
        private void produce() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("队列满,等待有空余");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(1);
                    notEmpty.signalAll();
                    System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}
复制代码
  • 以上使用两个Condition作为队列满和空的通知传递工具在生产者和消费者之间互通

4.3 注意点

  • 我们知道Lock可以看做synchronized的替代方案,而Condition就是用来替代object.wait/notify的,在用法上几乎一致。
  • 调用await()方法时必须持有Lock锁,否则会抛出异常,并且await()方法会释放当前持有的Lock锁,
  • 一个Lock锁可以有多个Condition更加灵活

5. CyclicBarrier循环栅栏

5.1 作用

  • CyclicBarrier循环栅栏和CountDownLatch很类似,都能阻塞一组线程
  • 当需要多个线程配合完成任务,并最后需要统一汇总时,我们就可以使用CyclicBarrier,当某个线程完成任务后,它先会等待,等到所有线程都执行好了任务,再一起继续执行剩下的任务
  • 比如:同时出去聚餐约在了公司,等大家到公司了一起走过去。
  • 但是注意CyclicBarrier是可以重复使用的,这个和CountDownLatch不同

5.2 案例

/**
 * @author yiren
 */
public class CyclicBarrierExample {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有人都到场了, 大家统一出发!");
            }
        });
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(i, cyclicBarrier)).start();
        }
    }
    static class Task implements Runnable {
        private int id;
        private CyclicBarrier cyclicBarrier;
        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            System.out.println("线程" + id + "现在前往集合地点");
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("线程" + id + "到了集合地点,开始等待其他人到达");
                cyclicBarrier.await();
                System.out.println("线程" + id + "出发了");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}
复制代码
线程0现在前往集合地点
线程2现在前往集合地点
线程3现在前往集合地点
线程1现在前往集合地点
线程4现在前往集合地点
线程5现在前往集合地点
线程6现在前往集合地点
线程7现在前往集合地点
线程8现在前往集合地点
线程9现在前往集合地点
线程3到了集合地点,开始等待其他人到达
线程9到了集合地点,开始等待其他人到达
线程8到了集合地点,开始等待其他人到达
线程4到了集合地点,开始等待其他人到达
线程5到了集合地点,开始等待其他人到达
所有人都到场了, 大家统一出发!
线程5出发了
线程3出发了
线程8出发了
线程4出发了
线程9出发了
线程1到了集合地点,开始等待其他人到达
线程6到了集合地点,开始等待其他人到达
线程0到了集合地点,开始等待其他人到达
线程7到了集合地点,开始等待其他人到达
线程2到了集合地点,开始等待其他人到达
所有人都到场了, 大家统一出发!
线程2出发了
线程1出发了
线程7出发了
线程0出发了
线程6出发了
Process finished with exit code 0
复制代码
  • 每五个人到了过后,就出发一批

5.3 CountDownLatch和CyclicBarrier`区别

  • 作用不同:CountDownLatch使用countDown()是用于事件的,而CyclicBarrier使用await()是用于线程的
  • 可重用性不同:CountDownLatch在倒数到0后不能再次重用,除非创建新对象;而CyclicBarrier是可以直接重用的

6. 深入AQS理解J.U.C的根基

6.1 AQS作用及其重要性

  • AQS在CountDownLatch等工具内都有使用,全称是:AbstractQueuedSynchronizer是一个抽象类
  • 锁和上面的线程并发控制类(Semaphore等)都有类似的地方。 其实他们底层都是使用了AQS作为基类的拓展
  • 正因为他们很多工作都类似,JDK就把这部分通用逻辑抽离了出来,提供给他们直接使用,使其不必关注很多深层次的细节,从而完成他们的功能。
  • 我们可以大致看一下我们锁用到的这些并发控制的工具类和锁的内部实现
  • `Semaphore``
public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        Sync(int permits) {
            setState(permits);
        }
  ......
复制代码
  • ReentrantLock
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
    ......
复制代码
  • CountDownLatch
public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {
            setState(count);
        }
    ......
复制代码
  • 由上源码我们可以看到,里面都有一个内部类,Sync继承自AbstractQueuedSynchronizer
  • 那么AQS是用来干些什么事情的呢?
  • J.U.C基本都是是基于AQS实现的,AQS是一个用于构建锁、同步器、线程协作工具类的框架供给子类使用,主要使用模板模式来设计。
  • 它主要工作就是管理线程的阻塞与唤醒,实现同步的管理,以及阻塞线程的队列管理工作

6.2 AQS的组成及内部原理

  • AbstractQueuedSynchronizer自JDK1.5加入,是基于FIFO等待队列实现的一个用于同步器的基础框架。
  • JDK1.8 继承AQS实现的类:

  • 我们可以看到,在可重入锁,读写锁,计数门闩等,信号量里面都是用了AQS的子类,接下来我们就学习一下AQS的内部原理
  1. AQS的三大部分
  • state:状态,
  • FIFO队列:线程竞争锁的管理队列
  • 获取和释放方法:需要工具类去实现的方法
  1. state:状态
/**
     * The synchronization state.
     */
    private volatile int state;
复制代码
  • 它的含义并不具体,根据实现的不同而不同,如:Semaphore内是剩余许可数量、CountDownLatch内是还需要倒数的数量,可看做一个计数器,只是不同类的作用及意义不用
protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
复制代码
  • 状态值的更新,是使用Unsafe的CAS完成
  • 在ReentrantLock中:state表示锁的占用情况,可重入的计数,每重入一次就加一,当要释放锁时,它的值就会变成0,表示不被任何线程占有。
  1. FIFO队列:
/**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;
    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;
复制代码
  • 这个队列是用来存放等待的线程的,AQS会对这个队列进行管理。当多个线程竞争锁时,没有拿到锁的,就会被翻到队列中,当前拿到锁的执行任务的线程结束,AQS就会从队列中选一个线程来占有这个锁。
  • AQS维护一个双向链表的等待队列,把等待线程都放到这个队列里面管理;队列头节点是当前拿到锁的线程;在AQS中保存了这个队列的头尾节点。
  1. 获取和释放的方法
  • 获取方法:
  • 获取操作会依赖state变量,经常会阻塞,如:获取不到锁的时候,获取不到许可的时候等
  • ReentrantLock中,就是获取锁。state+1
  • Semaphore中就是acquire获取许可,state-1,当state==0就会阻塞
  • CountDownLatch中就是await方法,就是等待state==0
  • 释放方法:
  • 释放操作不会阻塞
  • ReentrantLock中就是unlock方法调用release(1)对应state-1
  • Semaphore中就是realease,也是state-1
  • CountDownLatch中就是countDown方法,也是state-1
  • 一般情况下,实现类都会实现tryAcquiretryRelease相关方法,以对应各个类的需求

6.3 AQS的用法

  1. 指定协作逻辑,实现获取和释放方法
  2. 在内部写一个Sync类继承AQS
  3. 根据是否独占来决定重写的方法:独占使用tryAcquire/tryRelease、共享使用tryAcquireShared(int acquires)/tryReleaseShared(int releases),在主逻辑里面的获取释放相关方法中调用Sync的方法

7. AQS在CountDownLatch中的源码剖析

  • 下面我们以CountDownLatch为例分析源码:
  • 构造函数
  • 我们看到内部实现就是初始化一个Sync然后把计数值传入
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
复制代码
  • 我们可以看下面的CountDownlatchSync的实现,在构造方法创建的Sync传入的count调用了setState方法传入了AQSstate
  • CountDownLatch内部有一个继承AQS的Sync
private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
复制代码
  • CountDownLatchgetCount()方法
public long getCount() {
        return sync.getCount();
    }
复制代码
  • 我们可以看到getCount实际也是调用SyncgetCount()来获取state并返回
  • CountDownLatchcountDown()方法
public void countDown() {
        sync.releaseShared(1);
    }
复制代码
  • 我们看一看到它直接调用了AQSreleaseShared(1)
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
复制代码
  • releaseShared则是回去调用CountDownLatch中实现的tryReleaseShared
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
复制代码
  • 而在tryReleaseShared中则是主要对state的值做-1操作,如果state大于零可以获取到就减一并且用CAS并发更新值,如果最新值为0就返回true
  • 返回true过后就doReleaseShared释放锁,唤醒队列里面的等待线程。也就是调用了await()方法的线程
  • CountDownLatchawait()方法
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
复制代码
  • await则会调用AQS中的默认实现sync.acquireSharedInterruptibly(1);
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
复制代码
  • 而里面则是调用tryAcquireShared(arg) < 0看是否小于0,如果小于0就代表没有获取到锁,就调用doAcquireSharedInterruptibly(arg);入队
  • tryAcquireShared则是在CountDownLatch中的Sync实现的
protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
复制代码
  • 如果当前state为0了(也就是说计数已经到0了)就返回一个1就不会满住上面的acquireSharedInterruptibly方法中的条件,就会放行,如果不等于0就会返回-1,此时就会入队。调用doAcquireSharedInterruptibly方法
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码
  • 这个方法首先会把当前线程在addWaiter中包装成一个Node节点并添加到队列尾部;而这个Node节点就是FIFO队列的节点。
  • 然后就会进入循环,如果当前节点不是head,那么就会进入到后面的判断,其中重要的是parkAndCheckInterrupt,方法如下:
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
复制代码
  • 它会调用LockSupportpark并且此park方法就是封装了Unsafe的native方法park()来把线程挂起进入阻塞状态
public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
复制代码
  • 再往下就没意义了。我们只需要知道doAcquireSharedInterruptibly方法就是把当前线程放到阻塞队列中,并且把线程阻塞就OK了。
  • AQS在CountDownLatch中使用的一些点:
  • 调用CountDownLatchawait()时,便会尝试获取共享锁,开始时是获取不到锁的,于是就被阻塞
  • 可以获取到的条件就是计数器为0,也就是state==0的时候。
  • 只有每次调用countDown方法才会使得计数器减一,减到0时就回去唤醒阻塞中的线程。

8. AQS在Semaphore中的源码剖析

  • 由于上面讲得很细了,接下来就简略一些
  • Semaphorestate就是许可证的数量
  • 主要的操作就是acquire和release,也是借用Sync对state的操作来控制线程的阻塞与唤醒
public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
复制代码
public void release() {
        sync.releaseShared(1);
    }
复制代码
  • 先看下acquire调用的acquireSharedInterruptibly此方法在上面已经说过。
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
复制代码
  • 而在Semaphore中Sync有两个实现:NonfairSyncFairSync
  • 在FairSync中tryAcquireShared就会有hasQueuedPredecessors判断,如果不是头节点,那就返回-1,在acquireSharedInterruptibly方法中去调用doAcquireSharedInterruptibly入队并且阻塞线程
protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
复制代码
  • 而在NonfairSync中而是直接调用SyncnonfairTryAcquireShared
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
复制代码
final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
复制代码
  • 可以看到其中并没有对是否阻塞队列的头节点判断,直接去获取值,判断是会否许可足够。
  • release中则是调用AQS的releaseShared其也是调用SemaphoreSynctryReleaseShared来判断是否需要释放锁,去唤醒阻塞线程
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
复制代码
  • tryReleaseShared
protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
复制代码
  • 我们可以看到此处就是关于Semaphore的已获取许可的释放 把state加回去然后用CAS更新state

9. AQS在ReentrantLock中的应用

  • 源码就不分析了
  • ReentrantLock中,state主要是重入的次数,加锁的时候state+1 ,而在释放锁的时候,state-1然后判断当前的state==0
  • ReentrantLock中与AQS相关的有三个类:UnfairSyncFairSyncSync
  • 关于加锁和解锁的逻辑也是AQS中的acquire方法的逻辑(获取锁失败就会放入队列中)和release方法(调用子类的tryRelease来去掉头部,并且唤醒线程)
  • 而加锁解锁中的逻辑,主要是公平锁和非公平锁的区别,公平锁会去判断是否在队列头部,如果在才会去执行,而非公平锁则会抢锁。不会管你是不是在队列头部。
  • 相信在上面的源码分析过后,分析ReentrantLock是十分简单的。大家可以自行分析。



目录
相关文章
|
6天前
|
Java
Java中ReentrantLock释放锁代码解析
Java中ReentrantLock释放锁代码解析
23 8
|
7天前
|
安全 Java 开发者
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第9天】本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化。我们将详细解析Java中的同步机制,包括synchronized关键字、Lock接口以及并发集合等,并探讨它们如何影响程序的性能。此外,我们还将讨论Java内存模型,以及它如何影响并发程序的行为。最后,我们将提供一些实用的并发编程技巧和最佳实践,帮助开发者编写出既线程安全又高效的Java程序。
20 3
|
8天前
|
Java
Java 并发编程:深入理解线程池
【4月更文挑战第8天】本文将深入探讨 Java 中的线程池技术,包括其工作原理、优势以及如何使用。线程池是 Java 并发编程的重要工具,它可以有效地管理和控制线程的执行,提高系统性能。通过本文的学习,读者将对线程池有更深入的理解,并能在实际开发中灵活运用。
|
5天前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第11天】 在Java中,高效的并发编程是提升应用性能和响应能力的关键。本文将探讨Java并发的核心概念,包括线程安全、锁机制、线程池以及并发集合等,同时提供实用的编程技巧和最佳实践,帮助开发者在保证线程安全的前提下,优化程序性能。我们将通过分析常见的并发问题,如竞态条件、死锁,以及如何利用现代Java并发工具来避免这些问题,从而构建更加健壮和高效的多线程应用程序。
|
1天前
|
设计模式 运维 安全
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第15天】在Java开发中,多线程编程是提升应用程序性能和响应能力的关键手段。然而,它伴随着诸多挑战,尤其是在保证线程安全的同时如何避免性能瓶颈。本文将探讨Java并发编程的核心概念,包括同步机制、锁优化、线程池使用以及并发集合等,旨在为开发者提供实用的线程安全策略和性能优化技巧。通过实例分析和最佳实践的分享,我们的目标是帮助读者构建既高效又可靠的多线程应用。
|
3天前
|
Java 编译器
Java并发编程中的锁优化策略
【4月更文挑战第13天】 在Java并发编程中,锁是一种常见的同步机制,用于保证多个线程之间的数据一致性。然而,不当的锁使用可能导致性能下降,甚至死锁。本文将探讨Java并发编程中的锁优化策略,包括锁粗化、锁消除、锁降级等方法,以提高程序的执行效率。
11 4
|
4天前
|
Java 调度 开发者
Java 21时代的标志:虚拟线程带来的并发编程新境界
Java 21时代的标志:虚拟线程带来的并发编程新境界
14 0
|
4天前
|
Java
Java 15 神秘登场:隐藏类解析未知领域
Java 15 神秘登场:隐藏类解析未知领域
10 0
|
4天前
|
安全 Java 编译器
接口之美,内部之妙:深入解析Java的接口与内部类
接口之美,内部之妙:深入解析Java的接口与内部类
24 0
接口之美,内部之妙:深入解析Java的接口与内部类
|
5天前
|
运维 NoSQL 算法
Java开发-深入理解Redis Cluster的工作原理
综上所述,Redis Cluster通过数据分片、节点发现、主从复制、数据迁移、故障检测和客户端路由等机制,实现了一个分布式的、高可用的Redis解决方案。它允许数据分布在多个节点上,提供了自动故障转移和读写分离的功能,适用于需要大规模、高性能、高可用性的应用场景。
15 0

推荐镜像

更多