Java生产者和消费者模型的5种实现方式

简介: 生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。


q.png


现在用四种方式来实现生产者消费者模型


wait()和notify()方法的实现


这也是最简单最基础的实现,缓冲区满和为空时都调用wait()方法等待,当生产者生产了一个产品或者消费者消费了一个产品之后会唤醒所有线程。


/*** 生产者和消费者,wait()和notify()的实现* @author ZGJ* @date 2017年6月22日*/publicclassTest1 {
privatestaticIntegercount=0;
privatestaticfinalIntegerFULL=10;
privatestaticStringLOCK="lock";
publicstaticvoidmain(String[] args) {
Test1test1=newTest1();
newThread(test1.newProducer()).start();
newThread(test1.newConsumer()).start();
newThread(test1.newProducer()).start();
newThread(test1.newConsumer()).start();
newThread(test1.newProducer()).start();
newThread(test1.newConsumer()).start();
newThread(test1.newProducer()).start();
newThread(test1.newConsumer()).start();
    }
classProducerimplementsRunnable {
@Overridepublicvoidrun() {
for (inti=0; i<10; i++) {
try {
Thread.sleep(3000);
                } catch (Exceptione) {
e.printStackTrace();
                }
synchronized (LOCK) {
while (count==FULL) {
try {
LOCK.wait();
                        } catch (Exceptione) {
e.printStackTrace();
                        }
                    }
count++;
System.out.println(Thread.currentThread().getName() +"生产者生产,目前总共有"+count);
LOCK.notifyAll();
                }
            }
        }
    }
classConsumerimplementsRunnable {
@Overridepublicvoidrun() {
for (inti=0; i<10; i++) {
try {
Thread.sleep(3000);
                } catch (InterruptedExceptione) {
e.printStackTrace();
                }
synchronized (LOCK) {
while (count==0) {
try {
LOCK.wait();
                        } catch (Exceptione) {
                        }
                    }
count--;
System.out.println(Thread.currentThread().getName() +"消费者消费,目前总共有"+count);
LOCK.notifyAll();
                }
            }
        }
    }
}


结果:


Thread-0生产者生产,目前总共有1Thread-4生产者生产,目前总共有2Thread-3消费者消费,目前总共有1Thread-1消费者消费,目前总共有0Thread-2生产者生产,目前总共有1Thread-6生产者生产,目前总共有2Thread-7消费者消费,目前总共有1Thread-5消费者消费,目前总共有0Thread-0生产者生产,目前总共有1Thread-4生产者生产,目前总共有2Thread-3消费者消费,目前总共有1Thread-6生产者生产,目前总共有2Thread-1消费者消费,目前总共有1Thread-7消费者消费,目前总共有0Thread-2生产者生产,目前总共有1Thread-5消费者消费,目前总共有0Thread-0生产者生产,目前总共有1Thread-4生产者生产,目前总共有2Thread-3消费者消费,目前总共有1Thread-7消费者消费,目前总共有0Thread-6生产者生产,目前总共有1Thread-2生产者生产,目前总共有2Thread-1消费者消费,目前总共有1Thread-5消费者消费,目前总共有0Thread-0生产者生产,目前总共有1Thread-4生产者生产,目前总共有2Thread-3消费者消费,目前总共有1Thread-1消费者消费,目前总共有0Thread-6生产者生产,目前总共有1Thread-7消费者消费,目前总共有0Thread-2生产者生产,目前总共有1


可重入锁ReentrantLock的实现


java.util.concurrent.lock 中的 Lock 框架是锁定的一个抽象,通过对lock的lock()方法和unlock()方法实现了对锁的显示控制,而synchronize()则是对锁的隐性控制。

可重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响,简单来说,该锁维护这一个与获取锁相关的计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,函数调用结束计数器就减1,然后锁需要被释放两次才能获得真正释放。已经获取锁的线程进入其他需要相同锁的同步代码块不会被阻塞。


