正文
CountDownLatch
Java的concurrent包里面的CountDownLatch其实可以把它看作一个计数器,只不过这个计数器的操作是原子操作,同时只能有一个线程去操作这个计数器,也就是同时只能有一个线程去减这个计数器里面的值。
你可以向CountDownLatch对象设置一个初始的数字(大于0)作为计数值,任何调用这个对象上的await()方法都会阻塞,直到这个计数器的计数值被其他的线程减为0为止。
CountDownLatch的一个非常典型的应用场景是:有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。假如我们这个想要继续往下执行的任务调用一个CountDownLatch对象的await()方法,其他的任务执行完自己的任务后调用同一个CountDownLatch对象上的countDown()方法,这个调用await()方法的任务将一直阻塞等待,直到这个CountDownLatch对象的计数值减到0为止。
CountDownLatch使用
package com.xiaojie.aqs; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; /** * @author xiaojie * @version 1.0 * @description: 模拟多线程请求商品信息,等待所有的数据执行完毕之后,同时返回数据, * 看着有点类似 CyclicBarrier * @date 2022/1/9 0:00 */ public class CountDownDemo { public static void main(String[] args) { CountDownLatch countDownLatch = new CountDownLatch(1); Thread t1 = new Thread(() -> { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("商品基本信息"); }, "t1线程"); t1.start(); Thread t2 = new Thread(() -> { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("商品详情信息"); }, "t2线程"); t2.start(); Thread t3 = new Thread(() -> { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("商品配送信息"); }, "t3线程"); t3.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("唤醒线程。。。。。。。"); countDownLatch.countDown(); } }
CountDownLatch源码分析
以下源码分析基于JDK17,与JDK8有所不同,但思想是一致的
CountDownLatch有一个Sync的内部类,该类继承了AbstractQueuedSynchronizer(AQS),主要方法有
public void await() {} //阻塞当前线程 public boolean await(long timeout, TimeUnit unit)//有阻塞时间的阻塞方法 public void countDown() {}//唤醒阻塞线程
构造函数方法解读
初始化CountDownLatch(int)这个数值必须是不能小于0的数值,当然可以为0,但是如果是0的话那就不能阻塞线程了,而没有实际意义。该值对应AQS中state的值。
await()方法解读
tryAcquireShared()方法解读
1处代码,表示如果state值为0则返回1如果不是0则为-1。而构造函数中这个数值只有大于0才有意义。
2处代码,表示将请求的线程放入AQS的等待队列中(双向链表
这个代码的意思就是,如果调用了await(),就将当前线程放进AQS的等待队列,让线程阻塞。
countDown()方法解读
tryReleaseShared(1)方法解读
如果要唤醒AQS队列中的线程,只需要唤醒最前面线程即可。上面的方法通过CAS修改状态值为0。
signalNext()方法最终会调用到LockSupport.unpark(s.waiter)方法
Semaphore
Semaphore是计数信号量。Semaphore管理一系列许可。每个acquire方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个release方法增加一个许可,这可能会释放一个阻塞的acquire方法。然而,其实并没有实际的许可这个对象,Semaphore只是维持了一个可获得许可证的数量。Semaphore应用场景,比如接口限流。
Semaphore使用
package com.xiaojie.aqs; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author xiaojie * @version 1.0 * @description: Semaphore代码简单使用 * @date 2022/1/9 14:20 */ public class SemaphoreDemo { public static void main(String[] args) { Semaphore semaphore = new Semaphore(5, false); ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i=0;i<10;i++){ executorService.execute(()->{ try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"running........"); }); } semaphore.release(); //释放一个资源 executorService.shutdown(); } }
CyclicBarrier源码分析
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); //加锁 try { final CyclicBarrier.Generation g = generation; if (g.broken) //检查当前屏障是否被打破,如果破坏了抛出异常 throw new BrokenBarrierException(); if (Thread.interrupted()) { //判断线程是否中断,如果中断,破坏栅栏执行trip.signalAll()方法唤醒所有的线程,并且抛出中断异常 breakBarrier(); throw new InterruptedException(); } int index = --count; //计数器数值减1 if (index == 0) { // 如果减到0需要唤醒所有的线程并转换到下一代 Runnable command = barrierCommand; if (command != null) { try { command.run(); } catch (Throwable ex) { breakBarrier(); throw ex; } } nextGeneration();//唤醒所有的线程并且进入下一代 return 0; } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed)//判断是否有等待超时时间 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); //设置阻塞等待时间 } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) //打破栅栏抛出异常 throw new BrokenBarrierException(); if (g != generation) return index; //如果线程因为换代操作而被唤醒返回计数器的值 if (timed && nanos <= 0L) {//如果等待超时破坏栅栏,唤醒所有的线程 breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock();//释放锁 } }
说明:
private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition();
CyclicBarrier是通过Condition来实现的,通过condition.signalAll()方法来唤醒线程,而Condition是基于单向链表来实现的,如果需要唤醒线程就将线程加入到AQS的等待队列(双向链表)中,以此来唤醒线程。