前言
目前正在出一个Java多线程专题长期系列教程,从入门到进阶含源码解读, 篇幅会较多, 喜欢的话,给个关注❤️ ~
Java提供了一些非常好用的并发工具类,不需要我们重复造轮子,本节我们讲解CountDownLatch,一起来看下吧~
CountDownLatch
首先我们来看下这玩意是干啥用的。CountDownLatch同样的也是java.util.concurrent并发包下的工具类,通常我们会叫它是并发计数器,这个计数不是记12345,主要的使用场景是当一个任务被拆分成多个子任务时,需要等待子任务全部完成后,不然会阻塞线程,每完成一个任务计数器会-1,直到没有。这个有点类似go语言中的的sync.WaitGroup。
废话不多说,我们通过例子带大家快速入门, 在这之前,还需给大家补充一下它的常用方法~
public CountDownLatch(int count) {...}构造函数void await()是当前线程等待直到锁存储器计到0,或者线程被中断boolean await(long timeout, TimeUnit unit)是当前线程等待直到锁存储器计到0,或者线程被中断, 如果为0返回true, 可以指定等待的超时时间countDown()递减锁存器的计数,如果到0则释放所有等待的线程`getCount()获取锁存器的计数
下面我们看下具体的使用:
public class CountDownLaunchTest { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(10); IntStream.range(0, 10).forEach(i -> { new Thread(() -> { try { Thread.sleep(2000); System.out.println("worker ------> " + i); } catch (InterruptedException e) { e.printStackTrace(); }finally { countDownLatch.countDown(); } }).start(); }); countDownLatch.await(); System.out.println("completed !"); } } 复制代码
时间输出:
worker ------> 1 worker ------> 4 worker ------> 5 worker ------> 7 worker ------> 8 worker ------> 0 worker ------> 2 worker ------> 3 worker ------> 9 worker ------> 6 completed ! 进程已结束,退出代码0 复制代码
可以看到任务没有完全结束之前,主线程是阻塞状态
源码剖析
首先看下构造函数
private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } 复制代码
这个sync有没有很熟悉,这里又遇到了CAS,几乎涉及到多线程的实现类都会有
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } 复制代码
countDown
首先在构造函数中初始化状态,对应的setState(count);, 其实它的底层实现就是依赖AQS。CountDownLatch主要有两个方法一个是countDown一个是await,下面我们就来看下是如何实现的。
public void countDown() { sync.releaseShared(1); } 复制代码
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } 复制代码
tryReleaseShared()方法的实现在countDownLatch,自旋操作判断值是否为0,为0说明都执行完了,之前说的递减就是在这完成的,就会走到doReleaseShared也就是释放操作。有想过为啥c==0 返回false吗❓可以回顾上一步操作if (tryReleaseShared)才会去doReleaseShared,也就是任务全部执行完才会去释放,释放的过程其实是一个队列去完成的。
protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } 复制代码
doReleaseShared是`AbstractQueuedSynchronizer'的内部方法
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } 复制代码
这个方法之前给大家讲过,其实就是释放锁的操作。可以看到在这里只唤醒了头节点的后继节点,然后就返回了,为啥是后继节点,继续看unparkSuccessor
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 后继节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } 复制代码
那么剩余的其它线程怎么去释放呢?
await
再看下await(),同样的也调用了内部方法acquireSharedInterruptibly
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } 复制代码
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // CountDownLatch protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } 复制代码
重点在 doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 以共享模式添加到等待队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 返回前一个节点 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; failed = false; return; } } // 检查并更新未能获取的节点的状态。如果线程应该阻塞,则返回 true if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { // 失败就取消 if (failed) cancelAcquire(node); } } 复制代码
结束语
下节给大家讲下CyclicBarrier,跟CountDownLatch有点类似 ~