多线程案例(2)-阻塞式队列

简介: 多线程案例(2)-阻塞式队列

大家好,我是晓星航。今天为大家带来的是 多线程案例二 相关的讲解!😀

多线程案例二

二、阻塞式队列

阻塞队列是什么

阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.

阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:

  • 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
  • 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.

阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.

生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取

1)阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力. (削峰填谷)

比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求, 服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放 到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.

这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮.

2)阻塞队列也能使生产者和消费者之间 解耦.

解耦:降低耦合的过程。

耦合:两个东西关联程度高不高,高叫做高耦合,低叫做低耦合。

比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包. 擀饺 子皮的人就是 “生产者”, 包饺子的人就是 “消费者”.

擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包), 包饺子的人 也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超 市买的).

使用生产者消费者模型前:

 

使用了生产者消费者模型之后:

 

标准库中的阻塞队列

在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可.

  • BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
  • put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
  • BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞. 
String elem = queue.take();

生产者消费者模型

public static void main(String[] args) throws InterruptedException {
    BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
    Thread customer = new Thread(() -> {
        while (true) {
            try {
                int value = blockingQueue.take();
                System.out.println("消费元素: " + value);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }
       }
   }, "消费者");
customer.start();
    Thread producer = new Thread(() -> {
        Random random = new Random();
        while (true) {
            try {
                int num = random.nextInt(1000);
                System.out.println("生产元素: " + num);
                blockingQueue.put(num);
                Thread.sleep(1000);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }
       }
   }, "生产者");
    producer.start();
    customer.join();
    producer.join();
}

阻塞队列用法举例:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
//阻塞队列的使用
public class ThreadDemo21 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
        blockingQueue.put("hello");
        String res = blockingQueue.take();
        System.out.println(res);
        res = blockingQueue.take();
        System.out.println(res);
    }
}

这里我们的res取出了hello之后,我们的阻塞队列就变为空了,因此我们第一次打印res就输出结果hello,而第二次打印我们程序就回因为阻塞队列无值而进入wait阻塞状态。

阻塞队列实现

  • 通过 “循环队列” 的方式来实现.
  • 使用 synchronized 进行加锁控制.
  • put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一 定队列就不满了, 因为同时可能是唤醒了多个线程).
  • take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)
public class BlockingQueue {
    private int[] items = new int[1000];
    private volatile int size = 0;
    private int head = 0;
    private int tail = 0;
    public void put(int value) throws InterruptedException {
        synchronized (this) {
            // 此处最好使用 while.
            // 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
            // 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能又已经队列满了
            // 就只能继续等待
            while (size == items.length) {
                wait();
           }
            items[tail] = value;
            tail = (tail + 1) % items.length;
            size++;
            notifyAll();
       }
   }
    public int take() throws InterruptedException {
        int ret = 0;
        synchronized (this) {
            while (size == 0) {
             wait();
           }
            ret = items[head];
            head = (head + 1) % items.length;
            size--;
            notifyAll();
       }
        return ret;
   }
    public synchronized int size() {
        return size;
   }
    // 测试代码
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue blockingQueue = new BlockingQueue();
        Thread customer = new Thread(() -> {
            while (true) {
                try {
                    int value = blockingQueue.take();
                    System.out.println(value);
               } catch (InterruptedException e) {
                    e.printStackTrace();
               }
           }
       }, "消费者");
        customer.start();
        Thread producer = new Thread(() -> {
            Random random = new Random();
            while (true) {
                try {
                    blockingQueue.put(random.nextInt(10000));
               } catch (InterruptedException e) {
                    e.printStackTrace();
               }
           }
       }, "生产者");
        producer.start();
        customer.join();
        producer.join();
   }
}

自己实现阻塞队列:ThreadDemo23

//自己写的阻塞队列
//此处不考虑泛型
class MyBlockingQueue {
    private final int[] items = new int[1000];
    private int head = 0;
    private int tail = 0;
    private int size = 0;
    //入队列
    public void put (int value) throws InterruptedException {
        synchronized (this) {
            //这里的 while 是为了循环判断这里的队列到底是否已满 如果不满足就一直阻塞
            while (size == items.length) {
                //数组已满 ,此时要产生阻塞
                //return;
                this.wait();
            }
            items[tail] = value;
            tail++;
            if (tail >= items.length) {
                tail = 0;
            }
            size++;
            //这个notify 唤醒 take 中的wait
            this.notify();
        }
    }
    //出队列 - 先进先出
    public Integer take() throws InterruptedException {
        int result = 0;
        synchronized (this) {
            //这里的 while 是为了循环判断这里的队列到底是否为空 如果不满足就一直阻塞
            while (size == 0) {
                //队列为空 无法出元素
                //return null;
                this.wait();
            }
            result = items[head];
            head++;
            if (head >= items.length) {
                head = 0;
            }
            size--;
            //唤醒 put 中的 wait
            this.notify();
        }
        return result;
    }
}
public class ThreadDemo23 {
    public static void main(String[] args) throws InterruptedException {
        MyBlockingQueue queue = new MyBlockingQueue();
        queue.put(1);
        queue.put(2);
        queue.put(3);
        queue.put(4);
        queue.put(5);
        int result = 0;
        result = queue.take();
        System.out.println("result: " + result);
        result = queue.take();
        System.out.println("result: " + result);
        result = queue.take();
        System.out.println("result: " + result);
        result = queue.take();
        System.out.println("result: " + result);
        result = queue.take();
        System.out.println("result: " + result);
    }
}

运行结果:

 

这里我们 put 和 take 中的wait和notify操作是相互唤醒的。

那么这两个线程中的 wait 是否可能同时触发???(如果同时触发了,显然就不能正确相互唤醒了)

答:不会,针对同一个队列,不可能即使满又是空。

感谢各位读者的阅读,本文章有任何错误都可以在评论区发表你们的意见,我会对文章进行改正的。如果本文章对你有帮助请动一动你们敏捷的小手点一点赞,你的每一次鼓励都是作者创作的动力哦!😘

目录
相关文章
|
16天前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
30 7
|
16天前
|
消息中间件 存储 安全
|
23天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
15 1
|
6月前
|
设计模式 监控 Java
Java多线程基础-11:工厂模式及代码案例之线程池(一)
本文介绍了Java并发框架中的线程池工具,特别是`java.util.concurrent`包中的`Executors`和`ThreadPoolExecutor`类。线程池通过预先创建并管理一组线程,可以提高多线程任务的效率和响应速度,减少线程创建和销毁的开销。
197 2
|
1月前
|
存储 运维 API
源码解密协程队列和线程队列的实现原理(一)
源码解密协程队列和线程队列的实现原理(一)
35 1
|
1月前
|
存储 安全 API
源码解密协程队列和线程队列的实现原理(二)
源码解密协程队列和线程队列的实现原理(二)
33 1
|
2月前
|
安全 Java 调度
python3多线程实战(python3经典编程案例)
该文章提供了Python3中多线程的应用实例,展示了如何利用Python的threading模块来创建和管理线程,以实现并发执行任务。
39 0
|
3月前
|
存储 监控 Java
|
3月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
168 4
|
3月前
|
数据采集 Java Python
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器

相关实验场景

更多