同步工具类
同步工具类可以是任何一个对象,只要它根据自身状态来协调线程的控制流。阻塞队列可以作为同步控制类,其他类型的同步工具类还包括 信号量(Semaphore)
、栅栏(Barrier)
和 闭锁(Latch)
。下面我们就来一起认识一下这些工具类
Semaphore
Semaphore 翻译过来就是 信号量
,信号量是什么?它其实就是一种信号,在操作系统中,也有信号量的这个概念,在进程间通信的时候,我们就会谈到信号量进行通信。还有在 Linux 操作系统采取中断时,也会向进程发出中断信号,根据进程的种类和信号的类型判断是否应该结束进程。
在 Java 中,Semaphore(信号量)
是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
Semaphore 管理着一组许可(permit)
,许可的初始数量由构造函数来指定。在获取某个资源之前,应该先从信号量获取许可(permit)
,以确保资源是否可用。当线程完成对资源的操作后,会把它放在池中并向信号量返回一个许可,从而允许其他线程访问资源,这叫做释放许可。如果没有许可的话,那么 acquire
将会阻塞直到有许可(中断或者操作超时)为止。release
方法将返回一个许可信号量。
Semaphore 可以用来实现流量控制,例如常用的数据库连接池,线程请求资源时,如果数据库连接池为空则阻塞线程,直接返回失败,如果连接池不为空时解除阻塞。
CountDownLatch
闭锁(Latch)
是一种同步工具类,它可以延迟线程的进度以直到其到达终止状态。闭锁的作用相当于是一扇门,在闭锁达到结束状态前,门是一直关着的,没有任何线程能够通过。当闭锁到达结束状态后,这扇门会打开并且允许任何线程通过,然后就一直保持打开状态。
CountDownLatch
就是闭锁的一种实现。它可以使一个或者多个线程等待一组事件的发生。闭锁有一个计数器,闭锁需要对计数器进行初始化,表示需要等待的次数,闭锁在调用 await
处进行等待,其他线程在调用 countDown 把闭锁 count 次数进行递减,直到递减为 0 ,唤醒 await。如下代码所示
public class TCountDownLatch { public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(5); Increment increment = new Increment(latch); Decrement decrement = new Decrement(latch); new Thread(increment).start(); new Thread(decrement).start(); try { Thread.sleep(6000); } catch (InterruptedException e) { e.printStackTrace(); } } } class Decrement implements Runnable { CountDownLatch countDownLatch; public Decrement(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } @Override public void run() { try { for(long i = countDownLatch.getCount();i > 0;i--){ Thread.sleep(1000); System.out.println("countdown"); this.countDownLatch.countDown(); } } catch (InterruptedException e) { e.printStackTrace(); } } } class Increment implements Runnable { CountDownLatch countDownLatch; public Increment(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } @Override public void run() { try { System.out.println("await"); countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Waiter Released"); } }
Future
我们常见的创建多线程的方式有两种,一种是继承 Thread 类,一种是实现 Runnable 接口。这两种方式都没有返回值。相对的,创建多线程还有其他三种方式,那就是使用 Callable
接口、 Future
接口和 FutureTask
类。Callable 我们之前聊过,这里就不再描述了,我们主要来描述一下 Future 和 FutureTask 接口。
Future 就是对具体的 Runnable 或者 Callable 任务的执行结果进行一系列的操作,必要时可通过 get
方法获取执行结果,这个方法会阻塞直到执行结束。Future 中的主要方法有
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
cancel(boolean mayInterruptIfRunning)
: 尝试取消任务的执行。如果任务已经完成、已经被取消或者由于某些原因而无法取消,那么这个尝试会失败。如果取消成功,或者在调用 cancel 时此任务尚未开始,那么此任务永远不会执行。如果任务已经开始,那么 mayInterruptIfRunning 参数会确定是否中断执行任务以便于尝试停止该任务。这个方法返回后,会对isDone
的后续调用也返回 true,如果 cancel 返回 true,那么后续的调用isCancelled
也会返回 true。boolean isCancelled()
:如果此任务在正常完成之前被取消,则返回 true。boolean isDone()
:如果任务完成,返回 true。V get() throws InterruptedException, ExecutionException
:等待必要的计算完成,然后检索其结果V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
:必要时最多等待给定时间以完成计算,然后检索其结果。
因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。
FutureTask
FutureTask 实现了 RunnableFuture
接口,RunnableFuture 接口是什么呢?
RunnableFuture 接口又继承了 Runnable
接口和 Future
接口。纳尼?在 Java 中不是只允许单继承么,是的,单继承更多的是说的类与类之间的继承关系,子类继承父类,扩展父类的接口,这个过程是单向的,就是为了解决多继承引起的过渡引用问题。而接口之间的继承是接口的扩展,在 Java 编程思想中也印证了这一点
对 RunnableFuture 接口的解释是:成功执行的 run 方法会使 Future 接口的完成并允许访问其结果。所以它既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。
FutureTask 也可以用作闭锁,它可以处于以下三种状态
- 等待运行
- 正在运行
- 运行完成
FutureTask 在 Executor
框架中表示异步任务,此外还可以表示一些时间较长的计算,这些计算可以在使用计算结果之前启动。
FutureTask 具体的源码我后面会单独出文章进行描述。
Barrier
我们上面聊到了通过闭锁来启动一组相关的操作,使用闭锁来等待一组事件的执行。闭锁是一种一次性对象,一旦进入终止状态后,就不能被 重置
。
Barrier
的特点和闭锁也很类似,它也是阻塞一组线程直到某个事件发生。栅栏与闭锁的区别在于,所有线程必须同时到达栅栏的位置,才能继续执行,就像我们上面操作系统给出的这幅图一样。
ABCD 四条线程,必须同时到达 Barrier,然后 手牵手
一起走过幸福的殿堂。
当线程到达 Barrier 的位置时会调用 await
方法,这个方法会阻塞直到所有线程都到达 Barrier 的位置,如果所有线程都到达 Barrier 的位置,那么 Barrier 将会打开使所有线程都被释放,而 Barrier 将被重置以等待下次使用。如果调用 await 方法导致超时,或者 await 阻塞的线程被中断,那么 Barrier 就被认为被打破,所有阻塞的 await 都会抛出 BrokenBarrierException
。如果成功通过栅栏后,await 方法返回一个唯一索引号,可以利用这些索引号选举一个新的 leader,来处理一下其他工作。
public class TCyclicBarrier { public static void main(String[] args) { Runnable runnable = () -> System.out.println("Barrier 1 开始..."); Runnable runnable2 = () -> System.out.println("Barrier 2 开始..."); CyclicBarrier barrier1 = new CyclicBarrier(2,runnable); CyclicBarrier barrier2 = new CyclicBarrier(2,runnable2); CyclicBarrierRunnable b1 = new CyclicBarrierRunnable(barrier1,barrier2); CyclicBarrierRunnable b2 = new CyclicBarrierRunnable(barrier1,barrier2); new Thread(b1).start(); new Thread(b2).start(); } } class CyclicBarrierRunnable implements Runnable { CyclicBarrier barrier1; CyclicBarrier barrier2; public CyclicBarrierRunnable(CyclicBarrier barrier1,CyclicBarrier barrier2){ this.barrier1 = barrier1; this.barrier2 = barrier2; } @Override public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "等待 barrier1" ); barrier1.await(); Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "等待 barrier2" ); barrier2.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 做完了!"); } }
Exchanger
与 Barrier 相关联的还有一个工具类就是 Exchanger
, Exchanger 是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。
它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange 方法交换数据, 如果第一个线程先执行 exchange方法,它会一直等待第二个线程也执行 exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。因此使用Exchanger 的重点是成对的线程使用 exchange() 方法,当有一对线程达到了同步点,就会进行交换数据。因此该工具类的线程对象是成对的。
下面通过一段例子代码来讲解一下
public class TExchanger { public static void main(String[] args) { Exchanger exchanger = new Exchanger(); ExchangerRunnable exchangerRunnable = new ExchangerRunnable(exchanger,"A"); ExchangerRunnable exchangerRunnable2 = new ExchangerRunnable(exchanger,"B"); new Thread(exchangerRunnable).start(); new Thread(exchangerRunnable2).start(); } } class ExchangerRunnable implements Runnable { Exchanger exchanger; Object object; public ExchangerRunnable(Exchanger exchanger,Object object){ this.exchanger = exchanger; this.object = object; } @Override public void run() { Object previous = object; try { object = this.exchanger.exchange(object); System.out.println( Thread.currentThread().getName() + "改变前是" + previous + "改变后是" + object); } catch (InterruptedException e) { e.printStackTrace(); } } }
总结
本篇文章我们从同步容器类入手,主要讲了 fail-fast
和 fail-safe
机制,这两个机制在并发编程中非常重要。然后我们从操作系统的角度,聊了聊操作系统层面实现安全性的几种方式,然后从操作系统 -> 并发我们聊了聊 Java 中的并发工具包有哪些,以及构建并发的几种工具类。
如果这篇文章帮助到你,小伙伴们不妨点赞、再看、转发,三连是让我继续更文的最大动力!