Java多线程案例【阻塞队列】

简介: Java多线程案例【阻塞队列】

🍒一.阻塞队列介绍


🍎1.1阻塞队列特性


阻塞队列特性:


一.安全性


二.产生阻塞效果:


阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:

当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.

当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.


阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.

3ecc3e277c9c44268f1e818120dcad31.png



a4f7e9a031404768a39c753b3ee28ffc.png



🍎1.2阻塞队列的优点


我们可以将阻塞队列比做成"生产者"和"消费者"的"交易平台".


我们可以把这个模型来比做成"包饺子"

A 的作用是擀饺子皮,也就是"生产者"

B 的作用是包饺子,也就是"消费者"

X 的作用一个当作放擀好饺子皮的一个盘中,也就是阻塞队列

这样我们根据A,B,X可以想象以下场景


场景一:

当A擀饺子皮的速度过快,X被A的杆好饺子皮放满了,这样A就需要停止擀饺子皮这一个操作,这时只能等待B来利用A提供的饺子皮包饺子后X所空出的空间,来给A提供生产的环境


场景二:

当B包饺子的速度过快,X被B的包饺子所用的饺子皮用空,这样B就需要停止包饺子这一个操作,这时只能等待A提供的饺子皮包饺子后X所存在饺子皮,来给B提供消费的环境


🍒二.生产者消费者模型


生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题


生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取


(1) 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力.

比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求,服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮.


(2) 阻塞队列也能使生产者和消费者之间 解耦.

比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包. 擀饺子皮的人就是 “生产者”, 包饺子的人就是 “消费者”.擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包), 包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的).


🍎2.1阻塞队列对生产者的优化


优化一:能够让多个服务器程序之间更充分的解耦合:


如果不使用生产者和消费者模型,此时A和B的耦合性比较强,如果A线程出现一些状况B就会挂,B线程出现一些状况A就会挂,这时当我们引入阻塞队列后我们就可以将A,B线程分开,如果A,B线程挂了有阻塞队列的存在下,是不会影响别的线程


优化二:能够对于请求进行"削峰填谷":


我们可以联想到我国的三峡大坝,三峡大坝就相当于阻塞队列,当我们遇到雨水大的季节,我们就可以关闭三峡大坝,利用三峡大坝来存水;当我们遇到干旱期,我们就可以打开三峡大坝的门,来放水解决干旱问题



🍒三.标准库中的阻塞队列


🍎3.1Java提供阻塞队列实现的标准类


java官方也提供了阻塞队列的标准类,主要有下面几个:


标准类 说明
ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列
LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列
PriorityBlockingQueue 一个支持优先级排序的无界阻塞队列
DelayQueue 一个使用优先级队列实现的无界阻塞队列
SynchronousQueue 一个不存储元素的阻塞队列
LinkedTransferQueue 一个由链表结构组成的无界阻塞队列
LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列
BlockingQueue接口 单向阻塞队列实现了该接口
BlockingDeque接口 双向阻塞队列实现了该接口



🍎3.2Blockingqueue基本使用


在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可.


BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.

BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();

🍒四.阻塞队列实现


🍎4.1阻塞队列的代码实现


我们通过 “循环队列” 的方式来实现

使用 synchronized 进行加锁控制.put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一定队列就不满了, 因为同时可能是唤醒了多个线程).take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)


我们在设计阻塞队列的时候可以将队列联想成一个圆


2806d7d7fb984c9690d533bf4b6219c0.png



class BlockingQueue{
    //队列里存放的个数
   volatile private int size = 0;
    //队列的头节点
    private int head = 0;
    //队列的尾节点
    private int prov = 0;
    //创建一个数组,我们来给这个数组的容量设置为100
    private int[] array = new int[100];
    //创建一个专业的锁对象
    private Object object = new Object();
    //实现阻塞队列中的put方法
    public void put(int value) throws InterruptedException {
        synchronized (object) {
            //当数组已经满了
            if (size == array.length) {
                object.wait();
            } else {
                //我们可以优化成prov = (prov + 1) % items.length
                array[prov] = value;
                prov ++;
               if (prov >= array.length) {
                   prov = 0;
               }
            }
            size++;
            object.notify();
        }
    }
    //实现阻塞队列中的take方法
    public int take() throws InterruptedException {
        synchronized (object) {
            if (size == 0) {
                object.wait();
            }
            int x = array[head];
            head++;
            if (head >= array.length) {
                head = 0;
            }
            size--;
            object.notify();
            return x;
        }
    }
}


🍎4.2阻塞队列搭配生产者与消费者的代码实现


class BlockingQueue{
    //队列里存放的个数
   volatile private int size = 0;
    //队列的头节点
    private int head = 0;
    //队列的尾节点
    private int prov = 0;
    //创建一个数组,我们来给这个数组的容量设置为100
    private int[] array = new int[100];
    //创建一个专业的锁对象
    private Object object = new Object();
    //实现阻塞队列中的put方法
    public void put(int value) throws InterruptedException {
        synchronized (object) {
            //当数组已经满了
            if (size == array.length) {
                object.wait();
            } else {
                //我们可以优化成prov = (prov + 1) % items.length
                array[prov] = value;
                prov ++;
               if (prov >= array.length) {
                   prov = 0;
               }
            }
            size++;
            object.notify();
        }
    }
    //实现阻塞队列中的take方法
    public int take() throws InterruptedException {
        synchronized (object) {
            if (size == 0) {
                object.wait();
            }
            int x = array[head];
            head++;
            if (head >= array.length) {
                head = 0;
            }
            size--;
            object.notify();
            return x;
        }
    }
}
public class Test {
    public static void main(String[] args) {
        BlockingQueue blockingQueue = new BlockingQueue();
        Thread thread1 = new Thread(()-> {
            while (true) {
                for (int i = 0; i < 100; i++) {
                    try {
                        blockingQueue.put(i);
                        System.out.println("生产了"+i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        Thread thread2 = new Thread(()->{
                while (true) {
                    try {
                        int b = blockingQueue.take();
                        System.out.println("消耗了"+b);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
        });
        thread1.start();
        thread2.start();
    }
}

01eacdfcb4914910a3ac8f608fb57fae.png

相关文章
|
15天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
71 17
|
12天前
|
存储 监控 Java
JAVA线程池有哪些队列? 以及它们的适用场景案例
不同的线程池队列有着各自的特点和适用场景,在实际使用线程池时,需要根据具体的业务需求、系统资源状况以及对任务执行顺序、响应时间等方面的要求,合理选择相应的队列来构建线程池,以实现高效的任务处理。
90 12
|
26天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
11天前
|
缓存 安全 算法
Java 多线程 面试题
Java 多线程 相关基础面试题
|
28天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
28天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
Java
Java中需要注意的一些案例
Java中需要注意的一些案例
125 0
|
28天前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
54 3
|
28天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
154 2

热门文章

最新文章