importjava.util.concurrent.locks.Condition;
importjava.util.concurrent.locks.Lock;
importjava.util.concurrent.locks.ReentrantLock;
/*** 生产者和消费者,ReentrantLock的实现* * @author ZGJ* @date 2017年6月22日*/publicclassTest2 {
privatestaticIntegercount=0;
privatestaticfinalIntegerFULL=10;
//创建一个锁对象privateLocklock=newReentrantLock();
//创建两个条件变量,一个为缓冲区非满,一个为缓冲区非空privatefinalConditionnotFull=lock.newCondition();
privatefinalConditionnotEmpty=lock.newCondition();
publicstaticvoidmain(String[] args) {
Test2test2=newTest2();
newThread(test2.newProducer()).start();
newThread(test2.newConsumer()).start();
newThread(test2.newProducer()).start();
newThread(test2.newConsumer()).start();
newThread(test2.newProducer()).start();
newThread(test2.newConsumer()).start();
newThread(test2.newProducer()).start();
newThread(test2.newConsumer()).start();
    }
classProducerimplementsRunnable {
@Overridepublicvoidrun() {
for (inti=0; i<10; i++) {
try {
Thread.sleep(3000);
                } catch (Exceptione) {
e.printStackTrace();
                }
//获取锁lock.lock();
try {
while (count==FULL) {
try {
notFull.await();
                        } catch (InterruptedExceptione) {
e.printStackTrace();
                        }
                    }
count++;
System.out.println(Thread.currentThread().getName()
+"生产者生产,目前总共有"+count);
//唤醒消费者notEmpty.signal();
                } finally {
//释放锁lock.unlock();
                }
            }
        }
    }
classConsumerimplementsRunnable {
@Overridepublicvoidrun() {
for (inti=0; i<10; i++) {
try {
Thread.sleep(3000);
                } catch (InterruptedExceptione1) {
e1.printStackTrace();
                }
lock.lock();
try {
while (count==0) {
try {
notEmpty.await();
                        } catch (Exceptione) {
e.printStackTrace();
                        }
                    }
count--;
System.out.println(Thread.currentThread().getName()
+"消费者消费,目前总共有"+count);
notFull.signal();
                } finally {
lock.unlock();
                }
            }
        }
    }
}


阻塞队列BlockingQueue的实现


BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。

被阻塞的情况主要有如下两种:


  1. 当队列满了的时候进行入队列操作
  2. 当队列空了的时候进行出队列操作因此,当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线    程进行了出队操作,当一个线程对一个空的阻塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作。从上可知,
  3. 阻塞队列是线程安全的。下面是BlockingQueue接口的一些方法:


操作 抛异常 特定值 阻塞 超时
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
移除 remove(o) poll(o) take(o) poll(timeout, timeunit)
检查 element(o) peek(o)    


这四类方法分别对应的是:

1 . ThrowsException:如果操作不能马上进行,则抛出异常

2 . SpecialValue:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false

3 . Blocks:如果操作不能马上进行,操作会被阻塞

4 . TimesOut:如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值,一般是true或者false

下面来看由阻塞队列实现的生产者消费者模型,这里我们使用take()和put()方法,这里生产者和生产者,消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象


importjava.util.concurrent.ArrayBlockingQueue;
importjava.util.concurrent.BlockingQueue;
/*** 使用BlockingQueue实现生产者消费者模型* @author ZGJ* @date 2017年6月29日*/publicclassTest3 {
privatestaticIntegercount=0;
//创建一个阻塞队列finalBlockingQueue<Integer>blockingQueue=newArrayBlockingQueue<>(10);
publicstaticvoidmain(String[] args) {
Test3test3=newTest3();
newThread(test3.newProducer()).start();
newThread(test3.newConsumer()).start();
newThread(test3.newProducer()).start();
newThread(test3.newConsumer()).start();
newThread(test3.newProducer()).start();
newThread(test3.newConsumer()).start();
newThread(test3.newProducer()).start();
newThread(test3.newConsumer()).start();
    }
classProducerimplementsRunnable {
@Overridepublicvoidrun() {
for (inti=0; i<10; i++) {
try {
Thread.sleep(3000);
                } catch (Exceptione) {
e.printStackTrace();
                }
try {
blockingQueue.put(1);
count++;
System.out.println(Thread.currentThread().getName()
+"生产者生产,目前总共有"+count);
                } catch (InterruptedExceptione) {
e.printStackTrace();
                }
            }
        }
    }
classConsumerimplementsRunnable {
@Overridepublicvoidrun() {
for (inti=0; i<10; i++) {
try {
Thread.sleep(3000);
                } catch (InterruptedExceptione1) {
e1.printStackTrace();
                }
try {
blockingQueue.take();
count--;
System.out.println(Thread.currentThread().getName()
+"消费者消费,目前总共有"+count);
                } catch (InterruptedExceptione) {
e.printStackTrace();
                }
            }
        }
    }
}


