LinkedBlockingQueue实现的生产者和消费者模型

简介: LinkedBlockingQueue实现的生产者和消费者模型

首先 LinkedBlockingQueue 是线程安全的阻塞队列,LinkedBlockingQueue实现的生产者和消费者模型

阻塞队列与我们平常接触的普通队列(LinkedList或ArrayList等)的最大不同点,在于阻塞队列支出阻塞添加和阻塞删除方法。

阻塞添加:所谓的阻塞添加是指当阻塞队列元素已满时,队列会阻塞加入元素的线程,直队列元素不满时才重新唤醒线程执行元素加入操作。

阻塞删除:阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作(一般都会返回被删除的元素)

BlockingQueue的核心方法:

放入数据:

  offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)

  offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。

  put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

获取数据:

  poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;

  poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。

  take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;

  drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

如果不指定队列的容量大小,也就是使用默认的Integer.MAX_VALUE,如果存在添加速度大于删除速度时候,有可能会内存溢出(OOM)

写法一:

生产者 Producer.java

package com.vipsoft.web.app;
import java.util.concurrent.LinkedBlockingQueue;
public class Producer extends Thread {
    //1、通过构造函数传入阻塞队列
    public static LinkedBlockingQueue<String> queue; 
    
    public Producer(LinkedBlockingQueue<String> queue) {
        this.queue = queue;
    }
    public void run() {
        int i = 0;
        while (true) {
            i++;
            try {
                String msg = "P" + i;
                queue.put(msg);
                System.out.println("我生产了 => " + msg + " 队列数量 " + queue.size());
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println("Producer queue.size => " + queue.size());
                e.printStackTrace();
            } 
        }
    }
}

消费者 Consumer.java

package com.vipsoft.web.app;
import java.util.concurrent.LinkedBlockingQueue;
public class Consumer extends Thread {
    public static LinkedBlockingQueue<String> queue;
    public Consumer(LinkedBlockingQueue<String> queue) {
        this.queue = queue;
    }
    public void run() { 
        while (true) {
            try {
                System.out.println("我消费了 => " + queue.take() + " 队列数量 " + queue.size());
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                System.out.println("Consumer queue.size() => " + queue.size());
                e.printStackTrace();
            }
        }
    }
}

主程序

package com.vipsoft.web.app;
 
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueTest {
    public static void main(String[] args) {
        //1、创建一个BlockingQueue
        int MAX_NUM = 10;  //实际使用也需要指定大小,防止OOM
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(MAX_NUM);
        //2、创建一个生产者,一个消费者
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        //3、开启两个线程
        producer.start();
        consumer.start();
    }
}

写法二:

package com.vipsoft.web.app;
 
import java.util.concurrent.LinkedBlockingDeque;
public class LinkedBlockingQueueTest {
    public static void main(String[] args) {
        final LinkedBlockingDeque<String> queue = new LinkedBlockingDeque<>(10); //实际使用也需要指定大小,防止OOM
        Runnable producerRunnable = new Runnable() {
            public void run() {
                int i = 0;
                while (true) {
                    i++;
                    try {
                        String msg = "P" + i;
                        queue.put(msg);
                        System.out.println("我生产了 => " + msg + " 队列数量 " + queue.size());
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        System.out.println("Producer queue.size => " + queue.size());
                        e.printStackTrace();
                    }
                }
            }
        };
        Runnable customerRunnable = new Runnable() {
            public void run() {
                while (true) {
                    try {
                        System.out.println("我消费了 => " + queue.take() + " 队列数量 " + queue.size());
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        System.out.println("Consumer queue.size() => " + queue.size());
                        e.printStackTrace();
                    }
                }
            }
        };
        Thread thread1 = new Thread(producerRunnable);
        thread1.start();
        Thread thread2 = new Thread(customerRunnable);
        thread2.start();
    }
}
目录
相关文章
|
容器
多线程学习之生产者和消费者与阻塞队列的关系
多线程学习之生产者和消费者与阻塞队列的关系
48 0
|
3月前
|
Java
并发编程之生产者和消费者问题
该博客文章通过Java代码示例介绍了生产者和消费者问题的线程间通信解决方案,演示了如何使用synchronized关键字和wait/notifyAll方法来实现线程间的同步和资源的协调访问。
|
5月前
|
安全 Java 容器
线程池,定时器以及阻塞队列(生产者/消费者模型)
线程池,定时器以及阻塞队列(生产者/消费者模型)
40 0
|
安全 Java
【JUC基础】06. 生产者和消费者问题
学习JUC,就不得不提生产者消费者。生产者消费者模型是一种经典的多线程模型,用于解决生产者和消费者之间的数据交换问题。在生产者消费者模型中,生产者生产数据放入共享的缓冲区中,消费者从缓冲区中取出数据进行消费。在这个过程中,生产者和消费者之间需要保持同步,以避免数据出现错误或重复。今天我们就来说说生产者消费者模型,以及JUC中如何解决该模型的同步问题。
147 0
|
安全 数据处理
线程中的生产者和消费者模式
线程中的生产者和消费者模式
127 0
线程中的生产者和消费者模式
|
设计模式 安全
生产者与消费者模型
生产者与消费者模型
99 0
生产者与消费者模型
一个简单的生产者和消费者客服实现
一个简单的生产者和消费者客服实现
140 0