【Java并发编程 十】JUC并发包下的工具类

简介: 【Java并发编程 十】JUC并发包下的工具类

JUC并发包下有四个并发工具类,闭锁CountDownlatch栅栏CyclicBarrier信号量Semaphore交换器Exchanger

  • CountDownlatch通常用于主线程等待其他任务线程执行完毕的场景,类似于Join
  • CyclicBarrier主要阻塞当前线程,等待其他线程(大家无论谁先跑到A点,必须要等其他线程也到达了A点,大家才能继续)。
  • 信号量Semaphore可以用来控制同时访问特定资源的线程数量(比如100个线程只能有10个线程可以获得MySQL连接)。
  • 交换器Exchanger很少用,只适用于两个线程在同步点交换数据的场景

接下来分别详细的介绍下这四种工具类。

CountDownlatch

CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待CountDownLatch也叫闭锁,使得一(多)个主线程必须等待其他线程完成操作后再执行。

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

CountDownLatch内部维护一个Sync 类,包含计数器(继承自AQS的state),主线程先执行await方法,如果此时计数器大于0,则阻塞等待。当一个线程完成任务后,计数器值减1。直到计数器为0时,表示所有的线程已经完成任务,等待的主线程被唤醒继续执行

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
    public static void main(String[] args) {
        List<String> list1 = Arrays.asList("AAA", "BBB", "CCC");
        List<String> list2 = Arrays.asList("DDD", "EEE", "FFF");
        List<String> list3 = Arrays.asList("GGG", "HHH", "III");
        CountDownLatch countDownLatch = new CountDownLatch(3);
        new Thread(()->{
            for (String string: list1) {
                System.out.println(Thread.currentThread().getName() + ":" + string);
            }
            countDownLatch.countDown();  //锁减去1
        }).start();
        new Thread(()->{
            for (String string: list2) {
                System.out.println(Thread.currentThread().getName() + ":" + string);
            }
            countDownLatch.countDown(); //锁减去1
        }).start();
        new Thread(()->{
            for (String string: list3) {
                System.out.println(Thread.currentThread().getName() + ":" + string);
            }
            countDownLatch.countDown(); //锁减去1,为0
        }).start();
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Print Task Finish!");
    }
}

返回结果为:

Thread-0:AAA
Thread-0:BBB
Thread-0:CCC
Thread-1:DDD
Thread-1:EEE
Thread-1:FFF
Thread-2:GGG
Thread-2:HHH
Thread-2:III
Print Task Finish!

使用三个线程来打印三个List,三个线程任务都完成得时候才允许主线程继续允许输出Print Task Finish!

CyclicBarrier

阻塞当前线程,等待其他线程。等待其它线程,且会阻塞自己当前线程,所有线程必须同时到达栅栏位置后才能继续执行;所有线程到达栅栏处,可以触发执行另外一个预先设置的线程

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
    public static void main(String[] args) throws InterruptedException {
        new CyclicBarrierDemo().go();
    }
    private void go() throws InterruptedException {
        //初始化栅栏得参与者数为3
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        new Thread(new Task(cyclicBarrier), "Thread1").start();
        Thread.sleep(1000);
        new Thread(new Task(cyclicBarrier), "Thread2").start();
        Thread.sleep(1000);
        new Thread(new Task(cyclicBarrier), "Thread3").start();
    }
    class Task implements Runnable{
        private CyclicBarrier cyclicBarrier;
        public Task(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            System.out.println("线程" + Thread.currentThread().getName()
                    + "已经送达"
                    + System.currentTimeMillis());
            try {
                cyclicBarrier.await();  //栅栏唤醒,拦住执行线程
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程" + Thread.currentThread().getName()
                    + "开始处理"
                    + System.currentTimeMillis());
        }
    }
}

返回结果为:

线程Thread1已经送达1614786037856
线程Thread2已经送达1614786038856
线程Thread3已经送达1614786039857
线程Thread3开始处理1614786039857
线程Thread1开始处理1614786039857
线程Thread2开始处理1614786039857

可以看到只有三个线程都到达了,才开始处理,CyclicBarrier和CountDownLatch的区别是:

  • CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待
  • CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier

可以看出二者场景不同。

Semaphore

Semaphore也叫信号量,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。

  • 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。
  • 访问资源后,使用release释放许可。
  • Semaphore和ReentrantLock类似,获取许可有公平策略和非公平许可策略,默认情况下使用非公平策略。

信号量Semaphore得应用场景:Semaphore可以用来做流量分流,特别是对公共资源有限的场景,比如数据库连接。假设有这个的需求,读取几万个文件的数据到数据库中,由于文件读取是IO密集型任务,可以启动几十个线程并发读取,但是数据库连接数只有10个,这时就必须控制最多只有10个线程能够拿到数据库连接进行操作。这个时候,就可以使用Semaphore做流量控制

