多线程的线程工具的初步使用和原理详解

简介: 多线程的线程工具的初步使用和原理详解

一,线程之间的通信

1,BlockingQueue

这个主要就是通过这个阻塞队列实现,其CLH中的链表中的结点的状态为waitStatus为CONDITION:-2,为一个条件等待状态。之前在这篇https://blog.csdn.net/zhenghuishengq/article/details/125710294对BlockingQueue的源码有过具体的分析,主要是同步等待队列和这个条件等待队列的结合使用。


2,CountDownLatch

通过这个计数器实现,所有线程在通过这个countDown减法,每个线程执行完减1,然后进入这个等待状态,当这个值减为0时,所有的线程才能开始同时运行。


3,Semaphore

作用就是控制访问特定资源的线程数目,底层通过这个AQS实现,主要用于对这个线程的个数的控制,底层也是通过这个CLH队列实现,链表中结点的状态为-3,是一个广播状态。


4,CyclicBarrier

栅栏屏障,和countDownLatch的实现方式不同,所有线程都用加法实现,每个线程到达同一起跑线时,个数+1,然后进入这个等待状态,当全部线程到达同一起点时,那么所有线程才能同时开启。


二,Semaphore

1,概述

信号量,作用就是控制访问特定资源的线程数目,即主要可以用来限流等操作。如Hystrix里面就有用到这个组件。

//创建一个信号量,state的值为2
Semaphore semaphore = new Semaphore(2);
//获取公共资源
semaphore.acquire();
//如果获取资源需要等待的时间过长,可以尝试获取
semaphore.tryAcquire(500,TimeUnit.MILLISECONDS)
//释放公共资源
semaphore.release();

其含义如下,就是每次最多只允许两个线程过去

ec5ec022adab444fa8e66155722f3cf0.png


2,源码分析

这个Semaphore的底层实现和这个 ReentrantLock 的底层实现都差不多,里面主要是通过这个AQS实现,并且定义了一个Sync的类,实现了公平和非公平的两种方式去获取这个信号量。不过这个ReentrantLock 这个是一个独占模式,这个Semaphore是一个共享模式实现

public class Semaphore implements java.io.Serializable{
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
    }
    static final class NonfairSync extends Sync {
        ...
    }
    static final class FairSync extends Sync{
        ...
    }
}

默认这个信号量是一个非公平锁的实现

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

这个获取资源有两种实现,一个是无参构造,一个是有一个参数构造。

无参构造,默认数量为1

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

有参构造,这个值可以提供在外部传参

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

接下来主要讲解这个acquireSharedInterruptibly方法,

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //在获取信号的时候,不能发生中断,如果发生中断,那么就会直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //共享模式
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

公平锁

static final class FairSync extends Sync {
  protected int tryAcquireShared(int acquires) {
        //自旋
      for (;;) {
          if (hasQueuedPredecessors())
              return -1;
          int available = getState();
          int remaining = available - acquires;
            //通过这个cas的算法来获取这个信号量
          if (remaining < 0 ||
              compareAndSetState(available, remaining))
              return remaining;
      }
  }    
}

如果这个信号的中剩余的数量小于这个要获取的数量,那么会走这个doAcquireSharedInterruptibly逻辑。就是在这个CLH的同步阻塞队列里面有具体的操作

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    //共享结点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        //自旋
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

后面的这个源码和这个ReentrantLock的源码差不多,可以去看另外一篇https://blog.csdn.net/zhenghuishengq/article/details/125648495?spm=1001.2014.3001.5501


3,总结

其底层和这个ReentrantLock的实现差不多,只不过这个CLH同步等待队列的使用和这个ReentrantLock的不太一样。首先这个同步状态器是一个共享的,不像ReentrantLock里面时独占的;其次这个用法就是比如说同步状态器里面的state的值为5,那么就是说有5个信号量,那么允许这个链表中的可以有五个结点可以获取这个信号量,这个同步状态器里面每次被获取一个值,这个state就会减1,直到为0,然后这个链表中的结点就会去唤醒下一个结点,只要这个state这个状态不为0,那么这个结点就一直会唤醒下一个结点。并且每个结点的这个 waitStatues 的值为-3,为一个传播状态。

197412b88a7e4b4fbdc7bd5545054fdd.png


三,CountDownLatch

CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。


1,基本使用

例如下面这个场景,会保证10个线程同时创建完再工作,即可以模拟一个并发的过程。

