非凡的信号量

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 信号量的定义,意义在没有juc semaphore之前怎么实现信号量使用血管神经元实现
  1. 信号量的定义,意义
  2. 在没有juc semaphore之前怎么实现
  3. 信号量使用
  4. 血管神经元实现

信号量

实现进程同步与破题的机制:包括一个称为操作信号量的变量及对它进行的两个原语操作 (PV)

什么是信号量?

信号(semaphore)的数据结构为使用一个值和一个指针量,指向等待该信号量的下一个过程信号量。的值与相应资源的情况有关。

PV由P操作原语和V操作原语组成(原语是不可中断的过程)

(注,P是荷兰语的Passeren,相当于英文的pass,V是荷兰语的Verhoog,相当于中文中的incremnet)

对信号量进行操作,具体定义如下:

  • P(S):
  • ①将信号量S的值减1,即S=S-1;
  • ②如果该进程进入S>=0,则该进程为等待状态,等待有空闲
  • V(S):
  • ①将信号量S的值加1,即S=S+1;
  • ②如果S>0,则该进程继续执行;否则释放中第一个等待信号量的进程

PV操作的意义:我们用信号及PV操作来实现通信进程的同步和互斥。PV操作进程属于低级

使用PV操作实现进程互斥时应该注意的是:

用户程序中运行的P、V对,先实现P运行区,运行区操作,运行V,后对运行区进行操作,必须执行对区域的互操作性检查。
P、V操作有停泊区的头部,应有狭窄区域的代码应有相应的短处,不能执行死循环
互斥信号量的初值一般为1
//许可数量
private int permits = 1;
public synchronized void P() {
    permits--;
    if(permits < 0 ){
        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public synchronized void V(){
    permits++;
    if(permits <=0){
        notifyAll();
    }
}

JUC信号量

JUC提供了工具类之一就是Semaphore,提供了丰富的API,不再需要实现

// 创建具有给定的许可数和非公平的公平设置的 Semaphore。
Semaphore(int permits)
// 创建具有给定的许可数和给定的公平设置的 Semaphore。
Semaphore(int permits, boolean fair)
// 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
void acquire()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
void acquire(int permits)
// 从此信号量中获取许可,在有可用的许可前将其阻塞。
void acquireUninterruptibly()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
void acquireUninterruptibly(int permits)
// 返回此信号量中当前可用的许可数。
int availablePermits()
// 获取并返回立即可用的所有许可。
int drainPermits()
// 返回一个 collection,包含可能等待获取的线程。
protected Collection<Thread> getQueuedThreads()
// 返回正在等待获取的线程的估计数目。
int getQueueLength()
// 查询是否有线程正在等待获取。
boolean hasQueuedThreads()
// 如果此信号量的公平设置为 true,则返回 true。
boolean isFair()
// 根据指定的缩减量减小可用许可的数目。
protected void reducePermits(int reduction)
// 释放一个许可,将其返回给信号量。
void release()
// 释放给定数目的许可,将其返回到信号量。
void release(int permits)
// 返回标识此信号量的字符串,以及信号量的状态。
String toString()
// 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
boolean tryAcquire()
// 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
boolean tryAcquire(int permits)
// 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
// 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。
boolean tryAcquire(long timeout, TimeUnit unit)

J 的 Semaphore 这篇文章无法解释,另开新篇;但对 UC 而言,对于 Semaphore 倒是可以研究下

非凡的信号量

Redission在里面的RSemaphore

RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();

可过渡信号量

RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);

直接上操作脚本最基本的源码片段,lua 很简单,对信号量进行计数,获取时间,信号量减1,释放时间,信号量加1;主要是保证的原子性

@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return RedissonPromise.newSucceededFuture(true);
    }
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
              "local value = redis.call('get', KEYS[1]); " +
              "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
                  "local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
                  "return 1; " +
              "end; " +
              "return 0;",
              Collections.<Object>singletonList(getName()), permits);
}
@Override
public RFuture<Void> releaseAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return RedissonPromise.newSucceededFuture(null);
    }
    return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
        "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
        "redis.call('publish', KEYS[2], value); ",
        Arrays.<Object>asList(getName(), getChannelName()), permits);
}

在最根本的基础上,再生产看看还有什么东西,才能真正达到一个工业标准

尝试获取()

非阻尼式,有信息量就正常获取,刚快速没有,没有返回,没有做额外的事情

获得()

@Override
public void acquire(int permits) throws InterruptedException {
    if (tryAcquire(permits)) {
        return;
    }
    RFuture<RedissonLockEntry> future = subscribe();
    commandExecutor.syncSubscription(future);
    try {
        while (true) {
            if (tryAcquire(permits)) {
                return;
            }
            getEntry().getLatch().acquire(permits);
        }
    } finally {
        unsubscribe(future);
    }
}

阻尼式,相对非阻尼式就会有一些事情

  • 1.先tryAcquire,看能否获取到信号量
  • 2.订阅频道事件
  • 3.无限循环
  • 3.1. 先tryAcquire(),试一下
  • 3.2.getEntry().Latch(),通过jucSemaphore,acquire()就是阻尼
  • 4.取消订阅

事件内部细节,另外开篇订阅了,他的目的其实就是释放信号量

想像一下,一个客户的获取两个线程的信号量,A 需要同样的量,B 将被 Semaphore 驳回,如果此时停止,B 呢?

