LinkedBlockingQueue实现的生产者和消费者模型

简介: LinkedBlockingQueue实现的生产者和消费者模型

首先 LinkedBlockingQueue 是线程安全的阻塞队列,LinkedBlockingQueue实现的生产者和消费者模型

阻塞队列与我们平常接触的普通队列(LinkedList或ArrayList等)的最大不同点,在于阻塞队列支出阻塞添加和阻塞删除方法。

阻塞添加:所谓的阻塞添加是指当阻塞队列元素已满时,队列会阻塞加入元素的线程,直队列元素不满时才重新唤醒线程执行元素加入操作。

阻塞删除:阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作(一般都会返回被删除的元素)

BlockingQueue的核心方法:

放入数据:

  offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)

  offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。

  put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

获取数据:

  poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;

  poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。

  take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;

  drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

如果不指定队列的容量大小,也就是使用默认的Integer.MAX_VALUE,如果存在添加速度大于删除速度时候,有可能会内存溢出(OOM)

写法一:

生产者 Producer.java

package com.vipsoft.web.app;
import java.util.concurrent.LinkedBlockingQueue;
public class Producer extends Thread {
    //1、通过构造函数传入阻塞队列
    public static LinkedBlockingQueue<String> queue; 
    
    public Producer(LinkedBlockingQueue<String> queue) {
        this.queue = queue;
    }
    public void run() {
        int i = 0;
        while (true) {
            i++;
            try {
                String msg = "P" + i;
                queue.put(msg);
                System.out.println("我生产了 => " + msg + " 队列数量 " + queue.size());
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println("Producer queue.size => " + queue.size());
                e.printStackTrace();
            } 
        }
    }
}

消费者 Consumer.java

package com.vipsoft.web.app;
import java.util.concurrent.LinkedBlockingQueue;
public class Consumer extends Thread {
    public static LinkedBlockingQueue<String> queue;
    public Consumer(LinkedBlockingQueue<String> queue) {
        this.queue = queue;
    }
    public void run() { 
        while (true) {
            try {
                System.out.println("我消费了 => " + queue.take() + " 队列数量 " + queue.size());
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                System.out.println("Consumer queue.size() => " + queue.size());
                e.printStackTrace();
            }
        }
    }
}

主程序

package com.vipsoft.web.app;
 
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueTest {
    public static void main(String[] args) {
        //1、创建一个BlockingQueue
        int MAX_NUM = 10;  //实际使用也需要指定大小,防止OOM
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(MAX_NUM);
        //2、创建一个生产者,一个消费者
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        //3、开启两个线程
        producer.start();
        consumer.start();
    }
}

写法二:

package com.vipsoft.web.app;
 
