防止多线程同时操作一个资源,不能不学的JUC工具类: Semaphore详解

简介: 在工作中我们经常需要考虑对资源的使用,避免资源被过度使用或者资源没有被利用到而造成的问题,那我们该如何去限制访问某些资源的线程数目,从而对完成资源的保护。

前言

大家好,我是小郭,在工作中我们经常需要考虑对资源的使用,避免资源被过度使用或者资源没有被利用到而造成的问题,那我们该如何去限制访问某些资源的线程数目,从而对完成资源的保护。

1. 限制多线程同时操作的方式

concurrent包为我们提供了多种防止多线程同时操作一个资源的方法

  1. volatile
  2. 原子类
  3. Synchronized和Lock
  4. Semaphore

2. Semaphore是什么?

Semaphore被翻译为计数信号量,通常使用进行并发线程数量的限制,保证多个线程能够合理的使用资源。用大白话理解就是理解为红路灯。

官方的翻译:计数信号量。

从概念上讲,信号量维护一组许可证。 如有必要,每个acquire块都将阻塞直到获得许可为止,然后再获取它。 每个release添加一个许可证,从而有可能释放阻塞的获取者。 但是,没有使用实际的许可对象。 Semaphore只是保持可用数量的计数并采取相应措施。

3. 应用场景

公共资源有限的地方,我们就需要考虑限制的问题,防止过度的操作,带来的不良影响

例如:数据库连接

4. 如何使用Semaphore

没有进行控制的代码

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    IntStream.range(0,5).forEach(i -> executorService.submit(() ->{
        try {
            System.out.println(Thread.currentThread().getName() + "gogogo");
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + "正在操作");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "释放操作");
    }));
    executorService.shutdown();
}

输出结果

pool-1-thread-1gogogo
pool-1-thread-3gogogo
pool-1-thread-4gogogo
pool-1-thread-2gogogo
pool-1-thread-5gogogo
pool-1-thread-1正在操作
pool-1-thread-5正在操作
pool-1-thread-5释放操作
pool-1-thread-2正在操作
pool-1-thread-2释放操作
pool-1-thread-3正在操作
pool-1-thread-3释放操作
pool-1-thread-4正在操作
pool-1-thread-4释放操作
pool-1-thread-1释放操作

进行改造使用我们的并发工具Semaphore

private static Semaphore semaphore = new Semaphore(1,false);
public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    IntStream.range(0,5).forEach(i -> executorService.submit(() ->{
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + "gogogo");
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + "正在操作");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        semaphore.release();
        System.out.println(Thread.currentThread().getName() + "释放操作");
    }));
    executorService.shutdown();
}

输出结果

pool-1-thread-1gogogo
pool-1-thread-1正在操作
pool-1-thread-1释放操作
pool-1-thread-2gogogo
pool-1-thread-2正在操作
pool-1-thread-2释放操作
pool-1-thread-3gogogo
pool-1-thread-3正在操作
pool-1-thread-3释放操作
pool-1-thread-4gogogo
pool-1-thread-4正在操作
pool-1-thread-4释放操作
pool-1-thread-5gogogo
pool-1-thread-5正在操作
pool-1-thread-5释放操作

我们在Semaphore的初始化参数中,设置了允许并发线程数量为1,表示只允许1个线程通过,当线程拿到许可证的时候进行执行,线程完成之后进行许可证的归还,给下一个进来的线程使用,直到任务结束。

5. Semaphore的主要方法和核心参数

核心参数

参数 含义
permits 允许并发线程数量
fair 是否公平锁

构造方法

代码如下:

默认是非公平锁,只要传入并发线程数量

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

核心方法

//从此信号量获取许可,先休眠,直到获得可用线程或者被中断
void acquire() throws InterruptedException
//中断继续
void acquireUninterruptibly()
//从此信号量获取设定线程数许可,先休眠,直到获得可用线程或者被中断
void acquire(int permits)
//尝试获取许可,如果能够获取成功则立即返回true,否则,则返回false
boolean tryAcquire()
//和上面一样,设置了等待最长时间
boolean tryAcquire(long timeout, TimeUnit unit)
//释放许可
void release()
//返回当前可用的许可证数
int availablePermits()
//等待许可证数
int getQueueLength()
//返回正在等待线程的合集
Collection<Thread> getQueuedThreads()

6. Semaphore的原理

实现的核心还是AQS的共享模式

Sync extends AbstractQueuedSynchronizer

acquire()

