【JavaEE】多线程案例-阻塞队列

简介: 【JavaEE】多线程案例-阻塞队列


1. 前言

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:


  • 在队列为空时,获取元素的线程会等待队列变为非空
  • 当队列满时,存储元素的线程会等待队列可用

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。


2. 什么是生产者-消费者模型

生产者消费者模型是一种多线程并发协作的模型,由两类线程和一个缓冲区组成:生产者线程生产数据并把数据放在缓冲区,消费者线程从缓冲区取数据并消费。生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品。当存储空间为空时,消费者阻塞;当存储空间满时,生产者阻塞。

3. 生产者-消费者模型的意义

  1. 解耦合
  2. 削峰填谷

3.1 解耦合

两个模块的联系越紧密,耦合度就越高,耦合度越高就意味着两个模块的影响程度很大,当一个模块出现问题的时候,另一个模块也会受到影响而导致出现问题,特别是对分布式系统来说:解耦合是非常重要的。


假设上面是一个简单的分布式系统,服务器 A 和服务器 B 之间直接进行交互(服务器 A 向服务器 B 发送请求并接收服务器 B 返回的信息,服务器 B 向服务器 A 发送请求,以及接收服务器 A 返回的信息),服务器 A 和服务器 B 之间的耦合度比较高,当两个服务器之间的一个发生故障的时候就会导致两个服务器都无法使用。


不仅如此,当我们想要再添加一个服务器 C 与服务器 A 之间进行交互的时候,不仅需要对服务器 C 做出修改,还需要对服务器 A 作出修改。


相比上面的情况,如果我们使用生产者-消费者模型的话就可以解决上面的耦合度过高的问题。

服务器 A 接收到客户端发来的请求不是直接发送给服务器 B ,而是将接收到的请求加入到阻塞队列中,然后服务器 B 从阻塞队列中获取到请求,这样就避免了两个服务器之间进行直接的交互,降低了耦合性;不仅如此,当我们需要额外添加一个服务器 C 的时候,就不需要对服务器 A 做出修改,而是直接从阻塞队列获取请求信息。

3.2 削峰填谷


当客户端向服务器 A 短时间发出大量请求信息的话,那么当服务器 A 接收到客户端发来的请求的时候,就会立即将收到的所有信息都发送给服务器 B ,但是由于虽然服务器 A 能够接收的请求量可以很多,但是服务器 B 却不能一次接收这么多请求,就会导致服务器 B 会挂掉。


如果使用生产者-消费者莫模型的话,当客户端向服务器 A 短时间发送大量请求的话,服务器 A 不会将请求发送给 B ,而是发送给阻塞队列中,当阻塞队列满了的时候,服务器 A 就会停止向阻塞队列中发送请求,陷入阻塞状态,等服务器 B 向阻塞队列中受到请求使得阻塞队列容量减少的时候,服务器 A 才会继续向阻塞队列中发送收到的请求,这样就避免了服务器 B 短时间内受到大量的请求而挂掉的情况;如果阻塞队列中收到的请求信息被读取完的时候,服务器 B 就会停止从阻塞队列中读取请求,进入阻塞状态,直到服务器 A 向阻塞队列中发送请求。


4. 如何使用Java标准库提供的阻塞队列

当知道了生产者-消费者模型的意义之后,我们就来看看如何使用阻塞队列。在Java标准库中提供了阻塞队列 BlockingQueue 可以直接使用。

因为 BlockingQueu 是一个接口,无法实例化,所以需要创建出实现了 BlockingQueue 接口的类,而 ArrayBlockingQueue 和 LinkedBlockingQueue 实现了这个接口。


我们还可以观察到,BlockingQueue 还继承了 Queue ,也就是说我们也可以使用 Queue 中的方法,比如 offer 和 poll 等,但是在阻塞队列中不使用这两个方法,因为这两个方法不具有阻塞特性,而是使用 put 和 take 方法。

public class Demo1 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new LinkedBlockingQueue<>();
        queue.put("123");
        queue.put("234");
        queue.put("345");
        queue.put("456");
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
    }
}

这里向阻塞队列中加入了四个数据,但是读取的时候读取了五次,所以看到线程进入了阻塞状态。

5. 自己实现一个阻塞队列


阻塞队列是建立在队列的基础上的,所以要想实现一个阻塞队列,首先需要实现出来一个队列,那么就先来看看如何实现出一个循环队列。

