一,线程之间的通信
1,BlockingQueue
这个主要就是通过这个阻塞队列实现,其CLH中的链表中的结点的状态为waitStatus为CONDITION:-2,为一个条件等待状态。之前在这篇https://blog.csdn.net/zhenghuishengq/article/details/125710294对BlockingQueue的源码有过具体的分析,主要是同步等待队列和这个条件等待队列的结合使用。
2,CountDownLatch
通过这个计数器实现,所有线程在通过这个countDown减法,每个线程执行完减1,然后进入这个等待状态,当这个值减为0时,所有的线程才能开始同时运行。
3,Semaphore
作用就是控制访问特定资源的线程数目,底层通过这个AQS实现,主要用于对这个线程的个数的控制,底层也是通过这个CLH队列实现,链表中结点的状态为-3,是一个广播状态。
4,CyclicBarrier
栅栏屏障,和countDownLatch的实现方式不同,所有线程都用加法实现,每个线程到达同一起跑线时,个数+1,然后进入这个等待状态,当全部线程到达同一起点时,那么所有线程才能同时开启。
二,Semaphore
1,概述
信号量,作用就是控制访问特定资源的线程数目,即主要可以用来限流等操作。如Hystrix里面就有用到这个组件。
//创建一个信号量,state的值为2 Semaphore semaphore = new Semaphore(2); //获取公共资源 semaphore.acquire(); //如果获取资源需要等待的时间过长,可以尝试获取 semaphore.tryAcquire(500,TimeUnit.MILLISECONDS) //释放公共资源 semaphore.release();
其含义如下,就是每次最多只允许两个线程过去
2,源码分析
这个Semaphore的底层实现和这个 ReentrantLock 的底层实现都差不多,里面主要是通过这个AQS实现,并且定义了一个Sync的类,实现了公平和非公平的两种方式去获取这个信号量。不过这个ReentrantLock 这个是一个独占模式,这个Semaphore是一个共享模式实现
public class Semaphore implements java.io.Serializable{ abstract static class Sync extends AbstractQueuedSynchronizer { ... } static final class NonfairSync extends Sync { ... } static final class FairSync extends Sync{ ... } }
默认这个信号量是一个非公平锁的实现
public Semaphore(int permits) { sync = new NonfairSync(permits); }
这个获取资源有两种实现,一个是无参构造,一个是有一个参数构造。
无参构造,默认数量为1
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
有参构造,这个值可以提供在外部传参
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
接下来主要讲解这个acquireSharedInterruptibly方法,
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //在获取信号的时候,不能发生中断,如果发生中断,那么就会直接抛出异常 if (Thread.interrupted()) throw new InterruptedException(); //共享模式 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
公平锁
static final class FairSync extends Sync { protected int tryAcquireShared(int acquires) { //自旋 for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; //通过这个cas的算法来获取这个信号量 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
如果这个信号的中剩余的数量小于这个要获取的数量,那么会走这个doAcquireSharedInterruptibly逻辑。就是在这个CLH的同步阻塞队列里面有具体的操作
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //共享结点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { //自旋 for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
后面的这个源码和这个ReentrantLock的源码差不多,可以去看另外一篇https://blog.csdn.net/zhenghuishengq/article/details/125648495?spm=1001.2014.3001.5501
3,总结
其底层和这个ReentrantLock的实现差不多,只不过这个CLH同步等待队列的使用和这个ReentrantLock的不太一样。首先这个同步状态器是一个共享的,不像ReentrantLock里面时独占的;其次这个用法就是比如说同步状态器里面的state的值为5,那么就是说有5个信号量,那么允许这个链表中的可以有五个结点可以获取这个信号量,这个同步状态器里面每次被获取一个值,这个state就会减1,直到为0,然后这个链表中的结点就会去唤醒下一个结点,只要这个state这个状态不为0,那么这个结点就一直会唤醒下一个结点。并且每个结点的这个 waitStatues 的值为-3,为一个传播状态。
三,CountDownLatch
CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。
1,基本使用
例如下面这个场景,会保证10个线程同时创建完再工作,即可以模拟一个并发的过程。
//线程操作工具 CountDownLatch countDownLatch = new CountDownLatch(1); for(int i=0;i<10;i++){ new Thread(()->{ try { //线程全部创建成功则开始全部往下走 //因为for循环需要一点的时间,因此通过这个等待,保证线程全部创建完 countDownLatch.await(); for(int j=0;j<1000;j++){ try { lock.lock(); total++; } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } //等上面s所有的线程全部创建完 Thread.sleep(1000); //让线程开启 countDownLatch.countDown(); Thread.sleep(2000); System.out.println(total);
2,源码分析
其底层也是用了这个AQS的方式实现,构造方法需要一个整型的参数
public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer {...} //构造方法 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
这个state就是这个同步状态器里面的这个state,表示外面一次性可以访问的这个,与此同时这里面没有公平锁和非公平锁的概念,并且里面的链表长度是一个固定长度。
Sync(int count) { setState(count); } protected final void setState(int newState) { state = newState; }
其**await()**等待方法如下,;
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //向队列中增加节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
其**countDown()**等待方法如下,每个线程在同步状态器中获取一个资源,那么这个同步状态器中的state就会减1,直到减为0
public void countDown() { sync.releaseShared(1); }
protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。
四,CyclicBarrier
栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。就是说所有的线程达到这个同一起跑线的时候,才会开始执行。
CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() { public void run() { System.out.println("所有特工到达屏障,准备开始执行秘密任务"); } }); for (int i = 0; i < 10; i++) { new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start(); } cyclicBarrier.await(); System.out.println("全部到达屏障....1"); Thread.sleep(5000); for (int i = 0; i < 10; i++) { new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start(); } cyclicBarrier.await(); System.out.println("全部到达屏障....1");
底层原理和这个CountDownLatch都差不多,这个CountDownLatch使用的是减法,这个CyclicBarrier使用的是一个加法,并且这个CountDownLatch只能使用一次,而CyclicBarrier可以使用多次。