【Java并发编程 十】JUC并发包下的工具类

简介: 【Java并发编程 十】JUC并发包下的工具类

JUC并发包下有四个并发工具类,闭锁CountDownlatch栅栏CyclicBarrier信号量Semaphore交换器Exchanger

  • CountDownlatch通常用于主线程等待其他任务线程执行完毕的场景,类似于Join
  • CyclicBarrier主要阻塞当前线程,等待其他线程(大家无论谁先跑到A点,必须要等其他线程也到达了A点,大家才能继续)。
  • 信号量Semaphore可以用来控制同时访问特定资源的线程数量(比如100个线程只能有10个线程可以获得MySQL连接)。
  • 交换器Exchanger很少用,只适用于两个线程在同步点交换数据的场景

接下来分别详细的介绍下这四种工具类。

CountDownlatch

CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待CountDownLatch也叫闭锁,使得一(多)个主线程必须等待其他线程完成操作后再执行。

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

CountDownLatch内部维护一个Sync 类,包含计数器(继承自AQS的state),主线程先执行await方法,如果此时计数器大于0,则阻塞等待。当一个线程完成任务后,计数器值减1。直到计数器为0时,表示所有的线程已经完成任务,等待的主线程被唤醒继续执行

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
    public static void main(String[] args) {
        List<String> list1 = Arrays.asList("AAA", "BBB", "CCC");
        List<String> list2 = Arrays.asList("DDD", "EEE", "FFF");
        List<String> list3 = Arrays.asList("GGG", "HHH", "III");
        CountDownLatch countDownLatch = new CountDownLatch(3);
        new Thread(()->{
            for (String string: list1) {
                System.out.println(Thread.currentThread().getName() + ":" + string);
            }
            countDownLatch.countDown();  //锁减去1
        }).start();
        new Thread(()->{
            for (String string: list2) {
                System.out.println(Thread.currentThread().getName() + ":" + string);
            }
            countDownLatch.countDown(); //锁减去1
        }).start();
        new Thread(()->{
            for (String string: list3) {
                System.out.println(Thread.currentThread().getName() + ":" + string);
            }
            countDownLatch.countDown(); //锁减去1,为0
        }).start();
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Print Task Finish!");
    }
}

返回结果为:

Thread-0:AAA
Thread-0:BBB
Thread-0:CCC
Thread-1:DDD
Thread-1:EEE
Thread-1:FFF
Thread-2:GGG
Thread-2:HHH
Thread-2:III
Print Task Finish!

使用三个线程来打印三个List,三个线程任务都完成得时候才允许主线程继续允许输出Print Task Finish!

CyclicBarrier

阻塞当前线程,等待其他线程。等待其它线程,且会阻塞自己当前线程,所有线程必须同时到达栅栏位置后才能继续执行;所有线程到达栅栏处,可以触发执行另外一个预先设置的线程

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
    public static void main(String[] args) throws InterruptedException {
        new CyclicBarrierDemo().go();
    }
    private void go() throws InterruptedException {
        //初始化栅栏得参与者数为3
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        new Thread(new Task(cyclicBarrier), "Thread1").start();
        Thread.sleep(1000);
        new Thread(new Task(cyclicBarrier), "Thread2").start();
        Thread.sleep(1000);
        new Thread(new Task(cyclicBarrier), "Thread3").start();
    }
    class Task implements Runnable{
        private CyclicBarrier cyclicBarrier;
        public Task(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            System.out.println("线程" + Thread.currentThread().getName()
                    + "已经送达"
                    + System.currentTimeMillis());
            try {
                cyclicBarrier.await();  //栅栏唤醒,拦住执行线程
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程" + Thread.currentThread().getName()
                    + "开始处理"
                    + System.currentTimeMillis());
        }
    }
}

返回结果为:

线程Thread1已经送达1614786037856
线程Thread2已经送达1614786038856
线程Thread3已经送达1614786039857
线程Thread3开始处理1614786039857
线程Thread1开始处理1614786039857
线程Thread2开始处理1614786039857

可以看到只有三个线程都到达了,才开始处理,CyclicBarrier和CountDownLatch的区别是:

  • CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待
  • CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier

可以看出二者场景不同。

Semaphore

Semaphore也叫信号量,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。

  • 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。
  • 访问资源后,使用release释放许可。
  • Semaphore和ReentrantLock类似,获取许可有公平策略和非公平许可策略,默认情况下使用非公平策略。