package com.company;
import java.util.concurrent.*;
public class ThreadTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newCachedThreadPool();
        //只能5个线程同时访问
        Semaphore semaphore = new Semaphore(5);
        //模拟20个客户端访问
        for (int i = 0; i < 20; i++) {
            final int NO = i;
            pool.execute(()->{
                try {
                    //获取许可
                    semaphore.acquire();
                    System.out.println("Accessing: " + NO);
                    Thread.sleep(5000);
                    //访问完毕后释放
                    System.out.println("每隔5秒释放出5个信号量供线程使用" );
                    Thread.sleep(5000);
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        //退出线程池
        pool.shutdown();
    }
}

返回结果如下:

Accessing: 1
Accessing: 4
Accessing: 0
Accessing: 2
Accessing: 3
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
Accessing: 5
Accessing: 8
Accessing: 7
Accessing: 6
Accessing: 9
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
Accessing: 10
Accessing: 11
Accessing: 12
Accessing: 13
Accessing: 14
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
Accessing: 16
Accessing: 17
Accessing: 15
Accessing: 18
Accessing: 19
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用
每隔5秒释放出5个信号量供线程使用

semaphore.acquire();表示开启信号量限制,在限制被解除前,一次只能有5个线程能活动,即使开启了20个线程。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可

Exchanger类

Exchanger(交换者)是一个用于线程间数据交换协作的工具类。它提供一个同步点,在这个同步点多个线程间两两之间线程可以交换彼此的数据。这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方

public static void main(String[] args)   {
       public static void main(String[] args)   {
        Exchanger<String> exchanger = new Exchanger<>();
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        threadPool.execute(()->{
            try {
                //女生对男生说的话
                TimeUnit.SECONDS.sleep(5);
                System.out.println("Girl快一点");
                String girl = exchanger.exchange("hi girl");
                System.out.println("girl said: " + girl);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        threadPool.execute(()->{
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println("BOY慢一点");
                //男生对女生说的话
                String boy = exchanger.exchange("hi boy");
                System.out.println("boy said:" + boy);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        threadPool.shutdown();
    }

返回结果为:

BOY慢一点
Girl快一点
//获得了交换打印值
girl said: hi boy
boy said:hi girl

可以看的出在exchange相交之前,BOY慢一点打印完之后没有执行hi girl,而是等待Girl快一点所属线程也执行到了exchanger 点之后两个线程才一起打印。

相关文章
|
2天前
|
Java 数据库连接 数据库
如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面
本文介绍了如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面。通过合理配置初始连接数、最大连接数和空闲连接超时时间,确保系统性能和稳定性。文章还探讨了同步阻塞、异步回调和信号量等并发控制策略,并提供了异常处理的最佳实践。最后,给出了一个简单的连接池示例代码,并推荐使用成熟的连接池框架(如HikariCP、C3P0)以简化开发。
8 2
|
3天前
|
缓存 Java 调度
Java中的多线程编程:从基础到实践
【10月更文挑战第24天】 本文旨在为读者提供一个关于Java多线程编程的全面指南。我们将从多线程的基本概念开始,逐步深入到Java中实现多线程的方法,包括继承Thread类、实现Runnable接口以及使用Executor框架。此外,我们还将探讨多线程编程中的常见问题和最佳实践,帮助读者在实际项目中更好地应用多线程技术。
10 3
|
5天前
|
监控 安全 Java
Java多线程编程的艺术与实践
【10月更文挑战第22天】 在现代软件开发中,多线程编程是一项不可或缺的技能。本文将深入探讨Java多线程编程的核心概念、常见问题以及最佳实践,帮助开发者掌握这一强大的工具。我们将从基础概念入手,逐步深入到高级主题,包括线程的创建与管理、同步机制、线程池的使用等。通过实际案例分析,本文旨在提供一种系统化的学习方法,使读者能够在实际项目中灵活运用多线程技术。
|
6天前
|
存储 安全 Java
Java编程中的对象序列化与反序列化
【10月更文挑战第22天】在Java的世界里,对象序列化和反序列化是数据持久化和网络传输的关键技术。本文将带你了解如何在Java中实现对象的序列化与反序列化,并探讨其背后的原理。通过实际代码示例,我们将一步步展示如何将复杂数据结构转换为字节流,以及如何将这些字节流还原为Java对象。文章还将讨论在使用序列化时应注意的安全性问题,以确保你的应用程序既高效又安全。
|
3天前
|
缓存 安全 Java
Java中的多线程编程:从基础到实践
【10月更文挑战第24天】 本文将深入探讨Java中的多线程编程,包括其基本原理、实现方式以及常见问题。我们将从简单的线程创建开始,逐步深入了解线程的生命周期、同步机制、并发工具类等高级主题。通过实际案例和代码示例,帮助读者掌握多线程编程的核心概念和技术,提高程序的性能和可靠性。
8 2
|
4天前
|
Java
Java中的多线程编程:从基础到实践
本文深入探讨Java多线程编程,首先介绍多线程的基本概念和重要性,接着详细讲解如何在Java中创建和管理线程,最后通过实例演示多线程的实际应用。文章旨在帮助读者理解多线程的核心原理,掌握基本的多线程操作,并能够在实际项目中灵活运用多线程技术。
|
4天前
|
Java 程序员 开发者
Java编程中的异常处理艺术
【10月更文挑战第24天】在Java的世界里,代码就像一场精心编排的舞蹈,每一个动作都要精准无误。但就像最完美的舞者也可能踩错一个步伐一样,我们的程序偶尔也会遇到意外——这就是所谓的异常。本文将带你走进Java的异常处理机制,从基本的try-catch语句到高级的异常链追踪,让你学会如何优雅地处理这些不请自来的“客人”。
|
5天前
|
Java 数据处理 开发者
Java多线程编程的艺术:从入门到精通####
【10月更文挑战第21天】 本文将深入探讨Java多线程编程的核心概念,通过生动实例和实用技巧,引导读者从基础认知迈向高效并发编程的殿堂。我们将一起揭开线程管理的神秘面纱,掌握同步机制的精髓,并学习如何在实际项目中灵活运用这些知识,以提升应用性能与响应速度。 ####
20 3
|
4天前
|
设计模式 SQL 安全
Java编程中的单例模式深入解析
【10月更文挑战第24天】在软件工程中,单例模式是设计模式的一种,它确保一个类只有一个实例,并提供一个全局访问点。本文将探讨如何在Java中使用单例模式,并分析其优缺点以及适用场景。
8 0
|
5天前
|
存储 Java
在Java编程的世界里,标识符命名是一项基础且至关重要的技能
在Java编程的世界里,标识符命名是一项基础且至关重要的技能
8 0