一天一个 JUC 工具类 -- 真工具类

简介: CountDownLatch CyclicBarrier ForkJoin Semaphore 使用方法和注意事项

真工具类

CountDownLatch

CountDownLatch是Java并发工具包(Java Util Concurrent)中的一个同步工具类,它的主要作用是允许一个或多个线程等待其他线程完成某个操作后再继续执行。它的实现原理是基于一个计数器,当计数器的值变为零时,等待线程将被唤醒。

public class CountDownLatch {
    
   private final Sync sync;

   public CountDownLatch(int count) {
    
       if (count < 0) throw new IllegalArgumentException("count < 0");
       this.sync = new Sync(count);
   }

   public void await() throws InterruptedException {
    
       sync.acquireSharedInterruptibly(1);
   }

   public void countDown() {
    
       sync.releaseShared(1);
   }

   private static final class Sync extends AbstractQueuedSynchronizer {
    
       Sync(int count) {
    
           setState(count);
       }

       protected int tryAcquireShared(int acquires) {
    
           return (getState() == 0) ? 1 : -1;
       }

       protected boolean tryReleaseShared(int releases) {
    
           // Decrement count; signal when transition to zero
           for (;;) {
    
               int c = getState();
               if (c == 0)
                   return false;
               int nextc = c-1;
               if (compareAndSetState(c, nextc))
                   return nextc == 0;
           }
       }
   }
}
实现原理
  1. CountDownLatch内部维护一个整数计数器(count),初始化为一个正整数。
  2. 当一个线程调用CountDownLatch的await()方法时,它会被阻塞,直到计数器的值变为零。
  3. 其他线程执行完需要等待的操作后,调用CountDownLatch的countDown()方法来减少计数器的值。当计数器的值减为零时,await()方法返回,等待的线程继续执行。

当使用CountDownLatch时,我们首先需要创建一个CountDownLatch对象,并指定计数器的初始值。计数器的值代表需要等待的线程数量。然后,在等待的线程中调用await()方法,该方法会阻塞当前线程,直到计数器的值变为零。其他执行需要等待的操作的线程,在完成后调用countDown()方法来减少计数器的值。一旦计数器的值减为零,所有等待的线程将被唤醒,继续执行后续操作。

