CountDownLatch 闭锁源码分析

简介: 闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态【CPJ 3.4.2】。闭锁的作用相当于一扇门∶ 在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。

功能简介


闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态【CPJ 3.4.2】。闭锁的作用相当于一扇门∶  在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如∶


  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示"资源R已经被初始化",而所有需要 R 的操作都必须先在这个闭锁上等待。


  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S 时,将首先在S依赖的其他服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖 S 的服务才能继续执行。


  • 等待直到某个操作的所有参与者(例如,在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况中,当所有玩家都准备就绪时,闭锁将到达结束状态。


CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而 await方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么 await 会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。


使用案例


TestHarness 中给出了闭锁的两种常见用法。TestHarness 创建一定数量的线程,利用它们并发地执行指定的任务。它使用两个闭锁,分别表示"起始门(Starting Gate)"和"结束门(Ending Gate)"。起始门计数器的初始值为1,而结束门计数器的初始值为工作线程的数量。每个工作线程首先要做的值就是在启动门上等待,从而确保所有线程都就绪后才开始执行。而每个线程要做的最后一件事情是将调用结束门的 countDown 方法减1,这能使主线程高效地等待直到所有工作线程都执行完成,因此可以统计所消耗的时间。


public class TestHarness {
    public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);
        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread(() -> {
                try {
                    startGate.await();
                    try {
                        task.run();
                    } finally {
                        endGate.countDown();
                    }
                } catch (InterruptedException ignored) {
                }
            });
            t.start();
        }
        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;
    }
    public static void main(String[] args) throws InterruptedException {
        TestHarness testHarness = new TestHarness();
        AtomicInteger num = new AtomicInteger(0);
        long time = testHarness.timeTasks(10, () -> System.out.println(num.incrementAndGet()));
        System.out.println("cost time: " + time + "ms");
    }
}
//输出结果
1
10
9
8
7
5
6
4
3
2
cost time: 2960900ms


为什么要在 TestHarness 中使用闭锁,而不是在线程创建后就立即启动? 或许,我们希望测试 n 个线程并发执行某个任务时需要的时间。如果在创建线程后立即启动它们,那么先启动的线程将"领先"后启动的线程,并且活跃线程数量会随着时间的推移而增加或减少,竞争程度也在不断发生变化。启动门将使得主线程能够问时释放所有工作线程,而结束门则使主线程能够等待最后一个线程执行完成,而不是顺序地等待每个线程执行完成。


使用总结


CountDownLatch 是一次性的,计算器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch 使用完毕后,它不能再次被使用。


源码分析


代码分析


CountDownLatch 在底层还是采用 AbstractQueuedSynchronizer 实现.

CountDownLatch startGate = **new **CountDownLatch(1);


我们先看它的构造方法, 创建了一个 sync 对象.


public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}


SyncAbstractQueuedSynchronizer 的一个实现, 按照字面意思我们可以猜到它是公平方式实现


private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
    // 构造方法
    Sync(int count) {
        setState(count);
    }
    // 获取资源数
    int getCount() {
        return getState();
    }
    // 获取锁
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    // 释放锁
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            // CAS 解锁
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}


在 await 方法中如果存在计算值, 那么当前线程将进入 AQS 队列生成 Node 节点, 线程进入阻塞状态


public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}


其实主要是获取共享锁


public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}


CountDownLatch.Sync 实现了 tryAcquireShared 方法 ,如果 getState() == 0 返回 1 , 否则返回 -1. 也就是说创建 CountDownLatch 实例后再执行 await 方法将继续调用 doAcquireSharedInterruptibly(arg);


// 是否可获取共享锁
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}
// 尝试获取锁, 或者入队
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


在 countDown 方法如果存在等待的线程, 将对其进行唤醒. 或者减少 CountDownLatch 资源数.


public void countDown() {
    sync.releaseShared(1);
}


通过 releaseShared 对共享锁进行解锁


public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}


最终会调用 doReleaseShared 唤醒 AQS 中的头节点


private void doReleaseShared() {
    /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}


详细流程如下图


源码流程图


image.png

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/3d2af84a954c463ea266d14234688c60~tplv-k3u1fbpfcp-zoom-in-crop-mark:1304:0:0:0.awebp


参考资料


  • 《Java 并发编程实战》



相关文章
|
8月前
|
设计模式 Java
CountDownLatch和CyclicBarrier源码详解
我现在有个场景:现在我有50个任务,这50个任务在完成之后,才能执行下一个函数,要是你,你怎么设计?可以用JDK给我们提供的线程工具类,CountDownLatch和CyclicBarrier都可以完成这个需求。基于AQS实现,会将构造CountDownLatch的入参传递至statecountDown()就是在利用CAS将state减1,await)实际就是让头节点一直在等待state为0时,释放所有等待的线程。
76 1
|
5月前
|
Java 数据库 开发者
CountDownLatch、CyclicBarrier和Semaphore原理和区别
CountDownLatch、CyclicBarrier和Semaphore
|
8月前
多线程并发之CountDownLatch(闭锁)使用详解
多线程并发之CountDownLatch(闭锁)使用详解
434 0
|
安全 Dubbo Java
JUC系列(七)| JUC三大常用工具类CountDownLatch、CyclicBarrier、Semaphore
JUC系列(七)| JUC三大常用工具类CountDownLatch、CyclicBarrier、Semaphore
476 0
JUC系列(七)| JUC三大常用工具类CountDownLatch、CyclicBarrier、Semaphore
面试官:说说CountDownLatch,CyclicBarrier,Semaphore的原理?
CountDownLatch适用于在多线程的场景需要等待所有子线程全部执行完毕之后再做操作的场景。
|
Java
Java并发编程之CountDownLatch闭锁
Java并发编程之CountDownLatch闭锁
143 0