- 信号量的定义,意义
- 在没有juc semaphore之前怎么实现
- 信号量使用
- 血管神经元实现
信号量
实现进程同步与破题的机制:包括一个称为操作信号量的变量及对它进行的两个原语操作 (PV)
什么是信号量?
信号(semaphore)的数据结构为使用一个值和一个指针量,指向等待该信号量的下一个过程信号量。的值与相应资源的情况有关。
PV由P操作原语和V操作原语组成(原语是不可中断的过程)
(注,P是荷兰语的Passeren,相当于英文的pass,V是荷兰语的Verhoog,相当于中文中的incremnet)
对信号量进行操作,具体定义如下:
- P(S):
- ①将信号量S的值减1,即S=S-1;
- ②如果该进程进入S>=0,则该进程为等待状态,等待有空闲
- V(S):
- ①将信号量S的值加1,即S=S+1;
- ②如果S>0,则该进程继续执行;否则释放中第一个等待信号量的进程
PV操作的意义:我们用信号及PV操作来实现通信进程的同步和互斥。PV操作进程属于低级
使用PV操作实现进程互斥时应该注意的是:
用户程序中运行的P、V对,先实现P运行区,运行区操作,运行V,后对运行区进行操作,必须执行对区域的互操作性检查。 P、V操作有停泊区的头部,应有狭窄区域的代码应有相应的短处,不能执行死循环 互斥信号量的初值一般为1 //许可数量 private int permits = 1; public synchronized void P() { permits--; if(permits < 0 ){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } public synchronized void V(){ permits++; if(permits <=0){ notifyAll(); } }
JUC信号量
JUC提供了工具类之一就是Semaphore,提供了丰富的API,不再需要实现
// 创建具有给定的许可数和非公平的公平设置的 Semaphore。 Semaphore(int permits) // 创建具有给定的许可数和给定的公平设置的 Semaphore。 Semaphore(int permits, boolean fair) // 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。 void acquire() // 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。 void acquire(int permits) // 从此信号量中获取许可,在有可用的许可前将其阻塞。 void acquireUninterruptibly() // 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。 void acquireUninterruptibly(int permits) // 返回此信号量中当前可用的许可数。 int availablePermits() // 获取并返回立即可用的所有许可。 int drainPermits() // 返回一个 collection,包含可能等待获取的线程。 protected Collection<Thread> getQueuedThreads() // 返回正在等待获取的线程的估计数目。 int getQueueLength() // 查询是否有线程正在等待获取。 boolean hasQueuedThreads() // 如果此信号量的公平设置为 true,则返回 true。 boolean isFair() // 根据指定的缩减量减小可用许可的数目。 protected void reducePermits(int reduction) // 释放一个许可,将其返回给信号量。 void release() // 释放给定数目的许可,将其返回到信号量。 void release(int permits) // 返回标识此信号量的字符串,以及信号量的状态。 String toString() // 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。 boolean tryAcquire() // 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。 boolean tryAcquire(int permits) // 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。 boolean tryAcquire(int permits, long timeout, TimeUnit unit) // 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。 boolean tryAcquire(long timeout, TimeUnit unit)
J 的 Semaphore 这篇文章无法解释,另开新篇;但对 UC 而言,对于 Semaphore 倒是可以研究下
非凡的信号量
Redission在里面的RSemaphore
RSemaphore semaphore = redisson.getSemaphore("semaphore"); semaphore.acquire(); //或 semaphore.acquireAsync(); semaphore.acquire(23); semaphore.tryAcquire();
可过渡信号量
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore"); String permitId = semaphore.acquire(); // 获取一个信号,有效期只有2秒钟。 String permitId = semaphore.acquire(2, TimeUnit.SECONDS); // ... semaphore.release(permitId);
直接上操作脚本最基本的源码片段,lua 很简单,对信号量进行计数,获取时间,信号量减1,释放时间,信号量加1;主要是保证的原子性
@Override public RFuture<Boolean> tryAcquireAsync(int permits) { if (permits < 0) { throw new IllegalArgumentException("Permits amount can't be negative"); } if (permits == 0) { return RedissonPromise.newSucceededFuture(true); } return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('get', KEYS[1]); " + "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " + "local val = redis.call('decrby', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.<Object>singletonList(getName()), permits); } @Override public RFuture<Void> releaseAsync(int permits) { if (permits < 0) { throw new IllegalArgumentException("Permits amount can't be negative"); } if (permits == 0) { return RedissonPromise.newSucceededFuture(null); } return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "local value = redis.call('incrby', KEYS[1], ARGV[1]); " + "redis.call('publish', KEYS[2], value); ", Arrays.<Object>asList(getName(), getChannelName()), permits); }
在最根本的基础上,再生产看看还有什么东西,才能真正达到一个工业标准
尝试获取()
非阻尼式,有信息量就正常获取,刚快速没有,没有返回,没有做额外的事情
获得()
@Override public void acquire(int permits) throws InterruptedException { if (tryAcquire(permits)) { return; } RFuture<RedissonLockEntry> future = subscribe(); commandExecutor.syncSubscription(future); try { while (true) { if (tryAcquire(permits)) { return; } getEntry().getLatch().acquire(permits); } } finally { unsubscribe(future); } }
阻尼式,相对非阻尼式就会有一些事情
- 1.先tryAcquire,看能否获取到信号量
- 2.订阅频道事件
- 3.无限循环
- 3.1. 先tryAcquire(),试一下
- 3.2.getEntry().Latch(),通过jucSemaphore,acquire()就是阻尼
- 4.取消订阅
事件内部细节,另外开篇订阅了,他的目的其实就是释放信号量
想像一下,一个客户的获取两个线程的信号量,A 需要同样的量,B 将被 Semaphore 驳回,如果此时停止,B 呢?
就在线程A进行release()之后,会发布,细节可以查看release()中的lua脚本,当B监听到事件的时候,调用Semaphore.release(),再次进行tryAcquire()
tryAcquire(int 允许,长等待时间,TimeUnit 单位)
如果在给定的这个订单的情况下,当前有希望获得所有的接收信号,则需要提供给定的许可使用量
@Override public boolean tryAcquire(int permits, long waitTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); if (tryAcquire(permits)) { return true; } time -= (System.currentTimeMillis() - current); if (time <= 0) { return false; } current = System.currentTimeMillis(); RFuture<RedissonLockEntry> future = subscribe(); if (!await(future, time, TimeUnit.MILLISECONDS)) { return false; } try { time -= (System.currentTimeMillis() - current); if (time <= 0) { return false; } while (true) { current = System.currentTimeMillis(); if (tryAcquire(permits)) { return true; } time -= (System.currentTimeMillis() - current); if (time <= 0) { return false; } // waiting for message current = System.currentTimeMillis(); getEntry().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS); time -= (System.currentTimeMillis() - current); if (time <= 0) { return false; } } } finally { unsubscribe(future); } // return get(tryAcquireAsync(permits, waitTime, unit)); }
这是await(future, time, TimeUnit.MILLISECONDS)是使用的CountDownLatch
如果到达终点,则返回 true;如果在到达终点之前超过等待时间,则返回 false
当前是第一个请求,或者直接进入释放,然后再往下循环
CountDownLatch.await()+Semaphore.tryAcquire()配合使用
每等待一段时间后,都需要检查是否超过等待时间
都需要引入CountDownLatch.await()呢?都使用Semaphore.tryAcquire()可以吗?
总结
重要信号量,原理很明了,还是通过lua保障redis的自动性
阅读redisson源码,发现里面的操作基本都是绕圈化,又是基于网络,大量使用了future模式,不知道future,会很,debug模式会晕掉,所以在开始redisson之前,如果需要再对未来模式温习一下
喜欢这个内容的人还喜欢
DDD之Repository对象生命周期管理
...