信号量Semaphore的实现


Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,在操作系统中是一个非常重要的问题,可以用来解决哲学家就餐问题。Java中的Semaphore维护了一个许可集,一开始先设定这个许可集的数量,可以使用acquire()方法获得一个许可,当许可不足时会被阻塞,release()添加一个许可。

在下列代码中,还加入了另外一个mutex信号量,维护生产者消费者之间的同步关系,保证生产者和消费者之间的交替进行


importjava.util.concurrent.Semaphore;
/*** 使用semaphore信号量实现* @author ZGJ* @date 2017年6月29日*/publicclassTest4 {
privatestaticIntegercount=0;
//创建三个信号量finalSemaphorenotFull=newSemaphore(10);
finalSemaphorenotEmpty=newSemaphore(0);
finalSemaphoremutex=newSemaphore(1);
publicstaticvoidmain(String[] args) {
Test4test4=newTest4();
newThread(test4.newProducer()).start();
newThread(test4.newConsumer()).start();
newThread(test4.newProducer()).start();
newThread(test4.newConsumer()).start();
newThread(test4.newProducer()).start();
newThread(test4.newConsumer()).start();
newThread(test4.newProducer()).start();
newThread(test4.newConsumer()).start();
    }
classProducerimplementsRunnable {
@Overridepublicvoidrun() {
for (inti=0; i<10; i++) {
try {
Thread.sleep(3000);
                } catch (InterruptedExceptione) {
e.printStackTrace();
                }
try {
notFull.acquire();
mutex.acquire();
count++;
System.out.println(Thread.currentThread().getName()
+"生产者生产,目前总共有"+count);
                } catch (InterruptedExceptione) {
e.printStackTrace();
                } finally {
mutex.release();
notEmpty.release();
                }
            }
        }
    }
classConsumerimplementsRunnable {
@Overridepublicvoidrun() {
for (inti=0; i<10; i++) {
try {
Thread.sleep(3000);
                } catch (InterruptedExceptione1) {
e1.printStackTrace();
                }
try {
notEmpty.acquire();
mutex.acquire();
count--;
System.out.println(Thread.currentThread().getName()
+"消费者消费,目前总共有"+count);
                } catch (InterruptedExceptione) {
e.printStackTrace();
                } finally {
mutex.release();
notFull.release();
                }
            }
        }
    }
}


管道输入输出流PipedInputStream和PipedOutputStream实现


在java的io包下,PipedOutputStream和PipedInputStream分别是管道输出流和管道输入流。

它们的作用是让多线程可以通过管道进行线程间的通讯。在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用。

使用方法:先创建一个管道输入流和管道输出流,然后将输入流和输出流进行连接,用生产者线程往管道输出流中写入数据,消费者在管道输入流中读取数据,这样就可以实现了不同线程间的相互通讯,但是这种方式在生产者和生产者、消费者和消费者之间不能保证同步,也就是说在一个生产者和一个消费者的情况下是可以生产者和消费者之间交替运行的,多个生成者和多个消费者者之间则不行


