前面我已经讲解过了CountDownLatch和CyclicBarrier。本篇我们来讲解下Semaphore。
Semaphore是指信号量,在计算机的世界里信号量可以使用在数据竞争的场景中。在生活中交通信号灯可以比作现实世界中的Semaphore。Semaphore的作用就是允许或者禁止。比如说红灯禁止通行,绿灯允许通行。计算机世界里的Semaphore会持有多张许可证,举个例子有10张许可证,假设有20个线程同时请求信用量的许可证,那么只能是其中十个线程能够拿到许可证,执行代码。另外10个线程只能等拿到许可证的线程释放许可证。举个生活中的例子。比如在银行办理业务,假设大厅有3个窗口。办理业务的群众有10个。那么每次只能有3个群众能获得叫号的资格(对应计算机拿到许可证),如果有一个群众办理完业务了,那么窗口才能空余出来(对应计算机线程释放许可证),等待的群众才能获得叫号的资格。
我们用代码来模拟下该场景。假设只有3个窗口,有10个群众来办理业务,每个业务办理1~5秒钟不等
public class Bank { int[] windows = new int[3];//3个办事窗口 final boolean[] busy = new boolean[3];//办事窗口是否busy String[] persons = new String[10]; Semaphore semaphore = new Semaphore(3);//最多只有3张许可证 { for (int i = 0; i < 10; i++) { persons[i] = "person " + i; } } ExecutorService pool = Executors.newCachedThreadPool(); public void deal() { for (final String person : persons) { pool.execute(new Runnable() { public void run() { try { semaphore.acquire(); int index = 0; synchronized (busy) { for (int i = 0; i < 3; i++) { if (!busy[i]) {//如果该窗口空闲 index = i; busy[i] = true; System.out.println("请 " + person + " 到 " + index + " 号窗口办理业务"); break; } } } Random random = new Random(); int second = 1 + random.nextInt(5); TimeUnit.SECONDS.sleep(second); System.out.println(person + "在"+index+" 窗口办理完成 费时" + second + "秒"); synchronized (busy) { busy[index] = false; } } catch ( InterruptedException e ) { e.printStackTrace(); } finally { semaphore.release(); } } }); } } public static void main(String[] args) { Bank bank = new Bank(); bank.deal(); } }
输出结果如下
请 person 0 到 0 号窗口办理业务 请 person 1 到 1 号窗口办理业务 请 person 2 到 2 号窗口办理业务 person 1在1 窗口办理完成 费时1秒 请 person 3 到 1 号窗口办理业务 person 0在0 窗口办理完成 费时3秒 请 person 4 到 0 号窗口办理业务 person 2在2 窗口办理完成 费时3秒 请 person 5 到 2 号窗口办理业务 person 3在1 窗口办理完成 费时2秒 请 person 6 到 1 号窗口办理业务 person 4在0 窗口办理完成 费时3秒 请 person 7 到 0 号窗口办理业务 person 5在2 窗口办理完成 费时4秒 请 person 8 到 2 号窗口办理业务 person 6在1 窗口办理完成 费时5秒 请 person 9 到 1 号窗口办理业务 person 8在2 窗口办理完成 费时2秒 person 7在0 窗口办理完成 费时4秒 person 9在1 窗口办理完成 费时3秒
从打印结果我们可以看出,每次最多允许3个人办理业务,每当窗口空闲出来就会叫号。
接下来我们看看Semaphore源码
public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L; /** All mechanics via AbstractQueuedSynchronizer subclass */ private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { } }
Semaphore持有一个Sync对象,该对象是一个AQS。之前的并发系列我们讲过AQS。它是同步的核心。而Lock类也是持有Sync对象。我们可以把Semaphore看成是一个Lock。区别是Semaphore没有lock和unlock方法,取而代之的是acquire 和release方法
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
看到share字样,首先这把锁是共享锁。每次获取到了锁,许可证数量-1,如果许可证数量小于0,则获取锁失败,该线程等待
public void release() { sync.releaseShared(1); } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
release方法许可证+1,同时通知等待线程,尝试获取许可证
总结下CountDownLatch CyclicBarrier Semaphore的区别
CountDownLatch 适合做汇总相关的工作,比如开启多个线程爬取数据,在所有线程完成任务后,将数据做汇总。
CyclicBarrier适合于竞赛相关的场景,让多个线程在同一时间点执行任务。
Semaphore适用于资源有限的情况下的流量控制。