2023年Java核心技术第十一篇(篇篇万字精讲)

简介: 2023年Java核心技术第十一篇(篇篇万字精讲)

二十一 . Java并发包提供了哪些并发工具类



在前面的篇幅中我们进行了,线程,锁,等并发编程的基础元素的讲解。


21.1 典型回答


我们通常所说的并发包也就是 java.util.concurrent 及其子包,集中了 Java 并发的各种基础

工具类。


具体主要包括几个方面:


提供了比 synchronized 更加高级的各种同步结构,包括 CountDownLatch、

CyclicBarrier、Semaphore 等,可以实现更加丰富的多线程操作,比如利用Semap作为资源控制器,限制同时进行工作的线程数量。


21.1 .1  CountDownLatch


创建一个CountDownLatch对象,并将其计数器初始化为3:

CountDownLatch countDownLatch = new CountDownLatch(3);

创建一个InterfaceRequestThread类,继承自Thread类,用于发起接口请求并倒计时:


public class InterfaceRequestThread extends Thread {
    private final CountDownLatch countDownLatch;
    public InterfaceRequestThread(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
    @Override
    public void run() {
        // 发起接口请求操作
        System.out.println("请求接口数据...");
        try {
            Thread.sleep(2000); // 模拟接口请求耗时
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("接口数据请求完成。");
        countDownLatch.countDown(); // 计数器减一
    }
}


主线程中创建多个InterfaceRequestThread线程,并启动它们:

// 创建CountDownLatch对象
CountDownLatch countDownLatch = new CountDownLatch(3);
// 创建三个InterfaceRequestThread线程,并启动它们
for (int i = 0; i < 3; i++) {
    Thread thread = new InterfaceRequestThread(countDownLatch);
    thread.start();
}

在主线程中调用await()方法等待计数器达到零,然后进行下一步处理:


try {
    countDownLatch.await(); // 等待计数器达到零
    System.out.println("所有接口数据请求完成,开始进行下一步处理...");
    // 进行下一步操作,例如数据处理等
} catch (InterruptedException e) {
    e.printStackTrace();
}

运行时,会观察到每个InterfaceRequestThread线程都会发起接口请求,并在请求完成后将CountDownLatch的计数器减一。当所有线程的请求都完成后,主线程就会从await()方法中返回,并输出"所有接口数据请求完成,开始进行下一步处理...",表示可以开始下一步操作了。


通过CountDownLatch,我们实现了多个线程之间的协作与同步,确保所有接口数据都请求完毕后再进行下一步操作。这在需要等待其他线程完成某项操作后再继续执行的场景中非常有用。


21.1 .2  CyclicBarrier


CyclicBarrier 是 Java 提供的一种同步工具类,它可以用于多个线程之间相互等待,直到所有线程都达到一个共同的屏障点,然后同时继续执行。


CyclicBarrier 的主要特点如下:


  • 它可以接收一个整数作为构造函数参数,该整数表示需要等待的线程数量。
  • 每个线程在达到屏障点时会调用 await() 方法,此时该线程会被阻塞,直到所有线程都调用了 await() 方法。
  • 当所有线程都调用了 await() 方法后,所有线程会同时被释放,可以继续执行后续操作。
  • CyclicBarrier 的计数器可以重置,因此它可以被重复使用。


21.1 .2.1 例子:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
    public static void main(String[] args) {
        int numberOfThreads = 3;
        Runnable barrierAction = () -> System.out.println("All threads have reached the barrier");
        CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, barrierAction);
        for (int i = 0; i < numberOfThreads; i++) {
            Thread thread = new Thread(new Worker(barrier));
            thread.start();
        }
    }
    static class Worker implements Runnable {
        private final CyclicBarrier barrier;
        Worker(CyclicBarrier barrier) {
            this.barrier = barrier;
        }
        @Override
        public void run() {
            try {
                System.out.println("Thread started");
                // 模拟线程的工作
                Thread.sleep(1000);
                System.out.println("Thread finished work and waiting at the barrier");
                barrier.await();
                System.out.println("Thread released from the barrier and continuing its work");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}


21.1 .2.2  输出结果:

Thread started
Thread finished work and waiting at the barrier
Thread started
Thread finished work and waiting at the barrier
Thread started
Thread finished work and waiting at the barrier
All threads have reached the barrier
Thread released from the barrier and continuing its work
Thread released from the barrier and continuing its work
Thread released from the barrier and continuing its work


我们创建了一个 CyclicBarrier 对象,并指定等待的线程数量为 3,同时传入一个 barrierAction,当所有线程都达到屏障点时,会执行该 barrierAction。


每个线程在 run() 方法中模拟一些工作,并调用 await() 方法等待其他线程。当所有线程都调用了 await() 方法后,它们会同时被释放,继续执行后续操作。


在输出结果中,可以看到三个线程同时执行到了屏障点,并输出了 "All threads have reached the barrier" 的信息,然后它们继续执行剩余的工作。


通过使用 CyclicBarrier,可以实现多个线程之间的同步,让它们在某个点上进行协调,等待彼此达到同一个状态,然后再继续执行后续操作。这在需要线程间相互等待的场景下非常有用,例如并行计算中将计算结果合并,多个线程同时开始某个任务等等。


21.1.2.3 详细解释:屏障点


屏障点是CyclicBarrier对象的构造函数中指定的等待线程数量达到了设定值。也就是说,当3个线程都调用了await()方法后,它们会在CyclicBarrier对象的屏障点处等待。


具体来说,在该例子中,CyclicBarrier对象的构造函数参数为3,表示需要等待3个线程。当第一个线程调用await()方法时,它会被阻塞,直到所有3个线程都调用了await()方法。当第二个和第三个线程都调用了await()方法后,所有线程都达到了屏障点并等待。


一旦所有线程都达到屏障点,屏障点就会打开,所有线程都会被释放并继续执行后续操作。


21.1.3  Semaphore


Semaphore(信号量)是一种并发控制机制,用于限制同时访问某个共享资源的线程数量。它可以用于控制对临界区的访问,或者限制对有限资源的并发访问。


Semaphore 主要有两个操作:acquire() 和 release()。


  • acquire(): 当一个线程需要访问被 Semaphore 保护的资源时,它调用 acquire() 方法进行获取信号量。如果信号量计数器大于0,该线程可以继续执行,同时信号量计数器减1。如果信号量计数器为0,那么该线程将被阻塞,直到有其他线程调用 release() 方法释放信号量。
  • release(): 当一个线程访问完被 Semaphore 保护的资源后,它调用 release() 方法释放信号量。该操作会使信号量计数器增加1,并且唤醒一个或多个被阻塞的线程,允许它们继续执行。

Semaphore 的计数器可以初始化为一个非负整数。如果计数器初始化为1,那么 Semaphore 就变成了互斥锁的功能,也被称为二元信号量。


21.1.3.1 使用 Semaphore例子:

import java.util.concurrent.Semaphore;
public class SemaphoreExample {
    public static void main(String[] args) {
        int numberOfPermits = 3; // 允许同时访问资源的线程数量
        Semaphore semaphore = new Semaphore(numberOfPermits);
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(new Worker(semaphore));
            thread.start();
        }
    }
    static class Worker implements Runnable {
        private final Semaphore semaphore;
        Worker(Semaphore semaphore) {
            this.semaphore = semaphore;
        }
        @Override
        public void run() {
            try {
                System.out.println("Thread acquiring semaphore");
                semaphore.acquire();
                System.out.println("Thread acquired semaphore and performing its task");
                Thread.sleep(2000);
                System.out.println("Thread released semaphore");
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


21.1.3.2 输出结果:

Thread acquiring semaphore
Thread acquiring semaphore
Thread acquiring semaphore
Thread acquired semaphore and performing its task
Thread acquired semaphore and performing its task
Thread acquired semaphore and performing its task
Thread releasing semaphore
Thread releasing semaphore
Thread releasing semaphore


我们创建了一个 Semaphore 对象,并指定允许同时访问资源的线程数量为3。然后,我们创建了5个线程,并在每个线程中进行一些任务。


每个线程在运行时首先调用 acquire() 方法来获取信号量。如果有可用的许可证(计数器大于0),线程将继续执行,并且信号量计数器减1。如果没有可用的许可证(计数器为0),线程将被阻塞,直到有其他线程释放信号量。


在线程执行完任务后,它调用 release() 方法来释放信号量,使得其他等待的线程能够继续执行。


需要注意的是,在获取和释放信号量时要确保正确使用,以免导致死锁或访问冲突的问题。因此,在编写使用 Semaphore 的程序时,需要仔细设计和管理信号量的获取和释放操作,以确保线程间的正确协调和资源的正确使用。


21.1.3.3 安全容器和队列


各种线程安全的容器,比如常见的ConcurrentHashMap,有序的ConcurrentSkipListMap,或者通过类似快照机制,实现线程安全的动态数组CopyOnWriteArrayList。


以及并发队列实现,如:BlockingQueue实现,比较典型的ArrayBlockingQueue,SynchronousQueue或对特定场景的PriorityBlockingQueue等这些在前面的篇幅中有详细解释。


21.1.3.4 Executor框架


强大的Executor框架,可以创建各种不同类型的线程池,调度任务运行等,绝大部分情况,不需要自己从头进行实现线程池和任务调度器。


需要处理多个并发任务时,可以使用Executor框架来创建不同类型的线程池和调度任务运行。这样可以大大简化多线程编程的难度,并且可以有效地管理线程资源。下面是几个使用Executor框架创建线程池的实例:


21.1.3.4.1. 创建FixedThreadPool线程池:


FixedThreadPool线程池是一种固定大小的线程池,它能够同时执行指定数量的任务,如果有新任务提交时,它会将其放入任务队列等待执行。以下代码展示了如何创建一个FixedThreadPool线程池:


ExecutorService executor = Executors.newFixedThreadPool(4); // 创建一个大小为4的线程池
executor.execute(new MyTask()); // 提交任务至线程池


例子:


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
    public static void main(String[] args) {
        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(4); // 创建一个大小为4的线程池
        // 提交任务至线程池
        for (int i = 1; i <= 10; i++) {
            MyTask task = new MyTask(i);
            executor.execute(task); // 提交任务至线程池
        }
        // 关闭线程池
        executor.shutdown();
    }
}
class MyTask implements Runnable {
    private int taskId;
    public MyTask(int taskId) {
        this.taskId = taskId;
    }
    @Override
    public void run() {
        System.out.println("Task " + taskId + " is running."); // 执行具体的任务逻辑
        try {
            Thread.sleep(1000); // 模拟任务执行时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Task " + taskId + " is complete.");
    }
}


我们创建了一个大小为4的FixedThreadPool线程池,并提交了10个任务到线程池中。


每个任务由MyTask类表示,该类实现了Runnable接口,并重写了run方法。在run方法中,我们定义了任务的具体逻辑。在本例中,每个任务打印出任务标识,并模拟了一秒钟的执行时间。


通过循环提交多个任务到线程池中,线程池会同时执行最多4个任务。如果有新的任务提交时,它会将其放入任务队列等待执行,直到有空闲的线程可用。


最后,我们调用executor.shutdown()方法关闭线程池,在所有任务执行完毕后,线程池停止接受新的任务并逐渐关闭。


可以利用FixedThreadPool线程池来并发执行多个任务,提高程序的效率和性能。


当使用FixedThreadPool线程池时,可以提交多个任务并让线程池同时执行指定数量的任务。


21.1.3.4.2. 创建CachedThreadPool线程池:


CachedThreadPool线程池是一种自动管理大小的线程池,它会根据任务量自动扩展线程数,如果有多个任务同时提交,CachedThreadPool线程池会启动多个线程来执行这些任务。以下代码展示了如何创建一个CachedThreadPool线程池:


ExecutorService executor = Executors.newCachedThreadPool(); // 创建一个自动管理线程数的线程池
executor.execute(new MyTask()); // 提交任务至线程池


详细进行例子讲解:


当使用CachedThreadPool线程池时,它会自动管理线程数,根据任务量的大小来动态调整线程数量。以下是一个详细的示例,展示了如何创建和应用CachedThreadPool线程池:


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
    public static void main(String[] args) {
        // 创建线程池
        ExecutorService executor = Executors.newCachedThreadPool(); // 创建一个自动管理线程数的线程池
        // 提交任务至线程池
        for (int i = 1; i <= 10; i++) {
            MyTask task = new MyTask(i);
            executor.execute(task); // 提交任务至线程池
        }
        // 关闭线程池
        executor.shutdown();
    }
}
class MyTask implements Runnable {
    private int taskId;
    public MyTask(int taskId) {
        this.taskId = taskId;
    }
    @Override
    public void run() {
        System.out.println("Task " + taskId + " is running."); // 执行具体的任务逻辑
        try {
            Thread.sleep(1000); // 模拟任务执行时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Task " + taskId + " is complete.");
    }
}


我们创建了一个CachedThreadPool线程池,并提交了10个任务到线程池中。


每个任务由MyTask类表示,该类实现了Runnable接口,并重写了run方法。在run方法中,我们定义了任务的具体逻辑。在本例中,每个任务打印出任务标识,并模拟了一秒钟的执行时间。


通过循环提交多个任务到线程池中,CachedThreadPool线程池会自动根据任务量的大小来动态调整线程数量。如果有多个任务同时提交,它会启动多个线程来执行这些任务。


最后,我们调用executor.shutdown()方法关闭线程池,在所有任务执行完毕后,线程池停止接受新的任务并逐渐关闭。


这样,你可以利用CachedThreadPool线程池来处理大量的短期任务,让线程池动态适应任务的变化,提高程序的效率和性能。


21.1.3.4.3. 创建ScheduledThreadPool线程池:


ScheduledThreadPool线程池是一种定时执行任务的线程池,它可以让任务在指定时间或周期性地执行。以下代码展示了如何创建一个ScheduledThreadPool线程池:


ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); // 创建一个大小为2的定时任务线程池
executor.schedule(new MyTask(), 10, TimeUnit.SECONDS); // 10秒后执行MyTask任务
executor.scheduleAtFixedRate(new MyTask(), 1, 3, TimeUnit.SECONDS); // 1秒后开始执行MyTask任务,每隔3秒重复执行一次


详细进行例子讲解:


当使用ScheduledThreadPool线程池时,它可以定时执行任务,让任务在指定的时间或周期性地执行。以下是一个详细的示例,展示了如何创建和应用ScheduledThreadPool线程池:


import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Main {
    public static void main(String[] args) {
        // 创建线程池
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); // 创建一个大小为2的定时任务线程池
        // 使用schedule方法,在10秒后执行任务
        executor.schedule(new MyTask(), 10, TimeUnit.SECONDS);
        // 使用scheduleAtFixedRate方法,在1秒后开始执行任务,并且每隔3秒重复执行一次
        executor.scheduleAtFixedRate(new MyTask(), 1, 3, TimeUnit.SECONDS);
        // 关闭线程池
        // executor.shutdown();
    }
}
class MyTask implements Runnable {
    @Override
    public void run() {
        System.out.println("Task is running."); // 执行具体的任务逻辑
        try {
            Thread.sleep(1000); // 模拟任务执行时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Task is complete.");
    }
}


我们创建了一个ScheduledThreadPool线程池,大小为2。


首先,我们使用executor.schedule()方法,安排一个任务在10秒后执行。这意味着10秒钟后,线程池会执行MyTask任务的run()方法。


然后,我们使用executor.scheduleAtFixedRate()方法,安排一个任务在1秒后开始执行,并且每隔3秒重复执行一次。这意味着任务将会以固定的时间间隔来执行。


每个任务由MyTask类表示,该类实现了Runnable接口,并重写了run方法。在run方法中,我们定义了任务的具体逻辑。在本例中,每个任务打印输出一条信息,并模拟了一秒钟的执行时间。


我们注释掉了关闭线程池的代码。如果需要,在所有任务执行完毕后,可以调用executor.shutdown()方法来关闭线程池。


这样可以利用ScheduledThreadPool线程池来安排和执行定时任务,让任务在指定的时间或周期性地执行,从而满足需求。


这些是Executor框架创建线程池的例子。


实际上,Executor框架还提供了很多其他类型的线程池和任务调度器,可以根据需要选择适合自己的线程池和任务调度器来处理并发任务。


22.2多线程我们一般想要达到目的:


1.利用多线程提高程序的扩展能力,以达到业务对吞吐量的要求。

2.协调线程间调度,交互,以完成业务逻辑。

3.线程间传递数据和状态,同样是实现业务逻辑的需要。


相关文章
|
9天前
|
监控 Java 物联网
Java串口通信技术探究1:深入理解RXTX库
Java串口通信技术探究1:深入理解RXTX库
23 2
|
2天前
|
Kubernetes Java 调度
Java容器技术:Docker与Kubernetes
Java容器技术:Docker与Kubernetes
12 0
|
2天前
|
存储 安全 Java
深入理解Java字节码与反编译技术
深入理解Java字节码与反编译技术
10 0
|
2天前
|
监控 Java Maven
揭秘Java Agent技术:解锁Java工具开发的新境界
作为JDK提供的关键机制,Java Agent技术不仅为Java工具的开发者提供了一个强大的框架,还为性能监控、故障诊断和动态代码修改等领域带来了革命性的变革。本文旨在全面解析Java Agent技术的应用场景以及实现方式,特别是静态加载模式和动态加载模式这两种关键模式。
17 0
|
9天前
|
存储 缓存 前端开发
Java串口通信技术探究3:RXTX库线程 优化系统性能的SerialPortEventListener类
Java串口通信技术探究3:RXTX库线程 优化系统性能的SerialPortEventListener类
30 3
|
9天前
|
安全 IDE Java
Java串口通信技术探究2:RXTX库单例测试及应用
Java串口通信技术探究2:RXTX库单例测试及应用
25 4
|
9天前
|
存储 前端开发 安全
13:会话跟踪技术Session的深度应用与实践-Java Web
13:会话跟踪技术Session的深度应用与实践-Java Web
23 3
|
9天前
|
存储 前端开发 搜索推荐
12:会话跟踪技术Cookie的深度应用与实践-Java Web
12:会话跟踪技术Cookie的深度应用与实践-Java Web
22 4
|
10天前
|
供应链 Java API
Java 8新特性解析及应用区块链技术在供应链管理中的应用与挑战
【4月更文挑战第30天】本文将深入探讨Java 8的新特性,包括Lambda表达式、Stream API和Optional类等。通过对这些新特性的详细解析和应用实例,帮助读者更好地理解和掌握Java 8的新技术。
|
11天前
|
存储 安全 Java
【亮剑】`ConcurrentHashMap`是Java中线程安全的哈希表,采用锁定分离技术提高并发性能
【4月更文挑战第30天】`ConcurrentHashMap`是Java中线程安全的哈希表,采用锁定分离技术提高并发性能。数据被分割成多个Segment,每个拥有独立锁,允许多线程并发访问不同Segment。当写操作发生时,计算键的哈希值定位Segment并获取其锁;读操作通常无需锁定。内部会根据负载动态调整Segment,减少锁竞争。虽然使用不公平锁,但Java 8及以上版本提供了公平锁选项。理解其工作原理对开发高性能并发应用至关重要。