Semaphore实现原理全面解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: Semaphore(信号量)是一个同步工具类,通过Semaphore可以控制同时访问共享资源的线程个数。

简介

Semaphore(信号量)是一个同步工具类,通过Semaphore可以控制同时访问共享资源的线程个数。

应用场景

Semaphore的主要应用场景:

  • 资源并发控制:Semaphore可以限制对资源的并发访问。如:管理数据库连接池或线程池中的资源。
  • 控制并发线程数:Semaphore可以控制同时执行的线程数量。如:控制同时访问某个接口的请求数量。
  • 实现互斥锁:Semaphore可以通过设置参数permits(信号量数量)的值为1来实现互斥锁的功能,保证同一时间只有一个线程可以访问临界区。
  • 控制任务流量:Semaphore可以限制任务的执行速率。如:控制某个任务在单位时间内的执行次数。

实现原理

在分析Semaphore的实现原理之前,先介绍一下信号量模型。

信号量模型

信号量模型是一个通用模型(即:与语言无关)。它主要由一个计数器、一个等待队列和三个方法组成,其中计数器和等待队列对外不可见,只能通过信号量模型提供的三个方法来访问它们。

信号量模型整体结构,如图所示:

图中:

  • init()方法:设置计数器的初始值。

  • down()方法:计数器的值-1,并根据当前计数器的值进行判断:

    • 如果当前计数器的值小于0,则阻塞当前线程(即:没有获得信号量)。
    • 如果当前计数器的值大于等于0,则执行当前线程(即:获得信号量)。
  • up()方法:计数器的值+1,并根据当前计数器的值进行判断:

    • 如果当前计数器的值小于等于0,说明释放信号量之前等待队列中存在处于阻塞状态的线程,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

注意:以上三个方法均是原子性操作。

整体流程

在Java中,信号量模型是通过Semaphore同步工具类实现的。整体流程如图所示:

处理流程:

  • 1)某个线程尝试获取信号量,将当前可用的信号量数量-本次需要的信号量数得到当前最新的信号量数:

    • 如果当前最新的信号量数大于等于0且通过CAS的方式更新当前最新的信号量数成功,则表示获取信号量成功,执行业务处理。
    • 如果当前最新的信号量数小于0,则表示获取信号量失败,将当前线程封装成Node节点追加到等待队列的末尾,进入阻塞,等待被唤醒。
  • 2)业务处理完成,释放信号量,将当前信号量数+本次释放的信号量数得到当前最新的信号量数。如果此时等待队列中存在阻塞的线程,则唤醒等待队列中阻塞的线程。

内部结构

Semaphore类结构定义,如图所示:

可以看到,Semaphore底层是基于AQS来实现。其中:

  • AbstractQueuedSynchronizer:抽象队列同步器(简称AQS)。
  • Sync:Semaphore的静态内部类,用于实现Semaphore的公共同步逻辑。
  • FairSync:Semaphore的静态内部类,继承了Sync,用于实现公平模式。
  • NonfairSync:Semaphore的静态内部类,继承了Sync,用于实现非公平模式。

源码如下:

public class Semaphore implements java.io.Serializable {
   
   
    // 同步变量,类型为Sync
    private final Sync sync;
    /**
     * Sync:Semaphore的静态内部类,它继承了AbstractQueuedSynchronizer,用于实现Semaphore的同步逻辑
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
   
   
        private static final long serialVersionUID = 1192457210091910933L;
        // 初始化信号量数量
        Sync(int permits) {
   
   
            setState(permits);
        }
        // 获取当前信号量数量
        final int getPermits() {
   
   
            return getState();
        }
        /**
         * 以非公平模式尝试获取信号量
         */
        final int nonfairTryAcquireShared(int acquires) {
   
   
            // 自旋
            for (;;) {
   
   
                // 获取当前可用的信号量数量
                int available = getState();
                // 获取剩余信号量数量(当前可用的信号量数量-本次需要的信号量数量)
                int remaining = available - acquires;
                // 如果剩余信号量数量小于0或者剩余信号量数量大于等于0且更新剩余信号量数量成功,则返回当前剩余信号量数量
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
        /**
         * 尝试释放信号量
         */
        protected final boolean tryReleaseShared(int releases) {
   
   
            // 自旋
            for (;;) {
   
   
                // 获取当前信号量数量
                int current = getState();
                // 当前信号量数量+本次释放的信号量数量
                int next = current + releases;
                // 当前信号量数量超过int类型的最大值(即:溢出)
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                // 当前信号量数量未超过int类型的最大值且更新信号量数量成功,则返回释放信号量成功
                if (compareAndSetState(current, next))
                    return true;
            }
        }
        // 扣减指定数量的信号量
        final void reducePermits(int reductions) {
   
   
            for (;;) {
   
   
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
        // 清空并返回当前信号量数量
        final int drainPermits() {
   
   
            for (;;) {
   
   
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }
    /**
     * NonfairSync:Semaphore的静态内部类,它继承了Sync,用于实现非公平模式
     */
    static final class NonfairSync extends Sync {
   
   
        private static final long serialVersionUID = -2694183684443567898L;
        NonfairSync(int permits) {
   
   
            super(permits);
        }
        // 以非公平模式尝试获取信号量
        protected int tryAcquireShared(int acquires) {
   
   
            return nonfairTryAcquireShared(acquires);
        }
    }
    /**
     * FairSync:Semaphore的静态内部类,它继承了Sync,用于实现公平模式
     */
    static final class FairSync extends Sync {
   
   
        private static final long serialVersionUID = 2014338818796000944L;
        FairSync(int permits) {
   
   
            super(permits);
        }
        // 以公平模式尝试获取信号量
        protected int tryAcquireShared(int acquires) {
   
   
            // 自旋
            for (;;) {
   
   
                /**
                 * 当前线程对应的节点不是头节点的下一个节点或者当前线程不是持有信号量的线程,则获取信号量失败
                 * 其中:
                 *   当前线程对应的节点不是头节点的下一个节点,则获取信号量失败,体现的是公平原则(即:先到先得)
                 *   当前线程不是当前持有信号量的线程,则获取信号量失败,体现的是可重入
                 */
                if (hasQueuedPredecessors())
                    return -1;
                // 获取当前可用的信号量数量
                int available = getState();
                // 获取剩余信号量数量(当前可用的信号量数量-本次需要的信号量数量)
                int remaining = available - acquires;
                // 如果剩余信号量数量小于0或者剩余信号量数量大于等于0且更新剩余信号量数量成功,则返回当前剩余信号量数量
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
}

构造函数

Semaphore的构造函数:

/**
 * 构造函数1
 * permits:信号量数量,该参数值可能为负数,permits为负数时,必须要释放信号量后其他线程才能获取信号量
 */
public Semaphore(int permits) {
   
   
    // 默认为非公平模式
    sync = new NonfairSync(permits);
}
/**
 * 构造函数2
 * permits:信号量数量
 * fair:true-公平模式,false-非公平模式
 */
public Semaphore(int permits, boolean fair) {
   
   
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

其中:

  • 公平模式:获取信号量时按照先后顺序进行分配,保证等待最久的线程能够优先获得信号量。
  • 非公平锁模式:获取信号量时不保证等待最久的线程能够优先获得信号量,而是根据系统调度算法选择合适的线程获取信号量。

核心方法

Semaphore的常用方法:

// 获取一个信号量
public void acquire() throws InterruptedException {
   
   ...}
// 获取指定数量的信号量
public void acquire(int permits) throws InterruptedException {
   
   ...}
// 获取一个信号量(忽略中断)
public void acquireUninterruptibly() {
   
   ...}
// 获取指定数量的信号量(忽略中断)
public void acquireUninterruptibly(int permits) {
   
   ...}
// 尝试获取一个信号量
public boolean tryAcquire() {
   
   ...}
// 尝试在指定时间内获取一个信号量
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
   
   ...}
// 尝试获取指定数量的信号量
public boolean tryAcquire(int permits) {
   
   ...}
// 尝试在指定时间内获取指定数量的信号量
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
   
   ...}
// 释放一个信号量
public void release() {
   
   ...}
// 释放指定数量的信号量
public void release(int permits) {
   
   ...}
// 判断等待队列中是否存在阻塞的线程
public final boolean hasQueuedThreads() {
   
   ...}
// 获取等待队列中阻塞的线程数量
public final int getQueueLength() {
   
   ...}
// 获取等待队列中阻塞的线程集合
protected Collection<Thread> getQueuedThreads() {
   
   ...}
// 获取可用的信号量数量
public int availablePermits() {
   
   ...}
// 清空并返回当前信号量数量
public int drainPermits() {
   
   ...}

其中,最核心的是acquire()方法和release()方法。

acquire方法

Semaphore通过调用Semaphore#acquire()方法获取一个信号量:

  • 如果(当前可用的信号量数量-1)大于等于0且通过CAS的方式更新当前最新的信号量数成功,则表示获取信号量成功,执行业务处理。
  • 如果(当前可用的信号量数量-1)小于0,则表示获取信号量失败,将当前线程封装成Node节点添加到等待队列中,进入阻塞,等待被唤醒,如果线程阻塞过程中被其他线程中断,则抛出InterruptedException异常。

Semaphore#acquire()方法源码解析:

// java.util.concurrent.Semaphore#acquire()
// 获取一个信号量
public void acquire() throws InterruptedException {
   
   
    // 以共享模式获取一个信号量
    sync.acquireSharedInterruptibly(1);
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
// 以共享模式获取一个信号量(AQS)
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
   
   
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试以共享模式获取一个信号量(由继承AbstractQueuedSynchronizer类的Semaphore.Sync类实现)
    if (tryAcquireShared(arg) < 0)
        // 如果获取信号量失败(即:当前可用的信号量数量-1<0),则将当前线程封装成Node节点添加到等待队列中
        doAcquireSharedInterruptibly(arg);
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
// 将当前线程封装成Node节点追加到等待队列的末尾,等待被唤醒(即:进入阻塞)
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
   
   
    // 将封装当前线程的Node节点追加到等待队列的末尾(保证追加成功)
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
   
   
        // 自旋
        for (;;) {
   
   
            // 获取Node节点的前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点是头节点,则再次尝试获取信号量
            if (p == head) {
   
   
                int r = tryAcquireShared(arg);
                if (r >= 0) {
   
   
                    // 设置当前节点为head节点,如果存在剩余资源,则唤醒下一个相邻的后继节点(即:向后传播)
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 如果前驱节点不是头节点或者获取信号量失败,则逆序遍历等待队列,找到可以唤醒自己的节点
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 将自己挂起(即:阻塞)
                parkAndCheckInterrupt())
                // 如果阻塞的线程被其他线程中断,则抛出InterruptedException异常
                throw new InterruptedException();
        }
    } finally {
   
   
        if (failed)
            cancelAcquire(node);
    }
}

release方法

Semaphore通过调用Semaphore#release()方法释放一个信号量:

  • 如果(当前可用的信号量数量+1)未超过int类型的最大值,则释放信号量成功。如果此时等待队列中存在阻塞的线程,则唤醒等待队列中的阻塞的线程。
  • 如果(当前可用的信号量数量+1)超过int类型的最大值,则释放信号量失败,抛出"Maximum permit count exceeded"异常信息。

Semaphore#release()方法源码解析:

// java.util.concurrent.Semaphore#release()
// 释放一个信号量
public void release() {
   
   
    // 以共享模式释放一个信号量
    sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
// 以共享模式获取一个信号量(AQS)
public final boolean releaseShared(int arg) {
   
   
    // 尝试释放一个信号量(由继承AbstractQueuedSynchronizer类的Semaphore.Sync类实现)
    if (tryReleaseShared(arg)) {
   
   
        // 释放信号量成功,唤醒后继节点
        doReleaseShared();
        return true;
    }
    return false;
}

doReleaseShared()方法源码解析请移步主页查阅->「一文搞懂」AQS(抽象队列同步器)实现原理及源码解析。

使用示例

假设某个停车场有5个停车位,有8辆汽车想要进入停车场停车。

Semaphore示例代码:

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Semaphore;

/**
 * @program: xxkfz-study
 * @ClassName MyTest.java
 * @author: xxkfz
 * @create: 2024-02-24 21:18
 * @description:
 **/
@Slf4j
public class SemaphoreExample {
   
   

    public static void main(String[] args) {
   
   
        // 创建信号量,初始化信号量数为5(即:5个停车位)
        Semaphore semaphore = new Semaphore(2, true);
        // 创建8个线程,模拟8辆汽车进入停车场
        for (int i = 0; i < 8; i++) {
   
   
            int n = i;
            new Thread(new Runnable() {
   
   
                @SneakyThrows
                @Override
                public void run() {
   
   
                    try {
   
   
                        // 获取信号量
                        semaphore.acquire();
                        long time = (long) (Math.random() * 10 + n);
                        log.info(Thread.currentThread().getName() + "进入停车场,停车时间:{}秒", time);
                        // 模拟停车时长
                        Thread.sleep(time * 1000);
                    } finally {
   
   
                        log.info(Thread.currentThread().getName() + "离开停车场");
                        // 释放信号量
                        semaphore.release();
                    }
                }
            }, "第" + n + "号汽车").start();
        }
    }

}

执行结果:

10:04:45.249 [第0号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第0号汽车进入停车场,停车时间:7秒
10:04:45.249 [第1号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第1号汽车进入停车场,停车时间:3秒
10:04:48.264 [第1号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第1号汽车离开停车场
10:04:48.264 [第2号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第2号汽车进入停车场,停车时间:7秒
10:04:52.258 [第0号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第0号汽车离开停车场
10:04:52.258 [第6号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第6号汽车进入停车场,停车时间:14秒
10:04:55.278 [第2号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第2号汽车离开停车场
10:04:55.278 [第7号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第7号汽车进入停车场,停车时间:9秒
10:05:04.286 [第7号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第7号汽车离开停车场
10:05:04.286 [第5号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第5号汽车进入停车场,停车时间:13秒
10:05:06.271 [第6号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第6号汽车离开停车场
10:05:06.271 [第4号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第4号汽车进入停车场,停车时间:13秒
10:05:17.290 [第5号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第5号汽车离开停车场
10:05:17.290 [第3号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第3号汽车进入停车场,停车时间:8秒
10:05:19.273 [第4号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第4号汽车离开停车场
10:05:25.292 [第3号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第3号汽车离开停车场
相关文章
CountDownLatch实现原理全面解析
CountDownLatch是一个同步工具类,用来协调多个线程之间的同步(即:用于线程之间的通信而不是互斥)。它允许一个或多个线程进入等待状态,直到其他线程执行完毕后,这些等待的线程才继续执行。
|
5月前
|
存储 缓存 算法
滚雪球学Java(62):HashSet的底层实现原理解析
【6月更文挑战第16天】🏆本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
43 3
滚雪球学Java(62):HashSet的底层实现原理解析
|
5月前
|
存储 并行计算 Java
Java8中JUC包同步工具类深度解析(Semaphore,CountDownLatch,CyclicBarrier,Phaser)
Java8中JUC包同步工具类深度解析(Semaphore,CountDownLatch,CyclicBarrier,Phaser)
53 2
|
4月前
|
Java
Java中多态的实现原理解析
Java中多态的实现原理解析
|
5月前
|
Java 数据库
深入解析Java并发包(JUC)中的Semaphore
深入解析Java并发包(JUC)中的Semaphore
|
6月前
|
监控 API 数据安全/隐私保护
屏幕监控软件开发指南:C++实现原理解析
在当今数字化时代,屏幕监控软件成为了企业管理和个人隐私保护的重要工具。本文将深入探讨如何使用C++语言实现屏幕监控软件,并解析其实现原理。我们将通过多个代码示例来说明其工作方式,最后将介绍如何将监控到的数据自动提交到网站。
196 3
|
6月前
|
存储 缓存 监控
深入理解Java线程池ThreadPoolExcutor实现原理、数据结构和算法(源码解析)
Java线程池的核心组件包括核心线程数、最大线程数、队列容量、拒绝策略等。核心线程数是线程池长期维持的线程数量,即使这些线程处于空闲状态也不会被销毁;最大线程数则是线程池允许的最大线程数量,当任务队列已满且当前线程数未达到最大线程数时,线程池会创建新线程执行任务;队列容量决定了任务队列的最大长度,当新任务到来时,如果当前线程数已达到核心线程数且队列未满,任务将被放入队列等待执行;拒绝策略则定义了当线程池无法处理新任务时的行为,如抛出异常、丢弃任务等。
109 1
|
6月前
|
存储 缓存 编译器
Go语言解析Tag:深入探究实现原理
【2月更文挑战第20天】
302 2
|
6月前
|
存储 安全 Java
Go Slice的底层实现原理深度解析
在Go语言的世界里,切片(Slice)是一种极其重要的数据结构,它以其灵活性和高效性在众多编程场景中扮演着核心角色。本文将深入探讨Go切片的底层实现原理,通过实例和源码分析,带你领略Go语言设计之美。
|
6月前
|
程序员 测试技术 Python
Python中的装饰器实现原理解析
在Python编程中,装饰器是一种强大的工具,通过装饰器可以在不改变函数源码的情况下增加额外的功能。本文将深入探讨Python中装饰器的实现原理,帮助读者更好地理解和运用装饰器。

推荐镜像

更多
下一篇
无影云桌面