/*** 使用管道实现生产者消费者模型* @author ZGJ* @date 2017年6月30日*/publicclassTest5 {
finalPipedInputStreampis=newPipedInputStream();
finalPipedOutputStreampos=newPipedOutputStream();
    {
try {
pis.connect(pos);
        } catch (IOExceptione) {
e.printStackTrace();
        }
    }
classProducerimplementsRunnable {
@Overridepublicvoidrun() {
try {
while(true) {
Thread.sleep(1000);
intnum= (int) (Math.random() *255);
System.out.println(Thread.currentThread().getName() +"生产者生产了一个数字,该数字为: "+num);
pos.write(num);
pos.flush();
                } 
            } catch (Exceptione) {
e.printStackTrace();
            } finally {
try {
pos.close();
pis.close();
                } catch (IOExceptione) {
e.printStackTrace();
                }
            }
        }
    }
classConsumerimplementsRunnable {
@Overridepublicvoidrun() {
try {
while(true) {
Thread.sleep(1000);
intnum=pis.read();
System.out.println("消费者消费了一个数字,该数字为:"+num);
                }
            } catch (Exceptione) {
e.printStackTrace();
            } finally {
try {
pos.close();
pis.close();
                } catch (IOExceptione) {
e.printStackTrace();
                }
            }
        }
    }
publicstaticvoidmain(String[] args) {
Test5test5=newTest5();
newThread(test5.newProducer()).start();
newThread(test5.newConsumer()).start();
    }
}
相关文章
|
3月前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
5月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
53 1
|
7月前
|
安全 Java
Java模拟生产者-消费者问题。生产者不断的往仓库中存放产品,消费者从仓库中消费产品。其中生产者和消费者都可以有若干个。在这里,生产者是一个线程,消费者是一个线程。仓库容量有限,只有库满时生产者不能存
该博客文章通过Java代码示例演示了生产者-消费者问题,其中生产者在仓库未满时生产产品,消费者在仓库有产品时消费产品,通过同步机制确保多线程环境下的线程安全和有效通信。
|
7月前
|
网络协议 Java 关系型数据库
16 Java网络编程(计算机网络+网络模型OSI/TCP/IP+通信协议等)
16 Java网络编程(计算机网络+网络模型OSI/TCP/IP+通信协议等)
125 2
|
7月前
|
算法 Java
HanLP — HMM隐马尔可夫模型 -- 维特比(Viterbi)算法 --示例代码 - Java
HanLP — HMM隐马尔可夫模型 -- 维特比(Viterbi)算法 --示例代码 - Java
88 0
|
8月前
|
存储 Java Unix
(八)Java网络编程之IO模型篇-内核Select、Poll、Epoll多路复用函数源码深度历险!
select/poll、epoll这些词汇相信诸位都不陌生,因为在Redis/Nginx/Netty等一些高性能技术栈的底层原理中,大家应该都见过它们的身影,接下来重点讲解这块内容。
138 0
|
26天前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
157 60
【Java并发】【线程池】带你从0-1入门线程池
|
15天前
|
存储 网络协议 安全
Java网络编程,多线程,IO流综合小项目一一ChatBoxes
**项目介绍**:本项目实现了一个基于TCP协议的C/S架构控制台聊天室,支持局域网内多客户端同时聊天。用户需注册并登录,用户名唯一,密码格式为字母开头加纯数字。登录后可实时聊天,服务端负责验证用户信息并转发消息。 **项目亮点**: - **C/S架构**:客户端与服务端通过TCP连接通信。 - **多线程**:采用多线程处理多个客户端的并发请求,确保实时交互。 - **IO流**:使用BufferedReader和BufferedWriter进行数据传输,确保高效稳定的通信。 - **线程安全**:通过同步代码块和锁机制保证共享数据的安全性。
66 23
|
22天前
|
Java 调度
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
当我们创建一个`ThreadPoolExecutor`的时候,你是否会好奇🤔,它到底发生了什么?比如:我传的拒绝策略、线程工厂是啥时候被使用的? 核心线程数是个啥?最大线程数和它又有什么关系?线程池,它是怎么调度,我们传入的线程?...不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界...
92 0
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
|
1月前
|
Java 程序员 开发者
Java社招面试题:一个线程运行时发生异常会怎样?
大家好,我是小米。今天分享一个经典的 Java 面试题:线程运行时发生异常,程序会怎样处理?此问题考察 Java 线程和异常处理机制的理解。线程发生异常,默认会导致线程终止,但可以通过 try-catch 捕获并处理,避免影响其他线程。未捕获的异常可通过 Thread.UncaughtExceptionHandler 处理。线程池中的异常会被自动处理,不影响任务执行。希望这篇文章能帮助你深入理解 Java 线程异常处理机制,为面试做好准备。如果你觉得有帮助,欢迎收藏、转发!
130 14