5.1 实现出循环队列

队列比较容易实现,但是循环队列该如何实现呢?当数据到达数组的最后的时候,将数组的下标修改为0,这样就可以达到循环的目的。


当 tail == head 的时候有两种情况:


队列中没有数据的时候

队列满了的时候

为了区分这两种情况,我们可以使用两种方法:


浪费一个空间,当tail++之后,如果tail + 1 == head,则表示队列满了,将 tail 修改为 0

定义一个size变量来表示队列中有效数据的个数,当size == queue.length的时候,表示队列满了

class MyQueue {
    private final String[] data = new String[1000];
    private int size;
    private int head = 0;
    private int tail = 0;
    public void put(String str) {
        //当添加数据的时候需要判断队列的容量是否已经满了
        if(size == data.length) return;
        data[tail++] = str;
        size++;
        if(tail == data.length) tail = 0;
    }
    public String take() {
      //读取数据的时候需要判断队列是否为空
        if(size == 0) return null;
        String ret = data[head++];
        size--;
        if(head == data.length) head = 0;
        return ret;
    }
}

5.2 实现阻塞队列

阻塞队列就是在队列为空时,获取元素的线程会等待队列变为非空;当队列满时,存储元素的线程会等待队列可用。并且因为阻塞队列运用的环境是多线程,需要考虑到线程安全的问题。

5.2.1 加锁

当需要进行查询和修改的操作时,需要对该操作进行加锁。因为我们的 put 和 take 基本上都在查询和修改数据,所以可以将这两个操作直接进行加锁操作。

class MyBlockingQueue {
    private final String[] data = new String[1000];
    private int size;
    private int head = 0;
    private int tail = 0;
    public void put(String str) {
        synchronized (this) {
            if(size == data.length) return;
            data[tail++] = str;
            size++;
            if(tail == data.length) tail = 0;
        }
    }
    public String take() {
        synchronized (this) {
            if(size == 0) return null;
            String ret = data[head++];
            size--;
            if(head == data.length) head = 0;
            return ret;
        }
    }
}

5.2.2 进行阻塞操作

当进行完加锁操作之后,我们还需要实现阻塞的作用,当添加数据的时候,如果队列中容量满了的时候就进入阻塞等待状态,直到进行了 take 读取数据操作删除数据的时候,才停止等待;当读取数据的时候,如果队列为空,那么该线程就进入阻塞等待状态,直到进行了 put 操作。

class MyBlockingQueue {
    private final String[] data = new String[1000];
    private int size;
    private int head = 0;
    private int tail = 0;
    public void put(String str) throws InterruptedException {
        synchronized (this) {
            if(size == data.length) {
                this.wait();
            }
            data[tail++] = str;
            size++;
            if(tail == data.length) tail = 0;
            //这个 notify 用来唤醒 take 操作的等待
            this.notify();
        }
    }
    public String take() throws InterruptedException {
        synchronized (this) {
            if(size == 0) {
                this.wait();
            }
            String ret = data[head++];
            size--;
            if(head == data.length) head = 0;
            //这个 notify 用来唤醒 put 操作的等待
            this.notify();
            return ret;
        }
    }
}

5.2.3 解决因被 interrupt 唤醒 waiting 状态的问题


当使用了 wait 和 notify 对 put 和 take 操作进行了阻塞等待和唤醒操作之后,我们还需要注意,难道只有 notify 才会唤醒 WAITING 状态吗?前面我们学习了使用 interrupt 来终止线程,但是 interrup 还会唤醒处于 WAITING 状态的线程,也就是说这里的 WAITING 状态的线程不仅可以被 notify 唤醒,还可以被 interrupt 唤醒。


当线程是因为 put 操作队列满了的时候进入阻塞等待状态的时候,如果是因为被 interrupt 唤醒而不是 take 操作的 notify 唤醒的时候就意味着此时队列还是满的,当进行添加操作的时候,就会将有效的数据覆盖掉;

当线程是因为 take 操作队列为空的时候进入阻塞等待状态的时候,如果是因为被 interrupt 唤醒而不是 put 操作的 notify 唤醒的时候就意味着此时队列还是空的,如果进行删除操作,并没有意义。