让我们通过示例代码来演示CountDownLatch的使用方式和注意事项:

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {

   public static void main(String[] args) {
       final int threadsToWait = 3;
       CountDownLatch latch = new CountDownLatch(threadsToWait);

       // 创建并启动多个执行任务的线程
       for (int i = 0; i < threadsToWait; i++) {
           Thread thread = new Thread(new WorkerThread(latch, "Worker-" + (i + 1)));
           thread.start();
       }

       try {
           // 等待所有线程执行完成
           latch.await();
           System.out.println("All worker threads have finished their tasks.");
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }

   static class WorkerThread implements Runnable {
       private final CountDownLatch latch;
       private final String name;

       public WorkerThread(CountDownLatch latch, String name) {
           this.latch = latch;
           this.name = name;
       }

       @Override
       public void run() {
           System.out.println(name + " is starting its task.");
           // 模拟任务执行时间
           try {
                 //todo
               Thread.sleep(2000);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           System.out.println(name + " has finished its task.");
           latch.countDown(); // 任务完成,计数器减一
       }
   }
}

在上面的示例中,我们创建了3个WorkerThread线程,它们模拟了执行任务的过程。每个线程在任务完成后调用countDown()方法,减少CountDownLatch的计数器。在主线程中,我们调用latch.await()来等待所有的WorkerThread线程执行完成。

注意事项:

  1. CountDownLatch的计数器只能减少,一旦计数器的值为零,就无法重置。如果需要多次使用,请考虑使用CyclicBarrier。
  2. 确保在等待的线程调用await()方法之前,计数器的值已经正确设置,否则会导致等待的线程永远阻塞。
  3. 使用CountDownLatch时,需要确保计数器的值足够合理,以免出现线程永远等待或等待时间过长的情况。

CyclicBarrier

CyclicBarrier是Java并发工具包(Java Util Concurrent)中的一个同步工具类,它的主要作用是允许一组线程互相等待,直到所有线程都到达一个共同的屏障点,然后再一起继续执行。CyclicBarrier可以用于将多个线程分阶段地进行同步,每个阶段的线程都必须等待其他线程完成后才能继续执行。

实现原理

以下是CyclicBarrier的关键源码片段:

public class CyclicBarrier {
    
   private final Sync sync;

   public CyclicBarrier(int parties) {
    
       this.sync = new Sync(parties);
   }

   public int await() throws InterruptedException, BrokenBarrierException {
    
       return sync.innerAwait(false, 0L);
   }

   private static class Sync extends AbstractQueuedSynchronizer {
    
       // 等待的线程数量
       private final int parties;
       // 表示所有线程都到达屏障点的标志
       private volatile int count;

       Sync(int parties) {
    
           setState(parties);
           this.parties = parties;
           this.count = parties;
       }

       int innerAwait(boolean timed, long nanos)
               throws InterruptedException, BrokenBarrierException {
    
           // 等待线程被中断或超时时抛出异常
           if (Thread.interrupted()) {
    
               throw new InterruptedException();
           }

           int index = --count;
           if (index == 0) {
     // 如果是最后一个到达的线程
               // 唤醒所有等待的线程,重置计数器
               releaseShared(0);
               return 0;
           }

           for (;;) {
    
               // 等待其他线程到达屏障点
               if (timed && nanos <= 0L) {
    
                   throw new TimeoutException();
               }
               if (timed) {
    
                   // 阻塞一段时间,等待其他线程到达屏障点
                   LockSupport.parkNanos(this, nanos);
               } else {
    
                   LockSupport.park(this);
               }

               if (Thread.interrupted()) {
    
                   throw new InterruptedException();
               }

               // 计数器值为0,表示所有线程都到达屏障点
               if (getState() == 0) {
    
                   throw new BrokenBarrierException();
               }

               // 如果计数器值被重置,表示有线程被中断或超时
               if (getState() != parties) {
    
                   throw new InterruptedException();
               }
           }
       }
   }
}
  1. CyclicBarrier内部维护了一个计数器(count),和一个屏障(barrier)。
  2. 当线程调用CyclicBarrier的await()方法时,它会等待其他线程也调用了await()方法。
  3. 每次调用await()方法,计数器的值减一。当计数器的值减为零时,所有等待的线程将被唤醒,同时执行后续操作,并将计数器的值重置为初始值。
  4. 在所有线程到达屏障点后,CyclicBarrier可以选择执行一个指定的任务(Runnable),然后唤醒所有等待的线程继续执行后续操作。

当我们有一组线程需要同时执行某个任务,但需要确保所有线程都已经就绪后再开始执行,这时可以使用CyclicBarrier来实现线程的同步。

假设有一个多线程爬虫程序,需要同时启动多个线程并发爬取不同的网页内容,然后将爬取到的数据进行合并处理。为了保证所有线程都已经完成爬取工作后再进行数据处理,我们可以使用CyclicBarrier。

下面是一个简单的示例代码来演示CyclicBarrier的使用方式:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class WebCrawlerExample {
    

   public static void main(String[] args) {
    
       final int numOfThreads = 5;
       CyclicBarrier barrier = new CyclicBarrier(numOfThreads, new MergeTask());

       for (int i = 0; i < numOfThreads; i++) {
    
           Thread thread = new Thread(new WebCrawlerTask(barrier, "Thread-" + (i + 1)));
           thread.start();
       }
   }

   static class WebCrawlerTask implements Runnable {
    
       private final CyclicBarrier barrier;
       private final String name;

       public WebCrawlerTask(CyclicBarrier barrier, String name) {
    
           this.barrier = barrier;
           this.name = name;
       }

       @Override
       public void run() {
    
           System.out.println(name + " is crawling web pages...");
           // 模拟爬取网页的时间
           try {
    
               Thread.sleep(2000);
           } catch (InterruptedException e) {
    
               e.printStackTrace();
           }
           System.out.println(name + " has finished crawling web pages.");

           try {
    
               // 等待其他线程都完成爬取工作
               barrier.await();
           } catch (InterruptedException | BrokenBarrierException e) {
    
               e.printStackTrace();
           }
       }
   }

   static class MergeTask implements Runnable {
    
       @Override
       public void run() {
    
           System.out.println("All threads have finished crawling web pages.");
           System.out.println("Now merging the data...");
           // 进行数据合并处理
           System.out.println("Data merge complete!");
       }
   }
}

在上面的示例中,我们创建了5个WebCrawlerTask线程,它们模拟了爬取网页的过程。每个线程在完成爬取后,调用await()方法等待其他线程完成。当所有线程都完成爬取后,CyclicBarrier会唤醒所有等待的线程,并执行MergeTask任务来进行数据合并处理。

通过CyclicBarrier,我们可以很方便地实现线程的同步,保证多个线程同时开始执行或完成任务。这在并发爬虫、并发计算等场景中非常有用。

注意事项
  1. CyclicBarrier的计数器可以重复使用,一旦计数器的值为零,就会被重置为初始值。这使得CyclicBarrier可以在不同阶段多次使用。
  2. 确保在所有线程到达屏障点前调用await()方法,否则可能导致部分线程永远等待。
  3. 在使用CyclicBarrier时,需要确保计数器的值足够合理,以免出现线程永远等待或等待时间

ForkJoin

Fork/Join是Java并发工具包(Java Util Concurrent)中的一个并行执行框架,它提供了一种简单且高效的方式来处理分治算法并行化。Fork/Join框架的实现原理是基于"工作窃取"(Work-Stealing)算法,通过利用多个线程来完成任务的拆分与合并,从而实现任务的并行处理。

实现原理

Fork/Join框架的实现原理可以概括为以下几个关键点:

  1. 任务分割:当一个大任务需要并行处理时,Fork/Join框架会将大任务切分成多个小任务,每个小任务都可以独立执行。
  2. 工作窃取:每个线程都有自己的任务队列,当线程的任务队列为空时,它可以从其他线程的任务队列中"窃取"一个任务来执行。这样做的目的是为了保证线程始终有任务可执行,避免线程因为任务耗尽而空闲。
  3. 任务合并:当小任务执行完成后,Fork/Join框架会将结果合并,得到最终的结果。

Fork/Join框架中主要有两个核心类:ForkJoinTask和ForkJoinPool。

  • ForkJoinTask:是一个抽象类,表示可以由Fork/Join框架执行的任务。我们可以通过继承ForkJoinTask并实现compute()方法来定义自己的任务。
  • ForkJoinPool:是一个线程池,用于执行ForkJoinTask。它管理线程的创建和销毁,并调度任务的执行。

以下是Fork/Join框架的实现的一个 demo:

import java.util.concurrent.RecursiveTask;

public class MyTask extends RecursiveTask<Integer> {
    
   private final int threshold = 100;
   private int[] array;
   private int start;
   private int end;

   public MyTask(int[] array, int start, int end) {
    
       this.array = array;
       this.start = start;
       this.end = end;
   }

   @Override
   protected Integer compute() {
    
       if (end - start <= threshold) {
    
           // 如果任务足够小,直接执行任务
           int sum = 0;
           for (int i = start; i < end; i++) {
    
               sum += array[i];
           }
           return sum;
       } else {
    
           // 任务太大,拆分成两个子任务
           int mid = (start + end) / 2;
           MyTask leftTask = new MyTask(array, start, mid);
           MyTask rightTask = new MyTask(array, mid, end);

           // 并行执行子任务
           leftTask.fork();
           rightTask.fork();

           // 等待子任务执行完成,并得到结果
           int leftResult = leftTask.join();
           int rightResult = rightTask.join();

           // 合并子任务的结果
           return leftResult + rightResult;
       }
   }
}

在上面的示例中,我们定义了一个MyTask类继承自RecursiveTask,用于实现一个简单的求和任务。在compute()方法中,如果任务足够小(即元素数量小于等于threshold),直接执行任务并返回结果;否则,拆分任务成两个子任务,分别进行递归调用,并将子任务的结果进行合并。

使用注意事项
  1. Fork/Join框架适用于一些可以被递归拆分的问题,如归并排序、快速排序等分治算法。对于其他类型的任务,可能并不适合使用Fork/Join框架。
  2. 在使用Fork/Join框架时,需要根据具体情况合理设置拆分任务的阈值(threshold),以免过小导致任务频繁拆分和合并,过大导致任务无法充分并行执行。
  3. Fork/Join框架在处理大规模数据和递归深度时,可能导致栈溢出。为了避免这种情况,可以考虑使用invokeAll()方法代替fork()和join(),因为invokeAll()在内部使用了更高效的方式来执行任务。
  4. 在Fork/Join框架中,任务之间的调度和执行是由线程池(ForkJoinPool)控制的,因此对线程池的配置也需要慎重考虑,以充分利用计算资源。

总体而言,Fork/Join框架是一个强大且高效的并行执行框架,能够帮助我们处理一些复杂且耗时的任务,实现并行化计算。在合适的场景下,合理使用Fork/Join框架可以大大提高程序的性能。

Semaphore

Semaphore是Java并发工具包(Java Util Concurrent)中的一个同步工具类,它可以用来控制对共享资源的访问数量。Semaphore内部维护了一组许可证(permits),线程可以通过acquire()方法获取许可证,如果许可证数量大于0,则线程可以继续执行;如果许可证数量等于0,则线程会被阻塞,直到有其他线程释放许可证。

实现原理

Semaphore的实现原理可以概括为以下几个关键点:

  1. 许可证数量:Semaphore内部维护了一个整数,表示可用的许可证数量。在构造Semaphore对象时,我们可以指定许可证的初始数量。
  2. 获取许可证:当一个线程调用acquire()方法时,它会尝试获取一个许可证。如果许可证数量大于0,则线程可以继续执行,并将许可证数量减一;如果许可证数量等于0,则线程会被阻塞,直到有其他线程释放许可证。
  3. 释放许可证:当一个线程调用release()方法时,它会释放一个许可证。许可证数量加一,并唤醒一个等待的线程(如果有的话)。

Semaphore使用了内置的原子操作,因此它是线程安全的。

以下是Semaphore的关键源码片段:

public class Semaphore {
    
   private final Sync sync;

   public Semaphore(int permits) {
    
       this.sync = new NonfairSync(permits);
   }

   public void acquire() throws InterruptedException {
    
       sync.acquireSharedInterruptibly(1);
   }

   public void release() {
    
       sync.releaseShared(1);
   }

   abstract static class Sync extends AbstractQueuedSynchronizer {
    
       Sync(int permits) {
    
           setState(permits);
       }

       final int getPermits() {
    
           return getState();
       }

       final int nonfairTryAcquireShared(int acquires) {
    
           for (;;) {
    
               int available = getState();
               int remaining = available - acquires;
               if (remaining < 0 || compareAndSetState(available, remaining)) {
    
                   return remaining;
               }
           }
       }

       protected final boolean tryReleaseShared(int releases) {
    
           for (;;) {
    
               int current = getState();
               int next = current + releases;
               if (next < current) // overflow
                   throw new Error("Maximum permit count exceeded");
               if (compareAndSetState(current, next))
                   return true;
           }
       }
   }

   static final class NonfairSync extends Sync {
    
       NonfairSync(int permits) {
    
           super(permits);
       }

       protected int tryAcquireShared(int acquires) {
    
           return nonfairTryAcquireShared(acquires);
       }
   }
}

在上面的示例中,我们可以看到Semaphore的实现是基于AQS(AbstractQueuedSynchronizer)的共享模式。Semaphore内部维护了一个整数(许可证数量),通过CAS操作来获取和释放许可证。

当我们有多个线程需要同时访问一组有限的资源时,可以使用Semaphore来进行资源的控制和同步。Semaphore允许我们指定资源的数量,每个线程在访问资源前需要获取一个许可证,如果许可证数量大于0,则允许访问资源;如果许可证数量等于0,则线程需要等待其他线程释放许可证后才能继续执行。

让我们通过一个简单的示例代码来演示Semaphore的使用方式。假设我们有一个连接池,里面有5个数据库连接,多个线程需要从连接池中获取连接执行数据库操作。我们希望限制最多只有5个线程同时访问连接池中的连接。

import java.util.concurrent.Semaphore;

public class ConnectionPoolExample {
    

   public static void main(String[] args) {
    
       final int numOfThreads = 10;
       final int numOfConnections = 5;

       ConnectionPool connectionPool = new ConnectionPool(numOfConnections);

       for (int i = 0; i < numOfThreads; i++) {
    
           Thread thread = new Thread(new DatabaseTask(connectionPool, "Thread-" + (i + 1)));
           thread.start();
       }
   }

   static class ConnectionPool {
    
       private final Semaphore semaphore;
       private final int numOfConnections;

       public ConnectionPool(int numOfConnections) {
    
           this.semaphore = new Semaphore(numOfConnections);
           this.numOfConnections = numOfConnections;
       }

       public void getConnection() throws InterruptedException {
    
           semaphore.acquire();
           System.out.println(Thread.currentThread().getName() + " has obtained a connection.");
       }

       public void releaseConnection() {
    
           semaphore.release();
           System.out.println(Thread.currentThread().getName() + " has released a connection.");
       }
   }

   static class DatabaseTask implements Runnable {
    
       private final ConnectionPool connectionPool;
       private final String name;

       public DatabaseTask(ConnectionPool connectionPool, String name) {
    
           this.connectionPool = connectionPool;
           this.name = name;
       }

       @Override
       public void run() {
    
           try {
    
               connectionPool.getConnection();
               // 模拟数据库操作
               Thread.sleep(2000);
               connectionPool.releaseConnection();
           } catch (InterruptedException e) {
    
               e.printStackTrace();
           }
       }
   }
}

在上面的示例中,我们创建了一个ConnectionPool类,它包含一个Semaphore来限制连接池中的最大连接数。每个线程在执行数据库操作前,需要先从连接池中获取一个连接(调用getConnection()方法),然后进行模拟的数据库操作(这里用Thread.sleep()模拟),操作完成后释放连接(调用releaseConnection()方法)。

在运行上述代码时,我们会观察到输出中最多同时有5个线程在获取和使用连接,这样就实现了对连接池资源的合理调度和限制。

通过Semaphore,我们可以很方便地控制并发访问资源的数量,从而避免资源的过度竞争和浪费,提高程序的执行效率。在实际开发中,Semaphore广泛用于限流、线程池管理等场景。

下面是一些使用Semaphore时需要注意的问题:

  1. 初始化许可证数量: 在创建Semaphore对象时,需要根据实际情况合理设置初始许可证数量。如果设置的数量过小,可能导致线程无法获取足够的许可证而被阻塞;如果设置的数量过大,可能导致线程竞争许可证时出现过度的并发,从而导致性能下降。
  2. 获取和释放许可证的正确顺序: 使用Semaphore时,确保在获取许可证之后及时释放许可证。一旦线程获取了许可证,不要忘记在合适的地方释放它。否则,可能会导致许可证数量出现异常,影响其他线程的执行。
  3. 异常处理: 在调用Semaphore的acquire()和release()方法时,需要注意异常处理。特别是在获取许可证时,可能会出现InterruptedException,需要及时处理中断异常,确保程序的健壮性。
  4. 合理设置超时时间: 在调用acquire()方法时,可以选择传入超时时间,以防止线程因为获取不到许可证而一直阻塞。合理设置超时时间可以避免线程长时间等待,增加程序的响应性。
  5. 避免死锁: 当在使用Semaphore的同时还有其他锁或同步机制时,需要特别注意避免死锁的情况。死锁可能会在多个线程相互等待对方释放资源时发生,因此需要谨慎设计并发控制策略,以避免死锁的发生。
  6. 避免资源泄露: 使用Semaphore后,需要确保所有获取到许可证的线程在使用资源后及时释放许可证,否则会导致许可证数量异常,从而影响其他线程的执行。
  7. 性能考虑: 使用Semaphore会涉及到线程的阻塞和唤醒,这会产生一定的性能开销。在高并发场景下,需要综合考虑许可证数量和线程的调度,以避免出现性能瓶颈。
相关文章
|
2天前
|
JSON 网络协议 C#
C# 工具类
C# 工具类
21 1
|
2天前
|
Java
JavaMap工具类(MapUtils)
JavaMap工具类(MapUtils)
|
2天前
JsonUtil工具类
JsonUtil工具类
11 0
|
10月前
|
存储 安全 算法
一天一个 JUC 工具类 -- 并发集合
使用JUC工具包中的并发集合,我们可以避免手动处理锁和同步的复杂性,从而降低出现线程安全问题的概率。这些并发集合通过内部采用高效的算法和数据结构来优化并发操作,从而提供更好的性能和扩展性。
|
5月前
工具类-HttpClientUtil
工具类-HttpClientUtil
29 0
|
9月前
|
Java
数字处理工具类
数字处理工具类
46 0
|
11月前
SharePreference工具类
SharePreference工具类
|
12月前
|
Java API 索引
【JUC基础】08. 三大工具类
JUC包中包含了三个非常实用的工具类:CountDownLatch(倒计数器),CyclicBarrier(循环栅栏),Semaphore(信号量)。
106 0
|
SpringCloudAlibaba 前端开发 Java
JUC系列(四) callable与 常用的工具类
在多线程工作中常用的一些辅助类
JUC系列(四) callable与 常用的工具类
RedisUtils 工具类
RedisUtils 工具类
96 0