Java多线程基础-9:代码案例之阻塞队列(一)

简介: 阻塞队列是一种遵循先进先出原则的线程安全数据结构,它在队列满时会阻塞入队操作,队列空时会阻塞出队操作,常用于多线程间的协作,简化同步代码编写。Java中提供了`BlockingQueue`接口及其实现类,如`ArrayBlockingQueue`和`LinkedBlockingQueue`,用于实现生产者-消费者模型,以实现负载均衡和资源的有效利用,如削峰填谷,降低系统压力。

阻塞队列是一种特殊的队列,带有“阻塞特性”,也遵守队列 “先进先出” 的原则。阻塞队列是一种线程安全的数据结构,并且具有以下特性:


当队列满时,继续入队列就会阻塞,直到有其他线程从队列中取走元素。

当队列空时,继续出队列也会阻塞,直到有其他线程往队列中插入元素。

在多线程代码编程中,多个线程之间要进行数据交互,我们可以使用阻塞队列来简化代码的编写。



一、Java标准库:BlockingQueue接口


1、使用标准库中的阻塞队列


Java 标准库中内置了阻塞队列。如果我们需要在一些程序中使用阻塞队列,可以直接使用标准库中的 BlockingQueue 。

与BlockingQueue相关的具体的继承和实现关系如下:


collection接口------>Queue接口------>BlockingQueue接口----->7个实现类


  1. ArrayBlockingQueue:由数组结构组成的有界阻塞队列。它在构造的时候用户必须传入一个capacity。



2.LinkedBlockingQueue:由链表结构组成的有界阻塞队列,默认capacity为

Integer.MAX_VALUE。也可以自行指定capacity。


  1. PriorityBlockingQueue:支持优先级排序的无界阻塞队列。


  1. DelayQueue:使用优先级队列实现的延迟无界阻塞队列。


  1. SynchronousQueue:不存储元素的阻塞队列,它的容量为0.


  1. LinkedTransferQueue:由链表结构组成的无界阻塞队列。


  1. LinkedBlockingDeque:由链表结构组成的双向阻塞队列。


BlockingQueue 是一个接口,真正实现的类是 LinkedBlockingQueue(链表实现)和 ArrayBlockingQueue (顺序表实现)。


BlockingDeque<String> queue1 = new LinkedBlockingDeque<>();
BlockingDeque<String> queue2 = new ArrayBlockingQueue<>();


阻塞队列的核心方法主要有两个:阻塞队列~带有阻塞特性!!!


1.如果队列空,尝试出队列,就会阻塞等待.等待到队列不空为止2.

put() 用于阻塞式的入队列。put()方法会抛出 InterruptedException 异常,因为该方法可能会带来阻塞。而一旦阻塞了,就有可能被interrupt方法提前唤醒,此时就会抛出该异常(会带来阻塞的方法往往会抛出 InterruptedException 异常)。如果队列已满,尝试put(),就会阻塞等待,直到队列不满为止。


2.take() 用于阻塞式的出队列。如果队列空,尝试take(),也会阻塞等待,直到队列不空为止。


如下面代码所示,put 5 次后,执行take 6 次的操作。由于第6次出队列时队列已空,线程就会阻塞:


import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
 
public class Test {
    public static void main(String[] args) throws InterruptedException {
        BlockingDeque<String> queue = new LinkedBlockingDeque<>();
        // 阻塞队列的和核心方法主要有两个
        // 1. put() 入队列
        queue.put("hello1");
        queue.put("hello2");
        queue.put("hello3");
        queue.put("hello4");
        queue.put("hello5");
 
        // 2.take() 出队列
        String ret = null;
        // 出队列 1 次
        ret = queue.take();
        System.out.println(ret);
        // 出队列 2 次
        ret = queue.take();
        System.out.println(ret);
        // 出队列 3 次
        ret = queue.take();
        System.out.println(ret);
        // 出队列 4 次
        ret = queue.take();
        System.out.println(ret);
        // 出队列 5 次
        ret = queue.take();
        System.out.println(ret);
        // 出队列 6 次
        ret = queue.take();
        System.out.println(ret);
    }
}



