Java多线程案例——阻塞队列

简介: 阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.

1. 阻塞队列是什么


阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.


阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:


  • 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
  • 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.


阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.


2. 生产者消费者模型


生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合(关联关系紧密)问题。


生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.


这种生产者消费者模型有以下两种用途:

1) 削峰填谷


阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力.


比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求.

如果直接处理这些支付请求, 服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放到一个阻塞队列中,

然后再由消费者线程慢慢的来处理每个支付请求. 这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮.


另外一个更形象的例子:


三峡大坝:

汛期控制水量,防止水灾

旱期释放积攒的水,防止旱灾

这就类似于生产者消费者模型中的削峰填谷的作用


2) 解耦合

阻塞队列也能使生产者和消费者之间 解耦(减少两者之间的关联关系)


比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包. 擀饺子皮的人就是 “生产者”, 包饺子的人就是

“消费者”. 擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包),

包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的).


3.标准库中的阻塞队列


在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可.


  • BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
  • put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
  • BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
public static void main(String[] args) throws InterruptedException {
     BlockingQueue<String> queue=new LinkedBlockingQueue<>();
     //入队列,put具有阻塞功能
     queue.put("hello");
     //出队列
     String elem=queue.take();
     System.out.println(elem);
     elem=queue.take();
     System.out.println(elem);
}
//因为队列中的元素被取出后队列为空,所以形成阻塞


微信图片_20230110220356.png

使用jconsole工具观察线程运行状态,可以发现该线程此时处于WAITING状态,线程阻塞在了第11行。


微信图片_20230110220353.png


微信图片_20230110220349.png



生产者消费者模型


public static void main(String[] args) {
        BlockingQueue<Integer> queue=new LinkedBlockingQueue<>();
        Thread consumer=new Thread() {
            @Override
            public void run() {
                while (true) {
                    try {
                        Integer value=queue.take();
                        System.out.println("消费元素:"+value);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        consumer.start();
        Thread producer=new Thread() {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    System.out.println("生产了元素:"+i);
                    try {
                        queue.put(i);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        producer.start();
        try {
            consumer.join();
            producer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
}


微信图片_20230110220345.png

4. 阻塞队列实现

public class Demo23 {
    static class BlockingQueue {
        //1000就相当于队列的最大容量,此处暂不考虑扩容问题
        private int[] items=new int[1000];
        private volatile int head=0;
        private volatile int tail=0;
        private volatile int size=0;
        private Object locker=new Object();
        //put用来入队列
        public void put(int item) throws InterruptedException {
          //因为队列中涉及修改操作,所以通过加锁来解决线程不安全问题(原子性)。
            synchronized (locker) {
                //使用while就是为了让wait被唤醒之后,再次确认条件是否成立
                while (size==items.length) {
                    //队列已经满了,对于阻塞队列来说就要阻塞
                    locker.wait();
                }
                items[tail]=item;
                tail++;
                //如果到达末尾,就回到起始位置
                if(tail>=items.length) {
                    tail=0;
                }
                size++;
                locker.notify();
            }
        }
        public int take() throws InterruptedException {
            int ret=0;
            synchronized (locker) {
                while (size==0) {
                    //对于阻塞队列来说,如果队列为空,在尝试获取元素,就要阻塞
                    locker.wait();
                }
                ret=items[head];
                head++;
                if(head>=items.length) {
                    head=0;
                }
                size--;
                //此处的notify用来唤醒put中的wait
                locker.notify();
            }
            return ret;
        }
    }
}


put和take都可能会出现阻塞的情况(wait)

由于这两个代码中的阻塞条件是对立的,因此这两边的wait不会同时触发


put来唤醒take的阻塞,take来唤醒put的阻塞


下边我们用生产者消费者模型来检验我们自己实现的阻塞队列:


public static void main(String[] args) throws InterruptedException {
        BlockingQueue queue=new BlockingQueue();
        //消费者模型
        Thread consumer=new Thread() {
            @Override
            public void run() {
                while (true) {
                    try {
                        int elem= queue.take();
                        System.out.println("消费者元素:"+elem);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        consumer.start();
        //生产者线程
        Thread producer=new Thread() {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    System.out.println("生产元素:"+i);
                    try {
                        queue.put(i);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        producer.start();
        consumer.join();
        producer.join();
}

微信图片_20230110220332.png


我们可以发现和使用标准库中的阻塞队列的运行结果相同,说明代码实现成功。

相关文章
|
2月前
|
JSON 网络协议 安全
【Java】(10)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
177 2
|
2月前
|
JSON 网络协议 安全
【Java基础】(1)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
203 1
|
3月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案
Java 数据库 Spring
160 0
|
3月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
252 16
|
4月前
|
缓存 并行计算 安全
关于Java多线程详解
本文深入讲解Java多线程编程,涵盖基础概念、线程创建与管理、同步机制、并发工具类、线程池、线程安全集合、实战案例及常见问题解决方案,助你掌握高性能并发编程技巧,应对多线程开发中的挑战。
|
4月前
|
数据采集 存储 前端开发
Java爬虫性能优化:多线程抓取JSP动态数据实践
Java爬虫性能优化:多线程抓取JSP动态数据实践
|
Java
Java中需要注意的一些案例
Java中需要注意的一些案例
164 0
|
5月前
|
Java API 调度
从阻塞到畅通:Java虚拟线程开启并发新纪元
从阻塞到畅通:Java虚拟线程开启并发新纪元
363 83

热门文章

最新文章