聊一聊Java中那些常见的并发控制手段

简介: 单实例的并发控制,主要是针对JVM内,我们常规的手段即可满足需求,常见的手段大概有下面这些同步代码块CAS自旋锁阻塞队列,令牌桶等

单实例的并发控制,主要是针对JVM内,我们常规的手段即可满足需求,常见的手段大概有下面这些


  • 同步代码块
  • CAS自旋
  • 阻塞队列,令牌桶等


1.1 同步代码块



通过同步代码块,来确保同一时刻只会有一个线程执行对应的业务逻辑,常见的使用姿势如下


public synchronized doProcess() {
    // 同步代码块,只会有一个线程执行
}
复制代码


一般推荐使用最小区间使用原则,尽量不要直接在方法上加synchronized,比如经典的双重判定单例模式


public class Single {
  private static volatile Single instance;
  private Single() {}
  public static Single getInstance() {
      if (instance == null) {
          synchronized(Single.class) {
              if (instance == null) instance = new Single();
          }
      }
      return instance;
  }
}
复制代码


1.2 CAS自旋方式



比如AtomicXXX原子类中的很多实现,就是借助unsafe的CAS来实现的,如下


public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}
// unsafe 实现
// cas + 自选,不断的尝试更新设置,直到成功为止
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    return var5;
}
复制代码


1.3 锁



jdk本身提供了不少的锁,为了实现单实例的并发控制,我们需要选择写锁;如果支持多读,单实例写,则可以考虑读写锁;一般使用姿势也比较简单


private void doSome(ReentrantReadWriteLock.WriteLock writeLock) {
    try {
        writeLock.lock();
        System.out.println("持有锁成功 " + Thread.currentThread().getName());
        Thread.sleep(1000);
        System.out.println("执行完毕! " + Thread.currentThread().getName());
        writeLock.unlock();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
@Test
public void lock() throws InterruptedException {
    ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();
    new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();
    new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();
    Thread.sleep(20000);
}
复制代码


1.4 阻塞队列



借助同步阻塞队列,也可以实现并发控制的效果,比如队列中初始化n个元素,每次消费从队列中获取一个元素,如果拿不到则阻塞;执行完毕之后,重新塞入一个元素,这样就可以实现一个简单版的并发控制


demo版演示,下面指定队列长度为2,表示最大并发数控制为2;设置为1时,可以实现单线程的访问控制


AtomicInteger cnt = new AtomicInteger();
private void consumer(LinkedBlockingQueue<Integer> queue) {
    try {
        // 同步阻塞拿去数据
        int val = queue.take();
        Thread.sleep(2000);
        System.out.println("成功拿到: " + val + " Thread: " + Thread.currentThread());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        // 添加数据
        System.out.println("结束 " + Thread.currentThread());
        queue.offer(cnt.getAndAdd(1));
    }
}
@Test
public void blockQueue() throws InterruptedException {
    LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);
    queue.add(cnt.getAndAdd(1));
    queue.add(cnt.getAndAdd(1));
    new Thread(() -> consumer(queue)).start();
    new Thread(() -> consumer(queue)).start();
    new Thread(() -> consumer(queue)).start();
    new Thread(() -> consumer(queue)).start();
    Thread.sleep(10000);
}
复制代码


1.5 信号量Semaphore



上面队列的实现方式,可以使用信号量Semaphore来完成,通过设置信号量,来控制并发数


private void semConsumer(Semaphore semaphore) {
    try {
        //同步阻塞,尝试获取信号
        semaphore.acquire(1);
        System.out.println("成功拿到信号,执行: " + Thread.currentThread());
        Thread.sleep(2000);
        System.out.println("执行完毕,释放信号: " + Thread.currentThread());
        semaphore.release(1);
    } catch (Exception e) {
        e.printStackTrace();
    }
}
@Test
public void semaphore() throws InterruptedException {
    Semaphore semaphore = new Semaphore(2);
    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();
    new Thread(() -> semConsumer(semaphore)).start();
    Thread.sleep(20_000);
}
复制代码


1.6 计数器CountDownLatch


计数,应用场景更偏向于多线程的协同,比如多个线程执行完毕之后,再处理某些事情;不同于上面的并发数的控制,它和栅栏一样,更多的是行为结果的统一

这种场景下的使用姿势一般如下


  • 重点:countDownLatch 计数为0时放行
@Test
public void countDown() throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(2);
    new Thread(() -> {
        try {
            System.out.println("do something in " + Thread.currentThread());
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            countDownLatch.countDown();
        }
    }).start();
    new Thread(() -> {
        try {
            System.out.println("do something in t2: " + Thread.currentThread());
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            countDownLatch.countDown();
        }
    }).start();
    countDownLatch.await();
    System.out.printf("结束");
}
复制代码


1.7 栅栏 CyclicBarrier



CyclicBarrier的作用与上面的CountDownLatch相似,区别在于正向计数+1, 只有达到条件才放行; 且支持通过调用reset()重置计数,而CountDownLatch则不行


一个简单的demo