//线程操作工具
CountDownLatch countDownLatch = new CountDownLatch(1);
for(int i=0;i<10;i++){
    new Thread(()->{
        try {
            //线程全部创建成功则开始全部往下走
            //因为for循环需要一点的时间,因此通过这个等待,保证线程全部创建完
            countDownLatch.await();
            for(int j=0;j<1000;j++){
                try {
                    lock.lock();
                    total++;
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}
//等上面s所有的线程全部创建完
Thread.sleep(1000);
//让线程开启
countDownLatch.countDown();
Thread.sleep(2000);
System.out.println(total);

2,源码分析

其底层也是用了这个AQS的方式实现,构造方法需要一个整型的参数

public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {...}
    //构造方法
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

这个state就是这个同步状态器里面的这个state,表示外面一次性可以访问的这个,与此同时这里面没有公平锁和非公平锁的概念,并且里面的链表长度是一个固定长度。

Sync(int count) {
    setState(count);
}
protected final void setState(int newState) {
    state = newState;
}

95d1910709b24f71a9a240317425f539.png

其**await()**等待方法如下,;

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  //向队列中增加节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
           cancelAcquire(node);
    }
}

其**countDown()**等待方法如下,每个线程在同步状态器中获取一个资源,那么这个同步状态器中的state就会减1,直到减为0

public void countDown() {
    sync.releaseShared(1);
}
protected boolean tryReleaseShared(int releases) {
  for (;;) {
      int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
    }
}

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。


四,CyclicBarrier

栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。就是说所有的线程达到这个同一起跑线的时候,才会开始执行。

CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() {
    public void run() {
        System.out.println("所有特工到达屏障,准备开始执行秘密任务");
    }
});
for (int i = 0; i < 10; i++) {
    new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start();
}
cyclicBarrier.await();
System.out.println("全部到达屏障....1");
Thread.sleep(5000);
for (int i = 0; i < 10; i++) {
    new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start();
}
cyclicBarrier.await();
System.out.println("全部到达屏障....1");

底层原理和这个CountDownLatch都差不多,这个CountDownLatch使用的是减法,这个CyclicBarrier使用的是一个加法,并且这个CountDownLatch只能使用一次,而CyclicBarrier可以使用多次。


相关文章
|
10月前
|
安全 算法 Java
Java 多线程:线程安全与同步控制的深度解析
本文介绍了 Java 多线程开发的关键技术,涵盖线程的创建与启动、线程安全问题及其解决方案,包括 synchronized 关键字、原子类和线程间通信机制。通过示例代码讲解了多线程编程中的常见问题与优化方法,帮助开发者提升程序性能与稳定性。
408 0
|
12月前
|
机器学习/深度学习 消息中间件 存储
【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
418 0
|
7月前
|
存储 缓存 监控
什么是线程池?它的工作原理?
我是小假 期待与你的下一次相遇 ~
417 1
|
7月前
|
设计模式 缓存 安全
【JUC】(6)带你了解共享模型之 享元和不可变 模型并初步带你了解并发工具 线程池Pool,文章内还有饥饿问题、设计模式之工作线程的解决于实现
JUC专栏第六篇,本文带你了解两个共享模型:享元和不可变 模型,并初步带你了解并发工具 线程池Pool,文章中还有解决饥饿问题、设计模式之工作线程的实现
455 2
|
9月前
|
数据采集 消息中间件 并行计算
Python多线程与多进程性能对比:从原理到实战的深度解析
在Python编程中,多线程与多进程是提升并发性能的关键手段。本文通过实验数据、代码示例和通俗比喻,深入解析两者在不同任务类型下的性能表现,帮助开发者科学选择并发策略,优化程序效率。
686 1
|
10月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
11月前
|
Java 数据挖掘 调度
Java 多线程创建零基础入门新手指南:从零开始全面学习多线程创建方法
本文从零基础角度出发,深入浅出地讲解Java多线程的创建方式。内容涵盖继承`Thread`类、实现`Runnable`接口、使用`Callable`和`Future`接口以及线程池的创建与管理等核心知识点。通过代码示例与应用场景分析,帮助读者理解每种方式的特点及适用场景,理论结合实践,轻松掌握Java多线程编程 essentials。
740 5
|
11月前
|
数据采集 网络协议 前端开发
Python多线程爬虫模板:从原理到实战的完整指南
多线程爬虫通过并发请求大幅提升数据采集效率,适用于大规模网页抓取。本文详解其原理与实现,涵盖任务队列、线程池、会话保持、异常处理、反爬对抗等核心技术,并提供可扩展的Python模板代码,助力高效稳定的数据采集实践。
546 0
|
10月前
|
Java API 微服务
为什么虚拟线程将改变Java并发编程?
为什么虚拟线程将改变Java并发编程?
433 83
|
7月前
|
Java
如何在Java中进行多线程编程
Java多线程编程常用方式包括:继承Thread类、实现Runnable接口、Callable接口(可返回结果)及使用线程池。推荐线程池以提升性能,避免频繁创建线程。结合同步与通信机制,可有效管理并发任务。
287 6

热门文章

最新文章