信号量Semaphore得应用场景:Semaphore可以用来做流量分流,特别是对公共资源有限的场景,比如数据库连接。假设有这个的需求,读取几万个文件的数据到数据库中,由于文件读取是IO密集型任务,可以启动几十个线程并发读取,但是数据库连接数只有10个,这时就必须控制最多只有10个线程能够拿到数据库连接进行操作。这个时候,就可以使用Semaphore做流量控制

package com.company;
import java.util.concurrent.*;
public class ThreadTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newCachedThreadPool();
        //只能5个线程同时访问
        Semaphore semaphore = new Semaphore(5);
        //模拟20个客户端访问
        for (int i = 0; i < 20; i++) {
            final int NO = i;
            pool.execute(()->{
                try {
                    //获取许可
                    semaphore.acquire();
                    System.out.println("Accessing: " + NO);
                    Thread.sleep(5000);
                    //访问完毕后释放
                    System.out.println("每隔5秒释放出5个信号量供线程使用" );
                    Thread.sleep(5000);
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        //退出线程池
        pool.shutdown();
    }
}

返回结果如下:

Accessing: 1
Accessing: 4
Accessing: 0
Accessing: 2
Accessing: 3
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
Accessing: 5
Accessing: 8
Accessing: 7
Accessing: 6
Accessing: 9
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
Accessing: 10
Accessing: 11
Accessing: 12
Accessing: 13
Accessing: 14
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
Accessing: 16
Accessing: 17
Accessing: 15
Accessing: 18
Accessing: 19
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用

semaphore.acquire();表示开启信号量限制,在限制被解除前,一次只能有5个线程能活动,即使开启了20个线程。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可

Exchanger类

Exchanger(交换者)是一个用于线程间数据交换协作的工具类。它提供一个同步点,在这个同步点多个线程间两两之间线程可以交换彼此的数据。这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方

public static void main(String[] args)   {
       public static void main(String[] args)   {
        Exchanger<String> exchanger = new Exchanger<>();
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        threadPool.execute(()->{
            try {
                //女生对男生说的话
                TimeUnit.SECONDS.sleep(5);
                System.out.println("Girl快一点");
                String girl = exchanger.exchange("hi girl");
                System.out.println("girl said: " + girl);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        threadPool.execute(()->{
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println("BOY慢一点");
                //男生对女生说的话
                String boy = exchanger.exchange("hi boy");
                System.out.println("boy said:" + boy);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        threadPool.shutdown();
    }

返回结果为:

BOY慢一点
Girl快一点
//获得了交换打印值
girl said: hi boy
boy said:hi girl

可以看的出在exchange相交之前,BOY慢一点打印完之后没有执行hi girl,而是等待Girl快一点所属线程也执行到了exchanger 点之后两个线程才一起打印。

相关文章
|
15天前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
21 0
|
17天前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
1天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
24 12
|
14天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
14天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
38 3
|
19天前
|
开发框架 安全 Java
Java 反射机制:动态编程的强大利器
Java反射机制允许程序在运行时检查类、接口、字段和方法的信息,并能操作对象。它提供了一种动态编程的方式,使得代码更加灵活,能够适应未知的或变化的需求,是开发框架和库的重要工具。
35 2
|
20天前
|
安全 Java 开发者
Java中的多线程编程:从基础到实践
本文深入探讨了Java多线程编程的核心概念和实践技巧,旨在帮助读者理解多线程的工作原理,掌握线程的创建、管理和同步机制。通过具体示例和最佳实践,本文展示了如何在Java应用中有效地利用多线程技术,提高程序性能和响应速度。
54 1
|
4月前
|
安全 Java 调度
解锁Java并发编程高阶技能:深入剖析无锁CAS机制、揭秘魔法类Unsafe、精通原子包Atomic,打造高效并发应用
【8月更文挑战第4天】在Java并发编程中,无锁编程以高性能和低延迟应对高并发挑战。核心在于无锁CAS(Compare-And-Swap)机制,它基于硬件支持,确保原子性更新;Unsafe类提供底层内存操作,实现CAS;原子包java.util.concurrent.atomic封装了CAS操作,简化并发编程。通过`AtomicInteger`示例,展现了线程安全的自增操作,突显了这些技术在构建高效并发程序中的关键作用。
75 1
|
27天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
1月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####