Java高并发编程基础三大利器之Semaphore

简介: Java高并发编程基础三大利器之Semaphore

引言

最近可以进行个税申报了,还没有申报的同学可以赶紧去试试哦。不过我反正是从上午到下午一直都没有成功的进行申报,一进行申报
就返回“当前访问人数过多,请稍后再试”。为什么有些人就能够申报成功,有些人就直接返回失败。这很明显申报处理资源是有限的,
只能等别人处理完了在来处理你的,你如果运气好可能重试几次就轮到你了,如果运气不好可能重试一天也可能轮不到你。
我反正已经是放弃了,等到夜深人静的时候再来试试。作为一个程序员我们肯定知道这是个税申请app的限流操作,如果还有不懂什么
是限流操作的可以参考下这个文章《高并发系统三大利器之限流》
比如个税申报系统每台机器只最多分别只能处理1000个请求,再多的请求就会把机器打挂。如果是多余的请求就把这些请求拒绝掉。直接给你返回一句温馨提示:“当前访问人数过多,请稍后再试”,如果要实现这个功能大家想想可以通过哪些方法算法来实现。

共享锁、独占锁

学习semaphore之前我们必须要先了解下什么是共享锁。在上一篇文章《Java高并发编程基础之AQS》我们介绍了公平锁于非公平锁的区别。

  • 共享锁:它是允许多个线程同时获取锁,并发的访问共享资源
  • 独占锁:也有人把它叫做“独享锁”,它是是独占的,排他的,只能被一个线程可持有,

当独占锁已经被某个线程持有时,其他线程只能等待它被释放后,才能去争锁,并且同一时刻只有一个线程能争锁成功。

什么是Semaphore

在《Java并发编程艺术》(微信搜【java金融】回复电子书可以免费获取PDF版本)这一书中是这么说的:

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。很多年以来,我都觉得从字面上很难理解Semaphore所表达的含义,只能把它比作是控制流量的红绿灯,比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。
  • Semaphore机制是提供给线程抢占式获取许可,所以他可以实现公平或者非公平,类似于ReentrantLock

说了这么多我们来个实际的例子看一看,比如我们去停车场停车,停车场总共只有5个车位,但是现在有8辆汽车来停车,剩下的3辆汽车要么等其他汽车开走后进行停车,或者去找别的停车位?

/**
 * @author: 公众号【Java金融】
 */
public class SemaphoreTest {
    public static void main(String[] args) throws InterruptedException {
         // 初始化五个车位
        Semaphore semaphore = new Semaphore(5);
        // 等所有车子
        final CountDownLatch latch = new CountDownLatch(8);
        for (int i = 0; i < 8; i++) {
            int finalI = i;
            if (i == 5) {
                Thread.sleep(1000);
                new Thread(() -> {
                    stopCarNotWait(semaphore, finalI);
                    latch.countDown();
                }).start();
                continue;
            }
            new Thread(() -> {
                stopCarWait(semaphore, finalI);
                latch.countDown();
            }).start();
        }
        latch.await();
        log("总共还剩:" + semaphore.availablePermits() + "个车位");
    }

