Java并发系列之六 CyclicBarrier源码解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: Java并发系列之六 CyclicBarrier源码解析

CyclicBarrier概述



CyclicBarrier字面意思是可循环使用的线程屏障。


CyclicBarrier的功能和CountDownLatch功能有点相似。都能实现线程间相互等待,直到线程做完某些任务,唤醒等待线程。那么既然他们功能类似,提供一种解决方案不就行了吗,为什么还要再提供一个呢。原因是他们的侧重点其实还不一样。在CountDownLatch中我们把线程归类为两种,一类是工作线程,一类是阻塞线程。工作线程执行完任务,可以调用countDown()方法,释放共享锁,阻塞线程则是通过await()方法 自旋获取共享锁,注意这里工作线程是不会阻塞的。在CyclicBarrier中其实只有一类线程,那就是工作线程,假设有5个工作线程,工作线程执行完任务,会判断其他4个工作线程是否执行结束,如果还有线程没有执行完,那么工作线程会阻塞,等待其他线程结束完。当其他线程都执行完了才会执行下一步,CyclicBarrier会阻塞工作线程。而且CyclicBarrier内部使用的是ReentrantLock和Condition。我们知道Condition可以让线程阻塞,并且放入到Condition的单链表中。

public class CyclicBarrierUsage {
    public static void main(String[] args) {
        int N = 5;//一共有五个线程,如果线程都执行完了
        CyclicBarrier cyclicBarrier = new CyclicBarrier(N, new Runnable() {//
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " --- after run ");
            }
        });
        for (int i = 0; i < N; i++) {//五个工作线程
            new Thread() {
                @Override
                public void run() {
                    super.run();
                    try {
                        TimeUnit.SECONDS.sleep(1);//模拟工作
                        System.out.println(Thread.currentThread().getName() + " --- completed ");
                        cyclicBarrier.await();//执行完工作 等待其他线程完成
                        System.out.println(Thread.currentThread().getName() + " --- run again ");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }.start();
        }
    }
}

输出结果


Thread-2 — completed


Thread-1 — completed


Thread-0 — completed


Thread-4 — completed


Thread-3 — completed


Thread-3 — after run


Thread-3 — run again


Thread-2 — run again


Thread-0 — run again


Thread-4 — run again


Thread-1 — run again


构建CyclicBarrier的时候我们往构造函数传递了一个Runnable对象,这个Runnable会在线程屏障点到达的时候,被刚好到达屏障点的那个线程执行。所以它是在工作线程中执行的。如果我们在Android开发中,需要在UI线程中执行Runnable还需要转到UI线程执行


源码解析


构造函数

 public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;//线程个数
        this.count = parties;
        this.barrierCommand = barrierAction;//到达屏障点 需要执行的任务
    }
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

成员变量

/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();

通过成员变量我们可以知道内部使用的就是ReentrantLock和Condition


await()

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

接下来我们详细讲解下dowait(),基本上精华都在里面了

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();//独占锁上锁
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            int index = --count;//线程获取到锁了,count-1
            if (index == 0) {  // index=0表示所有线程都执行过了,触发屏障
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();//执行Runnable
                    ranAction = true;
                    nextGeneration();//nextGeneration会调用trip.signalAll(),唤醒所有等待在trip上的线程
                    return 0;
                } finally {
                    if (!ranAction)//如果抛异常了 唤醒所有等待在trip上的线程
                        breakBarrier();
                }
            }
            // 如果index!=0表示还有其他线程没有执行过,那么调用trip.await(),让当前线程阻塞
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }


相关文章
|
11天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
40 2
|
3天前
|
数据采集 存储 Web App开发
Java爬虫:深入解析商品详情的利器
在数字化时代,信息处理能力成为企业竞争的关键。本文探讨如何利用Java编写高效、准确的商品详情爬虫,涵盖爬虫技术概述、Java爬虫优势、开发步骤、法律法规遵守及数据处理分析等内容,助力电商领域市场趋势把握与决策支持。
|
7天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
12天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
70 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
57 0
|
1月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
62 0
|
1月前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
84 0
|
24天前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
41 3
|
1月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
57 5

推荐镜像

更多
下一篇
无影云桌面