BlockingQueue 也有 offer,poll, peek 等方法, 但是这些方法不带有阻塞特性。


2、生产者-消费者模型:多线程下阻塞队列应用


a.什么是生产者-消费者模型?


生产者-消费者模型这个概念很关键,是我们服务器开发中一种非常常见的写法。


可以用擀饺子皮和包饺子这两件事情来类比。假设过年有一桌子人围在一起准备饺子。每个人自己擀一个饺子皮,自己包一个;再擀一个,再自己包一个……这样的做法虽然能完成任务,但是并不高效:家里只有一根擀面杖,当一个人正在使用擀面杖的时候,剩下的几个人只能干等着,即使有多根擀面杖,但每个人都在不停地状态切换,也很费劲。


更常见的包法是一个人负责擀皮,另外几个人负责包(类似于流水线)。此时,就构成了生产者-消费者模型。


注意,生产者、消费者的角色是针对某个资源而言的,针对的资源不同,角色分配也就不同。对于饺子皮来说,负责擀皮的人就是生产饺子皮的,所以是生产者;负责包饺子的人是饺子皮的消耗饺子皮的,所以是消费者。



生产者和消费者之间交换数据,就需要用到一个交易场所。比如放饺子的盖帘:



这个交易场所就相当于是一个阻塞队列。如果擀饺子皮的人擀得很快,盖帘上一下子堆满了饺子皮,擀饺子皮的人就可以歇会儿;如果包饺子的包得很快,一下子把饺子皮都包完了,那包子饺子的就停下来等待擀饺子皮的人再擀点皮出来。


b.生产者-消费者模型是用来解决什么问题的?


生产者-消费者能解决很多问题,最主要的是两个方面:


第一,可以让上下游模块之间,更好地“解耦合”。

耦合指的是模块之间的关系是强还是弱,关联越强,耦合越高。而写代码追求的就是低耦合高内聚,避免代码牵一发动全身。(高内聚指的是将相关联的代码分门别类地规制起来。)


什么是耦合高的情况呢?考虑以下情景:有两台服务器A和B直接通信。



此时,如果要再加一个与A通信的C服务器,A的代码就必须作出很大调整。


而使用生产者-消费者模型,耦合就能够降低了。仍然有要进行通信的服务器A和服务器B,但此时它们不直接进行通信,A把它的请求发给一个阻塞队列服务器(也叫消息队列服务器。是把阻塞队列的功能单列出来,再扩充上更多的功能做成单独的服务器)。


A和B彼此不知道对方的存在,它们都只与阻塞队列服务器通信。此时B挂了,对A没有影响。如果再加一个C,此时直接让C从队列中获取请求即可,对于A的影响是非常小的。



A、B、C作为业务服务器,代码是不断在变化的;而阻塞队列服务器和业务无关,代码不太会变化,也就更稳定。因此,阻塞队列服务器挂的概率是远小于业务服务器的。


第二,使用生产者-消费者模型,可以削峰填谷。


仍然是两个服务器:



A与B直接通信的情况


如果此时A和B是直接调用的关系,A收到了请求峰值,B也同样会有这个峰值。假设A平时收到的请求是 10000个/s,而在某个时间点突然收到了 30000个/s,对于B来说,它的请求同样是 30000个/s。服务器在处理每个请求时,都要消耗一定的硬件资源,包括不限于CPU,内存,带宽等,如果某个硬件资源达到瓶颈,此时服务器就挂了。


如果B在设计时,没有考虑峰值处理的情况,B可能也就挂了,这就给系统的稳定性带来了极大的风险。但是,如果引入生产者-消费者模型,风险就大大降低了。



引入阻塞队列的情况

A不直接向B发送请求,而是向消息队列服务器发送请求。A收到的请求多了,队列中的元素也就多了,而此时的B仍然可以按照之前的速率来取元素。换句话说,队列帮B承担了压力。(队列没有业务,代码稳定,承担了压力也不容易挂。另外,A作为入口服务器,一般来说起到的效果就是调用一下其他服务武器,并把结果汇总,统一返回,业务比较简单,因此A服务器也不容易挂。而B服务器是有具体的业务的,B承担的工作量更大,单个请求吃的资源更多,也就更“娇贵”,更容易挂。)


