AbstractQueuedSynchronizer理解(CountDownLatch)

简介: AbstractQueuedSynchronizer理解(CountDownLatch)

本文分析一下CountDownLatch是如何运用AQS的

CountDownLatch是什么

CountDownLatch顾名思义它是一个Latch(门闩),它是用一个计数器实现的,初始状态计数器的数值等于线程数,每当有线程完成任务后,计数器就会减一。当state为0时,锁就会被释放,凡是之前因抢占锁而等待的线程这时候就会被唤醒继续抢占锁。

CountDownLatch小栗子

public static void main(String[] args) throws InterruptedException{
    int threadSize = 3;
    CountDownLatch doneSignal = new CountDownLatch(threadSize);

    for (int i = 1; i <= threadSize; i++) {
        final int threadNum = i;
        new Thread(() -> {
            System.out.println("thread" + threadNum + ":start");

            try {
                Thread.sleep(1000 * threadNum);
            } catch (InterruptedException e) {
                System.out.println("thread" + threadNum + ":exception");
            }

            doneSignal.countDown();
            System.out.println("thread" + threadNum + ":complete");
        }).start();
    }

    System.out.println("main thread:await");
    doneSignal.await();
    System.out.println("main thread:go on");
}

例子中主线程启动了三条子线程,睡眠一段时间,此时主线程在等待所有子线程结束后才会继续执行下去;
看一下输出结果:

main thread:await
thread1:start
thread2:start
thread3:start
thread1:complete
thread2:complete
thread3:complete
main thread:go on

Process finished with exit code 0

CountDownLatch原理分析

既然CountDownLatch也是AQS的一种使用方式,我们看一下它的内部类Syc是怎么实现AQS的:

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
    
    //构造函数,初始化同步状态state的值,即线程个数
    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    //这里重写了方法,在共享模式下,告诉调用者是否可以抢占state锁了,正数代表可以,负数代表否定;当state为0时返回正数
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    //共享模式下释放锁
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            //state为0时说明没有什么可释放
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                //CAS对state操作成功后返回state值是否为0,为0则释放成功
                return nextc == 0;
        }
    }
}

看完了重写的AQS同步器后,我们了解了CountDownLatch对state锁的描述。接下来先看主线程调用的await方法,在await方法里调用了AQS的acquireSharedInterruptibly:

//在共享模式下尝试抢占锁
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //线程中断抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //尝试抢占前先查询一下是否可以抢占,如果返回值大于0程序往下执行,小于0则等待
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}


private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //在Reentrant解析中我们看过,往队列中新增node(共享模式)
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                //如果当前node的前继时head,马上尝试抢占锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //如果state==0即允许往下执行,重新设置head并往下传播信号
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    //得到往下执行的允许
                    return;
                }
            }
            //以下都跟Reentrant一样
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    //将当前node设置为head,清空node的thread、prev
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    //如果propagate大于0,或者原来head的等待状态小于0或者现在head的等待状态小于0
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        //准备唤醒下一个节点
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                //如果head的状态为SIGNAL,更改状态为0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                //唤醒后继节点
                unparkSuccessor(h);
            }
            //如果head状态为0,更改状态为PROPAGATE
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        //如果head没有改变,结束当前loop,如果遇到head被别的线程改变,继续loop
        if (h == head)                   // loop if head changed
            break;
    }
}

释放锁的信号一直向后传播,直到所有node被唤醒并继续执行,那第一个信号时何时发起的呢?我们来看一下CountDownLatch的countDown方法,该方法调用了sync的releaseShared方法:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        //如果同步状态state为0时,调用doReleaseShared,在这里就发出了第一个唤醒所有等待node的信号,然后信号自动往后传播
        doReleaseShared();
        return true;
    }
    return false;
}

总结

CountDownLatch在调用await的时候判断state释放为0,如果大于0则阻塞当前线程,将当前线程的node添加到队列中等待;在调用countDown时当遇到state减到0时,发出释放共享锁的信号,从头节点的后记节点开始往后传递信号,将队列等待的线程逐个唤醒并继续往下执行;
在这里state跟Reentrant的state独占锁含义不同,state的含义是由AQS的子类去描述的。

相关文章
|
SQL 分布式计算 Oracle
使用Sqoop从Oracle数据库导入数据
使用Sqoop从Oracle数据库导入数据
使用Sqoop从Oracle数据库导入数据
|
XML JSON Java
Spring Boot 返回 XML 数据,一分钟搞定!
Spring核心技术 67 篇文章13 订阅 订阅专栏 Spring Boot 返回 XML 数据,前提必须已经搭建了 Spring Boot 项目,所以这一块代码就不贴了,可以点击查看之前分享的 Spring Boot 返回 JSON 数据,一分钟搞定!。
Spring Boot 返回 XML 数据,一分钟搞定!
|
安全 Linux 编译器
探索Linux内核的奥秘:从零构建操作系统####
本文旨在通过深入浅出的方式,带领读者踏上一段从零开始构建简化版Linux操作系统的旅程。我们将避开复杂的技术细节,以通俗易懂的语言,逐步揭开Linux内核的神秘面纱,探讨其工作原理、核心组件及如何通过实践加深理解。这既是一次对操作系统原理的深刻洞察,也是一场激发创新思维与实践能力的冒险。 ####
|
12月前
|
监控 Java Shell
监控堆外第三方监控工具Zabbix
监控堆外第三方监控工具Zabbix
288 5
|
负载均衡 Java API
Spring Cloud Gateway 详解:构建高效的API网关解决方案
Spring Cloud Gateway 详解:构建高效的API网关解决方案
558 0
|
索引 Python
【Pandas】Pandas Dataframe 常用用法
Pandas DataFrame的常用操作示例,包括筛选数据、索引操作、合并DataFrame、设置和排序索引、文本处理、列重命名、处理缺失值、排序以及删除满足特定条件的行等技巧。
388 0
|
存储 小程序 API
支付宝小程序:揭秘如何以低成本撬动商业价值的杠杆
【8月更文挑战第27天】支付宝小程序是阿里巴巴打造的一款轻量级应用平台,它降低了开发成本和技术门槛,简化了开发流程。用户无需下载安装即可享受快捷服务,提升了用户体验。依托支付宝庞大的用户基础,小程序能迅速触及潜在用户,降低推广成本。它不仅支持基本功能,还能无缝集成支付宝的核心服务如支付、信用等,在电商、金融等多个领域展现出独特优势。尽管存在功能限制等问题,但支付宝小程序已成为实现商业价值的重要工具。
323 1
|
Kubernetes jenkins 持续交付
在K8S中,Jenkins如何集成K8S集群?
在K8S中,Jenkins如何集成K8S集群?
|
JSON 数据挖掘 API
京东商品详情API:解锁电商数据的金钥匙
**《京东商品详情API:解锁电商数据》** 本文介绍如何通过京东API获取商品详情,包括注册成为开发者、获取App Key和App Secret、申请API权限、理解`jd.item.get`接口、构建HTTP请求、解析JSON响应。应用场景广泛,如电商平台、比价、数据分析和移动应用。注意调用限制、数据安全和合规性。示例代码展示了Python调用流程。利用此API可增强电商竞争力。请按最新文档和政策执行。
|
前端开发 JavaScript Java
基于JavaWeb机票订购系统(含前后台)(Java+spring+jsp+bootstrap+mysql)
基于JavaWeb机票订购系统(含前后台)(Java+spring+jsp+bootstrap+mysql)
237 3