多线程的线程工具的初步使用和原理详解

简介: 多线程的线程工具的初步使用和原理详解

一,线程之间的通信

1,BlockingQueue

这个主要就是通过这个阻塞队列实现,其CLH中的链表中的结点的状态为waitStatus为CONDITION:-2,为一个条件等待状态。之前在这篇https://blog.csdn.net/zhenghuishengq/article/details/125710294对BlockingQueue的源码有过具体的分析,主要是同步等待队列和这个条件等待队列的结合使用。


2,CountDownLatch

通过这个计数器实现,所有线程在通过这个countDown减法,每个线程执行完减1,然后进入这个等待状态,当这个值减为0时,所有的线程才能开始同时运行。


3,Semaphore

作用就是控制访问特定资源的线程数目,底层通过这个AQS实现,主要用于对这个线程的个数的控制,底层也是通过这个CLH队列实现,链表中结点的状态为-3,是一个广播状态。


4,CyclicBarrier

栅栏屏障,和countDownLatch的实现方式不同,所有线程都用加法实现,每个线程到达同一起跑线时,个数+1,然后进入这个等待状态,当全部线程到达同一起点时,那么所有线程才能同时开启。


二,Semaphore

1,概述

信号量,作用就是控制访问特定资源的线程数目,即主要可以用来限流等操作。如Hystrix里面就有用到这个组件。

//创建一个信号量,state的值为2
Semaphore semaphore = new Semaphore(2);
//获取公共资源
semaphore.acquire();
//如果获取资源需要等待的时间过长,可以尝试获取
semaphore.tryAcquire(500,TimeUnit.MILLISECONDS)
//释放公共资源
semaphore.release();

其含义如下,就是每次最多只允许两个线程过去

ec5ec022adab444fa8e66155722f3cf0.png


2,源码分析

这个Semaphore的底层实现和这个 ReentrantLock 的底层实现都差不多,里面主要是通过这个AQS实现,并且定义了一个Sync的类,实现了公平和非公平的两种方式去获取这个信号量。不过这个ReentrantLock 这个是一个独占模式,这个Semaphore是一个共享模式实现

public class Semaphore implements java.io.Serializable{
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
    }
    static final class NonfairSync extends Sync {
        ...
    }
    static final class FairSync extends Sync{
        ...
    }
}

默认这个信号量是一个非公平锁的实现

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

这个获取资源有两种实现,一个是无参构造,一个是有一个参数构造。

无参构造,默认数量为1

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

有参构造,这个值可以提供在外部传参

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

接下来主要讲解这个acquireSharedInterruptibly方法,

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //在获取信号的时候,不能发生中断,如果发生中断,那么就会直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //共享模式
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

公平锁

static final class FairSync extends Sync {
  protected int tryAcquireShared(int acquires) {
        //自旋
      for (;;) {
          if (hasQueuedPredecessors())
              return -1;
          int available = getState();
          int remaining = available - acquires;
            //通过这个cas的算法来获取这个信号量
          if (remaining < 0 ||
              compareAndSetState(available, remaining))
              return remaining;
      }
  }    
}

如果这个信号的中剩余的数量小于这个要获取的数量,那么会走这个doAcquireSharedInterruptibly逻辑。就是在这个CLH的同步阻塞队列里面有具体的操作

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    //共享结点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        //自旋
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

后面的这个源码和这个ReentrantLock的源码差不多,可以去看另外一篇https://blog.csdn.net/zhenghuishengq/article/details/125648495?spm=1001.2014.3001.5501


3,总结

其底层和这个ReentrantLock的实现差不多,只不过这个CLH同步等待队列的使用和这个ReentrantLock的不太一样。首先这个同步状态器是一个共享的,不像ReentrantLock里面时独占的;其次这个用法就是比如说同步状态器里面的state的值为5,那么就是说有5个信号量,那么允许这个链表中的可以有五个结点可以获取这个信号量,这个同步状态器里面每次被获取一个值,这个state就会减1,直到为0,然后这个链表中的结点就会去唤醒下一个结点,只要这个state这个状态不为0,那么这个结点就一直会唤醒下一个结点。并且每个结点的这个 waitStatues 的值为-3,为一个传播状态。