为了解决 WAITING 状态被 interrupt 唤醒而造成的问题,当线程被唤醒的时候,需要进行判断 size 是否还等于 0 或者 queue.length,如果还等于,就继续进入 WAITING 状态,但是光一次判断是不够的,因为还可能是被 interrupt 唤醒的,所以需要进行多次判断,可以用 while 循环来解决。

class MyBlockingQueue {
    private final String[] data = new String[1000];
    private int size;
    private int head = 0;
    private int tail = 0;
    public void put(String str) throws InterruptedException {
        synchronized (this) {
            while(size == data.length) {
                this.wait();
            }
            data[tail++] = str;
            size++;
            if(tail == data.length) tail = 0;
            //这个 notify 用来唤醒 take 操作的等待
            this.notify();
        }
    }
    public String take() throws InterruptedException {
        synchronized (this) {
            while(size == 0) {
                this.wait();
            }
            String ret = data[head++];
            size--;
            if(head == data.length) head = 0;
            //这个 notify 用来唤醒 put 操作的等待
            this.notify();
            return ret;
        }
    }
}

5.2.4 解决因指令重排序造成的问题

因为 put 和 take 操作要进行读和写的操作,可能会因为指令重排序的问题造成其他问题,这里就需要使用 volatile 解决指令重排序问题。

class MyBlockingQueue {
    private final String[] data = new String[1000];
    private volatile int size;
    private volatile int head = 0;
    private volatile int tail = 0;
    public void put(String str) throws InterruptedException {
        synchronized (this) {
            while(size == data.length) {
                this.wait();
            }
            data[tail++] = str;
            size++;
            if(tail == data.length) tail = 0;
            //这个 notify 用来唤醒 take 操作的等待
            this.notify();
        }
    }
    public String take() throws InterruptedException {
        synchronized (this) {
            while(size == 0) {
                this.wait();
            }
            String ret = data[head++];
            size--;
            if(head == data.length) head = 0;
            //这个 notify 用来唤醒 put 操作的等待
            this.notify();
            return ret;
        }
    }
}

测试实现的阻塞队列

public class Demo4 {
    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue();
        Thread t1 = new Thread(() -> {
            while(true) {
                try {
                    System.out.println("消费元素" + queue.take());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        Thread t2 = new Thread(() -> {
            int count = 1;
            while(true) {
                try {
                    queue.put(count + "");
                    System.out.println("生产元素" + count);
                    count++;
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
    }
}

让生产速度较慢,使得读取操作阻塞等待插入数据才执行。

public class Demo4 {
    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue();
        Thread t1 = new Thread(() -> {
            while(true) {
                try {
                    System.out.println("消费元素" + queue.take());
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        Thread t2 = new Thread(() -> {
            int count = 1;
            while(true) {
                try {
                    queue.put(count + "");
                    System.out.println("生产元素" + count);
                    count++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
    }
}

让生产 put 操作进入阻塞等待状态。

相关文章
|
3月前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
25天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
16 1
|
2月前
|
数据采集 负载均衡 安全
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
本文提供了多个多线程编程问题的解决方案,包括设计有限阻塞队列、多线程网页爬虫、红绿灯路口等,每个问题都给出了至少一种实现方法,涵盖了互斥锁、条件变量、信号量等线程同步机制的使用。
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
|
1月前
|
消息中间件 NoSQL 关系型数据库
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
23 0
|
2月前
|
安全 Java 调度
python3多线程实战(python3经典编程案例)
该文章提供了Python3中多线程的应用实例,展示了如何利用Python的threading模块来创建和管理线程,以实现并发执行任务。
43 0
|
3月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
169 4
|
6月前
|
设计模式 安全 Java
多线程(代码案例: 单例模式, 阻塞队列, 生产者消费者模型,定时器)
多线程(代码案例: 单例模式, 阻塞队列, 生产者消费者模型,定时器)
57 2
|
5月前
|
存储 Java API
java线程之阻塞队列
java线程之阻塞队列
|
5月前
|
安全 Java 容器
线程池,定时器以及阻塞队列(生产者/消费者模型)
线程池,定时器以及阻塞队列(生产者/消费者模型)
39 0
|
6月前
|
存储 Java 调度
Java多线程基础-11:工厂模式及代码案例之线程池(二)
这篇内容介绍了Java多线程基础,特别是线程池中的定时器和拒绝策略。
61 0

热门文章

最新文章