    private static void stopCarWait(Semaphore semaphore, int finalI) {
        String format = String.format("车牌号%d", finalI);
        try {
            semaphore.acquire(1);
            log(format + "找到车位了,去停车了");
            Thread.sleep(10000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            semaphore.release(1);
            log(format + "开走了");
        }
    }

    private static void stopCarNotWait(Semaphore semaphore, int finalI) {
         String format = String.format("车牌号%d", finalI);
        try {
            if (semaphore.tryAcquire()) {
                log(format + "找到车位了,去停车了");
                Thread.sleep(10000);
                log(format + "开走了");
                semaphore.release();
            } else {
                log(format + "没有停车位了,不在这里等了去其他地方停车去了");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void log(String content) {
        // 格式化
        DateTimeFormatter fmTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        // 当前时间
        LocalDateTime now = LocalDateTime.now();
        System.out.println(now.format(fmTime) + "  "+content);
    }
}
2021-03-01 18:54:57  车牌号0找到车位了,去停车了
2021-03-01 18:54:57  车牌号3找到车位了,去停车了
2021-03-01 18:54:57  车牌号2找到车位了,去停车了
2021-03-01 18:54:57  车牌号1找到车位了,去停车了
2021-03-01 18:54:57  车牌号4找到车位了,去停车了
2021-03-01 18:54:58  车牌号5没有停车位了,不在这里等了去其他地方停车去了
2021-03-01 18:55:07  车牌号7找到车位了,去停车了
2021-03-01 18:55:07  车牌号6找到车位了,去停车了
2021-03-01 18:55:07  车牌号2开走了
2021-03-01 18:55:07  车牌号0开走了
2021-03-01 18:55:07  车牌号3开走了
2021-03-01 18:55:07  车牌号4开走了
2021-03-01 18:55:07  车牌号1开走了
2021-03-01 18:55:17  车牌号7开走了
2021-03-01 18:55:17  车牌号6开走了
2021-03-01 18:55:17  总共还剩:5个车位

从输出结果我们可以看到车牌号5这辆车看见没有车位了,就不在这个地方傻傻的等了,而是去其他地方了,但是车牌号6车牌号7分别需要等到车库开出两辆车空出两个车位后才停进去。这就体现了Semaphoreacquire 方法如果没有获取到凭证它就会阻塞,而tryAcquire方法如果没有获取到凭证不会阻塞的。

semaphore在dubbo中的应用

Dubbo中可以给Provider配置线程池大小来控制系统提供服务的最大并行度,默认是200

<dubbo:provider  threads="200"/>

比如我现在这个订单系统有三个接口,分别为创单、取消订单、修改订单。这三个接口加起来的并发是200但是创单接口是核心接口,我想让它多分点线程来执行
让它可以有最大150个线程,取消订单和修改订单分别最大25个线程执行就可以了。dubbo提供了executes这一属性来实现这个功能

<dubbo:service interface="cn.javajr.service.CreateOrderService" executes="150"/>
<dubbo:service interface="cn.javajr.service.CancelOrderService" executes="25"/>
<dubbo:service interface="cn.javajr.service.EditOrderService" executes="25"/>

我们可以看看dubbo内部是如何来executes的,具体实现是在ExecuteLimitFilter这个类我们可以

 public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        Semaphore executesLimit = null;
        boolean acquireResult = false;
        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
        if (max > 0) {
            RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
            // 如果当前使用的线程数量已经大于等于设置的阈值,那么直接抛出异常
//            if (count.getActive() >= max) {
// throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service // using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
            /**
             * http://manzhizhen.iteye.com/blog/2386408
             * use semaphore for concurrency control (to limit thread number)
             */
             
            executesLimit = count.getSemaphore(max);
            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
            }
        }
        long begin = System.currentTimeMillis();
        boolean isSuccess = true;
        // 计数器+1
        RpcStatus.beginCount(url, methodName);
        try {
            Result result = invoker.invoke(invocation);
            return result;
        } catch (Throwable t) {
            isSuccess = false;
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        } finally {
           // 计数器-1
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
            if(acquireResult) {
                executesLimit.release();
            }
        }
    }

从上述代码我们也可以看出早期这个是没有采用Semaphore来实现的,而是直接采用被注释的 if (count.getActive() >= max) 这个来来实现的,由于这个count.getActive() >= max 和这个计数加1不是原子性的,所以会有问题,具体bug号可以看https://github.com/apache/dubbo/pull/582后面才采用上述代码用Semaphore来修复非原子性问题。具体更详细的分析可以参见代码的链接。不过现在最新版本(2.7.9)我看是采用采用自旋加上和CAS来实现的。

Semaphore

上面就是对Semaphore一个简单的使用以及dubbo中用到的例子,说句实话Semaphore在工作中用的还是比较少的,不过面试又有可能会被问到,所以还是有必要来一起学习一下它。我们前面《Java高并发编程基础之AQS》通过ReentrantLock 一起学习了下AQS,其实Semaphore同样也是通过AQS来是实现的,我们可以一起来对照下独占锁的方法,基本上都是有方法一一相对应的。
在这里插入图片描述
这里有两点稍微需要注意的地方:

  • 在独占锁模式中,我们只有在获取了独占锁的节点释放锁时,才会唤醒后继节点,因为独占锁只能被一个线程持有,如果它还没有被释放,就没有必要去唤醒它的后继节点。
  • 在共享锁模式下,当一个节点获取到了共享锁,我们在获取成功后就可以唤醒后继节点了,而不需要等到该节点释放锁的时候,这是因为共享锁可以被多个线程同时持有,一个锁获取到了,则后继的节点都可以直接来获取。因此,在共享锁模式下,在获取锁和释放锁结束时,都会唤醒后继节点。

获取凭证

我们同样还是通过非公平锁的模式来老获取凭证
我们可以看下acquire的核心方法

  public final void acquireSharedInterruptibly(int arg)
           throws InterruptedException {
       if (Thread.interrupted())
           throw new InterruptedException();
       if (tryAcquireShared(arg) < 0)
           doAcquireSharedInterruptibly(arg);
   }
    protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
   }
    