197412b88a7e4b4fbdc7bd5545054fdd.png


三,CountDownLatch

CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。


1,基本使用

例如下面这个场景,会保证10个线程同时创建完再工作,即可以模拟一个并发的过程。

//线程操作工具
CountDownLatch countDownLatch = new CountDownLatch(1);
for(int i=0;i<10;i++){
    new Thread(()->{
        try {
            //线程全部创建成功则开始全部往下走
            //因为for循环需要一点的时间,因此通过这个等待,保证线程全部创建完
            countDownLatch.await();
            for(int j=0;j<1000;j++){
                try {
                    lock.lock();
                    total++;
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}
//等上面s所有的线程全部创建完
Thread.sleep(1000);
//让线程开启
countDownLatch.countDown();
Thread.sleep(2000);
System.out.println(total);

2,源码分析

其底层也是用了这个AQS的方式实现,构造方法需要一个整型的参数

public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {...}
    //构造方法
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

这个state就是这个同步状态器里面的这个state,表示外面一次性可以访问的这个,与此同时这里面没有公平锁和非公平锁的概念,并且里面的链表长度是一个固定长度。

Sync(int count) {
    setState(count);
}
protected final void setState(int newState) {
    state = newState;
}

95d1910709b24f71a9a240317425f539.png

其**await()**等待方法如下,;

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  //向队列中增加节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
           cancelAcquire(node);
    }
}

其**countDown()**等待方法如下,每个线程在同步状态器中获取一个资源,那么这个同步状态器中的state就会减1,直到减为0

public void countDown() {
    sync.releaseShared(1);
}
protected boolean tryReleaseShared(int releases) {
  for (;;) {
      int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
    }
}

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。


四,CyclicBarrier

栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。就是说所有的线程达到这个同一起跑线的时候,才会开始执行。

CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() {
    public void run() {
        System.out.println("所有特工到达屏障,准备开始执行秘密任务");
    }
});
for (int i = 0; i < 10; i++) {
    new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start();
}
cyclicBarrier.await();
System.out.println("全部到达屏障....1");
Thread.sleep(5000);
for (int i = 0; i < 10; i++) {
    new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start();
}
cyclicBarrier.await();
System.out.println("全部到达屏障....1");

底层原理和这个CountDownLatch都差不多,这个CountDownLatch使用的是减法,这个CyclicBarrier使用的是一个加法,并且这个CountDownLatch只能使用一次,而CyclicBarrier可以使用多次。


相关文章
|
2天前
|
存储 安全 UED
多线程在打包工具中的运用
【11月更文挑战第2天】本文介绍了多线程技术在打包工具中的应用,包括提高打包效率、优化用户体验和多线程安全考虑。通过并行处理文件和加速资源收集,多线程可以显著缩短打包时间。在用户体验方面,多线程使界面保持响应,并支持优先级处理。此外,文章还讨论了资源访问冲突和死锁预防的解决方案,确保多线程环境下的稳定性和安全性。
|
6天前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
11 3
|
6天前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
9 2
|
6天前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
16 2
|
6天前
|
Java 开发者
Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点
【10月更文挑战第20天】Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点,重点解析为何实现Runnable接口更具灵活性、资源共享及易于管理的优势。
16 1
|
6天前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
17 1
|
22天前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
36 1
C++ 多线程之初识多线程
|
6天前
|
Java
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件成立时被唤醒,从而有效解决数据一致性和同步问题。本文通过对比其他通信机制,展示了 `wait()` 和 `notify()` 的优势,并通过生产者-消费者模型的示例代码,详细说明了其使用方法和重要性。
14 1
|
2月前
|
数据采集 负载均衡 安全
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
本文提供了多个多线程编程问题的解决方案,包括设计有限阻塞队列、多线程网页爬虫、红绿灯路口等,每个问题都给出了至少一种实现方法,涵盖了互斥锁、条件变量、信号量等线程同步机制的使用。
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
|
22天前
|
存储 前端开发 C++
C++ 多线程之带返回值的线程处理函数
这篇文章介绍了在C++中使用`async`函数、`packaged_task`和`promise`三种方法来创建带返回值的线程处理函数。
41 6

相关实验场景

更多