Java多线程案例——阻塞队列

简介: 阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.

1. 阻塞队列是什么


阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.


阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:


  • 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
  • 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.


阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.


2. 生产者消费者模型


生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合(关联关系紧密)问题。


生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.


这种生产者消费者模型有以下两种用途:

1) 削峰填谷


阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力.


比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求.

如果直接处理这些支付请求, 服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放到一个阻塞队列中,

然后再由消费者线程慢慢的来处理每个支付请求. 这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮.


另外一个更形象的例子:


三峡大坝:

汛期控制水量,防止水灾

旱期释放积攒的水,防止旱灾

这就类似于生产者消费者模型中的削峰填谷的作用


2) 解耦合

阻塞队列也能使生产者和消费者之间 解耦(减少两者之间的关联关系)


比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包. 擀饺子皮的人就是 “生产者”, 包饺子的人就是

“消费者”. 擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包),

包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的).


3.标准库中的阻塞队列


在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可.


  • BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
  • put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
  • BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
public static void main(String[] args) throws InterruptedException {
     BlockingQueue<String> queue=new LinkedBlockingQueue<>();
     //入队列,put具有阻塞功能
     queue.put("hello");
     //出队列
     String elem=queue.take();
     System.out.println(elem);
     elem=queue.take();
     System.out.println(elem);
}
//因为队列中的元素被取出后队列为空,所以形成阻塞


微信图片_20230110220356.png

使用jconsole工具观察线程运行状态,可以发现该线程此时处于WAITING状态,线程阻塞在了第11行。


微信图片_20230110220353.png


微信图片_20230110220349.png



生产者消费者模型


public static void main(String[] args) {
        BlockingQueue<Integer> queue=new LinkedBlockingQueue<>();
        Thread consumer=new Thread() {
            @Override
            public void run() {
                while (true) {
                    try {
                        Integer value=queue.take();
                        System.out.println("消费元素:"+value);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        consumer.start();
        Thread producer=new Thread() {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    System.out.println("生产了元素:"+i);
                    try {
                        queue.put(i);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        producer.start();
        try {
            consumer.join();
            producer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
}


微信图片_20230110220345.png

4. 阻塞队列实现

public class Demo23 {
    static class BlockingQueue {
        //1000就相当于队列的最大容量,此处暂不考虑扩容问题
        private int[] items=new int[1000];
        private volatile int head=0;
        private volatile int tail=0;
        private volatile int size=0;
        private Object locker=new Object();
        //put用来入队列
        public void put(int item) throws InterruptedException {
          //因为队列中涉及修改操作,所以通过加锁来解决线程不安全问题(原子性)。
            synchronized (locker) {
                //使用while就是为了让wait被唤醒之后,再次确认条件是否成立
                while (size==items.length) {
                    //队列已经满了,对于阻塞队列来说就要阻塞
                    locker.wait();
                }
                items[tail]=item;
                tail++;
                //如果到达末尾,就回到起始位置
                if(tail>=items.length) {
                    tail=0;
                }
                size++;
                locker.notify();
            }
        }
        public int take() throws InterruptedException {
            int ret=0;
            synchronized (locker) {
                while (size==0) {
                    //对于阻塞队列来说,如果队列为空,在尝试获取元素,就要阻塞
                    locker.wait();
                }
                ret=items[head];
                head++;
                if(head>=items.length) {
                    head=0;
                }
                size--;
                //此处的notify用来唤醒put中的wait
                locker.notify();
            }
            return ret;
        }
    }
}


put和take都可能会出现阻塞的情况(wait)

由于这两个代码中的阻塞条件是对立的,因此这两边的wait不会同时触发


put来唤醒take的阻塞,take来唤醒put的阻塞


下边我们用生产者消费者模型来检验我们自己实现的阻塞队列:


public static void main(String[] args) throws InterruptedException {
        BlockingQueue queue=new BlockingQueue();
        //消费者模型
        Thread consumer=new Thread() {
            @Override
            public void run() {
                while (true) {
                    try {
                        int elem= queue.take();
                        System.out.println("消费者元素:"+elem);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        consumer.start();
        //生产者线程
        Thread producer=new Thread() {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    System.out.println("生产元素:"+i);
                    try {
                        queue.put(i);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        producer.start();
        consumer.join();
        producer.join();
}

微信图片_20230110220332.png


我们可以发现和使用标准库中的阻塞队列的运行结果相同,说明代码实现成功。

相关文章
|
19天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
82 17
|
16天前
|
存储 监控 Java
JAVA线程池有哪些队列? 以及它们的适用场景案例
不同的线程池队列有着各自的特点和适用场景,在实际使用线程池时,需要根据具体的业务需求、系统资源状况以及对任务执行顺序、响应时间等方面的要求,合理选择相应的队列来构建线程池,以实现高效的任务处理。
97 12
|
30天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
15天前
|
缓存 安全 算法
Java 多线程 面试题
Java 多线程 相关基础面试题
|
1月前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
1月前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
Java
Java中需要注意的一些案例
Java中需要注意的一些案例
125 0
|
1月前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
60 3
|
1月前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
173 2