Java Review - 并发编程_ CountDownLatch原理&源码剖析

简介: Java Review - 并发编程_ CountDownLatch原理&源码剖析

cba922bb0f354216abcff72d9dbe4d11.png195d03d17afc4a928bc581f313b01dfe.png

Pre


每日一博 - CountDownLatch使用场景分析以及源码分析


在日常开发中经常会遇到需要在主线程中开启多个线程去并行执行任务,并且主线程需要等待所有子线程执行完毕后再进行汇总的场景。


在CountDownLatch出现之前一般都使用线程的join()方法来实现这一点,但是join方法不够灵活,不能够满足不同场景的需要,所以JDK开发组提供了CountDownLatch这个类,使用CountDownLatch会更优雅.


小Demo

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/12/19 10:46
 * @mark: show me the code , change the world
 */
public class CountDownLatchTest {
    // 创建一个CountDownLatch实例
    private static volatile CountDownLatch countDownLatch = new CountDownLatch(2);
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName() + " 模拟业务运行");
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 业务运行Over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                // 子线程执行结束,减1
                countDownLatch.countDown();
            }
        });
        executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName() + " 模拟业务运行");
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName() + " 业务运行Over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                // 子线程执行结束,减1
                countDownLatch.countDown();
            }
        });
        // 等待子线程执行执行结束  返回
        countDownLatch.await();
        System.out.println( "子线程业务运行Over,主线程继续工作");
        executorService.shutdown();
    }
}


cba922bb0f354216abcff72d9dbe4d11.png


如上代码中,


创建了一个CountDownLatch实例,因为有两个子线程所以构造函数的传参为2。


主线程调用countDownLatch.await()方法后会被阻塞。


子线程执行完毕后调用countDownLatch.countDown()方法让countDownLatch内部的计数器减1


所有子线程执行完毕并调用countDown()方法后计数器会变为0,这时候主线程的await()方法才会返回。


CountDownLatch VS join方法


调用一个子线程的join()方法后,该线程会一直被阻塞直到子线程运行完毕


而CountDownLatch则使用计数器来允许子线程运行完毕或者在运行中递减计数,也就是CountDownLatch可以在子线程运行的任何时候让await方法返回而不一定必须等到线程结束


另外,使用线程池来管理线程时一般都是直接添加Runable到线程池,这时候就没有办法再调用线程的join方法了,就是说countDownLatch相比join方法让我们对线程同步有更灵活的控制


类图关系


0e720dd842224ce68a68d10c4ded0fbf.png


从类图可以看出,CountDownLatch是使用AQS实现的。

通过下面的构造函数, 实际上是把计数器的值赋给了AQS的状态变量state,也就是这里使用AQS的状态值来表示计数器值。


01d3d58595cb46238cff24b3faf45a89.png

 /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
 Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }


核心方法&源码解析


接下来分析CountDownLatch中的几个重要的方法,看它们是如何调用AQS来实现功能的。


void await()


当线程调用CountDownLatch对象的await方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回


当所有线程都调用了CountDownLatch对象的countDown方法后,也就是计数器的值为0时


其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程就会抛出InterruptedException异常,然后返回

   public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

await()方法委托sync调用了AQS的acquireSharedInterruptibly方法

  // AQS获取共享资源时响应中断的方法
   public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
         // 响应中断 
        if (Thread.interrupted())
            throw new InterruptedException();
       // 查看当前计数器是否为0 ,为0 直接返回,否则进入AQS队列等待  
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }


6c50096d58064cd08aec3f1719d04c53.png


86f95847a1804491aee2adffa5b26130.png

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }


由如上代码可知,该方法的特点是线程获取资源时可以被中断,并且获取的资源是共享资源。


acquireSharedInterruptibly首先判断当前线程是否已被中断,若是则抛出异常,否则调用sync实现的tryAcquireShared方法查看当前状态值(计数器值)是否为0,是则当前线程的await()方法直接返回,否则调用AQS的doAcquireSharedInterruptibly方法让当前线程阻塞。


另外可以看到,这里tryAcquireShared传递的arg参数没有被用到,调用tryAcquireShared的方法仅仅是为了检查当前状态值是不是为0,并没有调用CAS让当前状态值减1。


boolean await(long timeout, TimeUnit unit)


当线程调用了CountDownLatch对象的该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回


当所有线程都调用了CountDownLatch对象的countDown方法后,也就是计数器值为0时,这时候会返回true


设置的timeout时间到了,因为超时而返回false


其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出InterruptedException异常,然后返回

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

void countDown()


线程调用该方法后,计数器的值递减,递减后如果计数器值为0则唤醒所有因调用await方法而被阻塞的线程,否则什么都不做

下面看下countDown()方法是如何调用AQS的方法的。


 /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
      // 委托调用AQS的releaseShared
        sync.releaseShared(1);
    }