假设流量高峰后还有个波谷,此时B仍然可以按照原有速率消费队列中之前积压的数据。


阻塞队列的削峰填谷作用与水库、湖泊对河流水量削峰填谷的作用是相似的。


这就像一条河流,当雨季时,天降暴雨(降水量达到峰值),河流上游的水量激增,下游的水量也会随之暴增。此时如果下游没有相应的排洪泄洪设施,河流下游就可能因为河水决堤而引发洪灾;另一方面,当旱季降水不足,上游水量供给减少,也会直接导致下游旱灾。而在中游建设一个水库,在雨季用于蓄洪,在旱季用于给下游开闸放水,此时就能一直保持河流下游水量适中且变化平缓了。如三峡大坝就是如此。


阻塞队列、三峡大坝等起到的效果与生产者-消费者模型起到的效果也是一致的。


二、使用 BlockingQueue 代码实现生产者-消费者模型


下面是使用BlockingQueue实现生产者-消费者模型模型的一个代码案例。注意,其中的生产者和消费者不一定只有一个,可以同时存在多个消费者和生产者。


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
 
public class Main3 {
    public static void main(String[] args) {
        // 创建一个阻塞队列
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
 
        // 消费者
        Thread t1 = new Thread(() -> {
            while(true) {
                try {
                    int value = blockingQueue.take();
                    System.out.println("消费元素:" + value);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
 
        // 生产者
        Thread t2 = new Thread(() -> {
            int value = 0;
            while(true) {
                try {
                    System.out.println("生产元素: " + value);
                    blockingQueue.put(value);
                    value++;
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t2.start();
    }
}


运行结果:每隔一秒(1000ms)生产者会生产出一个元素,put进阻塞队列blockingQueue内;而消费者消费元素没有时间间隔,一旦blockingQueue中有元素,它就会take,立即进行消费;没有元素的时候消费者将阻塞等待生产者生产出元素。




生产者-消费者模型是阻塞队列BlockingQueue的应用。如何使用BlockingQueue是简单的,但关键还要掌握如何通过自己的代码实现BlockingQueue。



Java多线程基础-9:代码案例之阻塞队列(二)+

https://developer.aliyun.com/article/1520533?spm=a2c6h.13148508.setting.14.75194f0e9S8GyQ

相关文章
|
21小时前
|
Java
java线程之用户线程与守护线程
java线程之用户线程与守护线程
6 1
java线程之用户线程与守护线程
|
1天前
|
消息中间件 监控 Java
Java 框架界的‘巨星’Spring,让你的代码翩翩起舞!
【6月更文挑战第25天】Spring,Java框架的明星,以其依赖注入(DI)和面向切面编程(AOP)简化开发。DI协调类间协作,AOP提供日志、监控等附加功能。事务管理确保代码稳定性,注解如`@Transactional`自动化事务处理。Spring的集成能力使代码灵活适应多样技术场景,让编程变得优雅高效,犹如舞蹈般流畅。跟随Spring,让代码起舞!
|
22小时前
|
安全 IDE Java
使用MapStruct和Lombok简化代码
使用MapStruct和Lombok简化代码
14 2
|
21小时前
|
安全 Java
java线程之List集合并发安全问题及解决方案
java线程之List集合并发安全问题及解决方案
6 1
|
21小时前
|
Java
java线程之定制化通信(多轮顺序打印问题)
java线程之定制化通信(多轮顺序打印问题)
6 1
|
21小时前
|
安全 Java
java线程之线程安全
java线程之线程安全
13 1
|
21小时前
|
Java
java线程之异步回调
java线程之异步回调
4 0
|
21小时前
|
存储 Java API
java线程之阻塞队列
java线程之阻塞队列
7 0
|
21小时前
|
Java
java线程之读写锁
java线程之读写锁
5 0
|
21小时前
|
Java
java线程之信号同步
java线程之信号同步
7 0