就在线程A进行release()之后,会发布,细节可以查看release()中的lua脚本,当B监听到事件的时候,调用Semaphore.release(),再次进行tryAcquire()

tryAcquire(int 允许,长等待时间,TimeUnit 单位)

如果在给定的这个订单的情况下,当前有希望获得所有的接收信号,则需要提供给定的许可使用量

@Override
public boolean tryAcquire(int permits, long waitTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    if (tryAcquire(permits)) {
        return true;
    }
    time -= (System.currentTimeMillis() - current);
    if (time <= 0) {
        return false;
    }
    current = System.currentTimeMillis();
    RFuture<RedissonLockEntry> future = subscribe();
    if (!await(future, time, TimeUnit.MILLISECONDS)) {
        return false;
    }
    try {
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            return false;
        }
        while (true) {
            current = System.currentTimeMillis();
            if (tryAcquire(permits)) {
                return true;
            }
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                return false;
            }
            // waiting for message
            current = System.currentTimeMillis();
            getEntry().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS);
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                return false;
            }
        }
    } finally {
        unsubscribe(future);
    }
//        return get(tryAcquireAsync(permits, waitTime, unit));
}

这是await(future, time, TimeUnit.MILLISECONDS)是使用的CountDownLatch

如果到达终点,则返回 true;如果在到达终点之前超过等待时间,则返回 false

当前是第一个请求,或者直接进入释放,然后再往下循环

CountDownLatch.await()+Semaphore.tryAcquire()配合使用

每等待一段时间后,都需要检查是否超过等待时间

都需要引入CountDownLatch.await()呢?都使用Semaphore.tryAcquire()可以吗?

总结

重要信号量,原理很明了,还是通过lua保障redis的自动性

阅读redisson源码,发现里面的操作基本都是绕圈化,又是基于网络,大量使用了future模式,不知道future,会很,debug模式会晕掉,所以在开始redisson之前,如果需要再对未来模式温习一下


喜欢这个内容的人还喜欢

DDD之Repository对象生命周期管理

...


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
并行计算 安全 Java
计算机原理探险系列(十)信号量和管程的一些理解
计算机原理探险系列(十)信号量和管程的一些理解
271 0
蜕变成蝶~Linux设备驱动中的阻塞和非阻塞I/O
  今天意外收到一个消息,真是惊呆我了,博客轩给我发了信息,说是俺的博客文章有特色可以出本书,,这简直让我受宠若惊,俺只是个大三的技术宅,写的博客也是自己所学的一些见解和在网上看到我一些博文以及帖子里综合起来写的,,总之这又给了额外的动力,让自己继续前进,,希望和大家能够分享一些自己的经验,,在最需要奋斗的年级以及在技术的领域踽踽独行的过程中有共同的伙伴继续前进~   今天写的是Linux设备驱动中的阻塞和非阻塞I/0,何谓阻塞与非阻塞I/O?简单来说就是对I/O操作的两种不同的方式,驱动程序可以灵活的支持用户空间对设备的这两种访问方式。
|
6月前
|
安全 Java 调度
震撼揭秘!手撕并发编程迷雾,Semaphore与CountDownLatch携手AQS共享模式,让你秒懂并发神器背后的惊天秘密!
【8月更文挑战第4天】在Java并发编程中,AbstractQueuedSynchronizer (AQS) 是核心框架,支持独占锁与共享锁的实现。本文以Semaphore与CountDownLatch为例,深入解析AQS共享模式的工作原理。Semaphore通过AQS管理许可数量,控制资源的并发访问;而CountDownLatch则利用共享计数器实现线程间同步。两者均依赖AQS提供的tryAcquireShared和tryReleaseShared方法进行状态管理和线程调度,展示了AQS的强大功能和灵活性。
50 0
|
7月前
|
安全 Java 开发者
Java并发编程:解锁多线程同步之谜
【7月更文挑战第2天】在Java的世界中,多线程编程如同精密的钟表机械,每一个齿轮和弹簧都必须精确配合以保障时间的准确传递。本文将深入探讨Java并发编程的核心概念,包括synchronized关键字、ReentrantLock类以及并发集合的使用,旨在为读者提供一把解开多线程同步谜团的钥匙。
|
3月前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
179 7
|
程序员
多线程专题之线程死锁原因之谜
引子:线程死锁曾是多少程序员的噩梦,每每为此食不甘味,夜不成寐,一句话:苦不堪言。本文从几个场景入手,试图解开产生死锁的原因之谜。 教科书:说的很具体,理解很抽象   关于死锁产生的原因《操作系统》中有比较好的说明:   (1)因为系统资源不足。
664 0
|
8月前
|
安全 Java 开发者
Java并发编程的艺术:解锁多线程同步的奥秘
本文将深入探讨Java并发编程的核心概念,揭示多线程环境下同步机制的工作原理与实践技巧。我们将从基础的synchronized关键字讲起,逐步过渡到高级的Lock接口和并发工具类,最后通过实例分析来加深理解。文章不仅旨在为初学者提供一个清晰的并发编程入门指南,同时也希望能够帮助有一定经验的开发者巩固和提升他们的并发处理能力。
|
Java 存储
Java并发编程的艺术(六)——线程间的通信
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_34173549/article/details/79612409 多条线程之间有时需要数据交互,下面介绍五种线程间数据交互的方式,他们的使用场景各有不同。
1074 0

热门文章

最新文章