import java.util.concurrent.LinkedBlockingDeque;
public class LinkedBlockingQueueTest {
    public static void main(String[] args) {
        final LinkedBlockingDeque<String> queue = new LinkedBlockingDeque<>(10); //实际使用也需要指定大小,防止OOM
        Runnable producerRunnable = new Runnable() {
            public void run() {
                int i = 0;
                while (true) {
                    i++;
                    try {
                        String msg = "P" + i;
                        queue.put(msg);
                        System.out.println("我生产了 => " + msg + " 队列数量 " + queue.size());
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        System.out.println("Producer queue.size => " + queue.size());
                        e.printStackTrace();
                    }
                }
            }
        };
        Runnable customerRunnable = new Runnable() {
            public void run() {
                while (true) {
                    try {
                        System.out.println("我消费了 => " + queue.take() + " 队列数量 " + queue.size());
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        System.out.println("Consumer queue.size() => " + queue.size());
                        e.printStackTrace();
                    }
                }
            }
        };
        Thread thread1 = new Thread(producerRunnable);
        thread1.start();
        Thread thread2 = new Thread(customerRunnable);
        thread2.start();
    }
}
目录
相关文章
|
6月前
|
人工智能 JSON Java
Spring Boot 如何接收并处理不确定类型的请求参数?
在 Spring Boot 中,当需要处理结构不确定的 JSON 数据时,可以使用 `Map` 类型灵活接收键值对数据。对于更复杂的场景,可通过 Jackson 注解支持多态类型、自定义反序列化器,或在接收后动态解析 JSON 数据,提升处理灵活性和扩展性。
184 0
|
存储 设计模式 安全
使用BlockingQueue实现生产者-消费者模式
使用BlockingQueue实现生产者-消费者模式
|
12月前
|
SQL JavaScript Java
Spring Boot 3 整合 Mybatis-Plus 实现数据权限控制
本文介绍了如何在Spring Boot 3中整合MyBatis-Plus实现数据权限控制,通过使用MyBatis-Plus提供的`DataPermissionInterceptor`插件,在不破坏原有代码结构的基础上实现了细粒度的数据访问控制。文中详细描述了自定义注解`DataScope`的使用方法、`DataPermissionHandler`的具体实现逻辑,以及根据用户的不同角色和部门动态添加SQL片段来限制查询结果。此外,还展示了基于Spring Boot 3和Vue 3构建的前后端分离快速开发框架的实际应用案例,包括项目的核心功能模块如用户管理、角色管理等,并提供Gitee上的开源仓库
2368 11
|
12月前
|
机器学习/深度学习 人工智能 算法
人工智能的三大主义--——行为主义(actionism),连接主义 (connectionism)
这段内容涵盖了人工智能领域的重要概念和历史节点。首先介绍了布鲁克斯的六足行走机器人及Spot机器狗,被视为新一代“控制论动物”。接着解释了感知机作为最简单的人工神经网络,通过特征向量进行二分类。1974年,沃伯斯提出误差反向传播(BP)算法,利用梯度调整权重以优化模型。最后,阐述了符号主义、连接主义和行为主义三大学派的发展与融合,强调它们在持续学习中共同推动人工智能的进步。
人工智能的三大主义--——行为主义(actionism),连接主义 (connectionism)
|
Linux Windows
Installing, this may take a few minutes...WslRegisterDistribution failed with error: 0x80370114Err
Installing, this may take a few minutes...WslRegisterDistribution failed with error: 0x80370114Err
4408 3
|
存储 NoSQL Java
教程:Spring Boot与RocksDB本地存储的整合方法
教程:Spring Boot与RocksDB本地存储的整合方法
|
SQL NoSQL 关系型数据库
13 秒插入 30 万条数据,这才是批量插入正确的姿势!
【8月更文挑战第9天】在数据处理和数据库管理中,高效批量插入数据是一项至关重要的技能。无论是大数据分析、日志处理还是业务数据快速导入,高效的批量插入能力都能显著提升系统性能和用户体验。今天,我们就来深入探讨如何在极短时间内(如13秒内)向数据库批量插入30万条数据,分享那些被验证过的高效技术和最佳实践。
1478 0
|
XML API Android开发
android S 上 安装apk出现android.os.FileUriExposedException
android S 上 安装apk出现android.os.FileUriExposedException
467 6
|
人工智能 自然语言处理 文字识别
社区供稿 | 元象首个多模态大模型XVERSE-V开源,刷新权威大模型榜单,支持任意宽高比输入
元象公司发布了开源多模态大模型XVERSE-V,该模型在图像输入的宽高比方面具有灵活性,并在多项评测中展现出优越性能,超越了包括谷歌在内的多个知名模型。XVERSE-V采用创新方法结合全局和局部图像信息,适用于高清全景图识别、文字检测等任务,且已在Hugging Face、ModelScope和GitHub上开放下载。此外,模型在视障场景、内容创作、教育解题、百科问答和代码生成等领域有广泛应用,并在VizWiz等测试集中表现出色。元象致力于推动AI技术的普惠,支持中小企业、研究者和开发者进行研发和应用创新。
|
NoSQL Java Redis
Spring Boot + Redis 实现延时队列,写得太好了!
首先我们分析下这个流程 用户提交任务。首先将任务推送至延迟队列中。
Spring Boot + Redis 实现延时队列,写得太好了!