生产者和消费者问题是从操作系统中的许多实际同步问题中抽象出来的具有
代表性的问题。它反映了操作系统中典型的同步例子。
生产者进程(进程由多个线程组成)生产信息,例如它可以是计算进程。消费
者进程使用信息,它可以是输出打印进程。由于生产者和消费者彼此独立,且运
行速度不确定,所以很可能出现生产者已产生了信息而消费者却没有来得及接受
信息这种情况。为此,需要引入由一个或者若干个存储单元组成的临时存储区,
以便存放生产者所产生的信息,平滑进程间由于速度不确定所带来的问题。这个
临时存储区叫做缓冲区,通常用一维数组来表示。
由一个或若干个存储单元组成的缓冲区叫作“有穷缓冲区”。下面我们来分
析一下有穷缓冲的生产者和消费者的例子。
假设有多个生产者和多个消费者,它们共享一个具有n个存储单元的有穷缓冲
区Buffer(0……n-1),这是一个环形队列。其队尾指针Rear指向当前信息应存放
的位置(Buffer[Rear]),队首指针Front指向当前取出信息的位置(Buffer[front
])。生产者进程总是把信息存放在Buffer[Rear]中,消费者进程则总是从Buffer
[Rear]中取出信息。如果想使生产者进程和消费者进程协调合作,则必须使它们
遵循如下规则:
1) 只要缓冲区有存储单元,生产者都可往其中存放信息;当缓冲区已满时,
若任意生产者提出写要求,则都必须等待;
2) 只要缓冲区中有消息可取,消费者都可从缓冲区中取出消息;当缓冲区为
空时,若任意消费者想取出信息,则必须等待;
3) 生产者们和消费者们不能同时读、写缓冲区。
用JAVA 实现“生产者-消费者”问题的代码如下:
public class ProducerConsumer {
public static void main(String[] args) {
SyncStack ss = new SyncStack();
Producer p = new Producer(ss);
Consumer c = new Consumer(ss);
new Thread(p).start();
new Thread(p).start();
new Thread(p).start();
new Thread(c).start();
}
}
class WoTou {
int id;
WoTou(int id) {
this.id = id;
}
public String toString() {
return "WoTou : " + id;
}
}
class SyncStack {
int index = 0;
WoTou[] arrWT = new WoTou[6];
public synchronized void push(WoTou wt) {
while(index == arrWT.length) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notifyAll();
arrWT[index] = wt;
index ++;
}
public synchronized WoTou pop() {
while(index == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notifyAll();
index--;
return arrWT[index];
}
}
class Producer implements Runnable {
SyncStack ss = null;
Producer(SyncStack ss) {
this.ss = ss;
}
public void run() {
for(int i=0; i<20; i++) {
WoTou wt = new WoTou(i);
ss.push(wt);
System.out.println("生产了:" + wt);
try {
Thread.sleep((int)(Math.random() * 200));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
SyncStack ss = null;
Consumer(SyncStack ss) {
this.ss = ss;
}
public void run() {
for(int i=0; i<20; i++) {
WoTou wt = ss.pop();
System.out.println("消费了: " + wt);
try {
Thread.sleep((int)(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
生产者消费者问题是研究多线程程序时绕不开的问题,它的描述是有一块生产者和消费者共享的有界缓冲区,生产者往缓冲区放入产品,消费者从缓冲区取走产品,这个过程可以无休止的执行,不能因缓冲区满生产者放不进产品而终止,也不能因缓冲区空消费者无产品可取而终止。
public class Sycn1 {
private LinkedList<Object> myList =new LinkedList<Object>();
private int MAX = 10;
public Sycn1(){
}
public void start(){
new Producer().start();
new Consumer().start();
}
public static void main(String[] args) throws Exception{
Sycn1 s1 = new Sycn1();
s1.start();
}
class Producer extends Thread{
public void run(){
while(true){
synchronized(myList){
try{
while(myList.size() == MAX){
System.out.println("warning: it's full!");
myList.wait();
}
Object o = new Object();
if(myList.add(o)){
System.out.println("Producer: " + o);
myList.notify();
}
}catch(InterruptedException ie){
System.out.println("producer is interrupted!");
}
}
}
}
}
class Consumer extends Thread{
public void run(){
while(true){
synchronized(myList){
try{
while(myList.size() == 0){
System.out.println("warning: it's empty!");
myList.wait();
}
Object o = myList.removeLast();
System.out.println("Consumer: " + o);
myList.notify();
}catch(InterruptedException ie){
System.out.println("consumer is interrupted!");
}
}
}
}
}
}
import java.util.concurrent.locks. * ;
public class Sycn2 {
private LinkedList<Object> myList = new LinkedList<Object>();
private int MAX = 10;
private final Lock lock = new ReentrantLock();
private final Condition full = lock.newCondition();
private final Condition empty = lock.newCondition();
public Sycn2(){
}
public void start(){
new Producer().start();
new Consumer().start();
}
public static void main(String[] args) throws Exception{
Sycn2 s2 = new Sycn2();
s2.start();
}
class Producer extends Thread{
public void run(){
while(true){
lock.lock();
try{
while(myList.size() == MAX){
System.out.println("warning: it's full!");
full.await();
}
Object o = new Object();
if(myList.add(o)){
System.out.println("Producer: " + o);
empty.signal();
}
}catch(InterruptedException ie){
System.out.println("producer is interrupted!");
}finally{
lock.unlock();
}
}
}
}
class Consumer extends Thread{
public void run(){
while(true){
lock.lock();
try{
while(myList.size() == 0){
System.out.println("warning: it's empty!");
empty.await();
}
Object o = myList.removeLast();
System.out.println("Consumer: " + o);
full.signal();
}catch(InterruptedException ie){
System.out.println("consumer is interrupted!");
}finally{
lock.unlock();
}
}
}
}
}
public class Sycn3 {
private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(10);
private int MAX = 10;
public Sycn3(){
}
public void start(){
new Producer().start();
new Consumer().start();
}
public static void main(String[] args) throws Exception{
Sycn3 s3 = new Sycn3();
s3.start();
}
class Producer extends Thread{
public void run(){
while(true){
//synchronized(this){
try{
if(queue.size() == MAX)
System.out.println("warning: it's full!");
Object o = new Object();
queue.put(o);
System.out.println("Producer: " + o);
}catch(InterruptedException e){
System.out.println("producer is interrupted!");
}
//}
}
}
}
class Consumer extends Thread{
public void run(){
while(true){
//synchronized(this){
try{
if(queue.size() == 0)
System.out.println("warning: it's empty!");
Object o = queue.take();
System.out.println("Consumer: " + o);
}catch(InterruptedException e){
System.out.println("producer is interrupted!");
}
//}
}
}
}
}
public class Sycn4 {
private PipedOutputStream pos;
private PipedInputStream pis;
//private ObjectOutputStream oos;
//private ObjectInputStream ois;
public Sycn4(){
try{
pos = new PipedOutputStream();
pis = new PipedInputStream(pos);
//oos = new ObjectOutputStream(pos);
//ois = new ObjectInputStream(pis);
}catch(IOException e){
System.out.println(e);
}
}
public void start(){
new Producer().start();
new Consumer().start();
}
public static void main(String[] args) throws Exception{
Sycn4 s4 = new Sycn4();
s4.start();
}
class Producer extends Thread{
public void run() {
try{
while(true){
int b = (int) (Math.random() * 255);
System.out.println("Producer: a byte, the value is " + b);
pos.write(b);
pos.flush();
//Object o = new MyObject();
//oos.writeObject(o);
//oos.flush();
//System.out.println("Producer: " + o);
}
}catch(Exception e){
//System.out.println(e);
e.printStackTrace();
}finally{
try{
pos.close();
pis.close();
//oos.close();
//ois.close();
}catch(IOException e){
System.out.println(e);
}
}
}
}
class Consumer extends Thread{
public void run(){
try{
while(true){
int b = pis.read();
System.out.println("Consumer: a byte, the value is " + String.valueOf(b));
//Object o = ois.readObject();
//if(o != null)
//System.out.println("Consumer: " + o);
}
}catch(Exception e){
//System.out.println(e);
e.printStackTrace();
}finally{
try{
pos.close();
pis.close();
//oos.close();
//ois.close();
}catch(IOException e){
System.out.println(e);
}
}
}
}
//class MyObject implements Serializable {
//}
}