//以共享模式获取,如果中断则中止.
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
            //1. 通过首先检查中断状态,中断返回异常
    if (Thread.interrupted())
        throw new InterruptedException();
        // 2. 以共享模式获取,获取到了锁,接下去,执行,没有就排队
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
//非公平模式共享锁获取
protected int tryAcquireShared(int acquires) {
    for (;;) {
        //判断当前节点在同步队列中是否有前驱节点的判断,获取不到返回-1
        if (hasQueuedPredecessors())
            return -1;
        //Semaphore用AQS的state变量的值代表可用许可数    
        int available = getState();
        int remaining = available - acquires;
        //如果剩余许可数小于0或者CAS将当前可用许可数设置为剩余许可数成功,则返回成功许可数
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
        //加入等待队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        //自旋过程中的退出条件是是当前节点的前驱节点是头结点并且tryAcquireShared(arg)
        //返回值大于等于0即能成功获得同步状态
        for (;;) {
            //获取前驱节点
            final Node p = node.predecessor();
            if (p == head) {
                //争夺锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //先把 head 给占了,然后唤醒队列中其他的线程
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        //失败的话,取消状态清除该节点
        if (failed)
            cancelAcquire(node);
    }
}
//设置head的值,完成初始化工作
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

7. Semaphore需要注意的问题

  1. release()不能随便调用,调用一次就增加一次

permits the initial number of permits available. This value may be negative, in which case releases must occur before any acquires will be granted.

public class SemaphoreTest {
    private static Semaphore semaphore = new Semaphore(1,false);
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        IntStream.range(0,5).forEach(i -> executorService.submit(() ->{
            try {
                System.out.println("after:" + semaphore.availablePermits());
                //semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + "gogogo");
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + "正在操作");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
                System.out.println("before:" + semaphore.availablePermits());
                System.out.println(Thread.currentThread().getName() + "释放操作");
}
            semaphore.release();
            System.out.println("before:" + semaphore.availablePermits());
            System.out.println(Thread.currentThread().getName() + "释放操作");
        }));
        executorService.shutdown();
    }
}
after:1
pool-1-thread-1gogogo
pool-1-thread-1正在操作
before:2
pool-1-thread-1释放操作
after:2
pool-1-thread-1gogogo
pool-1-thread-1正在操作
before:3
pool-1-thread-1释放操作
after:3
pool-1-thread-1gogogo
pool-1-thread-1正在操作
before:4
pool-1-thread-1释放操作
  1. 信号量泄露,指的是申请了但是没有释放,这会导致进入临界区的线程数量就会越来越少,随着时间的推移,最后许可证数量不够用,会导致线程卡死。

建议:在操作的时候,我们尽可能在finally中进行 semaphore.release() 的操作。

总结

以上就是关于信号量的全部内容,总体看来,用法比较简单,在实际的工作中需要对线程进行控制的场景,我们可以将他作为一个方案。

Semaphere的使用就总结就到这里!如有问题,欢迎讨论,我们一起进步!

相关文章
|
20天前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)
|
1月前
|
监控 Java 调度
【Java学习】多线程&JUC万字超详解
本文详细介绍了多线程的概念和三种实现方式,还有一些常见的成员方法,CPU的调动方式,多线程的生命周期,还有线程安全问题,锁和死锁的概念,以及等待唤醒机制,阻塞队列,多线程的六种状态,线程池等
105 6
【Java学习】多线程&JUC万字超详解
|
2月前
|
算法 Java
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
该博客文章综合介绍了Java并发编程的基础知识,包括线程与进程的区别、并发与并行的概念、线程的生命周期状态、`sleep`与`wait`方法的差异、`Lock`接口及其实现类与`synchronized`关键字的对比,以及生产者和消费者问题的解决方案和使用`Condition`对象替代`synchronized`关键字的方法。
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
|
2月前
|
设计模式 Java 调度
JUC线程池: ScheduledThreadPoolExecutor详解
`ScheduledThreadPoolExecutor`是Java标准库提供的一个强大的定时任务调度工具,它让并发编程中的任务调度变得简单而可靠。这个类的设计兼顾了灵活性与功能性,使其成为实现复杂定时任务逻辑的理想选择。不过,使用时仍需留意任务的执行时间以及系统的实际响应能力,以避免潜在的调度问题影响应用程序的行为。
61 1
|
2月前
|
Java API 调度
JUC线程池: FutureTask详解
总而言之,FutureTask是Java并发编程中一个非常实用的类,它在异步任务执行及结果处理方面提供了优雅的解决方案。在实现细节方面可以搭配线程池的使用,以及与Callable接口的配合使用,来完成高效的并发任务执行和结果处理。
29 0
|
2月前
|
Java 程序员 容器
【多线程面试题二十四】、 说说你对JUC的了解
这篇文章介绍了Java并发包java.util.concurrent(简称JUC),它是JSR 166规范的实现,提供了并发编程所需的基础组件,包括原子更新类、锁与条件变量、线程池、阻塞队列、并发容器和同步器等多种工具。
|
2月前
使用通义灵码写了一个多线程工具类,通义灵码处于什么水平
当方法间无依赖需提升执行效率时,可采用并行执行。示例通过`MultiThreadTaskExecutor`类实现多线程并发,其中`executeParallelDynamicMethods`方法接收一系列`Callable`任务并返回所有任务的结果列表。测试显示,四个耗时方法并行执行仅需4秒,相较于串行执行的12秒显著提升效率。该代码展示了良好的多线程编程实践。
42 0
|
2月前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
64 1
|
6天前
|
数据采集 负载均衡 安全
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
本文提供了多个多线程编程问题的解决方案,包括设计有限阻塞队列、多线程网页爬虫、红绿灯路口等,每个问题都给出了至少一种实现方法,涵盖了互斥锁、条件变量、信号量等线程同步机制的使用。
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
|
13天前
|
Java Spring
spring多线程实现+合理设置最大线程数和核心线程数
本文介绍了手动设置线程池时的最大线程数和核心线程数配置方法,建议根据CPU核数及程序类型(CPU密集型或IO密集型)来合理设定。对于IO密集型,核心线程数设为CPU核数的两倍;CPU密集型则设为CPU核数加一。此外,还讨论了`maxPoolSize`、`keepAliveTime`、`allowCoreThreadTimeout`和`queueCapacity`等参数的设置策略,以确保线程池高效稳定运行。
74 10
spring多线程实现+合理设置最大线程数和核心线程数