AQS的方法

  /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
      // 调用syn实现的tryReleaseShared
        if (tryReleaseShared(arg)) {
          // AQS释放资源的方法
            doReleaseShared();
            return true;
        }
        return false;
    }


1545c4ca401c4e4bbb4ada8aedbe8b3f.png

在如上代码中,releaseShared首先调用了sync实现的AQS的tryReleaseShared方法,代码如下


980d6302f97542539533e9ab562f3524.png


       protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            // 循环进行CAS, 直到当前线程成功弯沉CAS使计数器值(状态值state)减一 并更新state
            for (;;) {
                int c = getState();
                // 1 如果状态值为0 ,则直接返回
                if (c == 0)
                    return false;
               // 2 使用CAS让计数器减1     
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

如上代码


首先获取当前状态值(计数器值)。


代码(1)判断如果当前状态值为0则直接返回false,从而countDown()方法直接返回


否则执行代码(2)使用CAS将计数器值减1,CAS失败则循环重试,否则如果当前计数器值为0则返回true,返回true说明是最后一个线程调用的countdown方法,那么该线程除了让计数器值减1外,还需要唤醒因调用CountDownLatch的await方法而被阻塞的线程,具体是调用AQS的doReleaseShared方法来激活阻塞的线程


这里代码(1)貌似是多余的,其实不然,之所以添加代码(1)是为了防止当计数器值为0后,其他线程又调用了countDown方法,如果没有代码(1),状态值就可能会变成负数。


long getCount()


获取当前计数器的值,也就是AQS的state的值,一般在测试时使用该方法

  /**
     * Returns the current count.
     *
     * <p>This method is typically used for debugging and testing purposes.
     *
     * @return the current count
     */
    public long getCount() {
        return sync.getCount();
    }


在其内部还是调用了AQS的getState方法来获取state的值(计数器当前值)


小结


CountDownLatch是使用AQS实现的。使用AQS的状态变量state来存放计数器的值。


首先在初始化CountDownLatch时设置状态值(计数器值),当多个线程调用countdown方法时实际是原子性递减AQS的状态值。


当线程调用await方法后当前线程会被放入AQS的阻塞队列等待计数器为0再返回。其他线程调用countdown方法让计数器值递减1,当计数器值变为0时,当前线程还要调用AQS的doReleaseShared方法来激活由于调用await()方法而被阻塞的线程。

相关文章
|
1月前
|
Java 开发者
Java并发编程:CountDownLatch实战解析
Java并发编程:CountDownLatch实战解析
365 100
|
3月前
|
监控 Java API
现代 Java IO 高性能实践从原理到落地的高效实现路径与实战指南
本文深入解析现代Java高性能IO实践,涵盖异步非阻塞IO、操作系统优化、大文件处理、响应式网络编程与数据库访问,结合Netty、Reactor等技术落地高并发应用,助力构建高效可扩展的IO系统。
123 0
|
4月前
|
存储 缓存 Java
我们来详细讲一讲 Java NIO 底层原理
我是小假 期待与你的下一次相遇 ~
169 2
|
3月前
|
人工智能 安全 Java
Go与Java泛型原理简介
本文介绍了Go与Java泛型的实现原理。Go通过单态化为不同类型生成函数副本,提升运行效率;而Java则采用类型擦除,将泛型转为Object类型处理,保持兼容性但牺牲部分类型安全。两种机制各有优劣,适用于不同场景。
110 24
|
3月前
|
存储 缓存 安全
深入讲解 Java 并发编程核心原理与应用案例
本教程全面讲解Java并发编程,涵盖并发基础、线程安全、同步机制、并发工具类、线程池及实际应用案例,助你掌握多线程开发核心技术,提升程序性能与响应能力。
165 0
|
安全 Java
Java并发编程笔记之CopyOnWriteArrayList源码分析
并发包中并发List只有CopyOnWriteArrayList这一个,CopyOnWriteArrayList是一个线程安全的ArrayList,对其进行修改操作和元素迭代操作都是在底层创建一个拷贝数组(快照)上进行的,也就是写时拷贝策略。
19633 0
|
Java 安全
Java并发编程笔记之读写锁 ReentrantReadWriteLock 源码分析
我们知道在解决线程安全问题上使用 ReentrantLock 就可以,但是 ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而实际情况下会有写少读多的场景,显然 ReentrantLock 满足不了需求,所以 ReentrantReadWriteLock 应运而生,ReentrantReadWriteLock 采用读写分离,多个线程可以同时获取读锁。
3248 0
|
Java
Java并发编程笔记之FutureTask源码分析
FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
4380 0
|
Java 调度 API
Java并发编程笔记之Timer源码分析
timer在JDK里面,是很早的一个API了。具有延时的,并具有周期性的任务,在newScheduledThreadPool出来之前我们一般会用Timer和TimerTask来做,但是Timer存在一些缺陷,为什么这么说呢?   Timer只创建唯一的线程来执行所有Timer任务。
3112 0