private void cyclicBarrierLogic(CyclicBarrier barrier, long sleep) {
    // 等待达到条件才放行
    try {
        System.out.println("准备执行: " + Thread.currentThread() + " at: " + LocalDateTime.now());
        Thread.sleep(sleep);
        int index = barrier.await();
        System.out.println("开始执行: " + index + " thread: " + Thread.currentThread() + " at: " + LocalDateTime.now());
    } catch (Exception e) {
        e.printStackTrace();
    }
}
@Test
public void testCyclicBarrier() throws InterruptedException {
    // 到达两个工作线程才能继续往后面执行
    CyclicBarrier barrier = new CyclicBarrier(2);
    // 三秒之后,下面两个线程的才会输出 开始执行
    new Thread(() -> cyclicBarrierLogic(barrier, 1000)).start();
    new Thread(() -> cyclicBarrierLogic(barrier, 3000)).start();
    Thread.sleep(4000);
    // 重置,可以再次使用
    barrier.reset();
    new Thread(() -> cyclicBarrierLogic(barrier, 1)).start();
    new Thread(() -> cyclicBarrierLogic(barrier, 1)).start();
    Thread.sleep(10000);
}
复制代码


1.8 guava令牌桶



guava封装了非常简单的并发控制工具类RateLimiter,作为单机的并发控制首选

一个控制qps为2的简单demo如下:


private void guavaProcess(RateLimiter rateLimiter) {
    try {
        // 同步阻塞方式获取
        System.out.println("准备执行: " + Thread.currentThread() + " > " + LocalDateTime.now());
        rateLimiter.acquire();
        System.out.println("执行中: " + Thread.currentThread() + " > " + LocalDateTime.now());
    } catch (Exception e) {
        e.printStackTrace();
    }
}
@Test
public void testGuavaRate() throws InterruptedException {
    // 1s 中放行两个请求
    RateLimiter rateLimiter = RateLimiter.create(2.0d);
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    new Thread(() -> guavaProcess(rateLimiter)).start();
    Thread.sleep(20_000);
}
复制代码


输出:

准备执行: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.263
准备执行: Thread[Thread-1,5,main] > 2021-04-13T10:18:05.263
准备执行: Thread[Thread-5,5,main] > 2021-04-13T10:18:05.264
准备执行: Thread[Thread-7,5,main] > 2021-04-13T10:18:05.264
准备执行: Thread[Thread-3,5,main] > 2021-04-13T10:18:05.263
准备执行: Thread[Thread-4,5,main] > 2021-04-13T10:18:05.264
准备执行: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.263
执行中: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.267
执行中: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.722
执行中: Thread[Thread-4,5,main] > 2021-04-13T10:18:06.225
执行中: Thread[Thread-3,5,main] > 2021-04-13T10:18:06.721
执行中: Thread[Thread-7,5,main] > 2021-04-13T10:18:07.221
执行中: Thread[Thread-5,5,main] > 2021-04-13T10:18:07.720
执行中: Thread[Thread-1,5,main] > 2021-04-13T10:18:08.219
复制代码


1.9 滑动窗口TimeWindow



没有找到通用的滑动窗口jar包,一般来讲滑动窗口更适用于平滑的限流,解决瞬时高峰问题


一个供参考的实现方式:


固定大小队列,队列中每个数据代表一个时间段的计数,

访问 -》 队列头拿数据(注意不出队)-》判断是否跨时间段 -》 同一时间段,计数+1 -》跨时间段,新增数据入队,若扔不进去,表示时间窗满,队尾数据出队

问题:当流量稀疏时,导致不会自动释放过期的数据 解决方案:根据时间段设置定时任务,模拟访问操作,只是将计数改为 + 0


1.10 小结



本文给出了几种单机版的并发控制的技术手段,主要目的是介绍了一些可选的方案,技术细节待后续补全完善,当然如果有其他的建议,欢迎评论交流



相关文章
|
5月前
|
安全 Java 编译器
揭秘JAVA深渊:那些让你头大的最晦涩知识点,从泛型迷思到并发陷阱,你敢挑战吗?
【8月更文挑战第22天】Java中的难点常隐藏在其高级特性中,如泛型与类型擦除、并发编程中的内存可见性及指令重排,以及反射与动态代理等。这些特性虽强大却也晦涩,要求开发者深入理解JVM运作机制及计算机底层细节。例如,泛型在编译时检查类型以增强安全性,但在运行时因类型擦除而丢失类型信息,可能导致类型安全问题。并发编程中,内存可见性和指令重排对同步机制提出更高要求,不当处理会导致数据不一致。反射与动态代理虽提供运行时行为定制能力,但也增加了复杂度和性能开销。掌握这些知识需深厚的技术底蕴和实践经验。
117 2
|
5月前
|
安全 Java 调度
解锁Java并发编程高阶技能:深入剖析无锁CAS机制、揭秘魔法类Unsafe、精通原子包Atomic,打造高效并发应用
【8月更文挑战第4天】在Java并发编程中,无锁编程以高性能和低延迟应对高并发挑战。核心在于无锁CAS(Compare-And-Swap)机制,它基于硬件支持,确保原子性更新;Unsafe类提供底层内存操作,实现CAS;原子包java.util.concurrent.atomic封装了CAS操作,简化并发编程。通过`AtomicInteger`示例,展现了线程安全的自增操作,突显了这些技术在构建高效并发程序中的关键作用。
85 1
|
1月前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
68 3
|
2月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
86 7
|
2月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
2月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
2月前
|
Java 数据库连接 数据库
如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面
本文介绍了如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面。通过合理配置初始连接数、最大连接数和空闲连接超时时间,确保系统性能和稳定性。文章还探讨了同步阻塞、异步回调和信号量等并发控制策略,并提供了异常处理的最佳实践。最后,给出了一个简单的连接池示例代码,并推荐使用成熟的连接池框架(如HikariCP、C3P0)以简化开发。
77 2
|
3月前
|
Java
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
51 1
|
4月前
|
Java API 容器
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
|
5月前
|
存储 Java
Java 中 ConcurrentHashMap 的并发级别
【8月更文挑战第22天】
78 5