多线程案例-阻塞队列

简介: 多线程案例-阻塞队列

阻塞队列是什么

阻塞队列是一种特殊的队列.也遵循"先进先出"的原则

阻塞队列能是一种线程安全数据结构,并且具有以下特性:

当队列满的时候,继续入队列就会阻塞,直到有其他线程从队列中取走元素.

当队列空的时候,继续出队列也会阻塞,直到有其他线程往队列中插入元素.

阻塞队列的一个典型应用场景就是"生产者消费者模型".这时一种非常典型的开发模型.

生产者消费者模型

实际开发中,经常会涉及到分布式系统.服务器整个功能不是由一个服务器全部完成的.而是每个服务器负责一部分功能.通过服务器间的网络通信,最终完成整个功能.

生产者消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题.(更好地做到解耦合的能力).

生产者和消费者彼此之间不再进行通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据不用再等消费者处理,而是直接丢给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列中取.

示意图如下:

1.阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力.(削峰填谷)

比如在"秒杀"的场景下,服务器同一时刻可能会受到大量的支付请求.如果直接处理这些支付要求,服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程,即使一个请求消耗的资源少,但加到一起,总的消耗的资源就多了,任何一种硬件资源达到瓶颈,服务器都会挂).这个时候就可以把这些请求都放到一个阻塞队列中,然后再由消费者线程慢慢来处理每个支付请求.

这样做可以有效做到"削峰",防止服务器被突然来到的一波请求直接冲垮(挂的直观现象:给它发请求,无回应).

2.阻塞队列也能使生产者和消费者之间"解耦"

比如过年一家人一起包饺子.一般都是有明确分工,比如一个人负责擀饺子皮,其他人负责包.擀饺子皮的人就是"生产者",包饺子的人就是"消费者".

擀饺子皮的人并不关心包饺子的人是谁(能包就行,无论是手工,借助工具还是机器),包饺子的人也不需要关心擀饺子皮的人是谁(有饺子皮就行,无论是用擀面杖擀的,还是用ipadAir5擀的)

补充说明:

(1)上述描述的阻塞队列,并非是简单的数据结构,而是基于这个这个数据结构实现的服务器程序,又被部署到单独的主机上了(消息队列)

(2)整个系统的结构更复杂了.你要维护的服务器更多了

(3)效率.引入中间商,还是有差价的.比如在上面的图当中,请求从A出来到B收到.过程中的就经历队列的转发,这个过程有一定开销.

标准库中的阻塞队列

在Java标准库中内置了阻塞队列.如果我们需要在一些程序中使用阻塞队列,直接使用标准库中的即可.

譬如有:ArrayBlockingQueue, LinkedBlockingQueue,PriorityBlockingQueue.但最常用的是

LinkedBlockingQueue.

BlockingQueue是一个接口.真正实现的类是LinkedBlockingQueue.

put方法用于阻塞式的入队列,take用于阻塞式的出队列.

BlockingQueue也有offer,poll,peek等方法,但是这些方法不具有阻塞特性.

简单的代码示例:

public class BlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new LinkedBlockingQueue<>();
        //入队列
        queue.put("abc");
        //出队列.如果没有put直接take,会阻塞.
        String elem = queue.take();
        System.out.println(elem);
    }
}

生产者消费者模型

实际开发中,生产者消费者模型,往往是多个生产者多个消费者.

这里的生产者和消费者往往不仅是一个线程,也可能是独立的服务器程序.甚至是一组服务器程序.

代码示例如下:

public class TestCustomerAndProducer {
    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
 
