多线程的线程工具的初步使用和原理详解

简介: 多线程的线程工具的初步使用和原理详解

一,线程之间的通信

1,BlockingQueue

这个主要就是通过这个阻塞队列实现,其CLH中的链表中的结点的状态为waitStatus为CONDITION:-2,为一个条件等待状态。之前在这篇https://blog.csdn.net/zhenghuishengq/article/details/125710294对BlockingQueue的源码有过具体的分析,主要是同步等待队列和这个条件等待队列的结合使用。


2,CountDownLatch

通过这个计数器实现,所有线程在通过这个countDown减法,每个线程执行完减1,然后进入这个等待状态,当这个值减为0时,所有的线程才能开始同时运行。


3,Semaphore

作用就是控制访问特定资源的线程数目,底层通过这个AQS实现,主要用于对这个线程的个数的控制,底层也是通过这个CLH队列实现,链表中结点的状态为-3,是一个广播状态。


4,CyclicBarrier

栅栏屏障,和countDownLatch的实现方式不同,所有线程都用加法实现,每个线程到达同一起跑线时,个数+1,然后进入这个等待状态,当全部线程到达同一起点时,那么所有线程才能同时开启。


二,Semaphore

1,概述

信号量,作用就是控制访问特定资源的线程数目,即主要可以用来限流等操作。如Hystrix里面就有用到这个组件。

//创建一个信号量,state的值为2
Semaphore semaphore = new Semaphore(2);
//获取公共资源
semaphore.acquire();
//如果获取资源需要等待的时间过长,可以尝试获取
semaphore.tryAcquire(500,TimeUnit.MILLISECONDS)
//释放公共资源
semaphore.release();

其含义如下,就是每次最多只允许两个线程过去

ec5ec022adab444fa8e66155722f3cf0.png


2,源码分析

这个Semaphore的底层实现和这个 ReentrantLock 的底层实现都差不多,里面主要是通过这个AQS实现,并且定义了一个Sync的类,实现了公平和非公平的两种方式去获取这个信号量。不过这个ReentrantLock 这个是一个独占模式,这个Semaphore是一个共享模式实现

public class Semaphore implements java.io.Serializable{
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
    }
    static final class NonfairSync extends Sync {
        ...
    }
    static final class FairSync extends Sync{
        ...
    }
}

默认这个信号量是一个非公平锁的实现

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

这个获取资源有两种实现,一个是无参构造,一个是有一个参数构造。

无参构造,默认数量为1

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

有参构造,这个值可以提供在外部传参

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

接下来主要讲解这个acquireSharedInterruptibly方法,

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //在获取信号的时候,不能发生中断,如果发生中断,那么就会直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //共享模式
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

公平锁

static final class FairSync extends Sync {
  protected int tryAcquireShared(int acquires) {
        //自旋
      for (;;) {
          if (hasQueuedPredecessors())
              return -1;
          int available = getState();
          int remaining = available - acquires;
            //通过这个cas的算法来获取这个信号量
          if (remaining < 0 ||
              compareAndSetState(available, remaining))
              return remaining;
      }
  }    
}

如果这个信号的中剩余的数量小于这个要获取的数量,那么会走这个doAcquireSharedInterruptibly逻辑。就是在这个CLH的同步阻塞队列里面有具体的操作

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    //共享结点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        //自旋
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

后面的这个源码和这个ReentrantLock的源码差不多,可以去看另外一篇https://blog.csdn.net/zhenghuishengq/article/details/125648495?spm=1001.2014.3001.5501


3,总结

其底层和这个ReentrantLock的实现差不多,只不过这个CLH同步等待队列的使用和这个ReentrantLock的不太一样。首先这个同步状态器是一个共享的,不像ReentrantLock里面时独占的;其次这个用法就是比如说同步状态器里面的state的值为5,那么就是说有5个信号量,那么允许这个链表中的可以有五个结点可以获取这个信号量,这个同步状态器里面每次被获取一个值,这个state就会减1,直到为0,然后这个链表中的结点就会去唤醒下一个结点,只要这个state这个状态不为0,那么这个结点就一直会唤醒下一个结点。并且每个结点的这个 waitStatues 的值为-3,为一个传播状态。

197412b88a7e4b4fbdc7bd5545054fdd.png


三,CountDownLatch

CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。


1,基本使用

例如下面这个场景,会保证10个线程同时创建完再工作,即可以模拟一个并发的过程。

