Java源码分析之CountDownLatch

简介:         一、CountDownLatch介绍       CountDownLatch是一种同步手段,允许一个或者更多的线程等待,直到在其他线程正在执行的一组操作完成。给定count数目后CountDownLatch被初始化。

        一、CountDownLatch介绍

       CountDownLatch是一种同步手段,允许一个或者更多的线程等待,直到在其他线程正在执行的一组操作完成。给定count数目后CountDownLatch被初始化。await()方法阻塞,直到由于调用countDown()方法,当前count值达到0,之后所有等待线程被释放,而任何后续await()方法的调用会立即返回。这个是只有一次的现场,即count值无法被重设。如果你需要一个能够重设count值的版本,不妨考虑使用CyclicBarrier。

        二、CountDownLatch应用

        CountDownLatch是一个通用的同步工具,可用于许多目的。一个用count值为1来初始化的CountDownLatch可用作一个开关或者门闩:所有的线程调用await()方法等待一个线程调用countDown()方法后把门打开。一个用count值为N来初始化的CountDownLatch可用作使得一个线程等待,直到N个线程完成各自的事情,或者一些action被完成N次等。CountDownLatch一个有用的特性是:它不需要线程调用countDown()方法等待计数达到零在继续之前,它只是阻止任何线程继续过去一个等待,直到所有线程可以通过。

        1、示例应用一

        这里有一对类,一组工作线程使用两个countdown latches的示例:

        第一个是启动信号,阻止worker线程工作直到driver准备好;

        第二个是完成信号,允许driver等到所有的workers工作完成。

       代码如下:

package com.pengli.jdk;

import java.util.concurrent.CountDownLatch;

public class TestCountDownLatch {

	class Driver {
		void main() throws InterruptedException {
			CountDownLatch startSignal = new CountDownLatch(1);
			CountDownLatch doneSignal = new CountDownLatch(20);

			for (int i = 0; i < 20; ++i) // create and start threads
				new Thread(new Worker(startSignal, doneSignal)).start();

			doSomethingElse(); // don't let run yet
			startSignal.countDown(); // let all threads proceed
			doSomethingElse();
			doneSignal.await(); // wait for all to finish
		}

		void doSomethingElse() {
			// ...
		}
	}

	class Worker implements Runnable {
		private final CountDownLatch startSignal;
		private final CountDownLatch doneSignal;

		Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
			this.startSignal = startSignal;
			this.doneSignal = doneSignal;
		}

		public void run() {
			try {
				startSignal.await();
				doWork();
				doneSignal.countDown();
			} catch (InterruptedException ex) {
			} // return;
		}

		void doWork() {
			// ...
		}
	}
}

        2、示例应用二

        另一个典型用法是将一个问题分成n份,在一个线程中定义并执行一份,并在latch中count down,然后将所有的线程放入一个队列。当所有的部分完成,协调线程将会通过await()方法,继续处理。当线程必须以这种方式反复count down时,使用CyclicBarrier。

        代码如下:

package com.pengli.jdk;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class TestCountDownLatch2 {

	class Driver2 { // ...
		void main() throws InterruptedException {
			CountDownLatch doneSignal = new CountDownLatch(20);
			Executor e = Executors.newFixedThreadPool(20);

			for (int i = 0; i < 20; ++i) // create and start threads
				e.execute(new WorkerRunnable(doneSignal, i));

			doneSignal.await(); // wait for all to finish
		}
	}

	class WorkerRunnable implements Runnable {
		private final CountDownLatch doneSignal;
		private final int i;

		WorkerRunnable(CountDownLatch doneSignal, int i) {
			this.doneSignal = doneSignal;
			this.i = i;
		}

		public void run() {
			doWork(i);
			doneSignal.countDown();
		}

		void doWork(int i) {
			// ...
		}
	}
}
        三、CountDownLatch实现分析

        1、Sync

        在CountDownLatch内部,有一个Sync的同步器,它继承自java.util.concurrent包中各种同步工具共用的AbstractQueuedSynchronizer,其实现如下:

    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
        关于AbstractQueuedSynchronizer,有其它的文章进行专门的介绍。这里只分析下Sync的实现。其有一个需要入参int count的构造函数,设置AbstractQueuedSynchronizer的state。并覆写了tryAcquireShared()和tryReleaseShared()方法,其中tryReleaseShared()方法用于CountDownLatch的countDown()方法,这个tryReleaseShared()方法的逻辑如下:

        在一个for循环内,首先通过getState()获取state值,如果为0,直接返回false,否则取state-1,并尝试CAS操作,修改state状态,并且state等于0,返回true,否则返回false。

        tryAcquireShared()方法更简单,判断state(即count值),如果等于0,返回1,否则返回-1.

        2、countDown()

        countDown()方法的实现很简单,如下:

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1);
    }
        其核心处理就是减少latch中count值,如果cout值为0,释放所有的等待线程。它调用的是sync的releaseShared()方法,而这个方法是在AbstractQueuedSynchronizer中实现的,如下:

    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
        先调用tryReleaseShared()方法,即上述Sync的同名方法,并且如果返回true的话,继续调用doReleaseShared()方法,返回true,否则返回false。即如果修改后state(即count)值为正,不做其他处理,否则调用doReleaseShared()方法。

        3、await()

         await()方法的核心作用是,让当前线程阻塞,直到latch的count值更改为0,或者当前线程被interrupted。如果count值为0,则await()方法直接返回。代码如下:

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
        还是借助的sync,调用的其acquireSharedInterruptibly()方法,这个方法是在Sync的父类中实现的,代码如下:

    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
        先调用Sync的tryAcquireShared()方法,如果返回值为负值,则调用doAcquireSharedInterruptibly()方法。上面讲到了,如果state(即count)值为0,则返回1,方法直接返回,否则进入doAcquireSharedInterruptibly()方法,实现阻塞。

        doReleaseShared()和doAcquireSharedInterruptibly()方法的介绍参见AbstractQueuedSynchronizer的分析文章。






