Semaphore
基本使用
[ˈsɛməˌfɔr] 信号量,用来限制能同时访问共享资源的线程上限。
public static void main(String[] args) { // 1. 创建 semaphore 对象 Semaphore semaphore = new Semaphore(3); // 2. 10个线程同时运行 for (int i = 0; i < 10; i++) { new Thread(() -> { // 3. 获取许可 try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("running..."); sleep(1); log.debug("end..."); } finally { // 4. 释放许可 semaphore.release(); } }).start(); } }
输出
07:35:15.485 c.TestSemaphore [Thread-2] - running... 07:35:15.485 c.TestSemaphore [Thread-1] - running... 07:35:15.485 c.TestSemaphore [Thread-0] - running... 07:35:16.490 c.TestSemaphore [Thread-2] - end... 07:35:16.490 c.TestSemaphore [Thread-0] - end... 07:35:16.490 c.TestSemaphore [Thread-1] - end... 07:35:16.490 c.TestSemaphore [Thread-3] - running... 07:35:16.490 c.TestSemaphore [Thread-5] - running... 07:35:16.490 c.TestSemaphore [Thread-4] - running... 07:35:17.490 c.TestSemaphore [Thread-5] - end... 07:35:17.490 c.TestSemaphore [Thread-4] - end... 07:35:17.490 c.TestSemaphore [Thread-3] - end... 07:35:17.490 c.TestSemaphore [Thread-6] - running... 07:35:17.490 c.TestSemaphore [Thread-7] - running... 07:35:17.490 c.TestSemaphore [Thread-9] - running... 07:35:18.491 c.TestSemaphore [Thread-6] - end... 07:35:18.491 c.TestSemaphore [Thread-7] - end... 07:35:18.491 c.TestSemaphore [Thread-9] - end... 07:35:18.491 c.TestSemaphore [Thread-8] - running... 07:35:19.492 c.TestSemaphore [Thread-8] - end...
限制对共享资源的使用
semaphore 实现
使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数
用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好
Semaphore 实现简单连接池相对于使用"享元模式"(使用 wait 和 notify)的实现在性能和可读性方面更好的原因主要有以下几点:
- 简洁的接口:Semaphore 提供了 acquire 和 release 方法来获取和释放资源,这样可以更直观地控制资源的访问。而"享元模式"下的实现需要手动管理线程的等待和唤醒,使用 wait 和 notify 的机制更为复杂,可读性较差。
- 并发控制:Semaphore 可以灵活地控制并发线程的数量,通过控制许可证的数量来限制同时访问资源的线程数量。而"享元模式"中的 wait 和 notify 机制需要手动管理线程的等待和唤醒,容易出现死锁和同步问题。
- 性能优化:Semaphore 可以通过设置初始许可证数量、公平性等参数来进行性能优化,而"享元模式"中的 wait 和 notify 更多地依赖于程序员手动编写的同步逻辑,容易出现性能瓶颈和难以调试的问题。
- 可维护性:Semaphore 提供了一个一致的、标准的接口,易于理解和维护。而"享元模式"下的实现需要程序员手动管理线程的等待和唤醒,代码复杂度高,可维护性差。
因此,Semaphore 实现简单连接池在性能和可读性上更优,它提供了更直观、简洁和安全的方式来管理并发访问资源。同时,Semaphore 对并发的控制更为灵活,使得整个连接池的管理更加高效和可靠。
class Pool { // 1. 连接池大小 private final int poolSize; // 2. 连接对象数组 private Connection[] connections; // 3. 连接状态数组 0 表示空闲, 1 表示繁忙 private AtomicIntegerArray states; private Semaphore semaphore; // 4. 构造方法初始化 public Pool(int poolSize) { this.poolSize = poolSize; // 让许可数与资源数一致 this.semaphore = new Semaphore(poolSize); this.connections = new Connection[poolSize]; this.states = new AtomicIntegerArray(new int[poolSize]); for (int i = 0; i < poolSize; i++) { connections[i] = new MockConnection("连接" + (i+1)); } } // 5. 借连接 public Connection borrow() {// t1, t2, t3 // 获取许可 try { semaphore.acquire(); // 没有许可的线程,在此等待 } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < poolSize; i++) { // 获取空闲连接 if(states.get(i) == 0) { if (states.compareAndSet(i, 0, 1)) { log.debug("borrow {}", connections[i]); return connections[i]; } } } // 不会执行到这里 return null; } // 6. 归还连接 public void free(Connection conn) { for (int i = 0; i < poolSize; i++) { if (connections[i] == conn) { states.set(i, 0); log.debug("free {}", conn); semaphore.release(); break; } } } }
Semaphore 原理
加锁解锁流程
Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一
刚开始,permits(state)为 3,这时 5 个线程来获取资源
public Semaphore(int permits) { sync = new NonfairSync(permits); } static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; // 核心在这里将state设置进来 Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列
park 阻塞
以Thread1为例:
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 然后又调用 tryAcquireShared if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) // 以我们当前的例子来说,其实是符合的,3-1 = 2,返回2 不小于0 所以加锁成功 // 同理 thread 1 2 3都是一样的 return remaining; } } // 如果竞争失败了呢? private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 和前面一样的流程,先设置头结点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 尝试再获取一次 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 然后走到这里来 park if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
这时 Thread-4 释放了 permits,状态如下
public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 释放了之后进入 doReleaseShared方法 doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // unpark唤醒 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } // 唤醒了以后 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 和前面一样的流程,先设置头结点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 尝试再获取一次 // 唤醒了之后,再次for循环就能到这里 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 然后走到这里来 park if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); // 唤醒了以后重新设置头结点即可 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
CountdownLatch
用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,await() 用来等待计数归零(等待归零,只有归零了才能继续运行),countDown() 用来让计数减一
public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); new Thread(() -> { log.debug("begin..."); sleep(1); latch.countDown(); log.debug("end...{}", latch.getCount()); }).start(); new Thread(() -> { log.debug("begin..."); sleep(2); latch.countDown(); log.debug("end...{}", latch.getCount()); }).start(); new Thread(() -> { log.debug("begin..."); sleep(1.5); latch.countDown(); log.debug("end...{}", latch.getCount()); }).start(); log.debug("waiting..."); latch.await(); log.debug("wait end..."); }
输出
18:44:00.778 c.TestCountDownLatch [main] - waiting... 18:44:00.778 c.TestCountDownLatch [Thread-2] - begin... 18:44:00.778 c.TestCountDownLatch [Thread-0] - begin... 18:44:00.778 c.TestCountDownLatch [Thread-1] - begin... 18:44:01.782 c.TestCountDownLatch [Thread-0] - end...2 18:44:02.283 c.TestCountDownLatch [Thread-2] - end...1 18:44:02.782 c.TestCountDownLatch [Thread-1] - end...0 18:44:02.782 c.TestCountDownLatch [main] - wait end...
源码
public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } // 这里面去判断获得锁的条件 等不等于0,等于0就相当于获得了这个锁 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 如果其他线程调用 realease protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) // 什么时候 成为0了,就唤醒被阻塞的线程 return nextc == 0; } } } ... }
可以配合线程池使用,改进如下
在这里说明一下,其实join也可以达到这样的效果,但是join是属于比较底层的api,而countDownlatch属于比较高级的api。
public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); ExecutorService service = Executors.newFixedThreadPool(4); service.submit(() -> { log.debug("begin..."); sleep(1); latch.countDown(); log.debug("end...{}", latch.getCount()); }); service.submit(() -> { log.debug("begin..."); sleep(1.5); latch.countDown(); log.debug("end...{}", latch.getCount()); }); service.submit(() -> { log.debug("begin..."); sleep(2); latch.countDown(); log.debug("end...{}", latch.getCount()); }); service.submit(()->{ try { log.debug("waiting..."); latch.await(); log.debug("wait end..."); } catch (InterruptedException e) { e.printStackTrace(); } }); }
输出
18:52:25.831 c.TestCountDownLatch [pool-1-thread-3] - begin... 18:52:25.831 c.TestCountDownLatch [pool-1-thread-1] - begin... 18:52:25.831 c.TestCountDownLatch [pool-1-thread-2] - begin... 18:52:25.831 c.TestCountDownLatch [pool-1-thread-4] - waiting... 18:52:26.835 c.TestCountDownLatch [pool-1-thread-1] - end...2 18:52:27.335 c.TestCountDownLatch [pool-1-thread-2] - end...1 18:52:27.835 c.TestCountDownLatch [pool-1-thread-3] - end...0 18:52:27.835 c.TestCountDownLatch [pool-1-thread-4] - wait end...
应用之同步等待多线程准备完毕
以经典的王者荣耀等待为例子,准备阶段,必须要十个人都准备好了,游戏才能开始,那么就这样来模拟
AtomicInteger num = new AtomicInteger(0); ExecutorService service = Executors.newFixedThreadPool(10, (r) -> { return new Thread(r, "t" + num.getAndIncrement()); }); CountDownLatch latch = new CountDownLatch(10); String[] all = new String[10]; Random r = new Random(); for (int j = 0; j < 10; j++) { int x = j; service.submit(() -> { for (int i = 0; i <= 100; i++) { try { // 因为加的是随机输出,所以会出现差异值 Thread.sleep(r.nextInt(100)); } catch (InterruptedException e) { } all[x] = Thread.currentThread().getName() + "(" + (i + "%") + ")"; System.out.print("\r" + Arrays.toString(all)); } latch.countDown(); }); } latch.await(); System.out.println("\n游戏开始..."); service.shutdown();
中间输出
[t0(52%), t1(47%), t2(51%), t3(40%), t4(49%), t5(44%), t6(49%), t7(52%), t8(46%), t9(46%)]
最后输出
[t0(100%), t1(100%), t2(100%), t3(100%), t4(100%), t5(100%), t6(100%), t7(100%), t8(100%), t9(100%)] 游戏开始...
应用之同步等待多个远程调用结束
@RestController public class TestCountDownlatchController { @GetMapping("/order/{id}") public Map<String, Object> order(@PathVariable int id) { HashMap<String, Object> map = new HashMap<>(); map.put("id", id); map.put("total", "2300.00"); sleep(2000); return map; } @GetMapping("/product/{id}") public Map<String, Object> product(@PathVariable int id) { HashMap<String, Object> map = new HashMap<>(); if (id == 1) { map.put("name", "小爱音箱"); map.put("price", 300); } else if (id == 2) { map.put("name", "小米手机"); map.put("price", 2000); } map.put("id", id); sleep(1000); return map; } @GetMapping("/logistics/{id}") public Map<String, Object> logistics(@PathVariable int id) { HashMap<String, Object> map = new HashMap<>(); map.put("id", id); map.put("name", "中通快递"); sleep(2500); return map; } private void sleep(int millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }
rest 远程调用 CountDownLatch
RestTemplate restTemplate = new RestTemplate(); log.debug("begin"); ExecutorService service = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(4); service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1); latch.CountDown(); log.debug(r); }); service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1); latch.CountDown(); log.debug(r); }); = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2); latch.CountDown(); log.debug(r); }); = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1); latch.CountDown(); log.debug(r); }); latch.await(); log.debug("执行完毕"); service.shutdown();
rest 远程调用 future 带返回值
RestTemplate restTemplate = new RestTemplate(); log.debug("begin"); ExecutorService service = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(4); Future<Map<String,Object>> f1 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1); return r; }); Future<Map<String, Object>> f2 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1); return r; }); Future<Map<String, Object>> f3 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2); return r; }); Future<Map<String, Object>> f4 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1); return r; }); System.out.println(f1.get()); System.out.println(f2.get()); System.out.println(f3.get()); System.out.println(f4.get()); log.debug("执行完毕"); service.shutdown();
执行结果会比同步快非常多
但是需要注意的是,没有返回结果的时候,用countdownlatch比较合适,如果有返回值结果的话,还有用future
CyclicBarrier
public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); ExecutorService service = Executors.newFixedThreadPool(5); service.submit(() -> { log.debug("task1 start..."); sleep(1); latch.countDown(); }); service.submit(() -> { log.debug("task2 start..."); sleep(2); latch.countDown(); }); try{ latch.await(); }catch(InterruptedException e){ e.printStackTrace(); } log.debug("task1 task2 finish ..."); service.shutdown(); }
但是目前的需求是 task1 task2 被反复的运行三遍
最简单的办法就是将上述的代码放置到for循环中去
public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(5); for(int i = 0 ; i < 3 ; i++){ CountDownLatch latch = new CountDownLatch(3); service.submit(() -> { log.debug("task1 start..."); sleep(1); latch.countDown(); }); service.submit(() -> { log.debug("task2 start..."); sleep(2); latch.countDown(); }); try{ latch.await(); }catch(InterruptedException e){ e.printStackTrace(); } } log.debug("task1 task2 finish ..."); service.shutdown(); }
这样依然能够实现功能,但是,其实CountDownLatch也被创建了三次。
那能不能重用呢?这个CountDownLatch只能在构造方法的时候,给一个初始值,以后就不能改了。
[ˈsaɪklɪk ˈbæriɚ] 循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行
CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行 ExecutorService service = Executors.newFixedThreadPool(2); service.submit(()->{ System.out.println("线程1开始.."+new Date()); try { cb.await(); // 当个数不足时,等待 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程1继续向下运行..."+new Date()); }); service.submit(()->{ System.out.println("线程2开始.."+new Date()); try { Thread.sleep(2000); } catch (InterruptedException e) { } try { cb.await(); // 2 秒后,线程个数够2,继续运行 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程2继续向下运行..."+new Date()); });
当await等于0的时候,此时就能够继续往下运行了。
CyclicBarrier cb = new CyclicBarrier(2,()->{ // 等到两个都执行完成后,就会执行这里面的方法 log.debug("finish"); }); // 个数为2时才会继续执行 ExecutorService service = Executors.newFixedThreadPool(2); service.submit(()->{ System.out.println("线程1开始.."+new Date()); try { cb.await(); // 当个数不足时,等待 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程1继续向下运行..."+new Date()); }); service.submit(()->{ System.out.println("线程2开始.."+new Date()); try { Thread.sleep(2000); } catch (InterruptedException e) { } try { cb.await(); // 2 秒后,线程个数够2,继续运行 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程2继续向下运行..."+new Date()); });
再次调用await就会恢复成2,在一个for循环中调用
CyclicBarrier cb = new CyclicBarrier(2,()->{ // 等到两个都执行完成后,就会执行这里面的方法 log.debug("finish"); }); // 个数为2时才会继续执行 ExecutorService service = Executors.newFixedThreadPool(2); for(int i = 0; i < 3 ; i++){ service.submit(()->{ System.out.println("线程1开始.."+new Date()); try { cb.await(); // 当个数不足时,等待 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程1继续向下运行..."+new Date()); }); service.submit(()->{ System.out.println("线程2开始.."+new Date()); try { Thread.sleep(2000); } catch (InterruptedException e) { } try { cb.await(); // 2 秒后,线程个数够2,继续运行 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程2继续向下运行..."+new Date()); }); }
这样就达到了CountDownLatch的效果啦
在这里面需要注意一下,就是 希望线程池的线程数 和 屏障数是一致的
如果不一致,比如说线程池是 三个核心线程,那么当继续走for循环的时候,由于cyclicbarrier的特性,此时就是两个task1 执行完成了,而task2比较慢。这样有可能会影响最终的效果。