    // 主要看下这个方法,这个方法返回的值也就是tryAcquireShared返回的值,因为tryAcquireShared->nonfairTryAcquireShared
    final int nonfairTryAcquireShared(int acquires) {
          //自旋
          for (;;) {
               //Semaphore用AQS的state变量的值代表可用许可数
               int available = getState();
               //可用许可数减去本次需要获取的许可数即为剩余许可数
               int remaining = available - acquires;
               //如果剩余许可数小于0或者CAS将当前可用许可数设置为剩余许可数成功,则返回成功许可数
               if (remaining < 0 ||
                   compareAndSetState(available, remaining))
                   return remaining;
           }
  • tryAcquireShared 获取返回许可书小于0时说明获取许可失败需要进入doAcquireSharedInterruptibly这个方法去休眠。
  • tryAcquireShared 获取返回许可书小于0时说明获取许可成功直接结束。

doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg)

    throws InterruptedException {
    // 独占锁的acquireQueued调用的是addWaiter(Node.EXCLUSIVE),
    // 而共享锁调用的是addWaiter(Node.SHARED),表明了该节点处于共享模式
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
这个方法是不是跟我们上篇文章讲的`AQS`的独占锁的`acquireQueued`很像,不过独占锁它是直接调用了用了`setHead(node)`方法,而共享锁调用的是`setHeadAndPropagate(node, r)`
这个方法除了调用`setHead` 里面还调用了`doReleaseShared`(唤醒后继节点)
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
其他的方法基本上是和`ReentrantLock`来实现的独占锁差不多,我相信大家对源码分析感兴趣的应该也不多,其他更多细节问题还是需要自己亲自动手去看源码的。

# 总结
- 当信号量`Semaphore`初始化设置许可证为1 时,它也可以当作互斥锁使用。其中0、1就相当于它的状态,当=1时表示其他线程可以获取,当=0时,排他,即其他线程必须要等待。
- `Semaphore`是`JUC`包中的一个很简单的工具类,用来实现多线程下对于资源的同一时刻的访问线程数限制
- `Semaphore`中存在一个【许可】的概念,即访问资源之前,先要获得许可,如果当前许可数量为`0`,那么线程阻塞,直到获得许可
- `Semaphore`内部使用`AQS`实现,由抽象内部类`Sync`继承了`AQS`。因为`Semaphore`天生就是共享的场景,所以其内部实际上类似于共享锁的实现
- 共享锁的调用框架和独占锁很相似,它们最大的不同在于获取锁的逻辑——共享锁可以被多个线程同时持有,而独占锁同一时刻只能被一个线程持有。
- 由于共享锁同一时刻可以被多个线程持有,因此当头节点获取到共享锁时,可以立即唤醒后继节点来争锁,而不必等到释放锁的时候。因此,共享锁触发唤醒后继节点的行为可能有两处,一处在当前节点成功获得共享锁后,一处在当前节点释放共享锁后。
- 采用`semaphore`来进行限流的话会产生**突刺现象**。
> 指在一定时间内的一小段时间内就用完了所有资源,后大部分时间中无资源可用。
> 比如在限流方法中的计算器算法,设置1s内的最大请求数为100,在前100ms已经永远了100个请求,则后面900ms将无法处理请求,这就是突刺现象
### 结束
- 由于自己才疏学浅,难免会有纰漏,假如你发现了错误的地方,还望留言给我指出来,我会对其加以修正。
- 如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。
- 感谢您的阅读,十分欢迎并感谢您的关注。
站在巨人的肩膀上摘苹果:
https://segmentfault.com/a/1190000016447307

目录
相关文章
|
12天前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
20 0
|
14天前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
18天前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####
|
11天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
11天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
32 3
|
17天前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
58 6
|
16天前
|
开发框架 安全 Java
Java 反射机制:动态编程的强大利器
Java反射机制允许程序在运行时检查类、接口、字段和方法的信息,并能操作对象。它提供了一种动态编程的方式,使得代码更加灵活,能够适应未知的或变化的需求,是开发框架和库的重要工具。
34 2
|
17天前
|
安全 Java 开发者
Java中的多线程编程:从基础到实践
本文深入探讨了Java多线程编程的核心概念和实践技巧,旨在帮助读者理解多线程的工作原理,掌握线程的创建、管理和同步机制。通过具体示例和最佳实践,本文展示了如何在Java应用中有效地利用多线程技术,提高程序性能和响应速度。
52 1
|
6月前
|
Java C++
关于《Java并发编程之线程池十八问》的补充内容
【6月更文挑战第6天】关于《Java并发编程之线程池十八问》的补充内容
52 5
|
3月前
|
缓存 监控 Java
Java中的并发编程:理解并应用线程池
在Java的并发编程中,线程池是提高应用程序性能的关键工具。本文将深入探讨如何有效利用线程池来管理资源、提升效率和简化代码结构。我们将从基础概念出发,逐步介绍线程池的配置、使用场景以及最佳实践,帮助开发者更好地掌握并发编程的核心技巧。