//线程操作工具
CountDownLatch countDownLatch = new CountDownLatch(1);
for(int i=0;i<10;i++){
    new Thread(()->{
        try {
            //线程全部创建成功则开始全部往下走
            //因为for循环需要一点的时间,因此通过这个等待,保证线程全部创建完
            countDownLatch.await();
            for(int j=0;j<1000;j++){
                try {
                    lock.lock();
                    total++;
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}
//等上面s所有的线程全部创建完
Thread.sleep(1000);
//让线程开启
countDownLatch.countDown();
Thread.sleep(2000);
System.out.println(total);

2,源码分析

其底层也是用了这个AQS的方式实现,构造方法需要一个整型的参数

public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {...}
    //构造方法
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

这个state就是这个同步状态器里面的这个state,表示外面一次性可以访问的这个,与此同时这里面没有公平锁和非公平锁的概念,并且里面的链表长度是一个固定长度。

Sync(int count) {
    setState(count);
}
protected final void setState(int newState) {
    state = newState;
}

95d1910709b24f71a9a240317425f539.png

其**await()**等待方法如下,;

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  //向队列中增加节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
           cancelAcquire(node);
    }
}

其**countDown()**等待方法如下,每个线程在同步状态器中获取一个资源,那么这个同步状态器中的state就会减1,直到减为0

public void countDown() {
    sync.releaseShared(1);
}
protected boolean tryReleaseShared(int releases) {
  for (;;) {
      int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
    }
}

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。


四,CyclicBarrier

栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。就是说所有的线程达到这个同一起跑线的时候,才会开始执行。

CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() {
    public void run() {
        System.out.println("所有特工到达屏障,准备开始执行秘密任务");
    }
});
for (int i = 0; i < 10; i++) {
    new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start();
}
cyclicBarrier.await();
System.out.println("全部到达屏障....1");
Thread.sleep(5000);
for (int i = 0; i < 10; i++) {
    new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start();
}
cyclicBarrier.await();
System.out.println("全部到达屏障....1");

底层原理和这个CountDownLatch都差不多,这个CountDownLatch使用的是减法,这个CyclicBarrier使用的是一个加法,并且这个CountDownLatch只能使用一次,而CyclicBarrier可以使用多次。


相关文章
|
6天前
|
数据采集 负载均衡 安全
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
本文提供了多个多线程编程问题的解决方案,包括设计有限阻塞队列、多线程网页爬虫、红绿灯路口等,每个问题都给出了至少一种实现方法,涵盖了互斥锁、条件变量、信号量等线程同步机制的使用。
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
|
13天前
|
Java Spring
spring多线程实现+合理设置最大线程数和核心线程数
本文介绍了手动设置线程池时的最大线程数和核心线程数配置方法,建议根据CPU核数及程序类型(CPU密集型或IO密集型)来合理设定。对于IO密集型,核心线程数设为CPU核数的两倍;CPU密集型则设为CPU核数加一。此外,还讨论了`maxPoolSize`、`keepAliveTime`、`allowCoreThreadTimeout`和`queueCapacity`等参数的设置策略,以确保线程池高效稳定运行。
74 10
spring多线程实现+合理设置最大线程数和核心线程数
|
7天前
|
Python
5-5|python开启多线程入口必须在main,从python线程(而不是main线程)启动pyQt线程有什么坏处?...
5-5|python开启多线程入口必须在main,从python线程(而不是main线程)启动pyQt线程有什么坏处?...
|
4天前
|
NoSQL 网络协议 Unix
1)Redis 属于单线程还是多线程?不同版本之间有什么区别?
1)Redis 属于单线程还是多线程?不同版本之间有什么区别?
14 0
|
6天前
|
Java
COMATE插件实现使用线程池高级并发模型简化多线程编程
本文介绍了COMATE插件的使用,该插件通过线程池实现高级并发模型,简化了多线程编程的过程,并提供了生成结果和代码参考。
|
10天前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。
|
2月前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
64 1
|
22天前
|
Java 数据库 Android开发
一个Android App最少有几个线程?实现多线程的方式有哪些?
本文介绍了Android多线程编程的重要性及其实现方法,涵盖了基本概念、常见线程类型(如主线程、工作线程)以及多种多线程实现方式(如`Thread`、`HandlerThread`、`Executors`、Kotlin协程等)。通过合理的多线程管理,可大幅提升应用性能和用户体验。
37 15
一个Android App最少有几个线程?实现多线程的方式有哪些?
|
24天前
|
Java 数据库 Android开发
一个Android App最少有几个线程?实现多线程的方式有哪些?
本文介绍了Android应用开发中的多线程编程,涵盖基本概念、常见实现方式及最佳实践。主要内容包括主线程与工作线程的作用、多线程的多种实现方法(如 `Thread`、`HandlerThread`、`Executors` 和 Kotlin 协程),以及如何避免内存泄漏和合理使用线程池。通过有效的多线程管理,可以显著提升应用性能和用户体验。
38 10
|
1月前
|
存储 Ubuntu Linux
C语言 多线程编程(1) 初识线程和条件变量
本文档详细介绍了多线程的概念、相关命令及线程的操作方法。首先解释了线程的定义及其与进程的关系,接着对比了线程与进程的区别。随后介绍了如何在 Linux 系统中使用 `pidstat`、`top` 和 `ps` 命令查看线程信息。文档还探讨了多进程和多线程模式各自的优缺点及适用场景,并详细讲解了如何使用 POSIX 线程库创建、退出、等待和取消线程。此外,还介绍了线程分离的概念和方法,并提供了多个示例代码帮助理解。最后,深入探讨了线程间的通讯机制、互斥锁和条件变量的使用,通过具体示例展示了如何实现生产者与消费者的同步模型。