        Thread customer = new Thread(() -> {
           while(true) {
               try {
                   int value = blockingQueue.take();
                   System.out.println("消费元素: " + value);
                   Thread.sleep(500);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
        }, "消费者");
        
        Thread producer = new Thread(() -> {
            Random r = new Random();
            while(true) {
                try {
                    int num = r.nextInt(1000);
                    System.out.println("生产元素: " + num);
                    blockingQueue.put(num);
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "生产者");
 
        customer.start();
        producer.start();
    }
}

阻塞队列的实现

通过"循环队列"的方式实现.

使用synchronized进行加锁控制.

put插入元素的时候,判定如果队列满了,就进行wait.(注意,要在循环中进行wait.被唤醒时不一定队列就不满了,因为同时可能是唤醒了多个线程).

take取出元素的时候如果判定队列为空,就进行wait(也是循环wait).

下面展示代码(注意注释中的重点):

public class MyBlockingQueue {
    //主题内容指定为一个含有1000个元素的数组
    public int[] elems = new int[1000];
    private volatile int size = 0;
    private volatile int head = 0;
    private volatile int tail = 0;
    //锁对象
    private Object locker = new Object();
 
    public synchronized int getSize() {
        return size;
    }
 
    public void put(int value) throws InterruptedException {
        //锁加到这里和加到方法上的本质是一样的,加到方法上是给this加锁,此处是给locker对象加锁.
        synchronized (locker) {
            while(size >= elems.length) {//1
                //队列满了
                //后续需要让这个代码能够阻塞
                locker.wait();
            }
            //新的元素要放到tail指向的位置上
            elems[tail] = value;
            tail = (tail + 1) % elems.length;
            size++;
            //入队之后唤醒(可能有阻塞的take方法)
            locker.notify();
        }
    }
 
    public int take() throws InterruptedException {
        int ret = 0;
        synchronized (locker) {
            while(size <= 0) {//1
                //队列空了
                //后续也需要让这个代码阻塞
                locker.wait();
            }
            //取出head位置的元素并返回
            ret = elems[head];
            head = (head + 1) % elems.length;
            size--;
            //元素出队列成功后,加上唤醒
            locker.notify();
        }
        return ret;
    }
}

我相信大家应该能了解锁是怎么加的,这里不过多赘述.

那可能就会有人问,1处的判断处为什么用的是while,而不是if?

这主要是因为put和take中使用的是同一把锁.我们可能会想到,如put中元素满了阻塞,然后take出元素了,这里就解锁.但我要说的是,如果是put成功了(这里put之后队列刚好满了),又唤醒了另一个阻塞的put,又进行put,显然会出错,那么就可以加上循环条件,如果队列一直是满的,就不再被唤醒,所以用while.

相关文章
|
3月前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
27天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
17 1
|
2月前
|
数据采集 负载均衡 安全
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
本文提供了多个多线程编程问题的解决方案,包括设计有限阻塞队列、多线程网页爬虫、红绿灯路口等,每个问题都给出了至少一种实现方法,涵盖了互斥锁、条件变量、信号量等线程同步机制的使用。
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
|
6月前
|
设计模式 监控 Java
Java多线程基础-11:工厂模式及代码案例之线程池(一)
本文介绍了Java并发框架中的线程池工具,特别是`java.util.concurrent`包中的`Executors`和`ThreadPoolExecutor`类。线程池通过预先创建并管理一组线程,可以提高多线程任务的效率和响应速度,减少线程创建和销毁的开销。
215 2
|
1月前
|
消息中间件 NoSQL 关系型数据库
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
25 0
|
2月前
|
安全 Java 调度
python3多线程实战(python3经典编程案例)
该文章提供了Python3中多线程的应用实例,展示了如何利用Python的threading模块来创建和管理线程,以实现并发执行任务。
44 0
|
3月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
171 4
|
6月前
|
设计模式 安全 Java
多线程(代码案例: 单例模式, 阻塞队列, 生产者消费者模型,定时器)
多线程(代码案例: 单例模式, 阻塞队列, 生产者消费者模型,定时器)
58 2
|
5月前
|
存储 Java API
java线程之阻塞队列
java线程之阻塞队列
|
5月前
|
安全 Java 容器
线程池,定时器以及阻塞队列(生产者/消费者模型)
线程池,定时器以及阻塞队列(生产者/消费者模型)
40 0