目录
打赏
0
0
0
0
23
分享
相关文章
基于Java+Springboot+Vue开发的鲜花商城管理系统源码+运行
基于Java+Springboot+Vue开发的鲜花商城管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的鲜花商城管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。技术学习共同进步
268 7
JUC并发—1.Java集合包底层源码剖析
本文主要对JDK中的集合包源码进行了剖析。
智慧工地源码,Java语言开发,微服务架构,支持分布式和集群部署,多端覆盖
智慧工地是“互联网+建筑工地”的创新模式,基于物联网、移动互联网、BIM、大数据、人工智能等技术,实现对施工现场人员、设备、材料、安全等环节的智能化管理。其解决方案涵盖数据大屏、移动APP和PC管理端,采用高性能Java微服务架构,支持分布式与集群部署,结合Redis、消息队列等技术确保系统稳定高效。通过大数据驱动决策、物联网实时监测预警及AI智能视频监控,消除数据孤岛,提升项目可控性与安全性。智慧工地提供专家级远程管理服务,助力施工质量和安全管理升级,同时依托可扩展平台、多端应用和丰富设备接口,满足多样化需求,推动建筑行业数字化转型。
116 5
|
3月前
|
【源码】【Java并发】【ThreadLocal】适合中学者体质的ThreadLocal源码阅读
前言 下面,跟上主播的节奏,马上开始ThreadLocal源码的阅读( ̄▽ ̄)" 内部结构 如下图所示,我们可以知道,每个线程,都有自己的threadLocals字段,指向ThreadLocalMap
424 81
【源码】【Java并发】【ThreadLocal】适合中学者体质的ThreadLocal源码阅读
智慧班牌源码,采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署
智慧班牌系统是一款基于信息化与物联网技术的校园管理工具,集成电子屏显示、人脸识别及数据交互功能,实现班级信息展示、智能考勤与家校互通。系统采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署与私有化定制。核心功能涵盖信息发布、考勤管理、教务处理及数据分析,助力校园文化建设与教学优化。其综合性和可扩展性有效打破数据孤岛,提升交互体验并降低管理成本,适用于日常教学、考试管理和应急场景,为智慧校园建设提供全面解决方案。
301 70
家政系统源码,java版本
这是一款基于SpringBoot后端框架、MySQL数据库及Uniapp移动端开发的家政预约上门服务系统。
家政系统源码,java版本
Java 集合面试题从数据结构到 HashMap 源码剖析详解及长尾考点梳理
本文深入解析Java集合框架,涵盖基础概念、常见集合类型及HashMap的底层数据结构与源码实现。从Collection、Map到Iterator接口,逐一剖析其特性与应用场景。重点解读HashMap在JDK1.7与1.8中的数据结构演变,包括数组+链表+红黑树优化,以及put方法和扩容机制的实现细节。结合订单管理与用户权限管理等实际案例,展示集合框架的应用价值,助你全面掌握相关知识,轻松应对面试与开发需求。
112 3
Java基于SaaS模式多租户ERP系统源码
ERP,全称 Enterprise Resource Planning 即企业资源计划。是一种集成化的管理软件系统,它通过信息技术手段,将企业的各个业务流程和资源管理进行整合,以提高企业的运营效率和管理水平,它是一种先进的企业管理理念和信息化管理系统。 适用于小微企业的 SaaS模式多租户ERP管理系统, 采用最新的技术栈开发, 让企业简单上云。专注于小微企业的应用需求,如企业基本的进销存、询价,报价, 采购、销售、MRP生产制造、品质管理、仓库库存管理、财务应收付款, OA办公单据、CRM等。
176 23
|
3月前
|
【源码】【Java并发】【ReentrantLock】适合中学者体质的ReentrantLock源码阅读
因为本文说的是ReentrantLock源码,因此会默认,大家对AQS有基本的了解(比如同步队列、条件队列大概> 长啥样?)。 不懂AQS的小朋友们,你们好呀!也欢迎先看看这篇
106 13
【源码】【Java并发】【ReentrantLock】适合中学者体质的ReentrantLock源码阅读
|
3月前
|
【源码】【Java并发】【ConcurrentHashMap】适合中学体质的ConcurrentHashMap
本文深入解析了ConcurrentHashMap的实现原理,涵盖JDK 7与JDK 8的区别、静态代码块、构造方法、put/get/remove核心方法等。JDK 8通过Node数组+链表/红黑树结构优化并发性能,采用CAS和synchronized实现高效锁机制。文章还详细讲解了hash计算、表初始化、扩容协助及计数更新等关键环节,帮助读者全面掌握ConcurrentHashMap的工作机制。
99 6
【源码】【Java并发】【ConcurrentHashMap】适合中学体质的ConcurrentHashMap

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问